This repository has been archived by the owner on Nov 24, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
piecer.go
271 lines (234 loc) · 7.09 KB
/
piecer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
package piecer
import (
"context"
"fmt"
"io"
"sync"
"time"
"github.com/dustin/go-humanize"
commcid "github.com/filecoin-project/go-fil-commcid"
commP "github.com/filecoin-project/go-fil-commp-hashhash"
"github.com/ipfs/go-cid"
httpapi "github.com/ipfs/go-ipfs-http-client"
"github.com/ipfs/interface-go-ipfs-core/options"
"github.com/ipld/go-car"
"github.com/multiformats/go-multiaddr"
"github.com/textileio/broker-core/broker"
store "github.com/textileio/broker-core/cmd/piecerd/store"
"github.com/textileio/broker-core/ipfsutil"
"github.com/textileio/broker-core/metrics"
mbroker "github.com/textileio/broker-core/msgbroker"
logger "github.com/textileio/go-log/v2"
"go.opentelemetry.io/otel/metric"
)
const maxPaddingSize = 32 << 30
var log = logger.Logger("piecer")
// Piecer provides a data-preparation pipeline for Batchs.
type Piecer struct {
mb mbroker.MsgBroker
ipfsApis []ipfsutil.IpfsAPI
store *store.Store
daemonFrequency time.Duration
retryDelay time.Duration
padToSize uint64
newRequest chan struct{}
daemonCtx context.Context
daemonCancelCtx context.CancelFunc
daemonClosed chan struct{}
statLastSize int64
metricLastSize metric.Int64GaugeObserver
statLastDurationSeconds int64
metricLastDurationSeconds metric.Int64GaugeObserver
metricNewPrepare metric.Int64Counter
statLastPrepared time.Time
metricLastPrepared metric.Int64GaugeObserver
}
// New returns a new Piecer.
func New(
postgresURI string,
ipfsEndpoints []multiaddr.Multiaddr,
mb mbroker.MsgBroker,
daemonFrequency time.Duration,
retryDelay time.Duration,
padToSize uint64) (*Piecer, error) {
ipfsApis := make([]ipfsutil.IpfsAPI, len(ipfsEndpoints))
if padToSize&(padToSize-1) != 0 {
return nil, fmt.Errorf("pad to size %d must be a power of two", padToSize)
}
if padToSize > maxPaddingSize {
return nil, fmt.Errorf("pad to size can't be greater than 32GiB")
}
for i, endpoint := range ipfsEndpoints {
api, err := httpapi.NewApi(endpoint)
if err != nil {
return nil, fmt.Errorf("creating ipfs api: %s", err)
}
coreapi, err := api.WithOptions(options.Api.Offline(true))
if err != nil {
return nil, fmt.Errorf("creating offline core api: %s", err)
}
ipfsApis[i] = ipfsutil.IpfsAPI{Address: endpoint, API: coreapi}
}
s, err := store.New(postgresURI)
if err != nil {
return nil, fmt.Errorf("initializing store: %s", err)
}
ctx, cls := context.WithCancel(context.Background())
p := &Piecer{
store: s,
ipfsApis: ipfsApis,
mb: mb,
daemonFrequency: daemonFrequency,
retryDelay: retryDelay,
padToSize: padToSize,
newRequest: make(chan struct{}, 1),
daemonCtx: ctx,
daemonCancelCtx: cls,
daemonClosed: make(chan struct{}),
}
p.initMetrics()
go p.daemonPreparer()
return p, nil
}
// ReadyToPrepare signals the Piecer that a new batch is ready to be prepared.
func (p *Piecer) ReadyToPrepare(ctx context.Context, sdID broker.BatchID, dataCid cid.Cid) error {
if sdID == "" {
return fmt.Errorf("batch id is empty")
}
if !dataCid.Defined() {
return fmt.Errorf("data-cid is undefined")
}
if err := p.store.CreateUnpreparedBatch(ctx, sdID, dataCid); err != nil {
return fmt.Errorf("creating unprepared-batch %s %s: %w", sdID, dataCid, err)
}
log.Debugf("saved unprepared-batch with batch %s and data-cid %s", sdID, dataCid)
select {
case p.newRequest <- struct{}{}:
default:
}
return nil
}
// Close closes the piecer.
func (p *Piecer) Close() error {
log.Info("closing piecer...")
p.daemonCancelCtx()
<-p.daemonClosed
if err := p.store.Close(); err != nil {
return fmt.Errorf("closing store: %s", err)
}
return nil
}
func (p *Piecer) daemonPreparer() {
defer close(p.daemonClosed)
p.newRequest <- struct{}{}
for {
select {
case <-p.daemonCtx.Done():
log.Info("piecer closed")
return
case <-p.newRequest:
case <-time.After(p.daemonFrequency):
}
for {
usd, ok, err := p.store.GetNextPending(p.daemonCtx)
if err != nil {
log.Errorf("get next unprepared batch: %s", err)
break
}
if !ok {
break
}
err = p.prepare(p.daemonCtx, usd)
if err != nil {
p.metricNewPrepare.Add(p.daemonCtx, 1, metrics.AttrError)
log.Errorf("preparing batch %s, data-cid %s: %s", usd.BatchID, usd.DataCid, err)
if err := p.store.MoveToStatus(p.daemonCtx, usd.BatchID, p.retryDelay, store.StatusPending); err != nil {
log.Errorf("moving again to pending: %s", err)
}
break
}
if err := p.store.MoveToStatus(p.daemonCtx, usd.BatchID, 0, store.StatusDone); err != nil {
p.metricNewPrepare.Add(p.daemonCtx, 1, metrics.AttrError)
log.Errorf("deleting batch %s, data-cid %s: %s", usd.BatchID, usd.DataCid, err)
if err := p.store.MoveToStatus(p.daemonCtx, usd.BatchID, p.retryDelay, store.StatusPending); err != nil {
log.Errorf("moving again to pending: %s", err)
}
break
}
p.metricNewPrepare.Add(p.daemonCtx, 1, metrics.AttrOK)
}
}
}
func (p *Piecer) prepare(ctx context.Context, usd store.UnpreparedBatch) error {
start := time.Now()
log.Debugf("preparing batch %s with data-cid %s", usd.BatchID, usd.DataCid)
nodeGetter, found := ipfsutil.GetNodeGetterForCid(p.ipfsApis, usd.DataCid)
if !found {
return fmt.Errorf("node getter for data cid %s not found", usd.DataCid)
}
prCAR, pwCAR := io.Pipe()
var errCarGen error
go func() {
defer func() {
if err := pwCAR.Close(); err != nil {
errCarGen = err
}
}()
if err := car.WriteCar(ctx, nodeGetter, []cid.Cid{usd.DataCid}, pwCAR); err != nil {
errCarGen = err
return
}
}()
var (
errCommP error
wg sync.WaitGroup
dpr broker.DataPreparationResult
)
wg.Add(1)
go func() {
defer wg.Done()
cp := &commP.Calc{}
_, err := io.Copy(cp, prCAR)
if err != nil {
errCommP = fmt.Errorf("copying data to aggregator: %s", err)
return
}
rawCommP, ps, err := cp.Digest()
if err != nil {
errCommP = fmt.Errorf("calculating final digest: %s", err)
return
}
if ps < p.padToSize {
log.Debugf("padding commP from %d to %d", ps, p.padToSize)
rawCommP, err = commP.PadCommP(rawCommP, ps, p.padToSize)
if err != nil {
errCommP = fmt.Errorf("padding commp: %s", err)
return
}
ps = p.padToSize
}
pcid, err := commcid.DataCommitmentV1ToCID(rawCommP)
if err != nil {
errCommP = fmt.Errorf("converting commP to cid: %s", err)
return
}
dpr = broker.DataPreparationResult{
PieceSize: ps,
PieceCid: pcid,
}
}()
wg.Wait()
if errCarGen != nil || errCommP != nil {
return fmt.Errorf("write car err: %s, commP err: %s", errCarGen, errCommP)
}
duration := time.Since(start).Seconds()
log.Debugf("prepared of batch %s, data-cid %s, piece-size %s, piece-cid %s took %.2f seconds",
usd.BatchID, usd.DataCid, humanize.IBytes(dpr.PieceSize), dpr.PieceCid, duration)
if err := mbroker.PublishMsgNewBatchPrepared(ctx, p.mb, usd.BatchID, dpr.PieceCid, dpr.PieceSize); err != nil {
return fmt.Errorf("publish message to message broker: %s", err)
}
p.statLastPrepared = time.Now()
p.statLastSize = int64(dpr.PieceSize)
p.statLastDurationSeconds = int64(duration)
return nil
}