Skip to content

Commit

Permalink
Merge pull request #117 from kube-HPC/producer_process
Browse files Browse the repository at this point in the history
Producer process
  • Loading branch information
golanha committed Apr 18, 2024
2 parents 4c310ed + 1ef300f commit da6b1c9
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 35 deletions.
3 changes: 3 additions & 0 deletions hkube_python_wrapper/codeApi/hkube_api.py
Expand Up @@ -56,6 +56,7 @@ def isListeningToMessages(self):
return self.streamingManager.listeningToMessages

def get_streaming_statistics(self):
# Not supported since done in separate process
if (self.streamingManager.messageProducer is not None):
return self.streamingManager.messageProducer.getStatistics()
print("get_streaming_statistics returns no value in debug")
Expand Down Expand Up @@ -245,12 +246,14 @@ def start_raw_subpipeline(self, name, nodes, flowInput, options={}, webhooks={},
return self._waitForResult(execution)
# reset all pending messages to zero.
def resetQueue(self):
# Not supported since done in separate process
if (self.streamingManager.messageProducer is not None):
self.streamingManager.resetQueue()
else:
log.info('reset queue does not work when debuging localy')

def resetQueuePartial(self, numberOfMessagesToRemove):
# Not supported since done in separate process
if (self.streamingManager.messageProducer is not None):
self.streamingManager.resetQueuePartial(numberOfMessagesToRemove)
else:
Expand Down
84 changes: 59 additions & 25 deletions hkube_python_wrapper/communication/streaming/StreamingManager.py
@@ -1,39 +1,66 @@
import multiprocessing
import threading

from .MessageListener import MessageListener
from .MessageProducer import MessageProducer
from hkube_python_wrapper.util.logger import log, algorithmLogger
from .producerProcess import ProducerRunner


class StreamingManager():

def __init__(self):
self.threadLocalStorage = threading.local()
self.threadLocalStorage.sendMessageId = None
self.messageProducer = None
self._messageListeners = dict()
self._inputListener = []
self.listeningToMessages = False
self._isStarted = False
self.parsedFlows = {}
self.defaultFlow = None

self.method_invoke_queue = None
self.nextNodes = []
self.statistics_queue = None

def setParsedFlows(self, flows, defaultFlow):
self.parsedFlows = flows
self.defaultFlow = defaultFlow

def setupStreamingProducer(self, onStatistics, producerConfig, nextNodes, nodeName):
self.messageProducer = MessageProducer(producerConfig, nextNodes, nodeName)
self.messageProducer.registerStatisticsListener(onStatistics)
if (nextNodes):
self.messageProducer.start()

def resetQueue(self):
self.messageProducer.resetQueue()

def resetQueuePartial(self, numberOfMessagesToRemove):
self.messageProducer.resetQueuePartial(numberOfMessagesToRemove)
def setupStreamingProducer(self, options, onStatistics, producerConfig, nextNodes, nodeName):
self.nextNodes = nextNodes

def method_in_new_process(method_invoke_queue, statistics_queue, options, nextNodes, node):
producerRunner = ProducerRunner(method_invoke_queue, statistics_queue, options, nextNodes, producerConfig,
node)
producerRunner.run()

self.method_invoke_queue = multiprocessing.Queue() # Create a queue
self.statistics_queue = multiprocessing.Queue()
p = multiprocessing.Process(target=method_in_new_process, args=(
self.method_invoke_queue, self.statistics_queue, options, nextNodes, nodeName))
p.start()

def sendStatisticsEvery():
while (True):
try:
statistics = self.statistics_queue.get()
print("got stats")
onStatistics(statistics)
except Exception as e:
print(e)
runThread = threading.Thread(name="get-stats", target=sendStatisticsEvery)
runThread.daemon = True
runThread.start()

# self.messageProducer = MessageProducer(producerConfig, nextNodes, nodeName)
# self.messageProducer.registerStatisticsListener(onStatistics)
# if (nextNodes):
# self.messageProducer.start()

# def resetQueue(self):
# self.messageProducer.resetQueue()
#
# def resetQueuePartial(self, numberOfMessagesToRemove):
# self.messageProducer.resetQueuePartial(numberOfMessagesToRemove)

def setupStreamingListeners(self, listenerConfig, parents, nodeName):
log.debug("parents {parents}", parents=str(parents))
Expand All @@ -47,12 +74,15 @@ def setupStreamingListeners(self, listenerConfig, parents, nodeName):
options.update(listenerConfig)
options['remoteAddress'] = remoteAddressUrl
options['messageOriginNodeName'] = parentName

def is_active():
return self._messageListeners.get(remoteAddressUrl) is not None and self.listeningToMessages # pylint: disable=cell-var-from-loop
listener = MessageListener(options, nodeName,is_active)
return self._messageListeners.get(
remoteAddressUrl) is not None and self.listeningToMessages # pylint: disable=cell-var-from-loop

listener = MessageListener(options, nodeName, is_active)
listener.registerMessageListener(self.onMessage)
self._messageListeners[remoteAddressUrl] = listener
if(self.listeningToMessages):
if (self.listeningToMessages):
listener.start()

if (parent['type'] == 'Del'):
Expand Down Expand Up @@ -91,12 +121,14 @@ def startMessageListening(self):
listener.start()

def sendMessage(self, msg, flowName=None):
if (self.messageProducer is None):
raise Exception('Trying to send a message from a none stream pipeline or after close had been applied on algorithm')
if (self.messageProducer.nodeNames):
if (self.method_invoke_queue is None):
raise Exception(
'Trying to send a message from a none stream pipeline or after close had been applied on algorithm')
if (self.nextNodes):
parsedFlow = None
if (flowName is None):
if hasattr(self.threadLocalStorage, 'messageFlowPattern') and self.threadLocalStorage.messageFlowPattern:
if hasattr(self.threadLocalStorage,
'messageFlowPattern') and self.threadLocalStorage.messageFlowPattern:
parsedFlow = self.threadLocalStorage.messageFlowPattern
else:
if (self.defaultFlow is None):
Expand All @@ -106,7 +138,7 @@ def sendMessage(self, msg, flowName=None):
parsedFlow = self.parsedFlows.get(flowName)
if (parsedFlow is None):
raise Exception("No such flow " + flowName)
self.messageProducer.produce(parsedFlow, msg)
self.method_invoke_queue.put({"flow": parsedFlow, "msg": msg})
else:
log.error("messageProducer has no consumers")

Expand All @@ -122,9 +154,11 @@ def stopStreaming(self, force=True):
self._messageListeners = dict()
self._inputListener.clear()

if (self.messageProducer is not None):
self.messageProducer.close(force)
self.messageProducer = None
if (self.method_invoke_queue is not None):
self.method_invoke_queue.put({"action": "stop", "force": True})
print("sent stop")
done = self.method_invoke_queue.get()
print("got " + str(done) + " stop from producer process")

def clearMessageListeners(self):
self._messageListeners = dict()
60 changes: 60 additions & 0 deletions hkube_python_wrapper/communication/streaming/producerProcess.py
@@ -0,0 +1,60 @@
from hkube_python_wrapper.communication.streaming.MessageProducer import MessageProducer
from hkube_python_wrapper.util.logger import log
from hkube_python_wrapper.wrapper.messages import messages



class ProducerRunner:
def __init__(self, method_invoke_queue, statistics_queue, options, childs, producerConfig, node):
self._options = options
self.node = node
self._method_invoke_queue = method_invoke_queue
self._statistics_queue = statistics_queue
self._wsc = None
self.messageProducer = None
self._childs = childs
self._producerConfig = producerConfig

def run(self):
self._setupStreamingProducer()
while True:
arg = self._method_invoke_queue.get() # Get data from the queue
if arg.get("action") == 'stop':
print ("got stop")
self.messageProducer.close(arg.get("force"))
print("stopeped")
self._method_invoke_queue.put("done")
print("sent back")
break # Exit loop if 'exit' is received
if (arg.get("action")=="queuesize"):
self._method_invoke_queue.put(len(self.messageProducer.adapter.messageQueue.queue))
self.messageProducer.produce(arg.get("flow"), arg.get("msg"))

def _setupStreamingProducer(self):
def onStatistics(statistics):
self._sendStatistics(statistics)

self.messageProducer = MessageProducer(self._producerConfig, self._childs, self.node)
self.messageProducer.registerStatisticsListener(onStatistics)
if (self._childs):
self.messageProducer.start()


def _sendStatistics(self, data):
try:
self._statistics_queue.put(data)
except Exception as e:
self.sendError(e)

def sendError(self, error):
try:
log.error(error)
self._wsc.send({
'command': messages.outgoing.error,
'error': {
'code': 'Failed',
'message': str(error)
}
})
except Exception as e:
log.error(e)
11 changes: 6 additions & 5 deletions hkube_python_wrapper/wrapper/algorunner.py
Expand Up @@ -357,14 +357,13 @@ def onStatistics(statistics):
producerConfig['encoding'] = config.discovery['encoding']
producerConfig['statisticsInterval'] = config.discovery['streaming']['statisticsInterval']
self.streamingManager.setupStreamingProducer(
onStatistics, producerConfig, self._job.childs, nodeName)
config, onStatistics, producerConfig, self._job.childs, nodeName)

