/
option.go
341 lines (302 loc) · 9.77 KB
/
option.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
package dagsync
import (
"errors"
"fmt"
"time"
"github.com/ipfs/go-cid"
"github.com/ipni/go-libipni/announce"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
)
const (
// defaultAddrTTL is the default amount of time that addresses discovered
// from pubsub messages will remain in the peerstore. This is twice the
// default provider poll interval.
defaultAddrTTL = 48 * time.Hour
// defaultIdleHandlerTTL is the default time after which idle publisher
// handlers are removed.
defaultIdleHandlerTTL = time.Hour
// defaultSegDepthLimit disables (-1) segmented sync by default.
defaultSegDepthLimit = -1
// defaultHttpTimeout is time limit for requests made by the HTTP client.
defaultHttpTimeout = 10 * time.Second
)
type LastKnownSyncFunc func(peer.ID) (cid.Cid, bool)
// config contains all options for configuring Subscriber.
type config struct {
addrTTL time.Duration
topic *pubsub.Topic
blockHook BlockHookFunc
idleHandlerTTL time.Duration
lastKnownSync LastKnownSyncFunc
maxAsyncSyncs int
hasRcvr bool
rcvrOpts []announce.Option
rcvrTopic string
adsDepthLimit int64
entriesDepthLimit int64
firstSyncDepth int64
segDepthLimit int64
strictAdsSelSeq bool
httpTimeout time.Duration
httpRetryMax int
httpRetryWaitMin time.Duration
httpRetryWaitMax time.Duration
}
// Option is a function that sets a value in a config.
type Option func(*config) error
// getOpts creates a config and applies Options to it.
func getOpts(opts []Option) (config, error) {
cfg := config{
addrTTL: defaultAddrTTL,
httpTimeout: defaultHttpTimeout,
idleHandlerTTL: defaultIdleHandlerTTL,
segDepthLimit: defaultSegDepthLimit,
strictAdsSelSeq: true,
}
for i, opt := range opts {
if err := opt(&cfg); err != nil {
return config{}, fmt.Errorf("option %d failed: %s", i, err)
}
}
return cfg, nil
}
// AddrTTL sets the peerstore address time-to-live for addresses discovered
// from pubsub messages.
func AddrTTL(addrTTL time.Duration) Option {
return func(c *config) error {
c.addrTTL = addrTTL
return nil
}
}
// Topic provides an existing pubsub topic.
func Topic(topic *pubsub.Topic) Option {
return func(c *config) error {
c.topic = topic
return nil
}
}
// HttpTimeout specifies a time limit for HTTP requests made by the sync
// HTTP client. A value of zero means no timeout.
func HttpTimeout(to time.Duration) Option {
return func(c *config) error {
c.httpTimeout = to
return nil
}
}
// RetryableHTTPClient configures a retriable HTTP client. Setting retryMax to
// zero, the default, disables the retriable client.
func RetryableHTTPClient(retryMax int, waitMin, waitMax time.Duration) Option {
return func(c *config) error {
if waitMin > waitMax {
return errors.New("minimum retry wait time cannot be greater than maximum")
}
if retryMax < 0 {
retryMax = 0
}
c.httpRetryMax = retryMax
c.httpRetryWaitMin = waitMin
c.httpRetryWaitMax = waitMax
return nil
}
}
// BlockHook adds a hook that is run when a block is received via
// Subscriber.Sync along with a SegmentSyncActions to control the sync flow if
// segmented sync is enabled. Note that if segmented sync is disabled, calls on
// SegmentSyncActions will have no effect. See: SegmentSyncActions,
// SegmentDepthLimit, ScopedBlockHook.
func BlockHook(blockHook BlockHookFunc) Option {
return func(c *config) error {
c.blockHook = blockHook
return nil
}
}
// IdleHandlerTTL configures the time after which idle handlers are removed.
func IdleHandlerTTL(ttl time.Duration) Option {
return func(c *config) error {
c.idleHandlerTTL = ttl
return nil
}
}
// Checks that advertisement blocks contain a "PreviousID" field. This can be
// set to false to not do the check if there is no reason to do so.
func StrictAdsSelector(strict bool) Option {
return func(c *config) error {
c.strictAdsSelSeq = strict
return nil
}
}
// AdsDepthLimit sets the maximum number of advertisements in a chain to sync.
// Defaults to unlimited if not specified or set < 1.
func AdsDepthLimit(limit int64) Option {
return func(c *config) error {
c.adsDepthLimit = limit
return nil
}
}
// EntriesDepthLimit sets the maximum number of multihash entries blocks to
// sync per advertisement. Defaults to unlimited if not set or set to < 1.
func EntriesDepthLimit(depth int64) Option {
return func(c *config) error {
c.entriesDepthLimit = depth
return nil
}
}
// FirstSyncDepth sets the advertisement chain depth to sync on the first sync
// with a new provider. A value of 0, the default, means unlimited depth.
func FirstSyncDepth(depth int64) Option {
return func(c *config) error {
if depth < 0 {
depth = 0
}
c.firstSyncDepth = depth
return nil
}
}
// SegmentDepthLimit sets the maximum recursion depth limit for a segmented sync.
// Setting the depth to a value less than zero disables segmented sync completely.
// Disabled by default.
//
// For segmented sync to function at least one of BlockHook or ScopedBlockHook
// must be set.
func SegmentDepthLimit(depth int64) Option {
return func(c *config) error {
c.segDepthLimit = depth
return nil
}
}
// RecvAnnounce enables an announcement message receiver.
func RecvAnnounce(topic string, opts ...announce.Option) Option {
return func(c *config) error {
c.hasRcvr = true
c.rcvrOpts = opts
c.rcvrTopic = topic
return nil
}
}
// MaxAsyncConcurrency sets the maximum number of concurrent asynchrouous syncs
// (started by announce messages). This only takes effect if there is an
// announcement receiver configured by the RecvAnnounce option.
func MaxAsyncConcurrency(n int) Option {
return func(c *config) error {
if n != 0 {
if n < 0 {
n = 0
}
c.maxAsyncSyncs = n
}
return nil
}
}
// WithLastKnownSync sets a function that returns the last known sync, when it
// is not already known to dagsync. This will generally be some CID that is
// known to have already been seen, so that there is no need to fetch portions
// of the dag before this.
func WithLastKnownSync(f LastKnownSyncFunc) Option {
return func(c *config) error {
c.lastKnownSync = f
return nil
}
}
type syncCfg struct {
headAdCid cid.Cid
stopAdCid cid.Cid
blockHook BlockHookFunc
depthLimit int64
segDepthLimit int64
resync bool
}
type SyncOption func(*syncCfg)
// getSyncOpts creates a syncCfg and applies SyncOptions to it.
func getSyncOpts(opts []SyncOption) syncCfg {
var cfg syncCfg
for _, opt := range opts {
opt(&cfg)
}
return cfg
}
// WithHeadAdCid explicitly specifies an advertisement CID to sync to, instead
// of getting this by querying the publisher.
func WithHeadAdCid(headAd cid.Cid) SyncOption {
return func(sc *syncCfg) {
sc.headAdCid = headAd
}
}
// WithStopAdCid explicitly specifies an advertisement CID to stop at, instead
// of using the latest synced advertisement CID..
func WithStopAdCid(stopAd cid.Cid) SyncOption {
return func(sc *syncCfg) {
sc.stopAdCid = stopAd
}
}
// WithResyncAds causes the current sync to ignore advertisements that have been
// previously synced. When true, sync does not record the latest synced CID or
// send sync finished notification.
func WithAdsResync(resync bool) SyncOption {
return func(sc *syncCfg) {
sc.resync = resync
}
}
// ScopedDepthLimit provides a sync depth limit for the current sync. This
// applies to both advertisement and entries chains. If zero or not specified,
// the Subscriber ads or entries depth limit is used. Set to -1 for no limits.
func ScopedDepthLimit(limit int64) SyncOption {
return func(sc *syncCfg) {
sc.depthLimit = limit
}
}
// ScopedSegmentDepthLimit is the equivalent of SegmentDepthLimit option but
// only applied to a single sync. If zero or not specified, the Subscriber
// SegmentDepthLimit option is used instead. Set to -1 for no limits.
//
// For segmented sync to function at least one of BlockHook or ScopedBlockHook
// must be set. See: SegmentDepthLimit.
func ScopedSegmentDepthLimit(depth int64) SyncOption {
return func(sc *syncCfg) {
sc.segDepthLimit = depth
}
}
// ScopedBlockHook is the equivalent of BlockHook option but only applied to a
// single sync. If not specified, the Subscriber BlockHook option is used
// instead. Specifying the ScopedBlockHook will override the Subscriber level
// BlockHook for the current sync.
//
// Calls to SegmentSyncActions from bloc hook will have no impact if segmented
// sync is disabled. See: BlockHook, SegmentDepthLimit,
// ScopedSegmentDepthLimit.
func ScopedBlockHook(hook BlockHookFunc) SyncOption {
return func(sc *syncCfg) {
sc.blockHook = hook
}
}
// MakeGeneralBlockHook creates a block hook function that sets the next sync
// action based on whether the specified advertisement has a previous
// advertisement in the chain..
//
// Use this when segmented sync is enabled and no other blockhook is defined.
//
// The supplied prevAdCid function takes the CID of the current advertisement
// and returns the CID of the previous advertisement in the chain. This would
// typically be done my loading the specified advertisement from the
// ipld.LinkSystem and getting the previous CID.
func MakeGeneralBlockHook(prevAdCid func(adCid cid.Cid) (cid.Cid, error)) BlockHookFunc {
return func(_ peer.ID, adCid cid.Cid, actions SegmentSyncActions) {
// The only kind of block we should get by loading CIDs here should be
// Advertisement.
//
// Because:
// - The default subscription selector only selects advertisements.
// - Entries are synced with an explicit selector separate from
// advertisement syncs and should use dagsync.ScopedBlockHook to
// override this hook and decode chunks instead.
//
// Therefore, we only attempt to load advertisements here and signal
// failure if the load fails.
prevCid, err := prevAdCid(adCid)
if err != nil {
actions.FailSync(err)
} else {
actions.SetNextSyncCid(prevCid)
}
}
}