-
Notifications
You must be signed in to change notification settings - Fork 0
/
storage.py
103 lines (77 loc) · 3.76 KB
/
storage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import pickle
import sqlite3
from typing import Any
class ObjectStorage:
'''
Simple class that stores pickled (serialized) python objects as in an sqlite database table.
It can retrieve the stored objects by their tags directly as python objects.
Objects are referenced with unique tags.
'''
def __init__(self, sqlite_db_path: str, table_name: str):
self.sqlite_db_path = sqlite_db_path
self.table_name = self._scrub_table_name(table_name)
self.sqlite_connection = sqlite3.connect(self.sqlite_db_path)
self.sqlite_cursor = self.sqlite_connection.cursor()
self.create_table()
def __del__(self):
self.sqlite_connection.close()
@staticmethod
def _scrub_table_name(table_name: str):
''' Allows only alphanumerics and underscore in the storage table name. '''
if ''.join(char for char in table_name if (char.isalnum() or char == '_')) != table_name:
raise Exception('Table name must only contain alphanumerics and underscore.')
else:
return table_name
@staticmethod
def _scrub_tag(tag: str):
''' Allows only alphanumerics, underscore and hyphen in tags. '''
if ''.join(char for char in tag if (char.isalnum() or char == '_' or char == '-')) != tag:
raise Exception('Tags must only contain alphanumerics, underscore and hyphen.')
else:
return tag
def create_table(self) -> None:
''' Initializes the storage table. '''
self.sqlite_cursor.execute('''CREATE TABLE IF NOT EXISTS {0} (
id INTEGER PRIMARY KEY ASC,
tag TEXT UNIQUE,
object BLOB
)'''.format(self._scrub_table_name(self.table_name)))
def store(self, tag: str, object_: Any) -> None:
''' Store one object in the sqlite table with a given tag. '''
try:
self.sqlite_cursor.execute('''INSERT INTO {0} (tag, object) VALUES (?, ?)'''.format(
self._scrub_table_name(self.table_name)),
(self._scrub_tag(tag), pickle.dumps(object_, protocol=4)))
self.sqlite_connection.commit()
except sqlite3.IntegrityError:
print('Object with tag <{0}> exists in the database! Tags must be unique.'.format(tag))
raise
def get(self, tag: str) -> Any:
''' Get one object from the sqlite table with a given tag. Return None if doesn't exist. '''
self.sqlite_cursor.execute('SELECT id, tag, object FROM {0} WHERE tag=?'.format(
self._scrub_table_name(self.table_name)), (tag, ))
object_ = self.sqlite_cursor.fetchone()
if object_:
return pickle.loads(object_[2])
else:
return None
def get_all(self) -> dict:
''' Return all stored objects in a dict with their tags as keys. '''
self.sqlite_cursor.execute('SELECT id, tag, object FROM {0}'.format(
self._scrub_table_name(self.table_name)))
objects = dict()
object_row = self.sqlite_cursor.fetchone()
while object_row is not None:
objects[object_row[1]] = pickle.loads(object_row[2])
object_row = self.sqlite_cursor.fetchone()
return objects
def delete(self, tag: str) -> None:
''' Delete one object from storage with the given tag. '''
self.sqlite_cursor.execute('DELETE FROM {0} WHERE tag=?'.format(
self._scrub_table_name(self.table_name)), (tag, ))
self.sqlite_connection.commit()
def delete_all(self) -> None:
''' Purge storage of all objects. '''
self.sqlite_cursor.execute('DROP TABLE IF EXISTS {0}'.format(
self._scrub_table_name(self.table_name)))
self.create_table()