Skip to content

Commit

Permalink
Merge pull request #347 from pfafflabatuiuc/messages_from_apps
Browse files Browse the repository at this point in the history
Messages from apps from the AppManager.
  • Loading branch information
wpfff committed Nov 22, 2022
2 parents 1d04137 + 474d755 commit f877d2f
Showing 1 changed file with 36 additions and 15 deletions.
51 changes: 36 additions & 15 deletions plottr/apps/appmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class AppServer(QtCore.QObject):

messageReceived = Signal(object)

def __init__(self, context: zmq.Context, port: str, parent: Optional[QtCore.QObject] = None):
def __init__(self, port: str, parent: Optional[QtCore.QObject] = None):
"""
Constructor for :class: `.AppServer`
Expand All @@ -63,24 +63,27 @@ def __init__(self, context: zmq.Context, port: str, parent: Optional[QtCore.QObj
super().__init__(parent=parent)
self.port = port
self.address = '127.0.0.1'
self.context = context
self.context = zmq.Context()
self.t_blocking = 1000 # in ms
self.reply = None
self.running = True

self.socket = self.context.socket(zmq.REP)
self.socket: Optional[zmq.Socket] = self.context.socket(zmq.REP)
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)

def run(self) -> None:
"""
Connects the socket and starts listening for commands.
"""
assert isinstance(self.socket, zmq.Socket)
self.socket.bind(f'tcp://{self.address}:{self.port}')

while self.running:
# Check if there are any messages.
evts = self.poller.poll(self.t_blocking)
evts = []
if not self.socket._closed:
evts = self.poller.poll(self.t_blocking)
if len(evts) > 0:
message = self.socket.recv_pyobj()
if message == 'ping':
Expand Down Expand Up @@ -122,8 +125,6 @@ class App(QtCore.QObject):
Runs an :class:`.AppServer` in a separate thread, which allows to receive and send messages
from the parent :class:`.AppManager` to the app.
"""
#: Signal() -- emitted when the app is going to close. Used to close the AppServer
endProcess = Signal()

#: Signal(Any) -- emitted when the App has the reply for a message. The AppServer gets the signal and replies.
#: Arguments:
Expand All @@ -139,12 +140,10 @@ def __init__(self, setupFunc: AppType, port: int, parent: Optional[QtCore.QObjec
self.win.show()
self.win.windowClosed.connect(self.onQuit)

self.context = zmq.Context()
self.port = port
self.server: Optional[AppServer] = AppServer(self.context, str(port))
self.server: Optional[AppServer] = AppServer(str(port))
self.serverThread: Optional[QtCore.QThread] = QtCore.QThread()

self.endProcess.connect(self.server.quit)
self.replyReady.connect(self.server.loadReply)
self.server.messageReceived.connect(self.onMessageReceived)
self.server.moveToThread(self.serverThread)
Expand Down Expand Up @@ -209,11 +208,10 @@ def onMessageReceived(self, message: Tuple[str, str, Any]) -> None:
@Slot()
def onQuit(self) -> None:
"""
Gets called when win is about to close. Emits endProcess to stop the server. Destroys the zmq context and stops
the server thread.
Gets called when win is about to close. Stops the server and stops the server thread.
"""
self.endProcess.emit()
self.context.destroy(1)
if self.server is not None:
self.server.quit()

if self.server is not None and self.serverThread is not None:
self.serverThread.requestInterruption()
Expand All @@ -227,8 +225,9 @@ def onQuit(self) -> None:

class ProcessMonitor(QtCore.QObject):
"""
Helper class that runs in a separate thread. Its only job is to constantly check if a process is still running and
alert the AppManager when a process has been closed.
Helper class that runs in a separate thread. Its job is to constantly check if a process is still running and
alert the AppManager when a process has been closed and to print any standard output or standard error that
any process is sending.
"""

#: Signal(IdType) -- emitted when it detects that a process is closed.
Expand All @@ -250,6 +249,8 @@ def onNewProcess(self, Id: IdType, process: QtCore.QProcess) -> None:
:param process: The QProcess to keep track of.
"""
self.processes[Id] = process
self.processes[Id].readyReadStandardOutput.connect(self.onReadyStandardOutput)
self.processes[Id].readyReadStandardError.connect(self.onReadyStandardError)

def quit(self) -> None:
"""
Expand All @@ -271,6 +272,26 @@ def run(self) -> None:
self.processTerminated.emit(Id)
qtsleep(0.01)

@Slot()
def onReadyStandardOutput(self) -> None:
"""
Gets called when any process emits the readyReadStandardOutput signal, and prints any message it receives.
"""
for Id, process in self.processes.items():
output = str(process.readAllStandardOutput(), 'utf-8') # type: ignore[call-overload] # mypy complains about str() not accepting QbyteArray even though it is an object
if output != '':
print(f'Process {Id}: {output}')

@Slot()
def onReadyStandardError(self) -> None:
"""
Gets called when any process emits the readyReadStandardError signal, and prints any messages it receives.
"""
for Id, process in self.processes.items():
output = str(process.readAllStandardError(), 'utf-8') # type: ignore[call-overload] # mypy complains about str() not accepting QbyteArray even though it is an object.
if output != '':
print(f'Process {Id}: {output}')


class AppManager(QtWidgets.QWidget):
"""A widget that launches, manages, and communicates with app instances
Expand Down

0 comments on commit f877d2f

Please sign in to comment.