def _start(self, options):
if (self._job and self._job.isStreaming):
self.streamingManager.setParsedFlows(self._job.parsedFlow, self._job.defaultFlow)
if (self._job.childs):
self._setupStreamingProducer(self._job.nodeName)
self.streamingManager.clearMessageListeners()
# pylint: disable=unused-argument
span = None
self._initDataServer(config)
Expand Down Expand Up @@ -532,14 +531,16 @@ def _exit(self, options):

def _checkQueueSize(self, event):
if (self._job and self._job.isStreaming):
if (self.streamingManager.messageProducer):
if (self.streamingManager.method_invoke_queue):
try:
self.streamingManager.method_invoke_queue.put({"action": "queuesize"})
len = self.streamingManager.method_invoke_queue.get()
log.info('Messages left in queue on {event}={queue}', event=event,
queue=str(len(self.streamingManager.messageProducer.adapter.messageQueue.queue)))
queue=str(len))
except Exception:
log.error('Failed to print number of messages left in queue on {event}', event=event)
else:
log.info('MessageProducer already None on {event}', event=event)
log.info('Queue size not reachable {event}', event=event)

def _sendCommand(self, command, data):
try:
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 2.6.0-dev10
current_version = 2.6.0-dev13
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-(?P<release>[a-z]+)(?P<build>\d+))?
Expand Down
4 changes: 3 additions & 1 deletion setup.py
Expand Up @@ -6,7 +6,9 @@

