Skip to content

Commit

Permalink
Merge f2add73 into e5048eb
Browse files Browse the repository at this point in the history
  • Loading branch information
Golan Hallel committed Dec 3, 2020
2 parents e5048eb + f2add73 commit f45367f
Show file tree
Hide file tree
Showing 15 changed files with 309 additions and 170 deletions.
1 change: 1 addition & 0 deletions .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ jobs:
strategy:
matrix:
python-version: ["2.7", "3.5", "3.6", "3.7", "3.8"]
fail-fast: false
# Steps represent a sequence of tasks that will be executed as part of the job
steps:
- uses: actions/checkout@v2
Expand Down
68 changes: 8 additions & 60 deletions hkube_python_wrapper/codeApi/hkube_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,87 +5,35 @@
from .execution import Execution
from .waitFor import WaitForData
from hkube_python_wrapper.util.queueImpl import Empty
from ..communication.streaming.MessageListener import MessageListener
from ..communication.streaming.MessageProducer import MessageProducer



class HKubeApi:
"""Hkube interface for code-api operations"""

def __init__(self, wc, wrapper, dataAdapter, storage):
def __init__(self, wc, wrapper, dataAdapter, storage, streamingManager):
self._wc = wc
self._wrapper = wrapper
self._dataAdapter = dataAdapter
self._storage = storage
self._executions = {}
self._lastExecId = 0
self.messageProducer = None
self._messageListeners = dict()
self._inputListener = []
self.listeningToMessages = False

def setupStreamingProducer(self, onStatistics, producerConfig, nextNodes):
self.messageProducer = MessageProducer(producerConfig, nextNodes)
self.messageProducer.registerStatisticsListener(onStatistics)
if (nextNodes):
self.messageProducer.start()
self.streamingManager = streamingManager

def sendError(self, e):
self._wrapper.sendError(e)

def setupStreamingListeners(self, listenerConfig, parents, nodeName):
print("parents" + str(parents))
for predecessor in parents:
remoteAddress = 'tcp://' + \
predecessor['address']['host'] + ':' + \
str(predecessor['address']['port'])
if (predecessor['type'] == 'Add'):
options = {}
options.update(listenerConfig)
options['remoteAddress'] = remoteAddress
options['messageOriginNodeName'] = predecessor['nodeName']
listener = MessageListener(options, nodeName, self)
listener.registerMessageListener(self._onMessage)
self._messageListeners[remoteAddress] = listener
if (self.listeningToMessages):
listener.start()
if (predecessor['type'] == 'Del'):
if (self.listeningToMessages):
self._messageListeners[remoteAddress].close()
del self._messageListeners[remoteAddress]

def registerInputListener(self, onMessage):
self._inputListener.append(onMessage)

def _onMessage(self, msg, origin):
for listener in self._inputListener:
try:
listener(msg, origin)
except Exception as e:
print("hkube_api message listener through exception: " + str(e))
self.streamingManager.registerInputListener(onMessage)

def startMessageListening(self):
self.listeningToMessages = True
for listener in self._messageListeners.values():
if not (listener.is_alive()):
listener.start()
self.streamingManager.startMessageListening()

def sendMessage(self, msg):
if (self.messageProducer is None):
raise Exception('Trying to send a message from a none stream pipeline or after close had been sent to algorithm')
if (self.messageProducer.nodeNames):
self.messageProducer.produce(msg)
def sendMessage(self, msg, flowName=None):
self.streamingManager.sendMessage(msg, flowName)

def stopStreaming(self):
if (self.listeningToMessages):
for listener in self._messageListeners.values():
listener.close()
self._messageListeners = dict()
self.listeningToMessages = False
self._inputListener = []
if (self.messageProducer is not None):
self.messageProducer.close()
self.messageProducer = None
self.streamingManager.stopStreaming()

def _generateExecId(self):
self._lastExecId += 1
Expand Down
22 changes: 10 additions & 12 deletions hkube_python_wrapper/communication/streaming/MessageListener.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,33 @@
from hkube_python_wrapper.communication.zmq.streaming.ZMQListener import ZMQListener
import time

from hkube_python_wrapper.util.encoding import Encoding

#
from hkube_python_wrapper.util.DaemonThread import DaemonThread
import time


class MessageListener(DaemonThread):

def __init__(self, options, receiverNode, errorHandler=None):
self.errorHandler = errorHandler
remoteAddress = options['remoteAddress']
self.adapater = ZMQListener(remoteAddress, self.onMessage, receiverNode)
self.messageOriginNodeName = options['messageOriginNodeName']
encodingType = options['encoding']
self._encoding = Encoding(encodingType)
self.adapater = ZMQListener(remoteAddress, self.onMessage, self._encoding, receiverNode)
self.messageOriginNodeName = options['messageOriginNodeName']

