/
kv.go
118 lines (97 loc) · 2.02 KB
/
kv.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package main
import (
"bytes"
"encoding/binary"
"math/rand"
"time"
"github.com/cockroachdb/pebble"
"github.com/reusee/dscope"
"github.com/reusee/e4"
"github.com/reusee/sb"
"go.etcd.io/etcd/raft/v3"
"google.golang.org/protobuf/proto"
)
type KVScope struct{}
type NewKVScope func() Scope
func (_ NodeScope) NewKVScope(
node raft.Node,
scope Scope,
) NewKVScope {
return func() Scope {
kvDefs := dscope.Methods(new(KVScope))
kvDefs = append(kvDefs, &node)
kvScope := scope.Fork(kvDefs...)
return kvScope
}
}
type Set func(key any, value any) error
type Get func(key any, target any) error
func (_ KVScope) KV(
peb *pebble.DB,
node raft.Node,
wt NodeWaitTree,
reading Reading,
) (
set Set,
get Get,
) {
set = func(key any, value any) (err error) {
defer he(&err)
keyBuf := new(bytes.Buffer)
ce(sb.Copy(
sb.Marshal(func() (Namespace, any) {
return NamespaceKV, key
}),
sb.Encode(keyBuf),
))
bsKey := keyBuf.Bytes()
valueBuf := new(bytes.Buffer)
ce(sb.Copy(
sb.Marshal(value),
sb.Encode(valueBuf),
))
bsValue := valueBuf.Bytes()
data, err := proto.Marshal(&SetProposal{
Key: bsKey,
Value: bsValue,
})
ce(err)
ce(node.Propose(wt.Ctx, data))
return
}
get = func(key any, target any) (err error) {
defer he(&err)
data := make([]byte, 8)
binary.LittleEndian.PutUint64(data, uint64(rand.Int63()))
rKey := *(*[8]byte)(data)
ready := make(chan struct{})
reading.Store(rKey, ready)
ce(node.ReadIndex(wt.Ctx, data))
select {
case <-ready:
case <-time.After(time.Second * 8):
reading.Delete(rKey)
return we(ErrTimeout)
}
buf := new(bytes.Buffer)
// (NamespaceKV, key) -> value
ce(sb.Copy(
sb.Marshal(func() (Namespace, any) {
return NamespaceKV, key
}),
sb.Encode(buf),
))
bsKey := buf.Bytes()
value, cl, err := peb.Get(bsKey)
if err == pebble.ErrNotFound {
return we(ErrKeyNotFound)
}
ce(sb.Copy(
sb.Decode(bytes.NewReader(value)),
sb.Unmarshal(target),
), e4.Close(cl))
ce(cl.Close())
return
}
return
}