Permalink
Browse files

initial commit

  • Loading branch information...
0 parents commit ea7cca8e2c047df777acc9651ded5c23937dfc5a Klaas Bosteels committed Jan 22, 2009
Showing with 233 additions and 0 deletions.
  1. +1 −0 README
  2. +12 −0 setup.py
  3. +26 −0 tests/testio.py
  4. +194 −0 typedbytes.py
1 README
@@ -0,0 +1 @@
+This a simple module to deal with so called "typed bytes".
12 setup.py
@@ -0,0 +1,12 @@
+from setuptools import setup
+
+setup(name='typedbytes',
+ version='0.1',
+ description='A Python module for dealing with so called "typed bytes"',
+ author='Klaas Bosteels',
+ author_email='klaas@last.fm',
+ url='http://github.com/klbostee/typedbytes',
+ py_modules=['typedbytes'],
+ test_suite='nose.collector',
+ tests_require=['nose']
+ )
26 tests/testio.py
@@ -0,0 +1,26 @@
+import os
+import unittest
+import typedbytes
+
+
+class TestIO(unittest.TestCase):
+
+ def testio(self):
+ objects = [True, 1234, 12345L, "trala",
+ (1,2,3), [1,2,3,4], {1:2,3:4}, set([1,2,3])]
+ file = open("test.bin", "wb")
+ output = typedbytes.Output(file)
+ for obj in objects:
+ output.write(obj)
+ file.close()
+ file = open("test.bin", "rb")
+ input = typedbytes.Input(file)
+ for (index, record) in enumerate(input):
+ self.assertEqual(objects[index], record)
+ file.close()
+ os.remove("test.bin")
+
+
+if __name__ == "__main__":
+ suite = unittest.TestLoader().loadTestsFromTestCase(TestIO)
+ unittest.TextTestRunner(verbosity=2).run(suite)
194 typedbytes.py
@@ -0,0 +1,194 @@
+import cPickle as pickle
+import types
+from struct import pack, unpack
+
+
+BYTES = 0
+BYTE = 1
+BOOL = 2
+INT = 3
+LONG = 4
+FLOAT = 5
+DOUBLE = 6
+STRING = 7
+VECTOR = 8
+LIST = 9
+MAP = 10
+
+MARKER = 255
+
+
+class Output(object):
+
+ def __init__(self, file):
+ self.file = file
+
+ def write(self, obj):
+ t = type(obj)
+ if t == types.BooleanType:
+ self.write_bool(obj)
+ elif t == types.IntType:
+ self.write_long(obj) # Python ints are 64 bit
+ elif t == types.FloatType:
+ self.write_double(obj) # Python floats are 64 bit
+ elif t == types.StringType:
+ self.write_string(obj)
+ elif t == types.TupleType:
+ self.write_vector(obj)
+ elif t == types.ListType:
+ self.write_list(obj)
+ elif t == types.DictType:
+ self.write_map(obj)
+ else:
+ self.write_bytes(pickle.dumps(obj, protocol=2))
+
+ def write_bytes(self, bytes):
+ self.file.write(pack('!Bi', BYTES, len(bytes)))
+ self.file.write(bytes)
+
+ def write_byte(self, byte):
+ self.file.write(pack('!Bb', BYTE, byte))
+
+ def write_bool(self, bool_):
+ self.file.write(pack('!Bb', BOOL, int(bool_)))
+
+ def write_int(self, int_):
+ self.file.write(pack('!Bi', INT, int_))
+
+ def write_long(self, long_):
+ self.file.write(pack('!Bq', LONG, long_))
+
+ def write_float(self, float_):
+ self.file.write(pack('!Bf', FLOAT, float_))
+
+ def write_double(self, double):
+ self.file.write(pack('!Bd', DOUBLE, double))
+
+ def write_string(self, string):
+ self.file.write(pack('!Bi', STRING, len(string)))
+ self.file.write(string)
+
+ def write_vector(self, vector):
+ self.file.write(pack('!Bi', VECTOR, len(vector)))
+ for obj in vector:
+ Output.write(self, obj)
+
+ def write_list(self, list_):
+ self.file.write(pack('!B', LIST))
+ for obj in list_:
+ Output.write(self, obj)
+ self.file.write(pack('!B', MARKER))
+
+ def write_map(self, map):
+ self.file.write(pack('!Bi', MAP, len(map)))
+ for (key, value) in map.iteritems():
+ Output.write(self, key)
+ Output.write(self, value)
+
+
+class Input(object):
+
+ def __init__(self, file):
+ self.file = file
+
+ def read(self):
+ t = self.read_type()
+ if t == None:
+ return None
+ elif t == BYTE:
+ return self.read_byte()
+ elif t == BOOL:
+ return self.read_bool()
+ elif t == INT:
+ return self.read_int()
+ elif t == LONG:
+ return self.read_long()
+ elif t == FLOAT:
+ return self.read_float()
+ elif t == DOUBLE:
+ return self.read_double()
+ elif t == STRING:
+ return self.read_string()
+ elif t == VECTOR:
+ return self.read_vector()
+ elif t == LIST:
+ return self.read_list()
+ elif t == MAP:
+ return self.read_map()
+ elif t == BYTES:
+ return pickle.loads(self.read_bytes())
+ elif t == MARKER:
+ return None
+ else:
+ raise ValueError("invalid type byte: " + str(t))
+
+ def __iter__(self):
+ record = self.read()
+ while record != None:
+ yield record
+ record = self.read()
+
+ def read_type(self):
+ byte = self.file.read(1)
+ if byte:
+ return unpack('!B', byte)[0]
+
+ def read_bytes(self):
+ count = unpack('!i', self.file.read(4))[0]
+ return self.file.read(count)
+
+ def read_byte(self):
+ return unpack('!b', self.file.read(1))[0]
+
+ def read_bool(self):
+ return bool(unpack('!b', self.file.read(1))[0])
+
+ def read_int(self):
+ return unpack('!i', self.file.read(4))[0]
+
+ def read_long(self):
+ return unpack('!q', self.file.read(8))[0]
+
+ def read_float(self):
+ return unpack('!f', self.file.read(4))[0]
+
+ def read_double(self):
+ return unpack('!d', self.file.read(8))[0]
+
+ def read_string(self):
+ count = unpack('!i', self.file.read(4))[0]
+ return self.file.read(count)
+
+ def read_vector(self):
+ count = unpack('!i', self.file.read(4))[0]
+ return tuple(Input.read(self) for i in xrange(count))
+
+ def read_list(self):
+ obj = Input.read(self)
+ list_ = []
+ while obj != None:
+ list_.append(obj)
+ obj = Input.read(self)
+ return list_
+
+ def read_map(self):
+ count = unpack('!i', self.file.read(4))[0]
+ return dict((Input.read(self), Input.read(self)) \
+ for i in xrange(count))
+
+
+class PairedInput(Input):
+
+ def read(self):
+ record = Input.read(self)
+ if record != None:
+ return (record, Input.read(self))
+
+
+class PairedOutput(Output):
+
+ def write(self, obj1, obj2):
+ Output.write(self, obj1)
+ Output.write(self, obj2)
+
+

0 comments on commit ea7cca8

Please sign in to comment.