here = os.path.abspath(os.path.dirname(__file__))

VERSION = '2.6.0-dev10'

VERSION = '2.6.0-dev13'




Expand Down
7 changes: 4 additions & 3 deletions tests/test_streaming.py
Expand Up @@ -2,6 +2,7 @@
from hkube_python_wrapper.communication.streaming.MessageListener import MessageListener
from hkube_python_wrapper.communication.streaming.MessageProducer import MessageProducer
import time
from tests.configs import config

parsedFlows = {
'analyze': [{'source': 'A', 'next': ['B']}, {'source': 'B', 'next': ['C']}, {'source': 'C', 'next': ['D']}],
Expand All @@ -25,7 +26,7 @@ def statsInvoked(args):
listen_config = {'encoding': 'msgpack', 'delay': 10}
streamingManager = StreamingManager()
streamingManager.setParsedFlows(parsedFlows, 'analyze')
streamingManager.setupStreamingProducer(statsInvoked, producer_config, [nodeName], 'A')
streamingManager.setupStreamingProducer(config, statsInvoked, producer_config, [nodeName], 'A')
streamingManager.setupStreamingListeners(listen_config, parents, nodeName)
streamingManager.registerInputListener(onMessage)
streamingManager.startMessageListening()
Expand Down Expand Up @@ -71,11 +72,11 @@ def statsInvoked(args):

streamingManagerA = StreamingManager()
streamingManagerA.setParsedFlows(parsedFlows, 'analyze')
streamingManagerA.setupStreamingProducer(statsInvoked, producer_configA, ['B'], 'A')
streamingManagerA.setupStreamingProducer(config, statsInvoked, producer_configA, ['B'],'A')

streamingManagerB = StreamingManager()
streamingManagerB.setParsedFlows(parsedFlows, 'analyze')
streamingManagerB.setupStreamingProducer(statsInvoked, producer_configB, ['C'], 'B')
streamingManagerB.setupStreamingProducer(config, statsInvoked, producer_configB, ['C'], 'B')
streamingManagerB.setupStreamingListeners(listen_config, parents1, 'B')
streamingManagerB.registerInputListener(onMessageAtB)
streamingManagerB.startMessageListening()
Expand Down

0 comments on commit da6b1c9

Please sign in to comment.