/
dagbuilder.go
227 lines (195 loc) · 5.78 KB
/
dagbuilder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
package helpers
import (
"context"
"io"
"os"
dag "github.com/ipfs/go-ipfs/merkledag"
ft "github.com/ipfs/go-ipfs/unixfs"
chunker "gx/ipfs/QmWo8jYc19ppG7YoTsrr2kEtLRbARTJho5oNXFTR6B7Peq/go-ipfs-chunker"
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
files "gx/ipfs/QmceUdzxkimdYsgtX733uNgzf1DLHyBKN6ehGSp85ayppM/go-ipfs-cmdkit/files"
ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
)
// DagBuilderHelper wraps together a bunch of objects needed to
// efficiently create unixfs dag trees
type DagBuilderHelper struct {
dserv ipld.DAGService
spl chunker.Splitter
recvdErr error
rawLeaves bool
nextData []byte // the next item to return.
maxlinks int
batch *ipld.Batch
fullPath string
stat os.FileInfo
prefix *cid.Prefix
}
// DagBuilderParams wraps configuration options to create a DagBuilderHelper
// from a chunker.Splitter.
type DagBuilderParams struct {
// Maximum number of links per intermediate node
Maxlinks int
// RawLeaves signifies that the importer should use raw ipld nodes as leaves
// instead of using the unixfs TRaw type
RawLeaves bool
// CID Prefix to use if set
Prefix *cid.Prefix
// DAGService to write blocks to (required)
Dagserv ipld.DAGService
// NoCopy signals to the chunker that it should track fileinfo for
// filestore adds
NoCopy bool
}
// New generates a new DagBuilderHelper from the given params and a given
// chunker.Splitter as data source.
func (dbp *DagBuilderParams) New(spl chunker.Splitter) *DagBuilderHelper {
db := &DagBuilderHelper{
dserv: dbp.Dagserv,
spl: spl,
rawLeaves: dbp.RawLeaves,
prefix: dbp.Prefix,
maxlinks: dbp.Maxlinks,
batch: ipld.NewBatch(context.TODO(), dbp.Dagserv),
}
if fi, ok := spl.Reader().(files.FileInfo); dbp.NoCopy && ok {
db.fullPath = fi.AbsPath()
db.stat = fi.Stat()
}
return db
}
// prepareNext consumes the next item from the splitter and puts it
// in the nextData field. it is idempotent-- if nextData is full
// it will do nothing.
func (db *DagBuilderHelper) prepareNext() {
// if we already have data waiting to be consumed, we're ready
if db.nextData != nil || db.recvdErr != nil {
return
}
db.nextData, db.recvdErr = db.spl.NextBytes()
if db.recvdErr == io.EOF {
db.recvdErr = nil
}
}
// Done returns whether or not we're done consuming the incoming data.
func (db *DagBuilderHelper) Done() bool {
// ensure we have an accurate perspective on data
// as `done` this may be called before `next`.
db.prepareNext() // idempotent
if db.recvdErr != nil {
return false
}
return db.nextData == nil
}
// Next returns the next chunk of data to be inserted into the dag
// if it returns nil, that signifies that the stream is at an end, and
// that the current building operation should finish.
func (db *DagBuilderHelper) Next() ([]byte, error) {
db.prepareNext() // idempotent
d := db.nextData
db.nextData = nil // signal we've consumed it
if db.recvdErr != nil {
return nil, db.recvdErr
}
return d, nil
}
// GetDagServ returns the dagservice object this Helper is using
func (db *DagBuilderHelper) GetDagServ() ipld.DAGService {
return db.dserv
}
// NewUnixfsNode creates a new Unixfs node to represent a file.
func (db *DagBuilderHelper) NewUnixfsNode() *UnixfsNode {
n := &UnixfsNode{
node: new(dag.ProtoNode),
ufmt: &ft.FSNode{Type: ft.TFile},
}
n.SetPrefix(db.prefix)
return n
}
// newUnixfsBlock creates a new Unixfs node to represent a raw data block
func (db *DagBuilderHelper) newUnixfsBlock() *UnixfsNode {
n := &UnixfsNode{
node: new(dag.ProtoNode),
ufmt: &ft.FSNode{Type: ft.TRaw},
}
n.SetPrefix(db.prefix)
return n
}
// FillNodeLayer will add datanodes as children to the give node until
// at most db.indirSize nodes are added.
func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error {
// while we have room AND we're not done
for node.NumChildren() < db.maxlinks && !db.Done() {
child, err := db.GetNextDataNode()
if err != nil {
return err
}
if err := node.AddChild(child, db); err != nil {
return err
}
}
return nil
}
// GetNextDataNode builds a UnixFsNode with the data obtained from the
// Splitter, given the constraints (BlockSizeLimit, RawLeaves) specified
// when creating the DagBuilderHelper.
func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) {
data, err := db.Next()
if err != nil {
return nil, err
}
if data == nil { // we're done!
return nil, nil
}
if len(data) > BlockSizeLimit {
return nil, ErrSizeLimitExceeded
}
if db.rawLeaves {
if db.prefix == nil {
return &UnixfsNode{
rawnode: dag.NewRawNode(data),
raw: true,
}, nil
}
rawnode, err := dag.NewRawNodeWPrefix(data, *db.prefix)
if err != nil {
return nil, err
}
return &UnixfsNode{
rawnode: rawnode,
raw: true,
}, nil
}
blk := db.newUnixfsBlock()
blk.SetData(data)
return blk, nil
}
// SetPosInfo sets the offset information of a node using the fullpath and stat
// from the DagBuilderHelper.
func (db *DagBuilderHelper) SetPosInfo(node *UnixfsNode, offset uint64) {
if db.fullPath != "" {
node.SetPosInfo(offset, db.fullPath, db.stat)
}
}
// Add sends a node to the DAGService, and returns it.
func (db *DagBuilderHelper) Add(node *UnixfsNode) (ipld.Node, error) {
dn, err := node.GetDagNode()
if err != nil {
return nil, err
}
err = db.dserv.Add(context.TODO(), dn)
if err != nil {
return nil, err
}
return dn, nil
}
// Maxlinks returns the configured maximum number for links
// for nodes built with this helper.
func (db *DagBuilderHelper) Maxlinks() int {
return db.maxlinks
}
// Close has the DAGService perform a batch Commit operation.
// It should be called at the end of the building process to make
// sure all data is persisted.
func (db *DagBuilderHelper) Close() error {
return db.batch.Commit()
}