Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Added support for DynamicCompositeType. #183

Merged
merged 1 commit into from

2 participants

@umairmufti

We needed to use DynamicCompositeTypes for our project so I went ahead and added support for it in Pycassa. I wanted to contribute back as a way to say thanks and in the hopes that some others may find it useful.

Usage is very similar to the way CompositeTypes work, except that every component is a 2-tuple (comparator type, value). The comparator type can either be a full CassandraType or an alias, if one was given when the column family was created.

In order to create a column family with aliases, simply create a DynamicCompositeType object with a dictionary of aliases and use that for the comparator.

Example:
comparator = DynamicCompositeType({'L': LongType(reversed=True), 'a': AsciiType()})
sys.create_column_family("Keyspace1", "CF1", comparator_type=comparator)

cf = ColumnFamily(pool, "CF1")
comp1 = ('L', 1234)
comp2 = ('a', "abc")
comp3 = ('UTF8Type', "look ma, no hands!")
cf.insert("key", {(comp1, comp2, comp3): "colval"})

Hope this helps!

@thobbs
Owner

Wow, this looks awesome, great work! Thank you very much for taking the time to write tests and good docs.

It will take a little while to review it, but I should be able to get this merged in pretty soon.

@umairmufti

Thanks, Tyler. Both, for the kind words, as well as for creating a kick-ass library to begin with.

@thobbs thobbs merged commit 79534e8 into pycassa:master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jan 8, 2013
  1. @umairmufti
This page is out of date. Refresh to see the latest.
Showing with 263 additions and 5 deletions.
  1. +89 −0 pycassa/marshal.py
  2. +37 −1 pycassa/types.py
  3. +137 −4 tests/test_autopacking.py
