/
runtraversal.go
82 lines (72 loc) · 2.32 KB
/
runtraversal.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package runtraversal
import (
"bytes"
"io"
logging "github.com/ipfs/go-log/v2"
ipld "github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipfs/go-graphsync/ipldutil"
)
var logger = logging.Logger("gs-traversal")
type errorString string
func (e errorString) Error() string {
return string(e)
}
// ErrFirstBlockLoad indicates the traversal was unable to load the very first block in the traversal
const ErrFirstBlockLoad = errorString("Unable to load first block")
// ResponseSender sends responses over the network
type ResponseSender func(
link ipld.Link,
data []byte,
) error
// RunTraversal wraps a given loader with an interceptor that sends loaded
// blocks out to the network with the given response sender.
func RunTraversal(
loader ipld.BlockReadOpener,
traverser ipldutil.Traverser,
sendResponse ResponseSender) error {
for {
isComplete, err := traverser.IsComplete()
if isComplete {
if err != nil {
logger.Errorf("traversal completion check failed, nBlocksRead=%d, err=%s", traverser.NBlocksTraversed(), err)
if (traverser.NBlocksTraversed() == 0 && err == traversal.SkipMe{}) {
return ErrFirstBlockLoad
}
} else {
logger.Debugf("traversal completed successfully, nBlocksRead=%d", traverser.NBlocksTraversed())
}
return err
}
lnk, lnkCtx := traverser.CurrentRequest()
logger.Debugf("will load link=%s", lnk)
result, err := loader(lnkCtx, lnk)
var data []byte
if err != nil {
logger.Errorf("failed to load link=%s, nBlocksRead=%d, err=%s", lnk, traverser.NBlocksTraversed(), err)
traverser.Error(traversal.SkipMe{})
} else {
blockBuffer, ok := result.(*bytes.Buffer)
if !ok {
blockBuffer = new(bytes.Buffer)
_, err = io.Copy(blockBuffer, result)
}
if err != nil {
logger.Errorf("failed to write to buffer, link=%s, nBlocksRead=%d, err=%s", lnk, traverser.NBlocksTraversed(), err)
traverser.Error(err)
} else {
data = blockBuffer.Bytes()
err = traverser.Advance(blockBuffer)
if err != nil {
logger.Errorf("failed to advance traversal, link=%s, nBlocksRead=%d, err=%s", lnk, traverser.NBlocksTraversed(), err)
return err
}
}
logger.Debugf("successfully loaded link=%s, nBlocksRead=%d", lnk, traverser.NBlocksTraversed())
}
err = sendResponse(lnk, data)
if err != nil {
return err
}
}
}