Skip to content
Browse files

Merge pull request #110 from carlopires/nofail-with-schema-disagreements

Make schema updates resilient to schema disagreements
  • Loading branch information...
2 parents 066c4bc + f99612e commit 2b5dafabc61226215fdf1edbf21b64b4b3e481ac @thobbs thobbs committed Jan 13, 2012
Showing with 22 additions and 17 deletions.
  1. +22 −17 pycassa/system_manager.py
View
39 pycassa/system_manager.py
@@ -3,7 +3,7 @@
from pycassa.connection import Connection
from pycassa.cassandra.ttypes import IndexType, KsDef, CfDef, ColumnDef,\
- InvalidRequestException
+ InvalidRequestException, SchemaDisagreementException
from pycassa.cassandra.constants import *
import pycassa.util as util
import pycassa.marshal as marshal
@@ -148,14 +148,10 @@ def describe_snitch(self):
return snitch[snitch.rfind('.') + 1: ]
def _system_add_keyspace(self, ksdef):
- schema_version = self._conn.system_add_keyspace(ksdef)
- self._wait_for_agreement()
- return schema_version
+ return self._schema_update(self._conn.system_add_keyspace, ksdef)
def _system_update_keyspace(self, ksdef):
- schema_version = self._conn.system_update_keyspace(ksdef)
- self._wait_for_agreement()
- return schema_version
+ return self._schema_update(self._conn.system_update_keyspace, ksdef)
def create_keyspace(self, name,
replication_strategy=SIMPLE_STRATEGY,
@@ -238,14 +234,11 @@ def drop_keyspace(self, keyspace):
Drops a keyspace from the cluster.
"""
- schema_version = self._conn.system_drop_keyspace(keyspace)
- self._wait_for_agreement()
+ self._schema_update(self._conn.system_drop_keyspace, keyspace)
def _system_add_column_family(self, cfdef):
self._conn.set_keyspace(cfdef.keyspace)
- schema_version = self._conn.system_add_column_family(cfdef)
- self._wait_for_agreement()
- return schema_version
+ return self._schema_update(self._conn.system_add_column_family, cfdef)
def _qualify_type_class(self, classname):
if classname:
@@ -417,9 +410,7 @@ def _raise_ire(self, why):
raise ire
def _system_update_column_family(self, cfdef):
- schema_version = self._conn.system_update_column_family(cfdef)
- self._wait_for_agreement()
- return schema_version
+ return self._schema_update(self._conn.system_update_column_family, cfdef)
def alter_column_family(self, keyspace, column_family,
key_cache_size=None,
@@ -481,8 +472,7 @@ def drop_column_family(self, keyspace, column_family):
"""
self._conn.set_keyspace(keyspace)
- schema_version = self._conn.system_drop_column_family(column_family)
- self._wait_for_agreement()
+ self._schema_update(self._conn.system_drop_column_family, column_family)
def alter_column(self, keyspace, column_family, column, value_type):
"""
@@ -597,3 +587,18 @@ def _wait_for_agreement(self):
break
else:
time.sleep(_SAMPLE_PERIOD)
+
+ def _schema_update(self, schema_func, *args):
+ """
+ Call schema updates functions and properly
+ waits for agreement if needed.
+ """
+ while True:
+ try:
+ schema_version = schema_func(*args)
+ except SchemaDisagreementException:
+ self._wait_for_agreement()
+ else:
+ break
+ return schema_version
+

0 comments on commit 2b5dafa

Please sign in to comment.
Something went wrong with that request. Please try again.