-
Notifications
You must be signed in to change notification settings - Fork 38
/
injest.go
45 lines (39 loc) · 1.2 KB
/
injest.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package reconciledloader
import (
"github.com/ipfs/go-cid"
"go.opentelemetry.io/otel/trace"
"github.com/ipfs/go-graphsync"
)
// IngestResponse ingests new remote items into the reconciled loader
func (rl *ReconciledLoader) IngestResponse(md graphsync.LinkMetadata, traceLink trace.Link, blocks map[cid.Cid][]byte) {
if md.Length() == 0 {
return
}
duplicates := make(map[cid.Cid]struct{}, md.Length())
items := make([]*remotedLinkedItem, 0, md.Length())
md.Iterate(func(link cid.Cid, action graphsync.LinkAction) {
newItem := newRemote()
newItem.link = link
newItem.action = action
if action == graphsync.LinkActionPresent {
if _, isDuplicate := duplicates[link]; !isDuplicate {
duplicates[link] = struct{}{}
newItem.block = blocks[link]
}
}
newItem.traceLink = traceLink
items = append(items, newItem)
})
rl.lock.Lock()
// refuse to queue items when the request is ofline
if !rl.open {
// don't hold block memory if we're dropping these
freeList(items)
rl.lock.Unlock()
return
}
buffered := rl.remoteQueue.queue(items)
rl.signal.Signal()
rl.lock.Unlock()
log.Debugw("injested blocks for new response", "request_id", rl.requestID, "total_queued_bytes", buffered)
}