View
89 pycassa/marshal.py
@@ -34,6 +34,9 @@ def extract_type_name(typestr):
if typestr is None:
return 'BytesType'
+ if "DynamicCompositeType" in typestr:
+ return _get_composite_name(typestr)
+
if "CompositeType" in typestr:
return _get_composite_name(typestr)
@@ -140,10 +143,93 @@ def unpack_composite(bytestr):
return unpack_composite
+def get_dynamic_composite_packer(typestr):
+ cassandra_types = {}
+ for inner_type in _get_inner_types(typestr):
+ alias, cassandra_type = inner_type.split('=>')
+ cassandra_types[alias] = cassandra_type
+
+ len_packer = _short_packer.pack
+
+ def pack_dynamic_composite(items, slice_start=None):
+ last_index = len(items) - 1
+ s = ''
+ i = 0
+ for (alias, item) in items:
+ eoc = '\x00'
+ if isinstance(alias, tuple):
+ inclusive = item
+ alias, item = alias
+ if inclusive:
+ if slice_start:
+ eoc = '\xff'
+ elif slice_start is False:
+ eoc = '\x01'
+ else:
+ if slice_start:
+ eoc = '\x01'
+ elif slice_start is False:
+ eoc = '\xff'
+ elif i == last_index:
+ if slice_start:
+ eoc = '\xff'
+ elif slice_start is False:
+ eoc = '\x01'
+ if isinstance(alias, str) and len(alias) == 1:
+ header = '\x80' + alias
+ packer = packer_for(cassandra_types[alias])
+ else:
+ cassandra_type = str(alias).split('(')[0]
+ header = len_packer(len(cassandra_type)) + cassandra_type
+ packer = packer_for(cassandra_type)
+ i += 1
+
+ packed = packer(item)
+ s += ''.join((header, len_packer(len(packed)), packed, eoc))
+ return s
+
+ return pack_dynamic_composite
+
+def get_dynamic_composite_unpacker(typestr):
+ cassandra_types = {}
+ for inner_type in _get_inner_types(typestr):
+ alias, cassandra_type = inner_type.split('=>')
+ cassandra_types[alias] = cassandra_type
+
+ len_unpacker = lambda v: _short_packer.unpack(v)[0]
+
+ def unpack_dynamic_composite(bytestr):
+ # The composite format for each component is:
+ # <header> <len> <value> <eoc>
+ # ? bytes | 2 bytes | ? bytes | 1 byte
+ types = []
+ components = []
+ while bytestr:
+ header = len_unpacker(bytestr[:2])
+ if header & 0x8000:
+ alias = bytestr[1]
+ types.append(alias)
+ unpacker = unpacker_for(cassandra_types[alias])
+ bytestr = bytestr[2:]
+ else:
+ cassandra_type = bytestr[2:2 + header]
+ types.append(cassandra_type)
+ unpacker = unpacker_for(cassandra_type)
+ bytestr = bytestr[2 + header:]
+ length = len_unpacker(bytestr[:2])
+ components.append(unpacker(bytestr[2:2 + length]))
+ bytestr = bytestr[3 + length:]
+ return tuple(zip(types, components))
+
+ return unpack_dynamic_composite
+
def packer_for(typestr):
if typestr is None:
return lambda v: v
+ if "DynamicCompositeType" in typestr:
+ return get_dynamic_composite_packer(typestr)
+
if "CompositeType" in typestr:
return get_composite_packer(typestr)
@@ -238,6 +324,9 @@ def unpacker_for(typestr):
if typestr is None:
return lambda v: v
+ if "DynamicCompositeType" in typestr:
+ return get_dynamic_composite_unpacker(typestr)
+
if "CompositeType" in typestr:
return get_composite_unpacker(typestr)
View
38 pycassa/types.py
@@ -30,7 +30,8 @@
'AsciiType', 'UTF8Type', 'TimeUUIDType', 'LexicalUUIDType',
'CounterColumnType', 'DoubleType', 'FloatType', 'DecimalType',
'BooleanType', 'DateType', 'OldPycassaDateType',
- 'IntermediateDateType', 'CompositeType')
+ 'IntermediateDateType', 'CompositeType',
+ 'UUIDType', 'DynamicCompositeType')
class CassandraType(object):
"""
@@ -92,6 +93,10 @@ class UTF8Type(CassandraType):
""" Stores data as UTF8 encoded text """
pass
+class UUIDType(CassandraType):
+ """ Stores data as a type 1 or type 4 UUID """
+ pass
+
class TimeUUIDType(CassandraType):
""" Stores data as a version 1 UUID """
pass
@@ -251,3 +256,34 @@ def pack(self):
@property
def unpack(self):
return marshal.get_composite_unpacker(composite_type=self)
+
+class DynamicCompositeType(CassandraType):
+ """
+ A type composed of one or more components, each of
+ which have their own type. When sorted, items are
+ primarily sorted by their first component, secondarily
+ by their second component, and so on.
+
+ Unlike CompositeType, DynamicCompositeType columns
+ need not all be of the same structure. Each column
+ can be composed of different component types.
+
+ Components are specified using a 2-tuple made up of
+ a comparator type and value. Aliases for comparator
+ types can optionally be specified with a dictionary
+ during instantiation.
+
+ """
+
+ def __init__(self, *aliases):
+ self.aliases = {}
+ for alias in aliases:
+ if isinstance(alias, dict):
+ self.aliases.update(alias)
+
+ def __str__(self):
+ aliases = []
+ for k, v in self.aliases.iteritems():
+ aliases.append(k + '=>' + str(v))
+ return "DynamicCompositeType(" + ", ".join(aliases) + ")"
+
View
141 tests/test_autopacking.py
@@ -6,7 +6,8 @@
from pycassa.types import (LongType, IntegerType, TimeUUIDType, LexicalUUIDType,
AsciiType, UTF8Type, BytesType, CompositeType,
OldPycassaDateType, IntermediateDateType, DateType,
- BooleanType, CassandraType, DecimalType)
+ BooleanType, CassandraType, DecimalType,
+ FloatType, Int32Type, UUIDType, DoubleType, DynamicCompositeType)
from pycassa.index import create_index_expression, create_index_clause
import pycassa.marshal as marshal
@@ -20,6 +21,7 @@
import uuid
import unittest
import time
+from collections import namedtuple
TIME1 = uuid.UUID(hex='ddc6118e-a003-11df-8abf-00234d21610a')
TIME2 = uuid.UUID(hex='40ad6d4c-a004-11df-8abf-00234d21610a')
@@ -55,6 +57,13 @@ def setup_class(cls):
sys.create_column_family(TEST_KS, 'StdBytes', comparator_type=BytesType())
sys.create_column_family(TEST_KS, 'StdComposite',
comparator_type=CompositeType(LongType(), BytesType()))
+ sys.create_column_family(TEST_KS, 'StdDynamicComposite',
+ comparator_type=DynamicCompositeType({'a': AsciiType(),
+ 'b': BytesType(), 'c': DecimalType(), 'd': DateType(),
+ 'f': FloatType(), 'i': IntegerType(), 'l': LongType(),
+ 'n': Int32Type(), 's': UTF8Type(), 't': TimeUUIDType(),
+ 'u': UUIDType(), 'w': DoubleType(), 'x': LexicalUUIDType(),
+ 'y': BooleanType()}))
sys.close()
cls.cf_long = ColumnFamily(pool, 'StdLong')
@@ -67,10 +76,11 @@ def setup_class(cls):
cls.cf_utf8 = ColumnFamily(pool, 'StdUTF8')
cls.cf_bytes = ColumnFamily(pool, 'StdBytes')
cls.cf_composite = ColumnFamily(pool, 'StdComposite')
+ cls.cf_dynamic_composite = ColumnFamily(pool, 'StdDynamicComposite')
cls.cfs = [cls.cf_long, cls.cf_int, cls.cf_time, cls.cf_lex,
- cls.cf_ascii, cls.cf_utf8, cls.cf_bytes,
- cls.cf_composite]
+ cls.cf_ascii, cls.cf_utf8, cls.cf_bytes, cls.cf_composite,
+ cls.cf_dynamic_composite]
def tearDown(self):
for cf in TestCFs.cfs:
@@ -128,7 +138,17 @@ def test_standard_column_family(self):
composite_cols = [(1, 'foo'), (2, 'bar'), (3, 'baz')]
type_groups.append(self.make_group(TestCFs.cf_composite, composite_cols))
- # Begin the actual inserting and getting
+ dynamic_composite_cols = [(('LongType', 1), ('BytesType', 'foo')),
+ (('LongType', 2), ('BytesType', 'bar')),
+ (('LongType', 3), ('BytesType', 'baz'))]
+ type_groups.append(self.make_group(TestCFs.cf_dynamic_composite, dynamic_composite_cols))
+
+ dynamic_composite_alias_cols = [(('l', 1), ('b', 'foo')),
+ (('l', 2), ('b', 'bar')),
+ (('l', 3), ('b', 'baz'))]
+ type_groups.append(self.make_group(TestCFs.cf_dynamic_composite, dynamic_composite_alias_cols))
+
+ # Begin the actual inserting and getting
for group in type_groups:
cf = group.get('cf')
gdict = group.get('dict')
@@ -895,6 +915,118 @@ def test_single_component_composite(self):
cf.insert('key', {(123456,): 'val'})
assert_equal(cf.get('key'), {(123456,): 'val'})
+class TestDynamicComposites(unittest.TestCase):
+
+ @classmethod
+ def setup_class(cls):
+ sys = SystemManager()
+ sys.create_column_family(TEST_KS, 'StaticDynamicComposite',
+ comparator_type=DynamicCompositeType({'l': LongType(),
+ 'i': IntegerType(),
+ 'T': TimeUUIDType(reversed=True),
+ 'x': LexicalUUIDType(reversed=False),
+ 'a': AsciiType(),
+ 's': UTF8Type(),
+ 'b': BytesType()}))
+
+ @classmethod
+ def teardown_class(cls):
+ sys = SystemManager()
+ sys.drop_column_family(TEST_KS, 'StaticDynamicComposite')
+
+ def setUp(self):
+ global a, b, i, I, x, l, t, T, s
+
+ component = namedtuple('DynamicComponent', ['type','value'])
+ ascii_alias = component('a', None)
+ bytes_alias = component('b', None)
+ integer_alias = component('i', None)
+ integer_rev_alias = component('I', None)
+ lexicaluuid_alias = component('x', None)
+ long_alias = component('l', None)
+ timeuuid_alias = component('t', None)
+ timeuuid_rev_alias = component('T', None)
+ utf8_alias = component('s', None)
+
+ _r = lambda t, v: t._replace(value=v)
+ a = lambda v: _r(ascii_alias, v)
+ b = lambda v: _r(bytes_alias, v)
+ i = lambda v: _r(integer_alias, v)
+ I = lambda v: _r(integer_rev_alias, v)
+ x = lambda v: _r(lexicaluuid_alias, v)
+ l = lambda v: _r(long_alias, v)
+ t = lambda v: _r(timeuuid_alias, v)
+ T = lambda v: _r(timeuuid_rev_alias, v)
+ s = lambda v: _r(utf8_alias, v)
+
+ def test_static_composite_basic(self):
+ cf = ColumnFamily(pool, 'StaticDynamicComposite')
+ colname = (l(127312831239123123), i(1), T(uuid.uuid1()), x(uuid.uuid4()), a('foo'), s(u'ba\u0254r'), b('baz'))
+ cf.insert('key', {colname: 'val'})
+ assert_equal(cf.get('key'), {colname: 'val'})
+
+ def test_static_composite_slicing(self):
+ cf = ColumnFamily(pool, 'StaticDynamicComposite')
+ u1 = uuid.uuid1()
+ u4 = uuid.uuid4()
+ col0 = (l(0), i(1), T(u1), x(u4), a(''), s(''), b(''))
+ col1 = (l(1), i(1), T(u1), x(u4), a(''), s(''), b(''))
+ col2 = (l(1), i(2), T(u1), x(u4), a(''), s(''), b(''))
+ col3 = (l(1), i(3), T(u1), x(u4), a(''), s(''), b(''))
+ col4 = (l(2), i(1), T(u1), x(u4), a(''), s(''), b(''))
+ cf.insert('key2', {col0: '', col1: '', col2: '', col3: '', col4: ''})
+
+ result = cf.get('key2', column_start=((l(1), True),), column_finish=((l(1), True),))
+ assert_equal(result, {col1: '', col2: '', col3: ''})
+
+ result = cf.get('key2', column_start=(l(1),), column_finish=((l(2), False), ))
+ assert_equal(result, {col1: '', col2: '', col3: ''})
+
+ result = cf.get('key2', column_start=((l(1), True),), column_finish=((l(2), False), ))
+ assert_equal(result, {col1: '', col2: '', col3: ''})
+
+ result = cf.get('key2', column_start=(l(1), ), column_finish=((l(2), False), ))
+ assert_equal(result, {col1: '', col2: '', col3: ''})
+
+ result = cf.get('key2', column_start=((l(0), False), ), column_finish=((l(2), False), ))
+ assert_equal(result, {col1: '', col2: '', col3: ''})
+
+ result = cf.get('key2', column_start=(l(1), i(1)), column_finish=(l(1), i(3)))
+ assert_equal(result, {col1: '', col2: '', col3: ''})
+
+ result = cf.get('key2', column_start=(l(1), i(1)), column_finish=(l(1), (i(3), True)))
+ assert_equal(result, {col1: '', col2: '', col3: ''})
+
+ result = cf.get('key2', column_start=(l(1), (i(1), True)), column_finish=((l(2), False), ))
+ assert_equal(result, {col1: '', col2: '', col3: ''})
+
+ def test_static_composite_get_partial_composite(self):
+ cf = ColumnFamily(pool, 'StaticDynamicComposite')
+ cf.insert('key3', {(l(123123), i(1)): 'val'})
+ assert_equal(cf.get('key3'), {(l(123123), i(1)): 'val'})
+
+ def test_uuid_composites(self):
+ sys = SystemManager()
+ sys.create_column_family(TEST_KS, 'UUIDDynamicComposite',
+ comparator_type=DynamicCompositeType({'I': IntegerType(reversed=True), 't': TimeUUIDType()}),
+ key_validation_class=TimeUUIDType(),
+ default_validation_class=UTF8Type())
+
+ key, u1, u2 = uuid.uuid1(), uuid.uuid1(), uuid.uuid1()
+ cf = ColumnFamily(pool, 'UUIDDynamicComposite')
+ cf.insert(key, {(I(123123), t(u1)): 'foo'})
+ cf.insert(key, {(I(123123), t(u1)): 'foo', (I(-1), t(u2)): 'bar', (I(-123123123), t(u1)): 'baz'})
+ assert_equal(cf.get(key), {(I(123123), t(u1)): 'foo', (I(-1), t(u2)): 'bar', (I(-123123123), t(u1)): 'baz'})
+
+ def test_single_component_composite(self):
+ sys = SystemManager()
+ sys.create_column_family(TEST_KS, 'SingleDynamicComposite',
+ comparator_type=DynamicCompositeType({'i': IntegerType()}))
+
+ cf = ColumnFamily(pool, 'SingleDynamicComposite')
+ cf.insert('key', {(i(123456),): 'val'})
+ assert_equal(cf.get('key'), {(i(123456),): 'val'})
+
class TestBigInt(unittest.TestCase):
@classmethod
@@ -1237,3 +1369,4 @@ def check(column_start, column_finish, col_reversed=False):
check(((dt2, False),), ((dt0, False),), True)
check(((dt2, False),), (dt1,), True)
check((dt1,), ((dt0, False),), True)
+
Something went wrong with that request. Please try again.