Skip to content

Commit

Permalink
Added support for DynamicCompositeType.
Browse files Browse the repository at this point in the history
  • Loading branch information
umairmufti committed Jan 8, 2013
1 parent 74a5a32 commit 0e3c1b5
Show file tree
Hide file tree
Showing 3 changed files with 263 additions and 5 deletions.
89 changes: 89 additions & 0 deletions pycassa/marshal.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ def extract_type_name(typestr):
if typestr is None:
return 'BytesType'

if "DynamicCompositeType" in typestr:
return _get_composite_name(typestr)

if "CompositeType" in typestr:
return _get_composite_name(typestr)

Expand Down Expand Up @@ -140,10 +143,93 @@ def unpack_composite(bytestr):

return unpack_composite

def get_dynamic_composite_packer(typestr):
cassandra_types = {}
for inner_type in _get_inner_types(typestr):
alias, cassandra_type = inner_type.split('=>')
cassandra_types[alias] = cassandra_type

len_packer = _short_packer.pack

def pack_dynamic_composite(items, slice_start=None):
last_index = len(items) - 1
s = ''
i = 0
for (alias, item) in items:
eoc = '\x00'
if isinstance(alias, tuple):
inclusive = item
alias, item = alias
if inclusive:
if slice_start:
eoc = '\xff'
elif slice_start is False:
eoc = '\x01'
else:
if slice_start:
eoc = '\x01'
elif slice_start is False:
eoc = '\xff'
elif i == last_index:
if slice_start:
eoc = '\xff'
elif slice_start is False:
eoc = '\x01'
if isinstance(alias, str) and len(alias) == 1:
header = '\x80' + alias
packer = packer_for(cassandra_types[alias])
else:
cassandra_type = str(alias).split('(')[0]
header = len_packer(len(cassandra_type)) + cassandra_type
packer = packer_for(cassandra_type)
i += 1

packed = packer(item)
s += ''.join((header, len_packer(len(packed)), packed, eoc))
return s

return pack_dynamic_composite

def get_dynamic_composite_unpacker(typestr):
cassandra_types = {}
for inner_type in _get_inner_types(typestr):
alias, cassandra_type = inner_type.split('=>')
cassandra_types[alias] = cassandra_type

len_unpacker = lambda v: _short_packer.unpack(v)[0]

def unpack_dynamic_composite(bytestr):
# The composite format for each component is:
# <header> <len> <value> <eoc>
# ? bytes | 2 bytes | ? bytes | 1 byte
types = []
components = []
while bytestr:
header = len_unpacker(bytestr[:2])
if header & 0x8000:
alias = bytestr[1]
types.append(alias)
unpacker = unpacker_for(cassandra_types[alias])
bytestr = bytestr[2:]
else:
cassandra_type = bytestr[2:2 + header]
types.append(cassandra_type)
unpacker = unpacker_for(cassandra_type)
bytestr = bytestr[2 + header:]
length = len_unpacker(bytestr[:2])
components.append(unpacker(bytestr[2:2 + length]))
bytestr = bytestr[3 + length:]
return tuple(zip(types, components))

return unpack_dynamic_composite

def packer_for(typestr):
if typestr is None:
return lambda v: v

if "DynamicCompositeType" in typestr:
return get_dynamic_composite_packer(typestr)

if "CompositeType" in typestr:
return get_composite_packer(typestr)

Expand Down Expand Up @@ -238,6 +324,9 @@ def unpacker_for(typestr):
if typestr is None:
return lambda v: v

if "DynamicCompositeType" in typestr:
return get_dynamic_composite_unpacker(typestr)

if "CompositeType" in typestr:
return get_composite_unpacker(typestr)

Expand Down
38 changes: 37 additions & 1 deletion pycassa/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
'AsciiType', 'UTF8Type', 'TimeUUIDType', 'LexicalUUIDType',
'CounterColumnType', 'DoubleType', 'FloatType', 'DecimalType',
'BooleanType', 'DateType', 'OldPycassaDateType',
'IntermediateDateType', 'CompositeType')
'IntermediateDateType', 'CompositeType',
'UUIDType', 'DynamicCompositeType')

