Permalink
Browse files

First commit

  • Loading branch information...
thomasjungblut committed Sep 12, 2012
0 parents commit 191d23b5bd4302996cea45a053f0cf498a03cb35
Showing with 323 additions and 0 deletions.
  1. +20 −0 BSP.py
  2. +200 −0 BSPPeer.py
  3. +20 −0 BSPRunner.py
  4. +46 −0 BinaryProtocol.py
  5. +14 −0 BspJobConfiguration.py
  6. +22 −0 HelloWorldBSP.py
  7. +1 −0 README
20 BSP.py
@@ -0,0 +1,20 @@
+"""
+
+BSP Class that can be overridden to implement the computation logic.
+
+"""
+from BSPPeer import BSPPeer
+
+class BSP:
+
+ def __init__(self):
+ pass
+
+ def setup(self, peer):
+ pass
+
+ def bsp(self, peer):
+ pass
+
+ def cleanup(self, peer):
+ pass
@@ -0,0 +1,200 @@
+"""
+
+The BSPPeer handles the incoming protocol requests and forwards it to the BSP class.
+Basically you can register the to be executed BSP class into this peer,
+it will then callback the according methods.
+
+"""
+from BspJobConfiguration import BspJobConfiguration
+from sys import stdin
+from sys import stdout
+from BinaryProtocol import BinaryProtocol as bp
+
+class BSPPeer:
+ PROTOCOL_VERSION = 0
+
+ def __init__(self, bspClass):
+ self.config = BspJobConfiguration()
+ self.bspClass = bspClass;
+ self.initialize()
+
+ def initialize(self):
+ """
+ INIT protocol works as follows:
+ START OP_CODE
+ PROTOCOL_NUMBER
+ SET_BSPJOB_CONF OP_CODE
+ NUMBER OF CONF ITEMS (#KEY + #VALUES)
+ N-LINES, where line is key and the following the value
+ """
+
+ self.log("HELLO FROM PYTHON OMG!")
+ # parse our initial values
+ line = stdin.readline()
+ self.log("LINE: " + line)
+ # start code is the first
+ if line == bp.getProtocolString(bp.START):
+ # check the protocol compatibility
+ protocolNumber = int(stdin.readline())
+ if protocolNumber != self.PROTOCOL_VERSION:
+ raise RuntimeError(
+ "Protocol version mismatch: Expected: " + str(self.PROTOCOL_VERSION) +
+ " but got: " + str(protocolNumber))
+ line = stdin.readline()
+ self.log("LINE: " + line)
+ # parse the configurations
+ if line == bp.getProtocolString(bp.SET_BSPJOB_CONF):
+ numberOfItems = stdin.readline()
+ key = None
+ value = None
+ for i in range(0, int(numberOfItems), 2):
+ key = stdin.readline()
+ value = stdin.readline()
+ self.config.put(key, value)
+
+ self.ack(bp.START)
+
+ def send(self, peer, msg):
+ println(bp.getProtocolString(bp.SEND_MSG))
+ println(peer)
+ println(msg)
+
+
+ def getCurrentMessage(self):
+ println(bp.getProtocolString(bp.GET_MSG))
+ line = stdin.readline()
+ # if no message is send it will send %%-1%%
+ if line == "%%-1%%\n":
+ return -1
+
+ return line;
+
+
+ def getNumCurrentMessages(self):
+ println(bp.getProtocolString(bp.GET_MSG_COUNT))
+ return stdin.readline()
+
+
+ def sync(self):
+ println(bp.getProtocolString(bp.SYNC))
+ # this should block now until we get a response
+ line = stdin.readline()
+ if line != (bp.getProtocolString(bp.SYNC) + "_SUCCESS\n"):
+ raise RuntimeError(
+ "Barrier sync failed!")
+
+
+ def getSuperstepCount(self):
+ println(bp.getProtocolString(bp.GET_SUPERSTEP_COUNT))
+ return stdin.readline()
+
+
+ def getPeerName(self):
+ return self.getPeername(self, -1)
+
+
+ def getPeerName(self, index):
+ println(bp.getProtocolString(bp.GET_PEERNAME))
+ println(index);
+ return stdin.readline()
+
+
+ def getPeerIndex(self):
+ println(bp.getProtocolString(bp.GET_PEER_INDEX))
+ return stdin.readline()
+
+
+ def getAllPeerNames(self):
+ println(bp.getProtocolString(bp.GET_ALL_PEERNAME))
+ ln = stdin.readline()
+ self.log("allpeernames " + ln)
+ names = []
+ for i in range(int(ln)):
+ self.log("in the loop ")
+ peerName = stdin.readLine # TODO WHY IS THIS BLOCKING SO HARD?
+ self.log("peername " + peerName)
+ names.append(peerName)
+ self.log("allpeernames " + names)
+ return names
+
+
+ def getNumPeers(self):
+ println(bp.getProtocolString(bp.GET_PEER_COUNT))
+ return stdin.readline()
+
+
+ def clear(self):
+ println(bp.getProtocolString(bp.CLEAR))
+
+
+ def write(self, key, value):
+ println(bp.getProtocolString(bp.WRITE_KEYVALUE))
+ println(key)
+ println(value)
+
+
+ def readNext(self):
+ println(bp.getProtocolString(bp.READ_KEYVALUE))
+ line = stdin.readline()
+ secondLine = stdin.readline()
+ # if no message is send it will send %%-1%%
+ if line == "%%-1%%\n" and secondLine == "%%-1%%\n":
+ return -1
+ return [line, secondLine]
+
+
+ def reopenInput(self):
+ println(bp.getProtocolString(bp.REOPEN_INPUT))
+
+
+ # TODO counter!
+
+ def runSetup(self):
+ self.log("Starting setup!")
+ line = stdin.readline()
+ self.log("read: " + line)
+ # start code is the first
+ if line.startswith(bp.getProtocolString(bp.RUN_SETUP)):
+ self.bspClass.setup(self);
+ self.ack(bp.RUN_SETUP)
+
+
+ def runBSP(self):
+ self.log("Starting BSP!")
+ line = stdin.readline()
+ self.log("read: " + line)
+ # start code is the first
+ if line.startswith(bp.getProtocolString(bp.RUN_BSP)):
+ self.bspClass.bsp(self);
+ self.ack(bp.RUN_BSP)
+
+
+ def runCleanup(self):
+ self.log("Starting cleanup!")
+ line = stdin.readline()
+ self.log("read: " + line)
+ # start code is the first
+ if line.startswith(bp.getProtocolString(bp.RUN_CLEANUP)):
+ self.bspClass.cleanup(self);
+ self.ack(bp.RUN_CLEANUP)
+
+
+ def ack(self, code):
+ println(bp.getAckProtocolString(code))
+
+
+ def done(self):
+ println(bp.getProtocolString(bp.TASK_DONE))
+ println(bp.getProtocolString(bp.DONE))
+
+
+ def log(self, msg):
+ println(bp.getProtocolStringNL(bp.LOG) + msg)
+
+
+def println(text):
+ if "\n" in text:
+ print(text)
+ else:
+ print(text + "\n")
+ stdout.flush()
@@ -0,0 +1,20 @@
+"""
+
+Main Runner utility that will get the bsp class from the user, passed via args and start
+it with the whole context and stuff.
+
+"""
+import sys
+from BSPPeer import BSPPeer
+
+className = sys.argv[1]
+module = __import__(className)
+class_ = getattr(module, className)
+
+bspInstance = class_()
+
+peer = BSPPeer(bspInstance)
+peer.runSetup()
+peer.runBSP()
+peer.runCleanup()
+peer.done()
@@ -0,0 +1,46 @@
+"""
+
+Binary protocol to communicate with the Java BSP task via streams.
+
+"""
+
+class BinaryProtocol:
+ START = 0
+ SET_BSPJOB_CONF = 1
+ SET_INPUT_TYPES = 2
+ RUN_SETUP = 3
+ RUN_BSP = 4
+ RUN_CLEANUP = 5
+ READ_KEYVALUE = 6
+ WRITE_KEYVALUE = 7
+ GET_MSG = 8
+ GET_MSG_COUNT = 9
+ SEND_MSG = 10
+ SYNC = 11
+ GET_ALL_PEERNAME = 12
+ GET_PEERNAME = 13
+ GET_PEER_INDEX = 14
+ GET_PEER_COUNT = 15
+ GET_SUPERSTEP_COUNT = 16
+ REOPEN_INPUT = 17
+ CLEAR = 18
+ CLOSE = 19
+ ABORT = 20
+ DONE = 21
+ TASK_DONE = 22
+ REGISTER_COUNTER = 23
+ INCREMENT_COUNTER = 24
+ LOG = 25
+
+ @staticmethod
+ def getProtocolString(opCode):
+ return "%" + str(opCode) + "%=\n";
+
+ @staticmethod
+ def getProtocolStringNL(opCode):
+ return "%" + str(opCode) + "%=";
+
+ @staticmethod
+ def getAckProtocolString(opCode):
+ return "%ACK_" + str(opCode) + "%=\n";
+
@@ -0,0 +1,14 @@
+"""
+
+A mimic configuration object contains a dictionary that maps keys to values to store information.
+
+"""
+class BspJobConfiguration:
+ def __init__(self):
+ self.conf = {}
+
+ def get(self, key):
+ return self.conf[key]
+
+ def put(self, key, value):
+ self.conf[key] = value
@@ -0,0 +1,22 @@
+"""
+
+Basic Hello World BSP, in Hama this is called serialize printing.
+Each task sends its peer name to each other task who reads the
+message and outputs it to console.
+
+"""
+from BSP import BSP
+
+class HelloWorldBSP(BSP):
+ def bsp(self, peer):
+ for i in range(15):
+ for otherPeer in peer.getAllPeerNames():
+ peer.send(otherPeer, "Hello from " + peer.getPeerName() + " in superstep " + i)
+ peer.sync()
+
+ for msg in peer.getCurrentMessage():
+ if not msg:
+ break
+ peer.log(msg)
+
+
1 README
@@ -0,0 +1 @@
+This file was created by PyCharm 2.6.1 for binding GitHub repository

0 comments on commit 191d23b

Please sign in to comment.