/
coalesce.go
140 lines (120 loc) · 2.91 KB
/
coalesce.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package coalesce
import (
"io"
"sync"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsq "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
var (
putKey = "put"
getKey = // parent keys
"get"
hasKey = "has"
deleteKey = "delete"
)
type keySync struct {
op string
k ds.Key
value interface{}
}
type valSync struct {
val interface{}
err error
done chan struct{}
}
// Datastore uses golang-lru for internal storage.
type datastore struct {
child ds.Datastore
reqmu sync.Mutex
req map[keySync]*valSync
}
// Wrap wraps a given datastore with a coalescing datastore.
// All simultaenous requests which have the same keys will
// yield the exact same result. Note that this shares
// memory. It is not possible to copy a generic interface{}
func Wrap(d ds.Datastore) ds.Datastore {
return &datastore{child: d, req: make(map[keySync]*valSync)}
}
// sync synchronizes requests for a given key.
func (d *datastore) sync(k keySync) (vs *valSync, found bool) {
d.reqmu.Lock()
vs, found = d.req[k]
if !found {
vs = &valSync{done: make(chan struct{})}
d.req[k] = vs
}
d.reqmu.Unlock()
// if we did find one, wait till it's done.
if found {
<-vs.done
}
return vs, found
}
// sync synchronizes requests for a given key.
func (d *datastore) syncDone(k keySync) {
d.reqmu.Lock()
vs, found := d.req[k]
if !found {
panic("attempt to syncDone non-existent request")
}
delete(d.req, k)
d.reqmu.Unlock()
// release all the waiters.
close(vs.done)
}
// Put stores the object `value` named by `key`.
func (d *datastore) Put(key ds.Key, value interface{}) (err error) {
ks := keySync{putKey, key, value}
vs, found := d.sync(ks)
if !found {
vs.err = d.child.Put(key, value)
d.syncDone(ks)
}
return err
}
// Get retrieves the object `value` named by `key`.
func (d *datastore) Get(key ds.Key) (value interface{}, err error) {
ks := keySync{getKey, key, nil}
vs, found := d.sync(ks)
if !found {
vs.val, vs.err = d.child.Get(key)
d.syncDone(ks)
}
return vs.val, vs.err
}
// Has returns whether the `key` is mapped to a `value`.
func (d *datastore) Has(key ds.Key) (exists bool, err error) {
ks := keySync{hasKey, key, nil}
vs, found := d.sync(ks)
if !found {
vs.val, vs.err = d.child.Has(key)
d.syncDone(ks)
}
return vs.val.(bool), vs.err
}
// Delete removes the value for given `key`.
func (d *datastore) Delete(key ds.Key) (err error) {
ks := keySync{deleteKey, key, nil}
vs, found := d.sync(ks)
if !found {
vs.err = d.child.Delete(key)
d.syncDone(ks)
}
return vs.err
}
// Query returns a list of keys in the datastore
func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
// query not coalesced yet.
return d.child.Query(q)
}
func (d *datastore) Close() error {
d.reqmu.Lock()
defer d.reqmu.Unlock()
for _, s := range d.req {
<-s.done
}
if c, ok := d.child.(io.Closer); ok {
return c.Close()
}
return nil
}