-
Notifications
You must be signed in to change notification settings - Fork 11
/
badger.go
188 lines (157 loc) · 4.12 KB
/
badger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package fsm
import (
"encoding/json"
"fmt"
"github.com/dgraph-io/badger/v2"
"github.com/hashicorp/raft"
"io"
"os"
"strings"
)
// badgerFSM raft.FSM implementation using badgerDB
type badgerFSM struct {
db *badger.DB
}
// get fetch data from badgerDB
func (b badgerFSM) get(key string) (interface{}, error) {
var keyByte = []byte(key)
var data interface{}
txn := b.db.NewTransaction(false)
defer func() {
_ = txn.Commit()
}()
item, err := txn.Get(keyByte)
if err != nil {
data = map[string]interface{}{}
return data, err
}
var value = make([]byte, 0)
err = item.Value(func(val []byte) error {
value = append(value, val...)
return nil
})
if err != nil {
data = map[string]interface{}{}
return data, err
}
if value != nil && len(value) > 0 {
err = json.Unmarshal(value, &data)
}
if err != nil {
data = map[string]interface{}{}
}
return data, err
}
// set store data to badgerDB
func (b badgerFSM) set(key string, value interface{}) error {
var data = make([]byte, 0)
data, err := json.Marshal(value)
if err != nil {
return err
}
if data == nil || len(data) <= 0 {
return nil
}
txn := b.db.NewTransaction(true)
err = txn.Set([]byte(key), data)
if err != nil {
txn.Discard()
return err
}
return txn.Commit()
}
// delete remove data from badgerDB
func (b badgerFSM) delete(key string) error {
var keyByte = []byte(key)
txn := b.db.NewTransaction(true)
defer func() {
_ = txn.Commit()
}()
err := txn.Delete(keyByte)
if err != nil {
return err
}
return txn.Commit()
}
// Apply log is invoked once a log entry is committed.
// It returns a value which will be made available in the
// ApplyFuture returned by Raft.Apply method if that
// method was called on the same Raft node as the FSM.
func (b badgerFSM) Apply(log *raft.Log) interface{} {
switch log.Type {
case raft.LogCommand:
var payload = CommandPayload{}
if err := json.Unmarshal(log.Data, &payload); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "error marshalling store payload %s\n", err.Error())
return nil
}
op := strings.ToUpper(strings.TrimSpace(payload.Operation))
switch op {
case "SET":
return &ApplyResponse{
Error: b.set(payload.Key, payload.Value),
Data: payload.Value,
}
case "GET":
data, err := b.get(payload.Key)
return &ApplyResponse{
Error: err,
Data: data,
}
case "DELETE":
return &ApplyResponse{
Error: b.delete(payload.Key),
Data: nil,
}
}
}
_, _ = fmt.Fprintf(os.Stderr, "not raft log command type\n")
return nil
}
// Snapshot will be called during make snapshot.
// Snapshot is used to support log compaction.
// No need to call snapshot since it already persisted in disk (using BadgerDB) when raft calling Apply function.
func (b badgerFSM) Snapshot() (raft.FSMSnapshot, error) {
return newSnapshotNoop()
}
// Restore is used to restore an FSM from a Snapshot. It is not called
// concurrently with any other command. The FSM must discard all previous
// state.
// Restore will update all data in BadgerDB
func (b badgerFSM) Restore(rClose io.ReadCloser) error {
defer func() {
if err := rClose.Close(); err != nil {
_, _ = fmt.Fprintf(os.Stdout, "[FINALLY RESTORE] close error %s\n", err.Error())
}
}()
_, _ = fmt.Fprintf(os.Stdout, "[START RESTORE] read all message from snapshot\n")
var totalRestored int
decoder := json.NewDecoder(rClose)
for decoder.More() {
var data = &CommandPayload{}
err := decoder.Decode(data)
if err != nil {
_, _ = fmt.Fprintf(os.Stdout, "[END RESTORE] error decode data %s\n", err.Error())
return err
}
if err := b.set(data.Key, data.Value); err != nil {
_, _ = fmt.Fprintf(os.Stdout, "[END RESTORE] error persist data %s\n", err.Error())
return err
}
totalRestored++
}
// read closing bracket
_, err := decoder.Token()
if err != nil {
_, _ = fmt.Fprintf(os.Stdout, "[END RESTORE] error %s\n", err.Error())
return err
}
_, _ = fmt.Fprintf(os.Stdout, "[END RESTORE] success restore %d messages in snapshot\n", totalRestored)
return nil
}
// NewBadger raft.FSM implementation using badgerDB
func NewBadger(badgerDB *badger.DB) raft.FSM {
return &badgerFSM{
db: badgerDB,
}
}