self.messageListeners = []
DaemonThread.__init__(self, "MessageListener-"+ str(self.messageOriginNodeName))
DaemonThread.__init__(self, "MessageListener-" + str(self.messageOriginNodeName))

def registerMessageListener(self, listener):
self.messageListeners.append(listener)

def onMessage(self, header, msg):
def onMessage(self, messageFlowPattern, header, msg):
start = time.time()
decodedMsg = self._encoding.decode(header=header, value=msg)
for listener in self.messageListeners:
try:
listener(decodedMsg, self.messageOriginNodeName)
listener(messageFlowPattern, decodedMsg, self.messageOriginNodeName)
except Exception as e:
print('Error during MessageListener onMessage' + e)
print('Error during MessageListener onMessage' + str(e))

end = time.time()
duration = int((end - start) * 1000)
Expand All @@ -40,8 +38,8 @@ def run(self):
try:
self.adapater.start()
except Exception as e:
self.errorHandler.sendError(e)

if (self.errorHandler):
self.errorHandler.sendError(e)

def close(self):
self.messageListeners = []
Expand Down
12 changes: 6 additions & 6 deletions hkube_python_wrapper/communication/streaming/MessageProducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@


class MessageProducer(DaemonThread):
def __init__(self, options, nodeNames):
self.nodeNames = nodeNames
def __init__(self, options, consumerNodes, me):
self.nodeNames = consumerNodes
port = options['port']
maxMemorySize = options['messagesMemoryBuff'] * 1024 * 1024
encodingType = options['encoding']
statisticsInterval = options['statisticsInterval']
self._encoding = Encoding(encodingType)
self.adapter = ZMQProducer(port, maxMemorySize, self.responseAccumulator, consumerTypes=nodeNames)
self.adapter = ZMQProducer(port, maxMemorySize, self.responseAccumulator, consumerTypes=self.nodeNames, encoding=self._encoding, me=me)
self.responsesCache = {}
self.responseCount = {}
self.active = True
self.printStatistics = 0
for nodeName in nodeNames:
for nodeName in consumerNodes:
self.responsesCache[nodeName] = FifoArray(RESPONSE_CACHE)
self.responseCount[nodeName] = 0
self.listeners = []
Expand All @@ -38,9 +38,9 @@ def sendStatisticsEvery(interval):
runThread.start()
DaemonThread.__init__(self, "MessageProducer")

def produce(self, obj):
def produce(self, meesageFlowPattern, obj):
header, encodedMessage = self._encoding.encode(obj)
self.adapter.produce(header, encodedMessage)
self.adapter.produce(header, encodedMessage, messageFlowPattern=meesageFlowPattern)

def responseAccumulator(self, response, consumerType):
decodedResponse = self._encoding.decode(value=response, plainEncode=True)
Expand Down
87 changes: 87 additions & 0 deletions hkube_python_wrapper/communication/streaming/StreamingManager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import threading

from .MessageListener import MessageListener
from .MessageProducer import MessageProducer


class StreamingManager():
threadLocalStorage = threading.local()
def __init__(self):
self.messageProducer = None
self._messageListeners = dict()
self._inputListener = []
self.listeningToMessages = False
self.parsedFlows = {}
self.defaultFlow = None

def setParsedFlows(self, flows, defaultFlow):
self.parsedFlows = flows
self.defaultFlow = defaultFlow

def setupStreamingProducer(self, onStatistics, producerConfig, nextNodes, me):
self.messageProducer = MessageProducer(producerConfig, nextNodes, me)
self.messageProducer.registerStatisticsListener(onStatistics)
if (nextNodes):
self.messageProducer.start()

def setupStreamingListeners(self, listenerConfig, parents, nodeName):
print("parents" + str(parents))
for predecessor in parents:
remoteAddress = predecessor['address']
remoteAddressUrl = 'tcp://{host}:{port}'.format(host=remoteAddress['host'], port=remoteAddress['port'])
if (predecessor['type'] == 'Add'):
options = {}
options.update(listenerConfig)
options['remoteAddress'] = remoteAddressUrl
options['messageOriginNodeName'] = predecessor['nodeName']
listener = MessageListener(options, nodeName, self)
listener.registerMessageListener(self._onMessage)
self._messageListeners[remoteAddressUrl] = listener
if (self.listeningToMessages):
listener.start()
if (predecessor['type'] == 'Del'):
if (self.listeningToMessages):
self._messageListeners[remoteAddressUrl].close()
del self._messageListeners[remoteAddressUrl]

def registerInputListener(self, onMessage):
self._inputListener.append(onMessage)

