/
storage.py
166 lines (128 loc) · 4.67 KB
/
storage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# Copyright (c) 2016-2017, the ElectrumX authors
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Backend database abstraction.'''
import os
from functools import partial
import electrumx.lib.util as util
def db_class(name):
'''Returns a DB engine class.'''
for db_class in util.subclasses(Storage):
if db_class.__name__.lower() == name.lower():
db_class.import_module()
return db_class
raise RuntimeError('unrecognised DB engine "{}"'.format(name))
class Storage(object):
'''Abstract base class of the DB backend abstraction.'''
def __init__(self, name, for_sync):
self.is_new = not os.path.exists(name)
self.for_sync = for_sync or self.is_new
self.open(name, create=self.is_new)
@classmethod
def import_module(cls):
'''Import the DB engine module.'''
raise NotImplementedError
def open(self, name, create):
'''Open an existing database or create a new one.'''
raise NotImplementedError
def close(self):
'''Close an existing database.'''
raise NotImplementedError
def get(self, key):
raise NotImplementedError
def put(self, key, value):
raise NotImplementedError
def write_batch(self):
'''Return a context manager that provides `put` and `delete`.
Changes should only be committed when the context manager
closes without an exception.
'''
raise NotImplementedError
def iterator(self, prefix=b'', reverse=False):
'''Return an iterator that yields (key, value) pairs from the
database sorted by key.
If `prefix` is set, only keys starting with `prefix` will be
included. If `reverse` is True the items are returned in
reverse order.
'''
raise NotImplementedError
class LevelDB(Storage):
'''LevelDB database engine.'''
@classmethod
def import_module(cls):
import plyvel
cls.module = plyvel
def open(self, name, create):
mof = 512 if self.for_sync else 128
# Use snappy compression (the default)
self.db = self.module.DB(name, create_if_missing=create,
max_open_files=mof)
self.close = self.db.close
self.get = self.db.get
self.put = self.db.put
self.iterator = self.db.iterator
self.write_batch = partial(self.db.write_batch, transaction=True,
sync=True)
class RocksDB(Storage):
'''RocksDB database engine.'''
@classmethod
def import_module(cls):
import rocksdb
cls.module = rocksdb
def open(self, name, create):
mof = 512 if self.for_sync else 128
# Use snappy compression (the default)
options = self.module.Options(create_if_missing=create,
use_fsync=True,
target_file_size_base=33554432,
max_open_files=mof)
self.db = self.module.DB(name, options)
self.get = self.db.get
self.put = self.db.put
def close(self):
# PyRocksDB doesn't provide a close method; hopefully this is enough
self.db = self.get = self.put = None
import gc
gc.collect()
def write_batch(self):
return RocksDBWriteBatch(self.db)
def iterator(self, prefix=b'', reverse=False):
return RocksDBIterator(self.db, prefix, reverse)
class RocksDBWriteBatch(object):
'''A write batch for RocksDB.'''
def __init__(self, db):
self.batch = RocksDB.module.WriteBatch()
self.db = db
def __enter__(self):
return self.batch
def __exit__(self, exc_type, exc_val, exc_tb):
if not exc_val:
self.db.write(self.batch)
class RocksDBIterator(object):
'''An iterator for RocksDB.'''
def __init__(self, db, prefix, reverse):
self.prefix = prefix
if reverse:
self.iterator = reversed(db.iteritems())
nxt_prefix = util.increment_byte_string(prefix)
if nxt_prefix:
self.iterator.seek(nxt_prefix)
try:
next(self.iterator)
except StopIteration:
self.iterator.seek(nxt_prefix)
else:
self.iterator.seek_to_last()
else:
self.iterator = db.iteritems()
self.iterator.seek(prefix)
def __iter__(self):
return self
def __next__(self):
k, v = next(self.iterator)
if not k.startswith(self.prefix):
raise StopIteration
return k, v