/
snappy_db.go
115 lines (94 loc) · 2.52 KB
/
snappy_db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package snappy
import (
"encoding/json"
"github.com/golang/snappy"
"github.com/pkg/errors"
tmdb "github.com/tendermint/tm-db"
"sync"
)
const (
CompatModeEnabled = iota
CompatModeDisabled
)
var (
errIteratorNotSupported = errors.New("iterator unsupported")
errUnknownData = errors.New("unknown format")
)
var _ tmdb.DB = (*SnappyDB)(nil)
// SnappyDB implements a tmdb.DB overlay with snappy compression/decompression
// Iterator is NOT supported -- main purpose of this library is to support indexer.db,
// which never makes use of iterators anyway
// NOTE: implement when needed
// NOTE2: monitor mem pressure, optimize by pre-allocating dst buf when there is bottleneck
type SnappyDB struct {
db tmdb.DB
mtx *sync.Mutex
compatMode int
}
func NewSnappyDB(db tmdb.DB, compatMode int) *SnappyDB {
return &SnappyDB{
mtx: new(sync.Mutex),
db: db,
compatMode: compatMode,
}
}
func (s *SnappyDB) Get(key []byte) ([]byte, error) {
if item, err := s.db.Get(key); err != nil {
return nil, err
} else if item == nil && err == nil {
return nil, nil
} else {
decoded, decodeErr := snappy.Decode(nil, item)
// if snappy decode fails, try to replace the underlying
// only recover & replace when the blob is a valid json
if s.compatMode == CompatModeEnabled {
if decodeErr != nil {
if json.Valid(item) {
s.mtx.Lock()
// run item by Set() to encode & replace
_ = s.db.Set(key, item)
defer s.mtx.Unlock()
return item, nil
} else {
return nil, errUnknownData
}
} else {
return decoded, nil
}
}
return decoded, decodeErr
}
}
func (s *SnappyDB) Has(key []byte) (bool, error) {
return s.db.Has(key)
}
func (s *SnappyDB) Set(key []byte, value []byte) error {
return s.db.Set(key, snappy.Encode(nil, value))
}
func (s *SnappyDB) SetSync(key []byte, value []byte) error {
return s.Set(key, value)
}
func (s *SnappyDB) Delete(key []byte) error {
return s.db.Delete(key)
}
func (s *SnappyDB) DeleteSync(key []byte) error {
return s.Delete(key)
}
func (s *SnappyDB) Iterator(start, end []byte) (tmdb.Iterator, error) {
return nil, errIteratorNotSupported
}
func (s *SnappyDB) ReverseIterator(start, end []byte) (tmdb.Iterator, error) {
return nil, errIteratorNotSupported
}
func (s *SnappyDB) Close() error {
return s.db.Close()
}
func (s *SnappyDB) NewBatch() tmdb.Batch {
return NewSnappyBatch(s.db.NewBatch())
}
func (s *SnappyDB) Print() error {
return s.db.Print()
}
func (s *SnappyDB) Stats() map[string]string {
return s.db.Stats()
}