/
carwriter.go
125 lines (120 loc) · 3.3 KB
/
carwriter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package carwriter
import (
"context"
"fmt"
"io"
"github.com/ipfs/go-cid"
stargate "github.com/ipfs/stargate/pkg"
"github.com/ipld/go-car"
"github.com/ipld/go-car/util"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
)
// WriteCar traverses a StarGate query using a resolver to write StarGate CAR response to the given writer
func WriteCar(ctx context.Context, w io.Writer, root cid.Cid, paths stargate.PathSegments, query stargate.Query, appResolver stargate.AppResolver) error {
// write CAR header
header := car.CarHeader{
Version: 1,
Roots: []cid.Cid{root},
}
err := car.WriteHeader(&header, w)
if err != nil {
return fmt.Errorf("writing car header: %w", err)
}
// resolve root
lsys, resolver, err := appResolver.GetResolver(ctx, root)
if err != nil {
return fmt.Errorf("error loading root resolver: %w", err)
}
// resolve all path segments
for len(paths) != 0 {
var path *stargate.Path
path, paths, resolver, err = resolver.ResolvePathSegments(ctx, paths)
if err != nil {
return fmt.Errorf("resolving path segments: %w", err)
}
err = writeStarGateMessageAndBlocks(ctx, w, stargate.StarGateMessage{
Kind: stargate.KindPath,
Path: path,
}, lsys)
if err != nil {
return fmt.Errorf("encoding stargate message and blocks: %w", err)
}
}
// resolve query
queryResolver, err := resolver.ResolveQuery(ctx, query)
if err != nil {
return fmt.Errorf("resolving Query: %w", err)
}
for !queryResolver.Done() {
dag, err := queryResolver.Next()
if err != nil {
return fmt.Errorf("resolving Query Step: %w", err)
}
err = writeStarGateMessageAndBlocks(ctx, w, stargate.StarGateMessage{
Kind: stargate.KindDAG,
DAG: dag,
}, lsys)
if err != nil {
return fmt.Errorf("encoding stargate message and blocks: %w", err)
}
}
return nil
}
type bytesReader interface {
Bytes() []byte
}
// writeStarGateMessageAndBlocks serializes a StarGate message and its associate blocks
func writeStarGateMessageAndBlocks(ctx context.Context, w io.Writer, msg stargate.StarGateMessage, lsys *ipld.LinkSystem) error {
raw, err := stargate.BindnodeRegistry.TypeToBytes(&msg, dagcbor.Encode)
if err != nil {
return err
}
messageLink, err := cid.Prefix{
Version: 1,
Codec: uint64(multicodec.DagCbor),
MhType: multihash.SHA2_256,
MhLength: -1,
}.Sum(raw)
if err != nil {
return err
}
err = util.LdWrite(w, messageLink.Bytes(), raw)
if err != nil {
return err
}
var blockMetadata stargate.BlockMetadata
if msg.Kind == stargate.KindPath {
blockMetadata = msg.Path.Blocks
} else {
blockMetadata = msg.DAG.Blocks
}
for _, blockMetadatum := range blockMetadata {
if blockMetadatum.Status == stargate.BlockStatusPresent {
reader, err := lsys.StorageReadOpener(linking.LinkContext{
Ctx: ctx,
}, cidlink.Link{Cid: blockMetadatum.Link})
if err != nil {
return err
}
var data []byte
if br, ok := reader.(bytesReader); ok {
data = br.Bytes()
} else {
data, err = io.ReadAll(reader)
if err != nil {
return err
}
}
err = util.LdWrite(w, blockMetadatum.Link.Bytes(), data)
if err != nil {
return err
}
}
}
return nil
}