Skip to content

Commit

Permalink
Exception in log
Browse files Browse the repository at this point in the history
  • Loading branch information
golanha committed Mar 6, 2024
1 parent c06cd99 commit 00b87b7
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 4 deletions.
Expand Up @@ -48,7 +48,7 @@ def setupStreamingListeners(self, listenerConfig, parents, nodeName):
options['remoteAddress'] = remoteAddressUrl
options['messageOriginNodeName'] = parentName
def is_active():
return self._messageListeners[remoteAddressUrl] is not None and self.listeningToMessages # pylint: disable=cell-var-from-loop
return self._messageListeners.get(remoteAddressUrl) is not None and self.listeningToMessages # pylint: disable=cell-var-from-loop
listener = MessageListener(options, nodeName,is_active)
listener.registerMessageListener(self.onMessage)
self._messageListeners[remoteAddressUrl] = listener
Expand Down
5 changes: 2 additions & 3 deletions hkube_python_wrapper/wrapper/algorunner.py
Expand Up @@ -360,7 +360,7 @@ def onStatistics(statistics):
onStatistics, producerConfig, self._job.childs, nodeName)

def _start(self, options):
if (self._job.isStreaming):
if (self._job and self._job.isStreaming):
self.streamingManager.setParsedFlows(self._job.parsedFlow, self._job.defaultFlow)
if (self._job.childs):
self._setupStreamingProducer(self._job.nodeName)
Expand Down Expand Up @@ -481,10 +481,9 @@ def _stopAlgorithm(self, options):
else:
log.info('stopping gracefully')

if (self._job.isStreaming):
if (self._job and self._job.isStreaming):
if (forceStop is False):
stoppingState = True

def stopping():
while (stoppingState):
self._sendCommand(messages.outgoing.stopping, None)
Expand Down

0 comments on commit 00b87b7

Please sign in to comment.