-
Notifications
You must be signed in to change notification settings - Fork 0
/
merkledag.go
320 lines (270 loc) · 6.93 KB
/
merkledag.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
// package merkledag implements the ipfs Merkle DAG datastructures.
package merkledag
import (
"fmt"
"sync"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
blocks "github.com/ipfs/go-ipfs/blocks"
key "github.com/ipfs/go-ipfs/blocks/key"
bserv "github.com/ipfs/go-ipfs/blockservice"
logging "github.com/ipfs/go-ipfs/vendor/QmTBXYb6y2ZcJmoXVKk3pf9rzSEjbCg7tQaJW7RSuH14nv/go-log"
)
var log = logging.Logger("merkledag")
var ErrNotFound = fmt.Errorf("merkledag: not found")
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Add(*Node) (key.Key, error)
AddRecursive(*Node) error
Get(context.Context, key.Key) (*Node, error)
Remove(*Node) error
// GetDAG returns, in order, all the single leve child
// nodes of the passed in node.
GetDAG(context.Context, *Node) []NodeGetter
GetNodes(context.Context, []key.Key) []NodeGetter
Batch() *Batch
}
func NewDAGService(bs *bserv.BlockService) DAGService {
return &dagService{bs}
}
// dagService is an IPFS Merkle DAG service.
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
// TODO: should cache Nodes that are in memory, and be
// able to free some of them when vm pressure is high
type dagService struct {
Blocks *bserv.BlockService
}
// Add adds a node to the dagService, storing the block in the BlockService
func (n *dagService) Add(nd *Node) (key.Key, error) {
if n == nil { // FIXME remove this assertion. protect with constructor invariant
return "", fmt.Errorf("dagService is nil")
}
d, err := nd.Encoded(false)
if err != nil {
return "", err
}
b := new(blocks.Block)
b.Data = d
b.Multihash, err = nd.Multihash()
if err != nil {
return "", err
}
return n.Blocks.AddBlock(b)
}
func (n *dagService) Batch() *Batch {
return &Batch{ds: n, MaxSize: 8 * 1024 * 1024}
}
// AddRecursive adds the given node and all child nodes to the BlockService
func (n *dagService) AddRecursive(nd *Node) error {
_, err := n.Add(nd)
if err != nil {
log.Info("AddRecursive Error: %s\n", err)
return err
}
for _, link := range nd.Links {
if link.Node != nil {
err := n.AddRecursive(link.Node)
if err != nil {
return err
}
}
}
return nil
}
// Get retrieves a node from the dagService, fetching the block in the BlockService
func (n *dagService) Get(ctx context.Context, k key.Key) (*Node, error) {
if n == nil {
return nil, fmt.Errorf("dagService is nil")
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
b, err := n.Blocks.GetBlock(ctx, k)
if err != nil {
if err == bserv.ErrNotFound {
return nil, ErrNotFound
}
return nil, err
}
return Decoded(b.Data)
}
// Remove deletes the given node and all of its children from the BlockService
func (n *dagService) Remove(nd *Node) error {
for _, l := range nd.Links {
if l.Node != nil {
n.Remove(l.Node)
}
}
k, err := nd.Key()
if err != nil {
return err
}
return n.Blocks.DeleteBlock(k)
}
// FetchGraph asynchronously fetches all nodes that are children of the given
// node, and returns a channel that may be waited upon for the fetch to complete
func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{} {
log.Warning("Untested.")
var wg sync.WaitGroup
done := make(chan struct{})
for _, l := range root.Links {
wg.Add(1)
go func(lnk *Link) {
// Signal child is done on way out
defer wg.Done()
select {
case <-ctx.Done():
return
}
nd, err := lnk.GetNode(ctx, serv)
if err != nil {
log.Debug(err)
return
}
// Wait for children to finish
<-FetchGraph(ctx, nd, serv)
}(l)
}
go func() {
wg.Wait()
done <- struct{}{}
}()
return done
}
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
func FindLinks(links []key.Key, k key.Key, start int) []int {
var out []int
for i, lnk_k := range links[start:] {
if k == lnk_k {
out = append(out, i+start)
}
}
return out
}
// GetDAG will fill out all of the links of the given Node.
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
func (ds *dagService) GetDAG(ctx context.Context, root *Node) []NodeGetter {
var keys []key.Key
for _, lnk := range root.Links {
keys = append(keys, key.Key(lnk.Hash))
}
return ds.GetNodes(ctx, keys)
}
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
func (ds *dagService) GetNodes(ctx context.Context, keys []key.Key) []NodeGetter {
// Early out if no work to do
if len(keys) == 0 {
return nil
}
promises := make([]NodeGetter, len(keys))
sendChans := make([]chan<- *Node, len(keys))
for i := range keys {
promises[i], sendChans[i] = newNodePromise(ctx)
}
dedupedKeys := dedupeKeys(keys)
go func() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
blkchan := ds.Blocks.GetBlocks(ctx, dedupedKeys)
for count := 0; count < len(keys); {
select {
case blk, ok := <-blkchan:
if !ok {
return
}
nd, err := Decoded(blk.Data)
if err != nil {
// NB: can happen with improperly formatted input data
log.Debug("Got back bad block!")
return
}
is := FindLinks(keys, blk.Key(), 0)
for _, i := range is {
count++
sendChans[i] <- nd
}
case <-ctx.Done():
return
}
}
}()
return promises
}
// Remove duplicates from a list of keys
func dedupeKeys(ks []key.Key) []key.Key {
kmap := make(map[key.Key]struct{})
var out []key.Key
for _, k := range ks {
if _, ok := kmap[k]; !ok {
kmap[k] = struct{}{}
out = append(out, k)
}
}
return out
}
func newNodePromise(ctx context.Context) (NodeGetter, chan<- *Node) {
ch := make(chan *Node, 1)
return &nodePromise{
recv: ch,
ctx: ctx,
}, ch
}
type nodePromise struct {
cache *Node
recv <-chan *Node
ctx context.Context
}
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
type NodeGetter interface {
Get(context.Context) (*Node, error)
}
func (np *nodePromise) Get(ctx context.Context) (*Node, error) {
if np.cache != nil {
return np.cache, nil
}
select {
case blk := <-np.recv:
np.cache = blk
case <-np.ctx.Done():
return nil, np.ctx.Err()
case <-ctx.Done():
return nil, ctx.Err()
}
return np.cache, nil
}
type Batch struct {
ds *dagService
blocks []*blocks.Block
size int
MaxSize int
}
func (t *Batch) Add(nd *Node) (key.Key, error) {
d, err := nd.Encoded(false)
if err != nil {
return "", err
}
b := new(blocks.Block)
b.Data = d
b.Multihash, err = nd.Multihash()
if err != nil {
return "", err
}
k := key.Key(b.Multihash)
t.blocks = append(t.blocks, b)
t.size += len(b.Data)
if t.size > t.MaxSize {
return k, t.Commit()
}
return k, nil
}
func (t *Batch) Commit() error {
_, err := t.ds.Blocks.AddBlocks(t.blocks)
t.blocks = nil
t.size = 0
return err
}