Skip to content
Browse files

last fixes

  • Loading branch information...
1 parent 191d23b commit 6757bbea0e94dd654d5f51fa713b57b49bfb0459 @thomasjungblut committed Sep 12, 2012
Showing with 47 additions and 74 deletions.
  1. +39 −62 BSPPeer.py
  2. +1 −5 BinaryProtocol.py
  3. +4 −6 HelloWorldBSP.py
  4. +3 −1 README
View
101 BSPPeer.py
@@ -6,8 +6,7 @@
"""
from BspJobConfiguration import BspJobConfiguration
-from sys import stdin
-from sys import stdout
+from sys import stdout, stdin
from BinaryProtocol import BinaryProtocol as bp
class BSPPeer:
@@ -27,29 +26,25 @@ def initialize(self):
NUMBER OF CONF ITEMS (#KEY + #VALUES)
N-LINES, where line is key and the following the value
"""
-
- self.log("HELLO FROM PYTHON OMG!")
# parse our initial values
- line = stdin.readline()
- self.log("LINE: " + line)
+ line = readLine()
# start code is the first
if line == bp.getProtocolString(bp.START):
# check the protocol compatibility
- protocolNumber = int(stdin.readline())
+ protocolNumber = int(readLine())
if protocolNumber != self.PROTOCOL_VERSION:
raise RuntimeError(
"Protocol version mismatch: Expected: " + str(self.PROTOCOL_VERSION) +
" but got: " + str(protocolNumber))
- line = stdin.readline()
- self.log("LINE: " + line)
+ line = readLine()
# parse the configurations
if line == bp.getProtocolString(bp.SET_BSPJOB_CONF):
- numberOfItems = stdin.readline()
+ numberOfItems = readLine()
key = None
value = None
for i in range(0, int(numberOfItems), 2):
- key = stdin.readline()
- value = stdin.readline()
+ key = readLine()
+ value = readLine()
self.config.put(key, value)
self.ack(bp.START)
@@ -59,142 +54,124 @@ def send(self, peer, msg):
println(peer)
println(msg)
-
def getCurrentMessage(self):
println(bp.getProtocolString(bp.GET_MSG))
- line = stdin.readline()
+ line = readLine()
# if no message is send it will send %%-1%%
- if line == "%%-1%%\n":
+ if line == "%%-1%%":
return -1
return line;
+ def getAllMessages(self):
+ msgs = []
+ numMessages = self.getNumCurrentMessages()
+ for i in range(int(numMessages)):
+ msgs.append(self.getCurrentMessage())
+ return msgs
def getNumCurrentMessages(self):
println(bp.getProtocolString(bp.GET_MSG_COUNT))
- return stdin.readline()
-
+ return readLine()
def sync(self):
println(bp.getProtocolString(bp.SYNC))
# this should block now until we get a response
- line = stdin.readline()
- if line != (bp.getProtocolString(bp.SYNC) + "_SUCCESS\n"):
+ line = readLine()
+ if line != (bp.getProtocolString(bp.SYNC) + "_SUCCESS"):
raise RuntimeError(
"Barrier sync failed!")
-
def getSuperstepCount(self):
println(bp.getProtocolString(bp.GET_SUPERSTEP_COUNT))
- return stdin.readline()
-
+ return readLine()
def getPeerName(self):
- return self.getPeername(self, -1)
-
+ return self.getPeerNameForIndex(-1)
- def getPeerName(self, index):
+ def getPeerNameForIndex(self, index):
println(bp.getProtocolString(bp.GET_PEERNAME))
- println(index);
- return stdin.readline()
-
+ println(str(index));
+ return readLine()
def getPeerIndex(self):
println(bp.getProtocolString(bp.GET_PEER_INDEX))
- return stdin.readline()
-
+ return readLine()
def getAllPeerNames(self):
println(bp.getProtocolString(bp.GET_ALL_PEERNAME))
- ln = stdin.readline()
- self.log("allpeernames " + ln)
+ ln = readLine()
names = []
for i in range(int(ln)):
- self.log("in the loop ")
- peerName = stdin.readLine # TODO WHY IS THIS BLOCKING SO HARD?
- self.log("peername " + peerName)
+ peerName = readLine()
names.append(peerName)
- self.log("allpeernames " + names)
return names
-
def getNumPeers(self):
println(bp.getProtocolString(bp.GET_PEER_COUNT))
- return stdin.readline()
-
+ return readLine()
def clear(self):
println(bp.getProtocolString(bp.CLEAR))
-
def write(self, key, value):
println(bp.getProtocolString(bp.WRITE_KEYVALUE))
println(key)
println(value)
-
def readNext(self):
println(bp.getProtocolString(bp.READ_KEYVALUE))
- line = stdin.readline()
- secondLine = stdin.readline()
+ line = readLine()
+ secondLine = readLine()
# if no message is send it will send %%-1%%
- if line == "%%-1%%\n" and secondLine == "%%-1%%\n":
+ if line == "%%-1%%" and secondLine == "%%-1%%":
return -1
return [line, secondLine]
-
def reopenInput(self):
println(bp.getProtocolString(bp.REOPEN_INPUT))
-
# TODO counter!
def runSetup(self):
self.log("Starting setup!")
- line = stdin.readline()
- self.log("read: " + line)
+ line = readLine()
# start code is the first
if line.startswith(bp.getProtocolString(bp.RUN_SETUP)):
self.bspClass.setup(self);
self.ack(bp.RUN_SETUP)
-
def runBSP(self):
self.log("Starting BSP!")
- line = stdin.readline()
- self.log("read: " + line)
+ line = readLine()
# start code is the first
if line.startswith(bp.getProtocolString(bp.RUN_BSP)):
self.bspClass.bsp(self);
self.ack(bp.RUN_BSP)
-
def runCleanup(self):
self.log("Starting cleanup!")
- line = stdin.readline()
- self.log("read: " + line)
+ line = readLine()
# start code is the first
if line.startswith(bp.getProtocolString(bp.RUN_CLEANUP)):
self.bspClass.cleanup(self);
self.ack(bp.RUN_CLEANUP)
-
def ack(self, code):
println(bp.getAckProtocolString(code))
-
def done(self):
println(bp.getProtocolString(bp.TASK_DONE))
println(bp.getProtocolString(bp.DONE))
-
def log(self, msg):
- println(bp.getProtocolStringNL(bp.LOG) + msg)
+ println(bp.getProtocolString(bp.LOG) + msg)
+
+
+def readLine():
+ return stdin.readline().rstrip('\n')
def println(text):
- if "\n" in text:
- print(text)
- else:
- print(text + "\n")
+ print(text)
stdout.flush()
View
6 BinaryProtocol.py
@@ -34,13 +34,9 @@ class BinaryProtocol:
@staticmethod
def getProtocolString(opCode):
- return "%" + str(opCode) + "%=\n";
-
- @staticmethod
- def getProtocolStringNL(opCode):
return "%" + str(opCode) + "%=";
@staticmethod
def getAckProtocolString(opCode):
- return "%ACK_" + str(opCode) + "%=\n";
+ return "%ACK_" + str(opCode) + "%=";
View
10 HelloWorldBSP.py
@@ -9,14 +9,12 @@
class HelloWorldBSP(BSP):
def bsp(self, peer):
+ name = peer.getPeerName()
for i in range(15):
for otherPeer in peer.getAllPeerNames():
- peer.send(otherPeer, "Hello from " + peer.getPeerName() + " in superstep " + i)
+ peer.send(otherPeer, ("Hello from " + name + " in superstep " + str(i)))
peer.sync()
-
- for msg in peer.getCurrentMessage():
- if not msg:
- break
- peer.log(msg)
+ for msg in peer.getAllMessages():
+ peer.log(msg)
View
4 README
@@ -1 +1,3 @@
-This file was created by PyCharm 2.6.1 for binding GitHub repository
+Hama Streaming protocol implementation for Python.
+
+Using Python 3.2.3 and PyCharm 2.6.1.

0 comments on commit 6757bbe

Please sign in to comment.
Something went wrong with that request. Please try again.