Skip to content

Commit

Permalink
pylint and code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
parmitam committed May 17, 2017
1 parent 1534a92 commit 3f6dab0
Showing 1 changed file with 40 additions and 39 deletions.
79 changes: 40 additions & 39 deletions python/MyriaPythonWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,61 +72,61 @@ def write_long(value, stream):
class PickleSerializer(Serializer):

@classmethod
def read_item(self, stream, itemType, length):
def read_item(cls, stream, item_type, length):
obj = None
if itemType == DataType.INT:
obj = self.read_int(stream)
elif itemType == DataType.LONG:
obj = self.read_long(stream)
elif itemType == DataType.FLOAT:
obj = self.read_float(stream)
elif itemType == DataType.DOUBLE:
obj = self.read_double(stream)
elif itemType == DataType.BLOB:
obj = self.loads(stream.read(length))
if item_type == DataType.INT:
obj = cls.read_int(stream)
elif item_type == DataType.LONG:
obj = cls.read_long(stream)
elif item_type == DataType.FLOAT:
obj = cls.read_float(stream)
elif item_type == DataType.DOUBLE:
obj = cls.read_double(stream)
elif item_type == DataType.BLOB:
obj = cls.loads(stream.read(length))
return obj

@classmethod
def read_tuple(self, stream, tuplesize):
def read_tuple(cls, stream, tuplesize):
datalist = []
for i in range(tuplesize):
# first element read type
element_type = self.read_int(stream)
element_type = cls.read_int(stream)
# Second read the length
length = self.read_int(stream)
length = cls.read_int(stream)

if length == SpecialLengths.NULL or length == 0:
datalist.append(0)
# length is > 0, read the item now
elif length > 0:
obj = self.read_item(stream, element_type, length)
obj = cls.read_item(stream, element_type, length)
datalist.append(obj)
else:
raise ValueError("Invalid length for item.")

return datalist

@classmethod
def write_with_length( self, obj, stream, output_type):
def write_with_length(cls, obj, stream, output_type):
if output_type == DataType.INT:
self.write_int(DataType.INT, stream)
self.write_int(obj, stream)
cls.write_int(DataType.INT, stream)
cls.write_int(obj, stream)
elif output_type == DataType.LONG:
self.write_int(DataType.LONG, stream)
self.write_long(obj, stream)
cls.write_int(DataType.LONG, stream)
cls.write_long(obj, stream)
elif output_type == DataType.FLOAT:
self.write_int(DataType.FLOAT, stream)
self.write_float(obj, stream)
cls.write_int(DataType.FLOAT, stream)
cls.write_float(obj, stream)
elif output_type == DataType.DOUBLE:
self.write_int(DataType.DOUBLE, stream)
self.write_double(obj, stream)
cls.write_int(DataType.DOUBLE, stream)
cls.write_double(obj, stream)
elif output_type == DataType.BLOB:
self.write_int(DataType.BLOB, stream)
self.pickle_and_write(obj, stream)
cls.write_int(DataType.BLOB, stream)
cls.pickle_and_write(obj, stream)

@classmethod
def read_command(self, stream):
length = self.read_int(stream)
def read_command(cls, stream):
length = cls.read_int(stream)

if length < 0:
raise ValueError("Command cannot be less than zero.")
Expand All @@ -135,7 +135,7 @@ def read_command(self, stream):
if len(s) < length:
raise EOFError
unenc = base64.urlsafe_b64decode(s)
return self.loads(unenc)
return cls.loads(unenc)

@staticmethod
def dumps(obj):
Expand All @@ -147,15 +147,15 @@ def loads(obj):
return cPickle.loads(obj)

@classmethod
def pickle_and_write(self, obj, stream):
serialized = self.dumps(obj)
def pickle_and_write(cls, obj, stream):
serialized = cls.dumps(obj)

if serialized is None:
raise ValueError("Serialized value should not be None.")
elif len(serialized) > (1 << 31):
raise ValueError("Cannot serialize object larger than 2G.")

self.write_int(len(serialized), stream)
cls.write_int(len(serialized), stream)
stream.write(serialized)


Expand All @@ -180,7 +180,7 @@ def main(in_file, out_file):
tuple_list.append(pickle_serializer.read_tuple(in_file, tuple_size))

retval = func(tuple_list)
if is_flatmap > 0 :
if is_flatmap > 0:
count = len(retval)
pickle_serializer.write_int(count, out_file)
for i in range(count):
Expand All @@ -201,16 +201,17 @@ def main(in_file, out_file):
# JVM closed the socket
pass
except Exception:
print("Python worker process failed with exception:\n{}".format(traceback.format_exc()), file=sys.stderr)
print("Python worker process failed with exception:\n{}".
format(traceback.format_exc()), file=sys.stderr)
exit(-1)


if __name__ == '__main__':
# Read a local port to connect to from stdin
java_port = int(sys.stdin.readline())
port_number = int(sys.stdin.readline())
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("127.0.0.1", java_port))
infile = os.fdopen(os.dup(sock.fileno()), "rb", 65536)
outfile = os.fdopen(os.dup(sock.fileno()), "wb", 65536)
main(infile, outfile)
sock.connect(("127.0.0.1", port_number))
with os.fdopen(os.dup(sock.fileno()), "rb", 65536) as infile,\
os.fdopen(os.dup(sock.fileno()), "wb", 65536) as outfile:
main(infile, outfile)

0 comments on commit 3f6dab0

Please sign in to comment.