Skip to content

Commit

Permalink
Disable retries for counter mutations.
Browse files Browse the repository at this point in the history
  • Loading branch information
spladug committed Jan 30, 2012
1 parent ab4bd16 commit fe8b761
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 19 deletions.
10 changes: 7 additions & 3 deletions pycassa/batch.py
Expand Up @@ -76,7 +76,7 @@ class Mutator(object):
"""

def __init__(self, pool, queue_size=100, write_consistency_level=None):
def __init__(self, pool, queue_size=100, write_consistency_level=None, allow_retries=True):
"""Creates a new Mutator object.
`pool` is the :class:`~pycassa.pool.ConnectionPool` that will be used
Expand All @@ -90,6 +90,7 @@ def __init__(self, pool, queue_size=100, write_consistency_level=None):
self._lock = threading.RLock()
self.pool = pool
self.limit = queue_size
self.allow_retries = allow_retries
if write_consistency_level is None:
self.write_consistency_level = ConsistencyLevel.ONE
else:
Expand Down Expand Up @@ -124,7 +125,8 @@ def send(self, write_consistency_level=None):
mutations.setdefault(key, {}).setdefault(column_family, []).extend(cols)
if mutations:
conn = self.pool.get()
conn.batch_mutate(mutations, write_consistency_level)
conn.batch_mutate(mutations, write_consistency_level,
allow_retries=self.allow_retries)
self._buffer = []
finally:
if conn:
Expand Down Expand Up @@ -177,7 +179,8 @@ class CfMutator(Mutator):
"""

def __init__(self, column_family, queue_size=100, write_consistency_level=None):
def __init__(self, column_family, queue_size=100, write_consistency_level=None,
allow_retries=True):
""" A :class:`~pycassa.batch.Mutator` that deals only with one column family.
`column_family` is the :class:`~pycassa.columnfamily.ColumnFamily`
Expand All @@ -186,6 +189,7 @@ def __init__(self, column_family, queue_size=100, write_consistency_level=None):
"""
wcl = write_consistency_level or column_family.write_consistency_level
super(CfMutator, self).__init__(column_family.pool, queue_size=queue_size,
allow_retries=allow_retries,
write_consistency_level=wcl)
self._column_family = column_family

Expand Down
41 changes: 26 additions & 15 deletions pycassa/columnfamily.py
Expand Up @@ -112,6 +112,12 @@ class ColumnFamily(object):
By default, this is :const:`True`.
"""

retry_counter_mutations = False
""" Whether to retry failed counter mutations. Counter mutations are
not idempotent so retrying could result in double counting.
By default, this is :const:`False`.
"""

def _set_column_name_class(self, t):
if isinstance(t, types.CassandraType):
self._column_name_class = t
Expand Down Expand Up @@ -158,23 +164,23 @@ def _set_default_validation_class(self, t):
self._default_validation_class = t
self._default_value_packer = t.pack
self._default_value_unpacker = t.unpack
have_counters = isinstance(t, types.CounterColumnType)
self._have_counters = isinstance(t, types.CounterColumnType)
else:
self._default_validation_class = marshal.extract_type_name(t)
self._default_value_packer = marshal.packer_for(t)
self._default_value_unpacker = marshal.unpacker_for(t)
have_counters = self._default_validation_class == "CounterColumnType"
self._have_counters = self._default_validation_class == "CounterColumnType"

