-
Notifications
You must be signed in to change notification settings - Fork 0
/
store.go
199 lines (179 loc) · 4.48 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
package store
import (
"bytes"
"path"
"sync"
"time"
"github.com/intob/rocketkv/cfg"
"github.com/intob/rocketkv/util"
"github.com/spf13/viper"
)
type Store struct {
Parts map[uint64]*Part
Dir string
}
func NewStore() *Store {
st := &Store{
Dir: viper.GetString(cfg.DIR),
}
ensureManifest(st)
readFromBlockFiles(st)
sp := viper.GetInt(cfg.SCAN_PERIOD)
go scanForExpiredKeys(st, sp)
if viper.GetBool(cfg.PERSIST) {
wp := viper.GetInt(cfg.WRITE_PERIOD)
dir := viper.GetString(cfg.DIR)
go st.Persist(dir, wp)
}
return st
}
// Get slot for specified key
// from appropriate partition
func (s *Store) Get(key string) (*Slot, bool) {
ns, name := path.Split(key)
h := hashKey(ns, name)
block := s.getClosestPart(h).getClosestBlock(h)
block.Mutex.RLock()
defer block.Mutex.RUnlock()
slot, found := block.Slots[name]
return &slot, found
}
// Set specified slot in appropriate block
func (s *Store) Set(key string, slot Slot, repl bool) {
ns, name := path.Split(key)
h := hashKey(ns, name)
block := s.getClosestPart(h).getClosestBlock(h)
if repl && block.Slots[name].Modified > slot.Modified {
// if key has been modified since, skip it
return
} else {
slot.Modified = time.Now().Unix()
}
block.Mutex.Lock()
block.Slots[name] = slot
block.MustWrite = true
// don't re-replicate (for now)
// TODO: think more about this, maybe it's better
// to re-replicate except to origin of repl.
// This would ensure that all replicas arrive at a consistent state,
// even if they are not all connected. However, it increases the amount
// of work that is done. Maybe we can make this a config option.
if !repl {
for _, replNodeState := range block.ReplState {
if replNodeState != nil {
replNodeState.MustSync = true
}
}
}
block.Mutex.Unlock()
}
// Remove slot with specified key
//
// TODO: add key to list of deletes to replicate
func (s *Store) Del(key string) {
ns, name := path.Split(key)
h := hashKey(ns, name)
block := s.getClosestPart(h).getClosestBlock(h)
block.Mutex.Lock()
delete(block.Slots, name)
block.MustWrite = true
for _, replNodeState := range block.ReplState {
if replNodeState != nil {
replNodeState.MustSync = true
}
}
block.Mutex.Unlock()
}
// Returns channel for list of matching keys
//
// If a namespace is given only that namespace will be searched
func (s *Store) List(key string, bufferSize int) <-chan string {
output := make(chan string, bufferSize)
// split into namespace & path if given a path separator
ns, name := path.Split(key)
if ns == "" {
// namespace is empty, search all parts
wg := new(sync.WaitGroup)
for _, part := range s.Parts {
wg.Add(1)
go func(part *Part) {
part.listKeys(ns, name, output)
wg.Done()
}(part)
}
// close output chan when done
go func() {
wg.Wait()
close(output)
}()
} else {
go func() {
// namespace is given, search only namespace part
h := hashKey(ns, name)
part := s.getClosestPart(h)
part.listKeys(ns, name, output)
close(output)
}()
}
return output
}
func (s *Store) Count(key string) uint64 {
// split into namespace & path if given a path separator
ns, name := path.Split(key)
if ns == "" {
// namespace is empty, search all parts
var count uint64
mu := new(sync.Mutex)
wg := new(sync.WaitGroup)
for _, part := range s.Parts {
wg.Add(1)
go func(part *Part) {
c := part.countKeys(name)
mu.Lock()
count += c
mu.Unlock()
wg.Done()
}(part)
}
wg.Wait()
return count
} else {
// search only given namespace
h := hashKey(ns, name)
part := s.getClosestPart(h)
return part.countKeys(name)
}
}
// Returns pointer to part with least Hamming distance
// from given key hash
func (s *Store) getClosestPart(keyHash []byte) *Part {
var clDist []byte // winning distance
var clPart *Part // winning part
dist := make([]byte, util.ID_LEN)
// range through parts to find closest
for _, part := range s.Parts {
util.FastXor(dist, keyHash, part.Id)
if clDist == nil || bytes.Compare(dist, clDist) < 0 {
clPart = part
clDist = dist
}
// reset dist slice
dist = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
}
return clPart
}
// Returns hash of key
//
// If key contains a path separator, the key is split
// into namespace & name (like dir & filename). In this
// case, only the namespace is hashed.
func hashKey(namespace, name string) []byte {
// hash namespace if not empty
var h []byte
if namespace == "" {
h = util.HashStr(name)
} else {
h = util.HashStr(namespace)
}
return h
}