class CassandraType(object):
"""
Expand Down Expand Up @@ -92,6 +93,10 @@ class UTF8Type(CassandraType):
""" Stores data as UTF8 encoded text """
pass

class UUIDType(CassandraType):
""" Stores data as a type 1 or type 4 UUID """
pass

class TimeUUIDType(CassandraType):
""" Stores data as a version 1 UUID """
pass
Expand Down Expand Up @@ -251,3 +256,34 @@ def pack(self):
@property
def unpack(self):
return marshal.get_composite_unpacker(composite_type=self)

class DynamicCompositeType(CassandraType):
"""
A type composed of one or more components, each of
which have their own type. When sorted, items are
primarily sorted by their first component, secondarily
by their second component, and so on.
Unlike CompositeType, DynamicCompositeType columns
need not all be of the same structure. Each column
can be composed of different component types.
Components are specified using a 2-tuple made up of
a comparator type and value. Aliases for comparator
types can optionally be specified with a dictionary
during instantiation.
"""

def __init__(self, *aliases):
self.aliases = {}
for alias in aliases:
if isinstance(alias, dict):
self.aliases.update(alias)

def __str__(self):
aliases = []
for k, v in self.aliases.iteritems():
aliases.append(k + '=>' + str(v))
return "DynamicCompositeType(" + ", ".join(aliases) + ")"

141 changes: 137 additions & 4 deletions tests/test_autopacking.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from pycassa.types import (LongType, IntegerType, TimeUUIDType, LexicalUUIDType,
AsciiType, UTF8Type, BytesType, CompositeType,
OldPycassaDateType, IntermediateDateType, DateType,
BooleanType, CassandraType, DecimalType)
BooleanType, CassandraType, DecimalType,
FloatType, Int32Type, UUIDType, DoubleType, DynamicCompositeType)
from pycassa.index import create_index_expression, create_index_clause
import pycassa.marshal as marshal

Expand All @@ -20,6 +21,7 @@
import uuid
import unittest
import time
from collections import namedtuple

TIME1 = uuid.UUID(hex='ddc6118e-a003-11df-8abf-00234d21610a')
TIME2 = uuid.UUID(hex='40ad6d4c-a004-11df-8abf-00234d21610a')
Expand Down Expand Up @@ -55,6 +57,13 @@ def setup_class(cls):
sys.create_column_family(TEST_KS, 'StdBytes', comparator_type=BytesType())
sys.create_column_family(TEST_KS, 'StdComposite',
comparator_type=CompositeType(LongType(), BytesType()))
sys.create_column_family(TEST_KS, 'StdDynamicComposite',
comparator_type=DynamicCompositeType({'a': AsciiType(),
'b': BytesType(), 'c': DecimalType(), 'd': DateType(),
'f': FloatType(), 'i': IntegerType(), 'l': LongType(),
'n': Int32Type(), 's': UTF8Type(), 't': TimeUUIDType(),
'u': UUIDType(), 'w': DoubleType(), 'x': LexicalUUIDType(),
'y': BooleanType()}))
sys.close()

cls.cf_long = ColumnFamily(pool, 'StdLong')
Expand All @@ -67,10 +76,11 @@ def setup_class(cls):
cls.cf_utf8 = ColumnFamily(pool, 'StdUTF8')
cls.cf_bytes = ColumnFamily(pool, 'StdBytes')
cls.cf_composite = ColumnFamily(pool, 'StdComposite')
cls.cf_dynamic_composite = ColumnFamily(pool, 'StdDynamicComposite')

cls.cfs = [cls.cf_long, cls.cf_int, cls.cf_time, cls.cf_lex,
cls.cf_ascii, cls.cf_utf8, cls.cf_bytes,
cls.cf_composite]
cls.cf_ascii, cls.cf_utf8, cls.cf_bytes, cls.cf_composite,
cls.cf_dynamic_composite]

def tearDown(self):
for cf in TestCFs.cfs:
Expand Down Expand Up @@ -128,7 +138,17 @@ def test_standard_column_family(self):
composite_cols = [(1, 'foo'), (2, 'bar'), (3, 'baz')]
type_groups.append(self.make_group(TestCFs.cf_composite, composite_cols))

# Begin the actual inserting and getting
dynamic_composite_cols = [(('LongType', 1), ('BytesType', 'foo')),
(('LongType', 2), ('BytesType', 'bar')),
(('LongType', 3), ('BytesType', 'baz'))]
type_groups.append(self.make_group(TestCFs.cf_dynamic_composite, dynamic_composite_cols))

dynamic_composite_alias_cols = [(('l', 1), ('b', 'foo')),
(('l', 2), ('b', 'bar')),
(('l', 3), ('b', 'baz'))]
type_groups.append(self.make_group(TestCFs.cf_dynamic_composite, dynamic_composite_alias_cols))

# Begin the actual inserting and getting
for group in type_groups:
cf = group.get('cf')
gdict = group.get('dict')
Expand Down Expand Up @@ -895,6 +915,118 @@ def test_single_component_composite(self):
cf.insert('key', {(123456,): 'val'})
assert_equal(cf.get('key'), {(123456,): 'val'})

class TestDynamicComposites(unittest.TestCase):

@classmethod
def setup_class(cls):
sys = SystemManager()
sys.create_column_family(TEST_KS, 'StaticDynamicComposite',
comparator_type=DynamicCompositeType({'l': LongType(),
'i': IntegerType(),
'T': TimeUUIDType(reversed=True),
'x': LexicalUUIDType(reversed=False),
'a': AsciiType(),
's': UTF8Type(),
'b': BytesType()}))

@classmethod
def teardown_class(cls):
sys = SystemManager()
sys.drop_column_family(TEST_KS, 'StaticDynamicComposite')

def setUp(self):
global a, b, i, I, x, l, t, T, s

component = namedtuple('DynamicComponent', ['type','value'])
ascii_alias = component('a', None)
bytes_alias = component('b', None)
integer_alias = component('i', None)
integer_rev_alias = component('I', None)
lexicaluuid_alias = component('x', None)
long_alias = component('l', None)
timeuuid_alias = component('t', None)
timeuuid_rev_alias = component('T', None)
utf8_alias = component('s', None)

_r = lambda t, v: t._replace(value=v)
a = lambda v: _r(ascii_alias, v)
b = lambda v: _r(bytes_alias, v)
i = lambda v: _r(integer_alias, v)
I = lambda v: _r(integer_rev_alias, v)
x = lambda v: _r(lexicaluuid_alias, v)
l = lambda v: _r(long_alias, v)
t = lambda v: _r(timeuuid_alias, v)
T = lambda v: _r(timeuuid_rev_alias, v)
s = lambda v: _r(utf8_alias, v)

def test_static_composite_basic(self):
cf = ColumnFamily(pool, 'StaticDynamicComposite')
colname = (l(127312831239123123), i(1), T(uuid.uuid1()), x(uuid.uuid4()), a('foo'), s(u'ba\u0254r'), b('baz'))
cf.insert('key', {colname: 'val'})
assert_equal(cf.get('key'), {colname: 'val'})

def test_static_composite_slicing(self):
cf = ColumnFamily(pool, 'StaticDynamicComposite')
u1 = uuid.uuid1()
u4 = uuid.uuid4()
col0 = (l(0), i(1), T(u1), x(u4), a(''), s(''), b(''))
col1 = (l(1), i(1), T(u1), x(u4), a(''), s(''), b(''))
col2 = (l(1), i(2), T(u1), x(u4), a(''), s(''), b(''))
col3 = (l(1), i(3), T(u1), x(u4), a(''), s(''), b(''))
col4 = (l(2), i(1), T(u1), x(u4), a(''), s(''), b(''))
cf.insert('key2', {col0: '', col1: '', col2: '', col3: '', col4: ''})

result = cf.get('key2', column_start=((l(1), True),), column_finish=((l(1), True),))
assert_equal(result, {col1: '', col2: '', col3: ''})

result = cf.get('key2', column_start=(l(1),), column_finish=((l(2), False), ))
assert_equal(result, {col1: '', col2: '', col3: ''})

result = cf.get('key2', column_start=((l(1), True),), column_finish=((l(2), False), ))
assert_equal(result, {col1: '', col2: '', col3: ''})

result = cf.get('key2', column_start=(l(1), ), column_finish=((l(2), False), ))
assert_equal(result, {col1: '', col2: '', col3: ''})

result = cf.get('key2', column_start=((l(0), False), ), column_finish=((l(2), False), ))
assert_equal(result, {col1: '', col2: '', col3: ''})

result = cf.get('key2', column_start=(l(1), i(1)), column_finish=(l(1), i(3)))
assert_equal(result, {col1: '', col2: '', col3: ''})

result = cf.get('key2', column_start=(l(1), i(1)), column_finish=(l(1), (i(3), True)))
assert_equal(result, {col1: '', col2: '', col3: ''})

result = cf.get('key2', column_start=(l(1), (i(1), True)), column_finish=((l(2), False), ))
assert_equal(result, {col1: '', col2: '', col3: ''})

def test_static_composite_get_partial_composite(self):
cf = ColumnFamily(pool, 'StaticDynamicComposite')
cf.insert('key3', {(l(123123), i(1)): 'val'})
assert_equal(cf.get('key3'), {(l(123123), i(1)): 'val'})

def test_uuid_composites(self):
sys = SystemManager()
sys.create_column_family(TEST_KS, 'UUIDDynamicComposite',
comparator_type=DynamicCompositeType({'I': IntegerType(reversed=True), 't': TimeUUIDType()}),
key_validation_class=TimeUUIDType(),
default_validation_class=UTF8Type())

key, u1, u2 = uuid.uuid1(), uuid.uuid1(), uuid.uuid1()
cf = ColumnFamily(pool, 'UUIDDynamicComposite')
cf.insert(key, {(I(123123), t(u1)): 'foo'})
cf.insert(key, {(I(123123), t(u1)): 'foo', (I(-1), t(u2)): 'bar', (I(-123123123), t(u1)): 'baz'})
assert_equal(cf.get(key), {(I(123123), t(u1)): 'foo', (I(-1), t(u2)): 'bar', (I(-123123123), t(u1)): 'baz'})

def test_single_component_composite(self):
sys = SystemManager()
sys.create_column_family(TEST_KS, 'SingleDynamicComposite',
comparator_type=DynamicCompositeType({'i': IntegerType()}))

cf = ColumnFamily(pool, 'SingleDynamicComposite')
cf.insert('key', {(i(123456),): 'val'})
assert_equal(cf.get('key'), {(i(123456),): 'val'})

class TestBigInt(unittest.TestCase):

@classmethod
Expand Down Expand Up @@ -1237,3 +1369,4 @@ def check(column_start, column_finish, col_reversed=False):
check(((dt2, False),), ((dt0, False),), True)
check(((dt2, False),), (dt1,), True)
check((dt1,), ((dt0, False),), True)

0 comments on commit 0e3c1b5

Please sign in to comment.