Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'batch' of http://github.com/dln/pycassa

Conflicts:
	pycassa/columnfamily.py
  • Loading branch information...
commit fc796dc431b23bb0e6ccf7fea1f1058503c4826a 2 parents c7292df + 9698981
@thobbs thobbs authored
View
53 README.mkd
@@ -240,6 +240,59 @@ You may also use a ColumnFamilyMap with SuperColumns:
These output values retain the same format as given by the Cassandra thrift interface.
+Batch Mutations
+---------------
+
+The batch interface allows insert/update/remove operations to be performed in
+batches. This allows a convenient mechanism for streaming updates or doing a large number of operations while reducing number of RPC roundtrips.
+
+Batch mutator objects are synchronized and can be safely passed around threads.
+
+ >>> b = cf.batch(queue_size=10)
+ >>> b.insert('key1', {'col1':'value11', 'col2':'value21'})
+ >>> b.insert('key2', {'col1':'value12', 'col2':'value22'}, ttl=15)
+ >>> b.remove('key1', ['col2'])
+ >>> b.remove('key2')
+ >>> b.send()
+
+One can use the `queue_size` argument to control how many mutations will be
+queued before an automatic `send` is performed. This allows simple streaming of
+updates. If set to `None`, automatic checkpoints are disabled. Default is 100.
+
+Supercolumns are supported:
+
+ >>> b = scf.batch()
+ >>> b.insert('key1', {'supercol1': {'colA':'value1a', 'colB':'value1b'}
+ {'supercol2': {'colA':'value2a', 'colB':'value2b'}})
+ >>> b.remove('key1', ['colA'], 'supercol1')
+ >>> b.send()
+
+You may also create a batch mutator from a client instance, allowing operations
+on multiple column families:
+
+ >>> b = client.batch()
+ >>> b.insert(cf, 'key1', {'col1':'value1', 'col2':'value2'})
+ >>> b.insert(supercf, 'key1', {'subkey1': {'col1':'value1', 'col2':'value2'}})
+ >>> b.send()
+
+Note: This interface does not implement atomic operations across coLUMN
+ families. All the limitations of the `batch_mutate` Thrift API call
+ applies. Remember, a mutation in Cassandra is always atomic per key per
+ column family only.
+
+Note: If a single operation in a batch fails, the whole batch fails.
+
+In Python >= 2.5, mutators can be used as context managers, where an implicit
+`send` will be called upon exit.
+
+ >>> with cf.batch() as b:
+ >>> b.insert('key1', {'col1':'value11', 'col2':'value21'})
+ >>> b.insert('key2', {'col1':'value12', 'col2':'value22'})
+
+Calls to `insert` and `remove` can also be chained:
+
+ >>> cf.batch().remove('foo').remove('bar').send()
+
Advanced
--------
View
109 pycassa/batch.py
@@ -0,0 +1,109 @@
+import threading
+from cassandra.ttypes import (Clock, Column, ColumnOrSuperColumn, ConsistencyLevel,
+ Deletion, Mutation, SlicePredicate, SuperColumn)
+
+
+class Mutator(object):
+ """Batch update convenience mechanism.
+ Queues insert/update/remove operations and executes them when the queue
+ is filled up or explicitly using `send`.
+ """
+
+ def __init__(self, client, queue_size=100, write_consistency_level=None):
+ self._buffer = []
+ self._lock = threading.RLock()
+ self.client = client
+ self.limit = queue_size
+ if write_consistency_level is None:
+ self.write_consistency_level = ConsistencyLevel.ONE
+ else:
+ self.write_consistency_level = write_consistency_level
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.send()
+
+ def _enqueue(self, key, column_family, mutations):
+ self._lock.acquire()
+ try:
+ mutation = (key, column_family.column_family, mutations)
+ self._buffer.append(mutation)
+ if self.limit and len(self._buffer) >= self.limit:
+ self.send()
+ finally:
+ self._lock.release()
+ return self
+
+ def send(self, write_consistency_level=None):
+ if write_consistency_level is None:
+ write_consistency_level = self.write_consistency_level
+ mutations = {}
+ self._lock.acquire()
+ try:
+ for key, column_family, cols in self._buffer:
+ mutations.setdefault(key, {}).setdefault(column_family, []).extend(cols)
+ if mutations:
+ self.client.batch_mutate(mutations, write_consistency_level)
+ self._buffer = []
+ finally:
+ self._lock.release()
+
+ def _make_mutations_insert(self, column_family, columns, clock, ttl):
+ _pack_name = column_family._pack_name
+ _pack_value = column_family._pack_value
+ for c, v in columns.iteritems():
+ cos = ColumnOrSuperColumn()
+ if column_family.super:
+ subc = [Column(name=_pack_name(subname),
+ value=_pack_value(subvalue, subname),
+ clock=clock, ttl=ttl)
+ for subname, subvalue in v.iteritems()]
+ cos.super_column = SuperColumn(name=_pack_name(c, True),
+ columns=subc)
+ else:
+ cos.column = Column(name=_pack_name(c), value=_pack_value(v, c),
+ clock=clock, ttl=ttl)
+ yield Mutation(column_or_supercolumn=cos)
+
+ def insert(self, column_family, key, columns, clock=None, ttl=None):
+ if columns:
+ clock = clock or Clock(timestamp=column_family.timestamp())
+ mutations = self._make_mutations_insert(column_family, columns,
+ clock, ttl)
+ self._enqueue(key, column_family, mutations)
+ return self
+
+ def remove(self, column_family, key, columns=None, super_column=None, clock=None):
+ clock = clock or Clock(timestamp=column_family.timestamp())
+ deletion = Deletion(clock=clock)
+ if columns:
+ _pack_name = column_family._pack_name
+ packed_cols = [_pack_name(col, column_family.super)
+ for col in columns]
+ deletion.predicate = SlicePredicate(column_names=packed_cols)
+ if super_column:
+ deletion.super_column = super_column
+ mutation = Mutation(deletion=deletion)
+ self._enqueue(key, column_family, (mutation,))
+ return self
+
+
+class CfMutator(Mutator):
+ def __init__(self, column_family, queue_size=100, write_consistency_level=None):
+ wcl = write_consistency_level or column_family.write_consistency_level
+ super(CfMutator, self).__init__(column_family.client, queue_size=queue_size,
+ write_consistency_level=wcl)
+ self._column_family = column_family
+
+ def insert(self, key, cols, clock=None, ttl=None):
+ return super(CfMutator, self).insert(self._column_family, key, cols,
+ clock=clock, ttl=ttl)
+
+ def remove(self, key, columns=None, super_column=None, clock=None):
+ return super(CfMutator, self).remove(self._column_family, key,
+ columns=columns,
+ super_column=super_column,
+ clock=clock)
+
View
85 pycassa/columnfamily.py
@@ -1,13 +1,14 @@
from cassandra.ttypes import Column, ColumnOrSuperColumn, ColumnParent, \
ColumnPath, ConsistencyLevel, NotFoundException, SlicePredicate, \
- SliceRange, SuperColumn, Mutation, Deletion, Clock, KeyRange, \
- IndexExpression, IndexClause
+ SliceRange, SuperColumn, Clock, KeyRange, IndexExpression, IndexClause
import time
import sys
import uuid
import struct
+from batch import CfMutator
+
__all__ = ['gm_timestamp', 'ColumnFamily']
_TYPES = ['BytesType', 'LongType', 'IntegerType', 'UTF8Type', 'AsciiType',
'LexicalUUIDType', 'TimeUUIDType']
@@ -611,7 +612,8 @@ def get_range(self, start="", finish="", columns=None, column_start="",
last_key = key_slices[-1].key
i += 1
- def insert(self, key, columns, write_consistency_level=None):
+ def insert(self, key, columns, clock=None, ttl=None,
+ write_consistency_level=None):
"""
Insert or update columns for a key
@@ -631,10 +633,10 @@ def insert(self, key, columns, write_consistency_level=None):
-------
int timestamp
"""
- return self.batch_insert({key: columns},
- write_consistency_level = write_consistency_level)
+ return self.batch_insert({key: columns}, clock=clock, ttl=ttl,
+ write_consistency_level=write_consistency_level)
- def batch_insert(self, rows, write_consistency_level = None):
+ def batch_insert(self, rows, clock=None, ttl=None, write_consistency_level = None):
"""
Insert or update columns for multiple keys
@@ -652,29 +654,10 @@ def batch_insert(self, rows, write_consistency_level = None):
int timestamp
"""
clock = Clock(timestamp=self.timestamp())
-
- mutation_map = {}
-
- for row, cs in rows.iteritems():
- cols = []
-
- for c, v in cs.iteritems():
- if self.super:
- subc = [Column(name=self._pack_name(subname), \
- value=self._pack_value(subvalue, subname), clock=clock) \
- for subname, subvalue in v.iteritems()]
- column = SuperColumn(name=self._pack_name(c, is_supercol_name=True), columns=subc)
- cols.append(Mutation(column_or_supercolumn=ColumnOrSuperColumn(super_column=column)))
- else:
- column = Column(name=self._pack_name(c), value=self._pack_value(v, c), clock=clock)
- cols.append(Mutation(column_or_supercolumn=ColumnOrSuperColumn(column=column)))
-
- if cols:
- mutation_map[row] = {self.column_family: cols}
-
- self.client.batch_mutate(mutation_map,
- self._wcl(write_consistency_level))
-
+ batch = self.batch(write_consistency_level=write_consistency_level)
+ for key, columns in rows.iteritems():
+ batch.insert(key, columns, clock=clock, ttl=ttl)
+ batch.send()
return clock.timestamp
def remove(self, key, columns=None, super_column=None, write_consistency_level = None):
@@ -697,30 +680,32 @@ def remove(self, key, columns=None, super_column=None, write_consistency_level =
-------
int timestamp
"""
+ clock = Clock(timestamp=self.timestamp())
+ batch = self.batch(write_consistency_level=write_consistency_level)
+ batch.remove(key, columns, super_column, clock)
+ batch.send()
+ return clock.timestamp
- packed_cols = None
- if columns is not None:
- packed_cols = []
- for col in columns:
- packed_cols.append(self._pack_name(col, is_supercol_name = self.super))
+ def batch(self, queue_size=100, write_consistency_level=None):
+ """
+ Create batch mutator for doing multiple insert,update,remove
+ operations using as few roundtrips as possible.
- if super_column != '':
- super_column = self._pack_name(super_column, is_supercol_name=True)
+ Parameters
+ ----------
+ queue_size : int
+ Max number of mutations per request
+ write_consistency_level: ConsistencyLevel
+ Consistency level used for mutations.
- clock = Clock(timestamp=self.timestamp())
- if packed_cols is not None:
- # Deletion doesn't support SliceRange predicates as of Cassandra 0.6.0,
- # so we can't add column_start, column_finish, etc... yet
- sp = SlicePredicate(column_names=packed_cols)
- deletion = Deletion(clock=clock, super_column=super_column, predicate=sp)
- mutation = Mutation(deletion=deletion)
- self.client.batch_mutate({key: {self.column_family: [mutation]}},
- self._wcl(write_consistency_level))
- else:
- cp = ColumnPath(column_family=self.column_family, super_column=super_column)
- self.client.remove(key, cp, clock,
- self._wcl(write_consistency_level))
- return clock.timestamp
+ Returns
+ -------
+ CfMutator mutator
+ """
+ if write_consistency_level is None:
+ write_consistency_level = self.write_consistency_level
+ return CfMutator(self, queue_size=queue_size,
+ write_consistency_level=write_consistency_level)
def truncate(self):
"""
View
5 pycassa/connection.py
@@ -13,6 +13,8 @@
from cassandra.constants import VERSION
from cassandra.ttypes import AuthenticationRequest
+from batch import Mutator
+
__all__ = ['connect', 'connect_thread_local', 'NoServerAvailable']
DEFAULT_SERVER = 'localhost:9160'
@@ -233,3 +235,6 @@ def get_keyspace_description(self, keyspace=None):
new_metadata[datum.name] = datum
cf_def.column_metadata = new_metadata
return cf_defs
+
+ def batch(self, *args, **kwargs):
+ return Mutator(self, *args, **kwargs)
View
119 tests/test_batch_mutation.py
@@ -0,0 +1,119 @@
+import sys
+import unittest
+import uuid
+
+from nose import SkipTest
+from nose.tools import assert_raises
+from pycassa import connect, ColumnFamily, ConsistencyLevel, NotFoundException
+
+ROWS = {'1': {'a': '123', 'b':'123'},
+ '2': {'a': '234', 'b':'234'},
+ '3': {'a': '345', 'b':'345'}}
+
+class TestMutator(unittest.TestCase):
+
+ def setUp(self):
+ credentials = {'username': 'jsmith', 'password': 'havebadpass'}
+ self.client = connect('Keyspace1', credentials=credentials)
+ self.cf = ColumnFamily(self.client, 'Standard2',
+ write_consistency_level=ConsistencyLevel.ONE,
+ timestamp=self.timestamp)
+ self.scf = ColumnFamily(self.client, 'Super1',
+ write_consistency_level=ConsistencyLevel.ONE,
+ super=True, timestamp=self.timestamp)
+ try:
+ self.timestamp_n = int(self.cf.get('meta')['timestamp'])
+ except NotFoundException:
+ self.timestamp_n = 0
+ self.clear()
+
+ def clear(self):
+ self.cf.truncate()
+ self.scf.truncate()
+
+ def timestamp(self):
+ self.timestamp_n += 1
+ return self.timestamp_n
+
+ def test_insert(self):
+ batch = self.cf.batch()
+ for key, cols in ROWS.iteritems():
+ batch.insert(key, cols)
+ batch.send()
+ for key, cols in ROWS.items():
+ assert self.cf.get(key) == cols
+
+ def test_insert_supercolumns(self):
+ batch = self.scf.batch()
+ batch.insert('one', ROWS)
+ batch.insert('two', ROWS)
+ batch.insert('three', ROWS)
+ batch.send()
+ assert self.scf.get('one') == ROWS
+ assert self.scf.get('two') == ROWS
+ assert self.scf.get('three') == ROWS
+
+ def test_queue_size(self):
+ batch = self.cf.batch(queue_size=2)
+ batch.insert('1', ROWS['1'])
+ batch.insert('2', ROWS['2'])
+ batch.insert('3', ROWS['3'])
+ assert self.cf.get('1') == ROWS['1']
+ assert_raises(NotFoundException, self.cf.get, '3')
+ batch.send()
+ for key, cols in ROWS.items():
+ assert self.cf.get(key) == cols
+
+ def test_remove_key(self):
+ batch = self.cf.batch()
+ batch.insert('1', ROWS['1'])
+ batch.remove('1')
+ batch.send()
+ assert_raises(NotFoundException, self.cf.get, '1')
+
+ def test_remove_columns(self):
+ batch = self.cf.batch()
+ batch.insert('1', {'a':'123', 'b':'123'})
+ batch.remove('1', ['a'])
+ batch.send()
+ assert self.cf.get('1') == {'b':'123'}
+
+ def test_remove_supercolumns(self):
+ batch = self.scf.batch()
+ batch.insert('one', ROWS)
+ batch.insert('two', ROWS)
+ batch.insert('three', ROWS)
+ batch.remove('two', ['b'], '2')
+ batch.send()
+ assert self.scf.get('one') == ROWS
+ assert self.scf.get('two')['2'] == {'a': '234'}
+ assert self.scf.get('three') == ROWS
+
+ def test_chained(self):
+ batch = self.cf.batch()
+ batch.insert('1', ROWS['1']).insert('2', ROWS['2']).insert('3', ROWS['3']).send()
+ assert self.cf.get('1') == ROWS['1']
+ assert self.cf.get('2') == ROWS['2']
+ assert self.cf.get('3') == ROWS['3']
+
+ def test_contextmgr(self):
+ if sys.version_info < (2,5):
+ raise SkipTest("No context managers in Python < 2.5")
+ exec """with self.cf.batch(queue_size=2) as b:
+ b.insert('1', ROWS['1'])
+ b.insert('2', ROWS['2'])
+ b.insert('3', ROWS['3'])
+assert self.cf.get('3') == ROWS['3']"""
+
+ def test_multi_column_family(self):
+ batch = self.client.batch()
+ cf2 = self.cf
+ batch.insert(self.cf, '1', ROWS['1'])
+ batch.insert(self.cf, '2', ROWS['2'])
+ batch.remove(cf2, '1', ROWS['1'])
+ batch.send()
+ assert self.cf.get('2') == ROWS['2']
+ assert_raises(NotFoundException, self.cf.get, '1')
+
+
+
Please sign in to comment.
Something went wrong with that request. Please try again.