generated from ipfs/ipfs-repository-template
-
Notifications
You must be signed in to change notification settings - Fork 83
/
reprovide.go
255 lines (216 loc) · 6.1 KB
/
reprovide.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
package simple
import (
"context"
"errors"
"fmt"
"time"
"github.com/cenkalti/backoff"
blocks "github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/fetcher"
fetcherhelpers "github.com/ipfs/boxo/fetcher/helpers"
"github.com/ipfs/boxo/verifcid"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil"
logging "github.com/ipfs/go-log"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p/core/routing"
)
var logR = logging.Logger("reprovider.simple")
// ErrClosed is returned by Trigger when operating on a closed reprovider.
var ErrClosed = errors.New("reprovider service stopped")
// KeyChanFunc is function streaming CIDs to pass to content routing
type KeyChanFunc func(context.Context) (<-chan cid.Cid, error)
// Reprovider reannounces blocks to the network
type Reprovider struct {
// Reprovider context. Cancel to stop, then wait on closedCh.
ctx context.Context
cancel context.CancelFunc
closedCh chan struct{}
// Trigger triggers a reprovide.
trigger chan chan<- error
// The routing system to provide values through
rsys routing.ContentRouting
keyProvider KeyChanFunc
tick time.Duration
}
// NewReprovider creates new Reprovider instance.
func NewReprovider(ctx context.Context, reprovideInterval time.Duration, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider {
ctx, cancel := context.WithCancel(ctx)
return &Reprovider{
ctx: ctx,
cancel: cancel,
closedCh: make(chan struct{}),
trigger: make(chan chan<- error),
rsys: rsys,
keyProvider: keyProvider,
tick: reprovideInterval,
}
}
// Close the reprovider
func (rp *Reprovider) Close() error {
rp.cancel()
<-rp.closedCh
return nil
}
// Run re-provides keys with 'tick' interval or when triggered
func (rp *Reprovider) Run() {
defer close(rp.closedCh)
var initialReprovideCh, reprovideCh <-chan time.Time
// If reproviding is enabled (non-zero)
if rp.tick > 0 {
reprovideTicker := time.NewTicker(rp.tick)
defer reprovideTicker.Stop()
reprovideCh = reprovideTicker.C
// If the reprovide ticker is larger than a minute (likely),
// provide once after we've been up a minute.
//
// Don't provide _immediately_ as we might be just about to stop.
if rp.tick > time.Minute {
initialReprovideTimer := time.NewTimer(time.Minute)
defer initialReprovideTimer.Stop()
initialReprovideCh = initialReprovideTimer.C
}
}
var done chan<- error
for rp.ctx.Err() == nil {
select {
case <-initialReprovideCh:
case <-reprovideCh:
case done = <-rp.trigger:
case <-rp.ctx.Done():
return
}
err := rp.Reprovide()
// only log if we've hit an actual error, otherwise just tell the client we're shutting down
if rp.ctx.Err() != nil {
err = ErrClosed
} else if err != nil {
logR.Errorf("failed to reprovide: %s", err)
}
if done != nil {
if err != nil {
done <- err
}
close(done)
}
}
}
// Reprovide registers all keys given by rp.keyProvider to libp2p content routing
func (rp *Reprovider) Reprovide() error {
keychan, err := rp.keyProvider(rp.ctx)
if err != nil {
return fmt.Errorf("failed to get key chan: %s", err)
}
for c := range keychan {
// hash security
if err := verifcid.ValidateCid(c); err != nil {
logR.Errorf("insecure hash in reprovider, %s (%s)", c, err)
continue
}
op := func() error {
err := rp.rsys.Provide(rp.ctx, c, true)
if err != nil {
logR.Debugf("Failed to provide key: %s", err)
}
return err
}
err := backoff.Retry(op, backoff.WithContext(backoff.NewExponentialBackOff(), rp.ctx))
if err != nil {
logR.Debugf("Providing failed after number of retries: %s", err)
return err
}
}
return nil
}
// Trigger starts the reprovision process in rp.Run and waits for it to finish.
//
// Returns an error if a reprovide is already in progress.
func (rp *Reprovider) Trigger(ctx context.Context) error {
resultCh := make(chan error, 1)
select {
case rp.trigger <- resultCh:
default:
return fmt.Errorf("reprovider is already running")
}
select {
case err := <-resultCh:
return err
case <-rp.ctx.Done():
return ErrClosed
case <-ctx.Done():
return ctx.Err()
}
}
// Strategies
// NewBlockstoreProvider returns key provider using bstore.AllKeysChan
func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc {
return func(ctx context.Context) (<-chan cid.Cid, error) {
return bstore.AllKeysChan(ctx)
}
}
// Pinner interface defines how the simple.Reprovider wants to interact
// with a Pinning service
type Pinner interface {
DirectKeys(ctx context.Context) ([]cid.Cid, error)
RecursiveKeys(ctx context.Context) ([]cid.Cid, error)
}
// NewPinnedProvider returns provider supplying pinned keys
func NewPinnedProvider(onlyRoots bool, pinning Pinner, fetchConfig fetcher.Factory) KeyChanFunc {
return func(ctx context.Context) (<-chan cid.Cid, error) {
set, err := pinSet(ctx, pinning, fetchConfig, onlyRoots)
if err != nil {
return nil, err
}
outCh := make(chan cid.Cid)
go func() {
defer close(outCh)
for c := range set.New {
select {
case <-ctx.Done():
return
case outCh <- c:
}
}
}()
return outCh, nil
}
}
func pinSet(ctx context.Context, pinning Pinner, fetchConfig fetcher.Factory, onlyRoots bool) (*cidutil.StreamingSet, error) {
set := cidutil.NewStreamingSet()
go func() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer close(set.New)
dkeys, err := pinning.DirectKeys(ctx)
if err != nil {
logR.Errorf("reprovide direct pins: %s", err)
return
}
for _, key := range dkeys {
set.Visitor(ctx)(key)
}
rkeys, err := pinning.RecursiveKeys(ctx)
if err != nil {
logR.Errorf("reprovide indirect pins: %s", err)
return
}
session := fetchConfig.NewSession(ctx)
for _, key := range rkeys {
set.Visitor(ctx)(key)
if !onlyRoots {
err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: key}, func(res fetcher.FetchResult) error {
clink, ok := res.LastBlockLink.(cidlink.Link)
if ok {
set.Visitor(ctx)(clink.Cid)
}
return nil
})
if err != nil {
logR.Errorf("reprovide indirect pins: %s", err)
return
}
}
}
}()
return set, nil
}