Permalink
Browse files

Refactored IStore interface and modified backends, object store, coi …

…datastore, coi attrstore
  • Loading branch information...
Michael Meisinger
Michael Meisinger committed May 25, 2010
1 parent 5253cc7 commit f2ff6a5edf7cc553962f40a8cf9801fde90448e4
View
@@ -120,6 +120,9 @@ again (see above). Please review the branch logs for any hints.
Change log:
===========
+2010-05-24:
+- Modified the IStore interface to support a create_store factory method. This
+ method can yield and return a deferred. Modified and fixed IStore impls.
2010-05-22:
- Added timeout to BaseProcess.rpc_send. Use with kwarg timeout=<secs>
- CC-Agent detects missing known containers and removes them from the list
@@ -2,51 +2,51 @@
"""
@file ion/data/backends/cassandra.py
+@author Paul Hubbard
@author Michael Meisinger
-@author Dorian Raymer
+@author Dorian Raymer
@brief Implementation of ion.data.store.IStore using pycassa to interface a
-Cassandra datastore backend
+ Cassandra datastore backend
"""
import re
import logging
-
from twisted.internet import defer
import pycassa
-from ion.data import store
+from ion.data.store import IStore
-class CassandraStore(store.IStore):
+class CassandraStore(IStore):
"""
Store interface for interacting with the Cassandra key/value store
@see http://github.com/vomjom/pycassa
"""
- def __init__(self, cass_host_list=None):
- self.kvs=None
- self.cass_host_list = cass_host_list
-
- def _init(self, cass_host_list):
- if not cass_host_list:
- logging.info('Connecting to Cassandra on localhost...')
- else:
- logging.info('Connecting to Cassandra at "%s"...' % str(cass_host_list))
- client = pycassa.connect(cass_host_list)
- self.kvs = pycassa.ColumnFamily(client, 'Datasets', 'Catalog')
- logging.info('connected OK.')
- return True
+ def __init__(self, **kwargs):
+ self.kvs = None
+ self.cass_host_list = None
- def init(self):
+ @classmethod
+ def create_store(cls, **kwargs):
"""
- @brief Constructor, safe to use no arguments
+ @brief Factory method to create an instance of the cassandra store.
+ @param kwargs keyword arguments to configure the store.
@param cass_host_list List of hostname:ports for cassandra host or cluster
- @retval Deferred
+ @retval Deferred, for IStore instance.
"""
- #return defer.maybeDeferred(self._init, cass_host_list, None)
- return defer.succeed(self._init(self.cass_host_list))
-
+ inst = cls(**kwargs)
+ inst.kwargs = kwargs
+ inst.cass_host_list = kwargs.get('cass_host_list', None)
+ if not inst.cass_host_list:
+ logging.info('Connecting to Cassandra on localhost...')
+ else:
+ logging.info('Connecting to Cassandra at "%s"...' % str(inst.cass_host_list))
+ inst.client = pycassa.connect(inst.cass_host_list)
+ inst.kvs = pycassa.ColumnFamily(inst.client, 'Datasets', 'Catalog')
+ logging.info('connected OK.')
+ return defer.succeed(inst)
def get(self, key):
"""
@@ -57,7 +57,7 @@ def get(self, key):
value = None
try:
val = self.kvs.get(key)
- logging.info('Key "%s":"%s"' % (key, val))
+ #logging.info('Key "%s":"%s"' % (key, val))
value = val['value'] #this could fail if insert did it wrong
except pycassa.NotFoundException:
logging.info('Key "%s" not found' % key)
@@ -71,9 +71,9 @@ def put(self, key, value):
@note Value is composed into OOI dictionary under keyname 'value'
@retval None
"""
- logging.info('writing key %s value %s' % (key, value))
+ #logging.info('writing key %s value %s' % (key, value))
self.kvs.insert(key, {'value':value})
- logging.info('write complete')
+ #logging.info('write complete')
return defer.succeed(None)
def query(self, regex):
@@ -83,6 +83,7 @@ def query(self, regex):
@retval List, possibly empty, of keys that match.
@note Uses get_range generator of unknown efficiency.
"""
+ #@todo This implementation is totally inefficient. MUST replace.
matched_list = []
klist = self.kvs.get_range()
for x in klist:
@@ -97,12 +98,11 @@ def delete(self, key):
@retval None
@note Deletes are lazy, so key may still be visible for some time.
"""
- # Only except on specific exceptions.
+ # Only except on specific exceptions.
#try:
# self.kvs.remove(key)
#except: # Bad to except on anything and not re-raise!!
# logging.warn('Error removing key')
# return defer.fail()
self.kvs.remove(key)
return defer.succeed(None)
-
@@ -19,11 +19,8 @@ class CassandraStoreTest(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
- logging.basicConfig(level=logging.WARN, \
- format='%(asctime)s %(levelname)s [%(funcName)s] %(message)s')
clist = ['amoeba.ucsd.edu:9160']
- self.ds = cassandra.CassandraStore(cass_host_list=clist)
- yield self.ds.init()
+ self.ds = yield cassandra.CassandraStore.create_store(cass_host_list=clist)
self.key = self._mkey()
self.value = self._mkey()
View
@@ -14,10 +14,11 @@
except:
import simplejson as json
import logging
+import pickle
from twisted.internet import defer
from ion.data.dataobject import DataObject
-from ion.data.store import Store
+from ion.data.store import IStore, Store
import ion.util.procutils as pu
@@ -212,20 +213,26 @@ class ValueStore(object):
The GIT distributed repository model is a strong design reference.
Think GIT repository with commits, trees, blobs
"""
- def __init__(self, name=None, qualifier=None, backend=None):
+ def __init__(self, backend=None, backargs=None):
"""
- @param name name of this object store (= namespace for objects)
- @param qualifier namespace for the object store
@param backend Class object with a compliant Store or None for memory
+ @param backargs arbitrary keyword arguments, for the backend
"""
- self.name = name
- self.qualifier = qualifier
- if not backend:
- backend = Store
- self.backend = backend
+ self.backend = backend if backend else Store
+ self.backargs = backargs if backargs else {}
+ assert issubclass(self.backend, IStore)
+ assert type(self.backargs) is dict
# KVS with value ID -> value
- self.objstore = backend()
+ self.objstore = None
+
+ @defer.inlineCallbacks
+ def init(self):
+ """
+ Initializes the ValueStore class
+ @retval Deferred
+ """
+ self.objstore = yield self.backend.create_store(**self.backargs)
logging.info("ValueStore initialized")
def _num_values(self):
@@ -246,17 +253,19 @@ def put_value(self, value):
if oldval:
logging.info("put: value was already in obj store "+str(value.identity))
# Put value in object store
- yield self.objstore.put(value.identity, value)
+ yield self.objstore.put(value.identity, pickle.dumps(value))
else:
value = ValueObject(value)
# Put value in object store
- yield self.objstore.put(value.identity, value)
+ yield self.objstore.put(value.identity, pickle.dumps(value))
defer.returnValue(value._value_ref())
@defer.inlineCallbacks
def get_value(self, valid):
valid = _reftostr(valid)
val = yield self.objstore.get(valid)
+ if val:
+ val = pickle.loads(val)
defer.returnValue(val)
@defer.inlineCallbacks
@@ -395,14 +404,22 @@ class ObjectStore(object):
trees and blob values. It is always possible to get value objects.
@see ValueStore
"""
- def __init__(self, *args, **kwargs):
+ def __init__(self, backend=None, backargs=None):
"""
- Initialized object store
+ Initializes object store
@see ValueStore
"""
- self.vs = ValueStore(*args, **kwargs)
+ self.vs = ValueStore(backend, backargs)
# KVS with entity ID -> most recent value ID
- self.entityidx = self.vs.backend()
+ self.entityidx = None
+
+ @defer.inlineCallbacks
+ def init(self):
+ """
+ Initializes the ObjectStore class
+ """
+ yield self.vs.init()
+ self.entityidx = yield self.vs.backend.create_store(**self.vs.backargs)
logging.info("ObjectStore initialized")
def _num_entities(self):
View
@@ -15,18 +15,39 @@
class IStore(object):
"""
- Interface for all store backend implementations. All operations are returning
- deferreds and operate asynchronously.
+ Interface and abstract base class for all store backend implementations.
+ All operations are returning deferreds and operate asynchronously.
"""
+ def __init__(self, **kwargs):
+ """
+ @brief Initializes store instance
+ @param kwargs arbitrary keyword arguments interpreted by the subclass
+ """
+ pass
+
+ @classmethod
+ def create_store(cls, **kwargs):
+ """
+ @brief Factory method to create an instance of the store.
+ @param kwargs arbitrary keyword arguments interpreted by the subclass to
+ configure the store.
+ @retval Deferred, for IStore instance.
+ """
+ instance = cls(**kwargs)
+ instance.kwargs = kwargs
+ return defer.succeed(instance)
+
def get(self, key):
"""
@param key an immutable key associated with a value
@retval Deferred, for value associated with key, or None if not existing.
"""
raise NotImplementedError, "Abstract Interface Not Implemented"
-
def read(self, *args, **kwargs):
+ """
+ Inheritance safe alias for get
+ """
return self.get(*args, **kwargs)
def put(self, key, value):
@@ -39,6 +60,9 @@ def put(self, key, value):
raise NotImplementedError, "Abstract Interface Not Implemented"
def write(self, *args, **kwargs):
+ """
+ Inheritance safe alias for put
+ """
return self.put(*args, **kwargs)
def query(self, regex):
@@ -55,19 +79,12 @@ def delete(self, key):
"""
raise NotImplementedError, "Abstract Interface Not Implemented"
- def init(self, **kwargs):
- """
- Configures this Store with arbitrary keyword arguments
- @param kwargs any keyword args
- @retval Deferred, for success of this operation
- """
- raise NotImplementedError, "Abstract Interface Not Implemented"
class Store(IStore):
"""
- Memory implementation of an asynchronous store, based on a dict.
+ Memory implementation of an asynchronous key/value store, using a dict.
"""
- def __init__(self):
+ def __init__(self, **kwargs):
self.kvs = {}
def get(self, key):
@@ -99,7 +116,4 @@ def delete(self, key):
def _delete(self, key):
del self.kvs[key]
- return
- def init(self, **kwargs):
- return defer.succeed(True)
Oops, something went wrong.

0 comments on commit f2ff6a5

Please sign in to comment.