Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

173 lines (128 sloc) 5.075 kb
import sqlite3
import cPickle
import json
from UserDict import DictMixin
class SqliteDict(DictMixin):
"""SQLite-backed dictionary"""
def __init__(self, database=None, table="dict"):
self.database = database or ':memory:'
self.table = table
# about check_same_thread: http://twistedmatrix.com/trac/ticket/4040
self.conn = sqlite3.connect(self.database, check_same_thread=False)
q = "create table if not exists %s (key text primary key, value blob)" \
% table
self.conn.execute(q)
def __getitem__(self, key):
key = self.encode(key)
q = "select value from %s where key=?" % self.table
value = self.conn.execute(q, (key,)).fetchone()
if value:
return self.decode(value[0])
raise KeyError(key)
def __setitem__(self, key, value):
key, value = self.encode(key), self.encode(value)
q = "insert or replace into %s (key, value) values (?,?)" % self.table
self.conn.execute(q, (key, value))
self.conn.commit()
def __delitem__(self, key):
key = self.encode(key)
q = "delete from %s where key=?" % self.table
self.conn.execute(q, (key,))
self.conn.commit()
def iterkeys(self):
q = "select key from %s" % self.table
return (self.decode(x[0]) for x in self.conn.execute(q))
def keys(self):
return list(self.iterkeys())
def itervalues(self):
q = "select value from %s" % self.table
return (self.decode(x[0]) for x in self.conn.execute(q))
def values(self):
return list(self.itervalues())
def iteritems(self):
q = "select key, value from %s" % self.table
return ((self.decode(x[0]), self.decode(x[1])) for x in self.conn.execute(q))
def items(self):
return list(self.iteritems())
def encode(self, obj):
return obj
def decode(self, text):
return text
class PickleSqliteDict(SqliteDict):
def encode(self, obj):
return buffer(cPickle.dumps(obj, protocol=2))
def decode(self, text):
return cPickle.loads(str(text))
class JsonSqliteDict(SqliteDict):
def encode(self, obj):
return json.dumps(obj)
def decode(self, text):
return json.loads(text)
class SqlitePriorityQueue(object):
"""SQLite priority queue. It relies on SQLite concurrency support for
providing atomic inter-process operations.
"""
def __init__(self, database=None, table="queue"):
self.database = database or ':memory:'
self.table = table
# about check_same_thread: http://twistedmatrix.com/trac/ticket/4040
self.conn = sqlite3.connect(self.database, check_same_thread=False)
q = "create table if not exists %s (id integer primary key, " \
"priority real key, message blob)" % table
self.conn.execute(q)
def put(self, message, priority=0.0):
args = (priority, self.encode(message))
q = "insert into %s (priority, message) values (?,?)" % self.table
self.conn.execute(q, args)
self.conn.commit()
def pop(self):
q = "select id, message from %s order by priority desc limit 1" \
% self.table
idmsg = self.conn.execute(q).fetchone()
if idmsg is None:
return
id, msg = idmsg
q = "delete from %s where id=?" % self.table
c = self.conn.execute(q, (id,))
if not c.rowcount: # record vanished, so let's try again
self.conn.rollback()
return self.pop()
self.conn.commit()
return self.decode(msg)
def remove(self, func):
q = "select id, message from %s" % self.table
n = 0
for id, msg in self.conn.execute(q):
if func(self.decode(msg)):
q = "delete from %s where id=?" % self.table
c = self.conn.execute(q, (id,))
if not c.rowcount: # record vanished, so let's try again
self.conn.rollback()
return self.remove(func)
n += 1
self.conn.commit()
return n
def clear(self):
self.conn.execute("delete from %s" % self.table)
self.conn.commit()
def __len__(self):
q = "select count(*) from %s" % self.table
return self.conn.execute(q).fetchone()[0]
def __iter__(self):
q = "select message, priority from %s order by priority desc" % \
self.table
return ((self.decode(x), y) for x, y in self.conn.execute(q))
def encode(self, obj):
return obj
def decode(self, text):
return text
class PickleSqlitePriorityQueue(SqlitePriorityQueue):
def encode(self, obj):
return buffer(cPickle.dumps(obj, protocol=2))
def decode(self, text):
return cPickle.loads(str(text))
class JsonSqlitePriorityQueue(SqlitePriorityQueue):
def encode(self, obj):
return json.dumps(obj)
def decode(self, text):
return json.loads(text)
Jump to Line
Something went wrong with that request. Please try again.