Permalink
Browse files

Merge pull request #215 from rbranson/atomic-batches

Support for atomic batches
  • Loading branch information...
thobbs committed Oct 3, 2013
2 parents d5f764f + 5224ada commit f52e6544677db5e6a59a330103dbf410b55299fc
Showing with 49 additions and 9 deletions.
  1. +26 −6 pycassa/batch.py
  2. +3 −2 pycassa/columnfamily.py
  3. +2 −1 pycassa/pool.py
  4. +18 −0 tests/test_batch_mutation.py
View
@@ -60,6 +60,22 @@
>>> cf.batch().remove('foo').remove('bar').send()
+To use atomic batches (supported in Cassandra 1.2 and later), pass the atomic
+option in when creating the batch:
+
+.. code-block:: python
+
+ >>> cf.batch(atomic=True)
+
+or when sending it:
+
+.. code-block:: python
+
+ >>> b = cf.batch()
+ >>> b.insert('key1', {'col1':'val2'})
+ >>> b.insert('key2', {'col1':'val2'})
+ >>> b.send(atomic=True)
+
"""
import threading
@@ -75,7 +91,7 @@ class Mutator(object):
is full or `send` is called explicitly.
"""
- def __init__(self, pool, queue_size=100, write_consistency_level=None, allow_retries=True):
+ def __init__(self, pool, queue_size=100, write_consistency_level=None, allow_retries=True, atomic=False):
"""
`pool` is the :class:`~pycassa.pool.ConnectionPool` that will be used
for operations.
@@ -88,6 +104,7 @@ def __init__(self, pool, queue_size=100, write_consistency_level=None, allow_ret
self.pool = pool
self.limit = queue_size
self.allow_retries = allow_retries
+ self.atomic = atomic
if write_consistency_level is None:
self.write_consistency_level = ConsistencyLevel.ONE
else:
@@ -110,10 +127,12 @@ def _enqueue(self, key, column_family, mutations):
self._lock.release()
return self
- def send(self, write_consistency_level=None):
+ def send(self, write_consistency_level=None, atomic=None):
""" Sends all operations currently in the batch and clears the batch. """
if write_consistency_level is None:
write_consistency_level = self.write_consistency_level
+ if atomic is None:
+ atomic = self.atomic
mutations = {}
conn = None
self._lock.acquire()
@@ -122,8 +141,9 @@ def send(self, write_consistency_level=None):
mutations.setdefault(key, {}).setdefault(column_family, []).extend(cols)
if mutations:
conn = self.pool.get()
- conn.batch_mutate(mutations, write_consistency_level,
- allow_retries=self.allow_retries)
+ mutatefn = conn.atomic_batch_mutate if atomic else conn.batch_mutate
+ mutatefn(mutations, write_consistency_level,
+ allow_retries=self.allow_retries)
self._buffer = []
finally:
if conn:
@@ -179,13 +199,13 @@ class CfMutator(Mutator):
"""
def __init__(self, column_family, queue_size=100, write_consistency_level=None,
- allow_retries=True):
+ allow_retries=True, atomic=False):
"""
`column_family` is the :class:`~pycassa.columnfamily.ColumnFamily`
that all operations will be executed on.
"""
wcl = write_consistency_level or column_family.write_consistency_level
- Mutator.__init__(self, column_family.pool, queue_size, wcl, allow_retries)
+ Mutator.__init__(self, column_family.pool, queue_size, wcl, allow_retries, atomic)
self._column_family = column_family
def insert(self, key, cols, timestamp=None, ttl=None):
View
@@ -1107,7 +1107,7 @@ def remove_counter(self, key, column, super_column=None, write_consistency_level
self.pool.execute('remove_counter', packed_key, cp,
write_consistency_level or self.write_consistency_level)
- def batch(self, queue_size=100, write_consistency_level=None):
+ def batch(self, queue_size=100, write_consistency_level=None, atomic=None):
"""
Create batch mutator for doing multiple insert, update, and remove
operations using as few roundtrips as possible.
@@ -1120,7 +1120,8 @@ def batch(self, queue_size=100, write_consistency_level=None):
return CfMutator(self, queue_size,
write_consistency_level or self.write_consistency_level,
- allow_retries=self._allow_retries)
+ allow_retries=self._allow_retries,
+ atomic=atomic)
def truncate(self):
"""
View
@@ -192,7 +192,8 @@ def __str__(self):
retryable = ('get', 'get_slice', 'multiget_slice', 'get_count', 'multiget_count',
'get_range_slices', 'get_indexed_slices', 'batch_mutate', 'add',
- 'insert', 'remove', 'remove_counter', 'truncate', 'describe_keyspace')
+ 'insert', 'remove', 'remove_counter', 'truncate', 'describe_keyspace',
+ 'atomic_batch_mutate')
for fname in retryable:
new_f = ConnectionWrapper._retry(getattr(Connection, fname))
setattr(ConnectionWrapper, fname, new_f)
@@ -132,3 +132,21 @@ def test_multi_column_family(self):
batch.send()
assert cf.get('2') == ROWS['2']
assert_raises(NotFoundException, cf.get, '1')
+
+ def test_atomic_insert_at_mutator_creation(self):
+ batch = cf.batch(atomic=True)
+ for key, cols in ROWS.iteritems():
+ batch.insert(key, cols)
+ batch.send()
+ for key, cols in ROWS.items():
+ assert cf.get(key) == cols
+
+ def test_atomic_insert_at_send(self):
+ batch = cf.batch(atomic=True)
+ for key, cols in ROWS.iteritems():
+ batch.insert(key, cols)
+ batch.send(atomic=True)
+ for key, cols in ROWS.items():
+ assert cf.get(key) == cols
+
+

0 comments on commit f52e654

Please sign in to comment.