def _onMessage(self, messageFlowPattern, msg, origin):
self.threadLocalStorage.messageFlowPattern = messageFlowPattern
for listener in self._inputListener:
try:
listener(msg, origin)
except Exception as e:
print("hkube_api message listener through exception: " + str(e))
self.threadLocalStorage.messageFlowPattern = []

def startMessageListening(self):
self.listeningToMessages = True
for listener in self._messageListeners.values():
if not (listener.is_alive()):
listener.start()

def sendMessage(self, msg, flowName=None):
if (self.messageProducer is None):
raise Exception('Trying to send a message from a none stream pipeline or after close had been applied on algorithm')
if (self.messageProducer.nodeNames):
if (flowName is None):
if (self.defaultFlow is None):
raise Exception("Streaming default flow is None")
flowName = self.defaultFlow
parsedFlow = self.parsedFlows.get(flowName)
if (parsedFlow is None):
raise Exception("No such flow " + flowName)
self.messageProducer.produce(parsedFlow, msg)

def stopStreaming(self):
if (self.listeningToMessages):
for listener in self._messageListeners.values():
listener.close()
self._messageListeners = dict()
self.listeningToMessages = False
self._inputListener = []
if (self.messageProducer is not None):
self.messageProducer.close()
self.messageProducer = None
24 changes: 16 additions & 8 deletions hkube_python_wrapper/communication/zmq/streaming/ZMQListener.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import time
import zmq
import msgpack
import uuid

HEARTBEAT_LIVENESS = 3
Expand All @@ -14,7 +13,8 @@

class ZMQListener(object):

def __init__(self, remoteAddress, onMessage, consumerType):
def __init__(self, remoteAddress, onMessage, encoding, consumerType):
self.encoding = encoding
self.onMessage = onMessage
self.consumerType = consumerType
self.remoteAddress = remoteAddress
Expand All @@ -29,7 +29,7 @@ def worker_socket(self, context, remoteAddress):
worker.setsockopt(zmq.IDENTITY, identity)
worker.connect(remoteAddress)
print("zmq listener connecting to " + remoteAddress)
worker.send_multipart([PPP_READY, msgpack.packb(self.consumerType)])
worker.send_multipart([PPP_READY, self.encoding.encode(self.consumerType, plainEncode=True)])
return worker

def start(self): # pylint: disable=too-many-branches
Expand All @@ -47,6 +47,7 @@ def start(self): # pylint: disable=too-many-branches
if (self.active):
print(e)
raise e
break
# Handle worker activity on backend
if result == zmq.POLLIN:
# Get message
Expand All @@ -59,21 +60,24 @@ def start(self): # pylint: disable=too-many-branches
if (self.active):
print(e)
raise e
break
if not frames:
if (self.active):
raise Exception("Connection to producer on " + self.remoteAddress + " interrupted")
break

if len(frames) == 2:
if len(frames) == 3:
liveness = HEARTBEAT_LIVENESS
result = self.onMessage(frames[0], frames[1])
newFrames = [result, msgpack.packb(self.consumerType)]
messageFlowPattern = self.encoding.decode(value=frames[0], plainEncode=True)
result = self.onMessage(messageFlowPattern, frames[1], frames[2])
newFrames = [result, self.encoding.encode(self.consumerType, plainEncode=True)]
try:
self.worker.send_multipart(newFrames)
except Exception as e:
if (self.active):
print(e)
raise e

break
elif len(frames) == 1 and frames[0] == PPP_HEARTBEAT:
liveness = HEARTBEAT_LIVENESS
else:
Expand All @@ -96,16 +100,20 @@ def start(self): # pylint: disable=too-many-branches
except Exception as e:
if (self.active):
print(e)
else:
break
self.worker = self.worker_socket(context, self.remoteAddress)
liveness = HEARTBEAT_LIVENESS

if time.time() > heartbeat_at:
heartbeat_at = time.time() + HEARTBEAT_INTERVAL
try:
self.worker.send_multipart([PPP_HEARTBEAT, msgpack.packb(self.consumerType)])
self.worker.send_multipart([PPP_HEARTBEAT, self.encoding.encode(self.consumerType, plainEncode=True)])
except Exception as e:
if (self.active):
print(e)
else:
break

def close(self):
if not (self.active):
Expand Down
20 changes: 20 additions & 0 deletions hkube_python_wrapper/communication/zmq/streaming/producer/Flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
class Flow:
def __init__(self, flow, meName):
self.flow = flow
self.me = None
for node in flow:
if self.me is None:
if node['source'] == meName:
self.me = node

def isNextInFlow(self, next):
if (self.me is None):
return False
return next in self.me['next']

def getRestOfFlow(self):
if (self.me is None):
return []
flowcopy = self.flow[:]
flowcopy.remove(self.me)
return flowcopy

0 comments on commit f45367f

Please sign in to comment.