Permalink
Browse files

added sqlite3 support

  • Loading branch information...
1 parent a669eab commit 212f56ddedb96ffb170f49884a6deab4903c1e45 @ownport committed Sep 10, 2012
Showing with 268 additions and 51 deletions.
  1. +268 −51 kvlite.py
View
319 kvlite.py
@@ -37,11 +37,13 @@
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE."""
+import os
import cmd
import sys
import zlib
import uuid
import pprint
+import sqlite3
import binascii
__all__ = ['open', 'remove',]
@@ -52,23 +54,82 @@
print >> sys.stderr, 'Error! MySQLdb package is not installed, please install python-mysqldb'
sys.exit()
+
try:
import cPickle as pickle
except ImportError:
import pickle
-
-# TODO add JSON support for serialization
+# TODO add test cases for serializers
# TODO add support user specific serializators
-from json import loads as json_decode
-from json import dumps as json_encode
SUPPORTED_BACKENDS = ['mysql', 'sqlite', ]
+'''
+A collection is a group of documents stored in kvlite2,
+and can be thought of as roughly the equivalent of a
+table in a relational database.
+
+
+
+For using JSON as serialization
+
+>>> import json
+>>> collection = open('sqlite://test.sqlite:test', serializer=json)
+>>>
+
+'''
+# -----------------------------------------------------------------
+# cPickleSerializer class
+# -----------------------------------------------------------------
+
+class cPickleSerializer(object):
+ ''' cPickleSerializer '''
+
+ @staticmethod
+ def dumps(v):
+ ''' dumps value '''
+ if isinstance(v, unicode):
+ v = str(v)
+ return pickle.dumps(v)
+
+ @staticmethod
+ def loads(v):
+ ''' loads value '''
+ if isinstance(v, unicode):
+ v = str(v)
+ return pickle.loads(v)
+
+# -----------------------------------------------------------------
+# CompressedJsonSerializer class
+# -----------------------------------------------------------------
+
+class CompressedJsonSerializer(object):
+ ''' CompressedJsonSerializer '''
+
+ @staticmethod
+ def dumps(v):
+ ''' dumps value '''
+ return zlib.compress(json_encode(v))
+
+ @staticmethod
+ def loads(v):
+ ''' loads value '''
+ return json_decode(zlib.decompress(v))
+
+# -----------------------------------------------------------------
+# SERIALIZERS
+# -----------------------------------------------------------------
+
+SERIALIZERS = {
+ 'pickle': cPickleSerializer,
+ 'completed_json': CompressedJsonSerializer,
+}
+
# -----------------------------------------------------------------
# KVLite utils
# -----------------------------------------------------------------
-def open(uri, serializer=pickle):
+def open(uri, serializer=cPickleSerializer):
'''
open collection by URI,
if collection does not exist kvlite will try to create it
@@ -88,7 +149,7 @@ def open(uri, serializer=pickle):
params = manager.parse_uri(uri)
if params['collection'] not in manager.collections():
manager.create(params['collection'])
- return manager.get_collection(params['collection'])(uri, serializer)
+ return manager.collection_class(manager.connection, params['collection'], serializer)
def remove(uri):
'''
@@ -127,29 +188,39 @@ def __init__(self, uri):
backend, rest_uri = uri.split('://')
if backend in SUPPORTED_BACKENDS:
if backend == 'mysql':
- self.manager = MysqlCollectionManager
+ self.backend_manager = MysqlCollectionManager(uri)
elif backend == 'sqlite':
- self.manager = SqliteCollectionManager
+ self.backend_manager = SqliteCollectionManager(uri)
else:
raise NotImplementedError()
else:
raise RuntimeError('Unknown backend: {}'.format(backend))
+ def parse_uri(self, uri):
+ ''' parse_uri '''
+ return self.backend_manager.parse_uri(uri)
+
def create(self, name):
''' create collection '''
- self.manager.create(name)
+ self.backend_manager.create(name)
- def get_collection(self, name):
+ @property
+ def collection_class(self):
''' return object MysqlCollection or SqliteCollection '''
- return self.manager.get_collection(name)
+ return self.backend_manager.collection_class
+
+ @property
+ def connection(self):
+ ''' return reference to backend connection '''
+ return self.backend_manager.connection
def collections(self):
''' return list of collections '''
- return self.manager.collections()
+ return self.backend_manager.collections()
def remove(self, name):
''' remove collection '''
- self.manager.remove(name)
+ self.backend_manager.remove(name)
# -----------------------------------------------------------------
# MysqlCollectionManager class
@@ -160,6 +231,7 @@ class MysqlCollectionManager(object):
def __init__(self, uri):
params = self.parse_uri(uri)
+
try:
self.__conn = MySQLdb.connect(
host=params['host'], port = params['port'],
@@ -194,7 +266,7 @@ def parse_uri(uri):
parsed_uri['db'] = rest_uri
parsed_uri['collection'] = None
return parsed_uri
-
+
def create(self, name):
''' create collection '''
@@ -207,9 +279,15 @@ def create(self, name):
self.__cursor.execute(SQL_CREATE_TABLE % name)
self.__conn.commit()
- def get_collection(self, name):
+ @property
+ def connection(self):
+ ''' return connection '''
+ return self.__conn
+
+ @property
+ def collection_class(self):
''' return MysqlCollection object'''
- return MysqlCollection(self.__conn, name)
+ return MysqlCollection
def collections(self):
''' return collection list'''
@@ -235,11 +313,13 @@ def close(self):
class MysqlCollection(object):
''' Mysql Connection '''
- def __init__(self, connection, collection_name):
+ def __init__(self, connection, collection_name, serializer=cPickleSerializer):
self.__conn = connection
self.__cursor = self.__conn.cursor()
self.__collection = collection_name
+ self.__serializer = serializer
+
self.__uuid_cache = list()
def get_uuid(self):
@@ -271,9 +351,9 @@ def __get_many(self):
rowid = r[0]
k = binascii.b2a_hex(r[1])
try:
- v = self.unpack(r[2])
+ v = self.__serializer.loads(r[2])
except Exception, err:
- raise ValueUnpackError('key %s, %s' % (k, err))
+ raise RuntimeError('key %s, %s' % (k, err))
yield (k, v)
def get(self, k=None):
@@ -292,9 +372,9 @@ def get(self, k=None):
result = self.__cursor.fetchone()
if result:
try:
- v = self.unpack(result[1])
+ v = self.__serializer.loads(result[1])
except Exception, err:
- raise ValueUnpackError('key %s, %s' % (k, err))
+ raise RuntimeError('key %s, %s' % (k, err))
return (binascii.b2a_hex(result[0]), v)
else:
return (None, None)
@@ -303,11 +383,14 @@ def get(self, k=None):
def put(self, k, v):
''' put document in collection '''
+
+ # TODO many k/v in put()
+
if len(k) > 40:
raise RuntimeError('The length of key is more than 40 bytes')
SQL_INSERT = 'INSERT INTO %s (k,v) ' % self.__collection
SQL_INSERT += 'VALUES (%s,%s) ON DUPLICATE KEY UPDATE v=%s;;'
- v = self.pack(v)
+ v = self.__serializer.dumps(v)
try:
self.__cursor.execute(SQL_INSERT, (binascii.a2b_hex(k), v, v))
except TypeError, err:
@@ -337,6 +420,7 @@ def keys(self):
k = binascii.b2a_hex(r[1])
yield k
+ @property
def count(self):
''' return amount of documents in collection'''
self.__cursor.execute('SELECT count(*) FROM %s;' % self.__collection)
@@ -354,50 +438,183 @@ def close(self):
# -----------------------------------------------------------------
class SqliteCollectionManager(object):
''' Sqlite Collection Manager '''
- pass
+ def __init__(self, uri):
+
+ params = self.parse_uri(uri)
+
+ self.__conn = sqlite3.connect(params['db'])
+ self.__cursor = self.__conn.cursor()
+
+ @staticmethod
+ def parse_uri(uri):
+ '''parse URI
+
+ return driver, database, collection
+ '''
+ parsed_uri = dict()
+ parsed_uri['backend'], rest_uri = uri.split('://', 1)
+ if ':' in rest_uri:
+ parsed_uri['db'], parsed_uri['collection'] = rest_uri.split(':',1)
+ else:
+ parsed_uri['db'] = rest_uri
+ parsed_uri['collection'] = None
+ if parsed_uri['db'] == 'memory':
+ parsed_uri['db'] = ':memory:'
+ return parsed_uri
+
+ @property
+ def connection(self):
+ ''' return connection '''
+ return self.__conn
+
+ @property
+ def collection_class(self):
+ ''' return SqliteCollection object'''
+ return SqliteCollection
+
+ def collections(self):
+ ''' return collection list'''
+
+ self.__cursor.execute('SELECT name FROM sqlite_master WHERE type="table";')
+ return [t[0] for t in self.__cursor.fetchall()]
+ def create(self, name):
+ ''' create collection '''
+
+ SQL_CREATE_TABLE = '''CREATE TABLE IF NOT EXISTS %s (
+ k NOT NULL, v, UNIQUE (k) );'''
+
+ self.__cursor.execute(SQL_CREATE_TABLE % name)
+ self.__conn.commit()
+
+ def remove(self, name):
+ ''' remove collection '''
+ if name in self.collections():
+ self.__cursor.execute('DROP TABLE %s;' % name)
+ self.__conn.commit()
+ else:
+ raise RuntimeError('No collection with name: {}'.format(name))
+
+ def close(self):
+ ''' close connection to database '''
+ self.__conn.close()
+
# -----------------------------------------------------------------
# SqliteCollection class
# -----------------------------------------------------------------
class SqliteCollection(object):
''' Sqlite Collection'''
- def __init__(self, uri):
- raise NotImplementedError('SqliteCollection is not implemented yet')
+ def __init__(self, connection, collection_name, serializer=cPickleSerializer):
-class Collection(object):
- '''
- kvlite2 collection
+ self.__conn = connection
+ self.__cursor = self.__conn.cursor()
+ self.__collection = collection_name
+ self.__serializer = serializer
- A collection is a group of documents stored in kvlite2,
- and can be thought of as roughly the equivalent of a
- table in a relational database.
+ self.__uuid_cache = list()
- '''
- def __init__(self, db_uri):
- '''
- db_uri - URI to databases,
- URI format: driver://username:passwd@host[:port]/database.collection
- '''
- params = parse_uri(db_uri)
- self.__conn = MySQLdb.connect(host=params['host'], port = params['port'],
- user=params['usr'], passwd=params['pwd'], db=params['db'])
- self.__collection = params['coll']
- self.__cursor = self.__conn.cursor()
- self.__uuids = []
+ def get_uuid(self):
+ """ return id based on uuid """
- def pack(self, v):
- ''' pack value
+ if not self.__uuid_cache:
+ for uuid in get_uuid():
+ self.__uuid_cache.append(uuid)
+ return self.__uuid_cache.pop()
+
+ def put(self, k, v):
+ ''' put document in collection '''
- Note: before pack the value it's better to encode it by base64
- '''
- return zlib.compress(json_encode(v))
+ # TODO many k/v in put()
+
+ if len(k) > 40:
+ raise RuntimeError('The length of key is more than 40 bytes')
+ SQL_INSERT = 'INSERT OR REPLACE INTO %s (k,v) ' % self.__collection
+ SQL_INSERT += 'VALUES (?,?)'
+ v = self.__serializer.dumps(v)
+ try:
+ self.__cursor.execute(SQL_INSERT, (k, v))
+ except TypeError, err:
+ raise RuntimeError(err)
- def unpack(self, v):
- ''' unpack value
+ def __get_many(self):
+ ''' return all docs '''
+ rowid = 0
+ while True:
+ SQL_SELECT_MANY = 'SELECT rowid, k,v FROM %s WHERE rowid > %d LIMIT 1000 ;' % (self.__collection, rowid)
+ self.__cursor.execute(SQL_SELECT_MANY)
+ result = self.__cursor.fetchall()
+ if not result:
+ break
+ for r in result:
+ rowid = r[0]
+ k = r[1]
+ try:
+ v = self.__serializer.loads(r[2])
+ except Exception, err:
+ raise RuntimeError('key %s, %s' % (k, err))
+ yield (k, v)
+
+ def get(self, k=None):
+ '''
+ return document by key from collection
+ return documents if key is not defined
'''
- return json_decode(zlib.decompress(v))
+ if k:
+ if len(k) > 40:
+ raise RuntimeError('The key length is more than 40 bytes')
+ SQL = 'SELECT k,v FROM %s WHERE k = ?;' % self.__collection
+ try:
+ self.__cursor.execute(SQL, (k,))
+ except TypeError, err:
+ raise WronKeyValue(err)
+ result = self.__cursor.fetchone()
+ if result:
+ try:
+ v = self.__serializer.loads(result[1])
+ except Exception, err:
+ raise RuntimeError('key %s, %s' % (k, err))
+ return (result[0], v)
+ else:
+ return (None, None)
+ else:
+ return self.__get_many()
+
+ def keys(self):
+ ''' return document keys in collection'''
+ rowid = 0
+ while True:
+ SQL_SELECT_MANY = 'SELECT rowid, k FROM %s WHERE rowid > %d LIMIT 1000 ;' % (self.__collection, rowid)
+ self.__cursor.execute(SQL_SELECT_MANY)
+ result = self.__cursor.fetchall()
+ if not result:
+ break
+ for r in result:
+ rowid = r[0]
+ yield r[1]
+ def delete(self, k):
+ ''' delete document by k '''
+ if len(k) > 40:
+ raise RuntimeError('The key length is more than 40 bytes')
+ SQL_DELETE = '''DELETE FROM %s WHERE k = ?;''' % self.__collection
+ try:
+ self.__cursor.execute(SQL_DELETE, (k,))
+ except TypeError, err:
+ raise WronKeyValue(err)
+
+ @property
+ def count(self):
+ ''' return amount of documents in collection'''
+ self.__cursor.execute('SELECT count(*) FROM %s;' % self.__collection)
+ return int(self.__cursor.fetchone()[0])
+
+ def commit(self):
+ self.__conn.commit()
+
+ def close(self):
+ ''' close connection to database '''
+ self.__conn.close()
# -----------------------------------------------------------------
# Console class

0 comments on commit 212f56d

Please sign in to comment.