if not self.super:
if have_counters:
if self._have_counters:
def _make_cosc(name, value, timestamp, ttl):
return ColumnOrSuperColumn(counter_column=CounterColumn(name, value))
else:
def _make_cosc(name, value, timestamp, ttl):
return ColumnOrSuperColumn(Column(name, value, timestamp, ttl))
self._make_cosc = _make_cosc
else:
if have_counters:
if self._have_counters:
def _make_column(name, value, timestamp, ttl):
return CounterColumn(name, value)
self._make_column = _make_column
Expand All @@ -201,6 +207,10 @@ def _get_default_validation_class(self):
include an instance of any class in :mod:`pycassa.types`, such as ``LongType()``.
"""

@property
def _allow_retries(self):
return not self._have_counters or self.retry_counter_mutations

def _set_column_validators(self, other_dict):
self._column_validators = ColumnValidatorDict(other_dict)

Expand Down Expand Up @@ -258,7 +268,8 @@ def __init__(self, pool, column_family, **kwargs):
recognized_kwargs = ["buffer_size", "read_consistency_level",
"write_consistency_level", "timestamp",
"dict_class", "buffer_size", "autopack_names",
"autopack_values", "autopack_keys"]
"autopack_values", "autopack_keys",
"retry_counter_mutations"]
for kw in recognized_kwargs:
if kw in kwargs:
setattr(self, kw, kwargs[kw])
Expand Down Expand Up @@ -862,12 +873,14 @@ def insert(self, key, columns, timestamp=None, ttl=None,
column = Column(colname, colval, timestamp, ttl)

self.pool.execute('insert', packed_key, cp, column,
write_consistency_level or self.write_consistency_level)
write_consistency_level or self.write_consistency_level,
allow_retries=self._allow_retries)
else:
mut_list = self._make_mutation_list(columns, timestamp, ttl)
mutations = {packed_key: {self.column_family: mut_list}}
self.pool.execute('batch_mutate', mutations,
write_consistency_level or self.write_consistency_level)
write_consistency_level or self.write_consistency_level,
allow_retries=self._allow_retries)

return timestamp

Expand All @@ -894,7 +907,8 @@ def batch_insert(self, rows, timestamp=None, ttl=None, write_consistency_level =

if mutations:
self.pool.execute('batch_mutate', mutations,
write_consistency_level or self.write_consistency_level)
write_consistency_level or self.write_consistency_level,
allow_retries=self._allow_retries)

return timestamp

Expand All @@ -905,11 +919,6 @@ def add(self, key, column, value=1, super_column=None, write_consistency_level=N
`value` should be an integer, either positive or negative, to be added
to a counter column. By default, `value` is 1.
.. note:: This method is not idempotent. Retrying a failed add may result
in a double count. You should consider using a separate
ConnectionPool with retries disabled for column families
with counters.
.. versionadded:: 1.1.0
Available in Cassandra 0.8.0 and later.
Expand All @@ -918,7 +927,8 @@ def add(self, key, column, value=1, super_column=None, write_consistency_level=N
cp = self._column_parent(super_column)
column = self._pack_name(column)
self.pool.execute('add', packed_key, cp, CounterColumn(column, value),
write_consistency_level or self.write_consistency_level)
write_consistency_level or self.write_consistency_level,
allow_retries=self._allow_retries)

def remove(self, key, columns=None, super_column=None,
write_consistency_level=None, timestamp=None, counter=None):
Expand Down Expand Up @@ -974,7 +984,8 @@ def batch(self, queue_size=100, write_consistency_level=None):
"""

return CfMutator(self, queue_size,
write_consistency_level or self.write_consistency_level)
write_consistency_level or self.write_consistency_level,
allow_retries=self._allow_retries)

def truncate(self):
"""
Expand Down
4 changes: 3 additions & 1 deletion pycassa/pool.py
Expand Up @@ -114,6 +114,7 @@ def _retry(cls, f):
def new_f(self, *args, **kwargs):
self.operation_count += 1
try:
allow_retries = kwargs.pop('allow_retries', True)
if kwargs.pop('reset', False):
self._pool._replace_wrapper() # puts a new wrapper in the queue
self._replace(self._pool.get()) # swaps out transport
Expand All @@ -134,7 +135,8 @@ def new_f(self, *args, **kwargs):
self._pool._clear_current()

self._retry_count += 1
if self.max_retries != -1 and self._retry_count > self.max_retries:
if (not allow_retries or
(self.max_retries != -1 and self._retry_count > self.max_retries)):
raise MaximumRetryException('Retried %d times. Last failure was %s: %s' %
(self._retry_count, exc.__class__.__name__, exc))
# Exponential backoff
Expand Down
22 changes: 22 additions & 0 deletions tests/test_connection_pooling.py
Expand Up @@ -439,6 +439,28 @@ def test_queue_threadlocal_retry_limit(self):

pool.dispose()

def test_queue_failure_with_no_retries(self):
listener = _TestListener()
pool = ConnectionPool(pool_size=5, max_overflow=5, recycle=10000,
prefill=True, max_retries=3, # allow 3 retries
keyspace='PycassaTestKeyspace', credentials=_credentials,
listeners=[listener], use_threadlocal=False,
server_list=['localhost:9160', 'localhost:9160'])

# Corrupt all of the connections
for i in range(5):
conn = pool.get()
setattr(conn, 'send_batch_mutate', conn._fail_once)
conn._should_fail = True
conn.return_to_pool()

cf = ColumnFamily(pool, 'Counter1')
assert_raises(MaximumRetryException, cf.insert, 'key', {'col': 2, 'col2': 2})
assert_equal(listener.failure_count, 1) # didn't retry at all

pool.dispose()


class _TestListener(PoolListener):

def __init__(self):
Expand Down

0 comments on commit fe8b761

Please sign in to comment.