Permalink
Browse files

Changed IStore delete to remove; parameterizable cassandra backend

  • Loading branch information...
1 parent d03c15a commit 3f881c961acc754732f02c7c3c93d5b122848c72 Michael Meisinger committed May 26, 2010
View
@@ -120,9 +120,12 @@ again (see above). Please review the branch logs for any hints.
Change log:
===========
-2010-05-24:
+2010-05-25:
+- Made Cassandra backend parameterizable with keyspace/colfamily and added
+ SuperColumn support.
- Modified the IStore interface to support a create_store factory method. This
method can yield and return a deferred. Modified and fixed IStore impls.
+ Changed delete to remove to be more compliant with standard collections.
2010-05-22:
- Added timeout to BaseProcess.rpc_send. Use with kwarg timeout=<secs>
- CC-Agent detects missing known containers and removes them from the list
@@ -3,10 +3,7 @@
@file ion/data/backends/cassandra.py
@author Paul Hubbard
@author Michael Meisinger
-<<<<<<< HEAD
-=======
@author Paul Hubbard
->>>>>>> 617e221edbc1767a3e0a36253b29e39cb8f94dfb
@author Dorian Raymer
@brief Implementation of ion.data.store.IStore using pycassa to interface a
Cassandra datastore backend
@@ -16,11 +13,16 @@
import logging
from twisted.internet import defer
-
import pycassa
+from ion.core import ioninit
from ion.data.store import IStore
+CONF = ioninit.config(__name__)
+CF_default_keyspace = CONF['default_keyspace']
+CF_default_colfamily = CONF['default_colfamily']
+CF_default_cf_super = CONF['default_cf_super']
+CF_default_namespace = CONF['default_namespace']
class CassandraStore(IStore):
"""
@@ -30,6 +32,10 @@ class CassandraStore(IStore):
def __init__(self, **kwargs):
self.kvs = None
self.cass_host_list = None
+ self.keyspace = None
+ self.colfamily = None
+ self.cf_super = False
+ self.namespace = None
@classmethod
def create_store(cls, **kwargs):
@@ -42,28 +48,44 @@ def create_store(cls, **kwargs):
inst = cls(**kwargs)
inst.kwargs = kwargs
inst.cass_host_list = kwargs.get('cass_host_list', None)
+ inst.keyspace = kwargs.get('keyspace', CF_default_keyspace)
+ inst.colfamily = kwargs.get('colfamily', CF_default_colfamily)
+ inst.cf_super = kwargs.get('cf_super', CF_default_cf_super)
+ inst.namespace = kwargs.get('namespace', CF_default_namespace)
+ if inst.cf_super and inst.namespace == None:
+ inst.namespace = ":"
+
if not inst.cass_host_list:
logging.info('Connecting to Cassandra on localhost...')
else:
- logging.info('Connecting to Cassandra at "%s"...' % str(inst.cass_host_list))
+ logging.info('Connecting to Cassandra ks:cf=%s:%s at %s ...' %
+ (inst.keyspace, inst.colfamily, inst.cass_host_list))
inst.client = pycassa.connect(inst.cass_host_list)
- inst.kvs = pycassa.ColumnFamily(inst.client, 'Datasets', 'Catalog')
- logging.info('connected OK.')
+ inst.kvs = pycassa.ColumnFamily(inst.client, inst.keyspace,
+ inst.colfamily, super=inst.cf_super)
+ logging.info('connected to Cassandra... OK.')
return defer.succeed(inst)
def get(self, key):
"""
@brief Return a value corresponding to a given key
@param key Cassandra key
- @retval Value from the ion dictionary, or None
+ @retval Deferred, for value from the ion dictionary, or None
"""
value = None
try:
- val = self.kvs.get(key)
- #logging.info('Key "%s":"%s"' % (key, val))
- value = val['value'] #this could fail if insert did it wrong
+ if self.cf_super:
+ val = self.kvs.get(key, super_column=self.namespace)
+ else:
+ if self.namespace:
+ key = self.namespace + ":" + key
+ val = self.kvs.get(key)
+ #logging.debug('Key "%s":"%s"' % (key, val))
+ #this could fail if insert did it wrong
+ value = val['value']
except pycassa.NotFoundException:
- logging.info('Key "%s" not found' % key)
+ #logging.debug('Key "%s" not found' % key)
+ pass
return defer.succeed(value)
def put(self, key, value):
@@ -72,40 +94,59 @@ def put(self, key, value):
@param key Lookup key
@param value Corresponding value
@note Value is composed into OOI dictionary under keyname 'value'
- @retval None
+ @retval Deferred for success
"""
- #logging.info('writing key %s value %s' % (key, value))
- self.kvs.insert(key, {'value':value})
- #logging.info('write complete')
+ #logging.debug('writing key %s value %s' % (key, value))
+ if self.cf_super:
+ self.kvs.insert(key, {self.namespace:{'value':value}})
+ else:
+ if self.namespace:
+ key = self.namespace + ":" + key
+ self.kvs.insert(key, {'value':value})
return defer.succeed(None)
def query(self, regex):
"""
@brief Search by regular expression
@param regex Regular expression to match against the keys
- @retval List, possibly empty, of keys that match.
+ @retval Deferred, for list, possibly empty, of keys that match.
@note Uses get_range generator of unknown efficiency.
"""
- #@todo This implementation is totally inefficient. MUST replace.
+ #@todo This implementation is very inefficient. Do smarter, but how?
matched_list = []
- klist = self.kvs.get_range()
- for x in klist:
- if re.search(regex, x[0]):
- matched_list.append(x)
+ if self.cf_super:
+ klist = self.kvs.get_range(super_column=self.namespace)
+ for x in klist:
+ if re.search(regex, x[0]):
+ matched_list.append(x)
+ else:
+ klist = self.kvs.get_range()
+ if self.namespace:
+ prefix = self.namespace+":" if self.namespace else ''
+ pl = len(prefix)
+ for x in klist:
+ key = x[0]
+ if key.startswith(prefix) and re.search(regex, key[pl:]):
+ y = (key[pl:], x[1])
+ matched_list.append(y)
+ else:
+ for x in klist:
+ if re.search(regex, x[0]):
+ matched_list.append(x)
+
return defer.succeed(matched_list)
- def delete(self, key):
+ def remove(self, key):
"""
@brief delete a key/value pair
@param key Key to delete
- @retval None
+ @retval Deferred, for success of operation
@note Deletes are lazy, so key may still be visible for some time.
"""
- # Only except on specific exceptions.
- #try:
- # self.kvs.remove(key)
- #except: # Bad to except on anything and not re-raise!!
- # logging.warn('Error removing key')
- # return defer.fail()
- self.kvs.remove(key)
+ if self.cf_super:
+ self.kvs.remove(key, super_column=self.namespace)
+ else:
+ if self.namespace:
+ key = self.namespace + ":" + key
+ self.kvs.remove(key)
return defer.succeed(None)
@@ -26,37 +26,62 @@ def setUp(self):
@defer.inlineCallbacks
def tearDown(self):
- yield self.ds.delete(self.key)
+ yield self.ds.remove(self.key)
del self.ds
@defer.inlineCallbacks
def test_get_404(self):
# Make sure we can't read the not-written
rc = yield self.ds.get(self.key)
- self.failUnlessEqual(rc, None)
+ self.assertEqual(rc, None)
@defer.inlineCallbacks
def test_write_and_delete(self):
# Hmm, simplest op, just looking for exceptions
yield self.ds.put(self.key, self.value)
@defer.inlineCallbacks
- def test_delete(self):
+ def test_remove(self):
yield self.ds.put(self.key, self.value)
- yield self.ds.delete(self.key)
+ yield self.ds.remove(self.key)
rc = yield self.ds.get(self.key)
- self.failUnlessEqual(rc, None)
+ self.assertEqual(rc, None)
+ yield self.ds.remove(self.key)
+ yield self.ds.remove('non_exist23231')
@defer.inlineCallbacks
def test_put_get_delete(self):
# Write, then read to verify same
yield self.ds.put(self.key, self.value)
b = yield self.ds.get(self.key)
- self.failUnlessEqual(self.value, b)
+ self.assertEqual(self.value, b)
@defer.inlineCallbacks
def test_query(self):
# Write a key, query for it, verify contents
yield self.ds.put(self.key, self.value)
rl = yield self.ds.query(self.key)
- self.failUnlessEqual(rl[0][0], self.key)
+ self.assertEqual(rl[0][0], self.key)
+
+class CassandraStoreNSTest(CassandraStoreTest):
+ @defer.inlineCallbacks
+ def setUp(self):
+ clist = ['amoeba.ucsd.edu:9160']
+ self.ds = yield cassandra.CassandraStore.create_store(
+ cass_host_list=clist,
+ namespace='n')
+ self.key = str(uuid4())
+ self.value = str(uuid4())
+
+class CassandraStoreSCTest(CassandraStoreTest):
+ @defer.inlineCallbacks
+ def setUp(self):
+ clist = ['amoeba.ucsd.edu:9160']
+ self.ds = yield cassandra.CassandraStore.create_store(
+ cass_host_list=clist,
+ keyspace='DatastoreTest',
+ colfamily='DS1',
+ cf_super=True,
+ namespace='n')
+ self.key = str(uuid4())
+ self.value = str(uuid4())
@@ -92,7 +92,7 @@ def scard(self, key):
except:
logging.warn("Error calculating cardinality")
- def delete(self, key):
+ def remove(self, key):
"""
@brief remove the entire set and key from the data store
@param key which is mapped to the set
@@ -125,7 +125,7 @@ def query(self, regex):
logging.error('Unable to find any keys')
return None
- def delete(self, key):
+ def remove(self, key):
"""
@brief delete a key/value pair
@param key Key to delete
@@ -26,7 +26,7 @@ def setUp(self):
self.value2 = self._mkey()
def tearDown(self):
- self.ds.delete(self.key)
+ self.ds.remove(self.key)
del self.ds
def _mkey(self):
@@ -27,9 +27,9 @@ def setUp(self):
self.set.add(self._mkey())
self.set.add(self._mkey())
self.set.add(self._mkey())
-
+
def tearDown(self):
- self.ds.delete(self.key)
+ self.ds.remove(self.key)
del self.ds
def _mkey(self):
@@ -47,10 +47,10 @@ def test_write_and_delete(self):
def test_delete(self):
self.ds.put(self.key, self.value)
- self.ds.delete(self.key)
+ self.ds.remove(self.key)
rc = self.ds.get(self.key)
self.failUnlessEqual(rc, None)
-
+
def test_val_put_get_delete(self):
# Write, then read to verify same
self.ds.put(self.key, self.value)
@@ -62,25 +62,25 @@ def test_dict_put_get_delete(self):
self.ds.put(self.key,self.dict)
b = self.ds.get(self.key)
self.failUnlessEqual(self.dict, b)
-
+
def test_set_put_get_delete(self):
# Write the dict, then read to verify the same
self.ds.put(self.key,self.set)
b = self.ds.get(self.key)
self.failUnlessEqual(self.set, b)
-
+
def test_incr(self):
a=self.ds.incr(self.key)
self.failUnlessEqual(1, a)
a=self.ds.incr(self.key)
self.failUnlessEqual(2, a)
-
-
-
+
+
+
def test_query(self):
# Write a key, query for it, verify contents
self.ds.put(self.key, self.value)
rl = self.ds.query(self.key)
- self.failUnlessEqual(rl[0][0], self.key)
+ self.failUnlessEqual(rl[0][0], self.key)
View
@@ -588,3 +588,23 @@ def getmult(self, keys):
val = yield self.get(key)
res.append(val)
defer.returnValue(res)
+
+ @defer.inlineCallbacks
+ def remove(self, key):
+ """
+ @brief removes the entiti with the given key from the object store
+ @param key identifier of a mutable entity
+ @retval Deferred
+ """
+ key = _reftostr(key)
+ yield self.entityidx.remove(key)
+ defer.returnValue(dobj)
+
+ @defer.inlineCallbacks
+ def size(self):
+ """
+ @brief returns the number of structured objects in the store
+ @param key identifier of a mutable entity
+ @retval Deferred, for number of objects in store
+ """
+ defer.succeed(self._num_entities())
Oops, something went wrong.

0 comments on commit 3f881c9

Please sign in to comment.