/
options.go
235 lines (216 loc) · 7.32 KB
/
options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
package mirror
import (
"crypto/rand"
"errors"
"fmt"
"time"
"github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
hamt "github.com/ipld/go-ipld-adl-hamt"
"github.com/ipld/go-ipld-prime/schema"
stischema "github.com/ipni/go-libipni/ingest/schema"
"github.com/ipni/index-provider/engine/chunker"
"github.com/libp2p/go-libp2p"
p2pcrypto "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multicodec"
)
type (
Option func(*options) error
options struct {
h host.Host
ds datastore.Batching
syncInterval time.Duration
httpListenAddr string
initAdRecurLimit int64
entriesRecurLimit int64
chunkerFunc chunker.NewChunkerFunc
chunkCacheCap int
chunkCachePurge bool
privKey p2pcrypto.PrivKey
topic string
skipRemapOnEntriesTypeMatch bool
entriesRemapPrototype schema.TypedPrototype
alwaysReSignAds bool
}
)
// TODO: add options to restructure advertisements.
// nft.storage advertisement chain is a good usecase, where remapping entries to say HAMT
// probably won't make much difference. But combining ads to make a shorter chain will most
// likely improve end-to-end ingestion latency.
func newOptions(o ...Option) (*options, error) {
opts := options{
chunkCacheCap: 1024,
chunkCachePurge: false,
topic: "/indexer/ingest/mainnet",
syncInterval: 10 * time.Minute,
}
for _, apply := range o {
if err := apply(&opts); err != nil {
return nil, err
}
}
if opts.h == nil {
var err error
if opts.privKey == nil {
opts.privKey, _, err = p2pcrypto.GenerateEd25519Key(rand.Reader)
if err != nil {
return nil, err
}
}
if opts.h, err = libp2p.New(libp2p.Identity(opts.privKey)); err != nil {
return nil, err
}
} else {
peerIDFromPrivKey, err := peer.IDFromPrivateKey(opts.privKey)
if err != nil {
return nil, fmt.Errorf("could not get peer ID from private key: %w", err)
}
if opts.h.ID() != peerIDFromPrivKey {
return nil, errors.New("host ID does not match ID from private key")
}
}
if opts.ds == nil {
opts.ds = dssync.MutexWrap(datastore.NewMapDatastore())
}
// TODO: Do not set this when libp2phttp available.
if opts.httpListenAddr == "" {
opts.httpListenAddr = "127.0.0.1:0"
}
return &opts, nil
}
func (o *options) remapEntriesEnabled() bool {
// Use whether the chunker func is set or not as a flag to decide if entries should be remapped.
return o.chunkerFunc != nil
}
// WithDatastore specifies the datastore used by the mirror to persist mirrored
// advertisements, their entries and other internal data. Defaults to an
// ephemeral in-memory datastore.
func WithDatastore(ds datastore.Batching) Option {
return func(o *options) error {
o.ds = ds
return nil
}
}
// WithHost specifies the libp2p host the mirror should be exposed on.
// If unspecified a host with default options and random identity is used.
func WithHost(h host.Host, privKey p2pcrypto.PrivKey) Option {
return func(o *options) error {
if h != nil && privKey == nil {
return errors.New("if host is specified then private key must be specified")
}
o.h = h
o.privKey = privKey
return nil
}
}
// WithEntryChunkRemapper remaps the entries from the original provider into schema.EntryChunkPrototype
// structure with the given chunk size.
// If unset, the original structure is mirrored without change.
//
// See: WithSkipRemapOnEntriesTypeMatch, WithHamtRemapper.
func WithEntryChunkRemapper(chunkSize int) Option {
return func(o *options) error {
o.entriesRemapPrototype = stischema.EntryChunkPrototype
o.chunkerFunc = chunker.NewChainChunkerFunc(chunkSize)
return nil
}
}
// WithHamtRemapper remaps the entries from the original provider into hamt.HashMapRootPrototype
// structure with the given bit-width and bucket size.
// If unset, the original structure is mirrored without change.
//
// See: WithSkipRemapOnEntriesTypeMatch, WithEntryChunkRemapper.
func WithHamtRemapper(hashAlg multicodec.Code, bitwidth, bucketSize int) Option {
return func(o *options) error {
o.entriesRemapPrototype = hamt.HashMapRootPrototype
o.chunkerFunc = chunker.NewHamtChunkerFunc(hashAlg, bitwidth, bucketSize)
return nil
}
}
// WithHTTPListenAddr sets the HTTP address:port for the http publisher to listen on.
func WithHTTPListenAddr(addr string) Option {
return func(o *options) error {
o.httpListenAddr = addr
return nil
}
}
// WithSkipRemapOnEntriesTypeMatch specifies weather to skip remapping entries if the original
// structure prototype matches the configured remap option.
// Note that setting this option without setting a remap option has no effect.
//
// See: WithEntryChunkRemapper, WithHamtRemapper.
func WithSkipRemapOnEntriesTypeMatch(s bool) Option {
return func(o *options) error {
o.skipRemapOnEntriesTypeMatch = s
return nil
}
}
// WithSyncInterval specifies the time interval at which the original provider is checked for new
// advertisements.
// If unset, the default time interval of 10 minutes is used.
func WithSyncInterval(interval time.Duration) Option {
return func(o *options) error {
o.syncInterval = interval
return nil
}
}
// WithInitialAdRecursionLimit specifies the recursion limit for the initial
// sync if no previous advertisements are mirrored by the mirror.
//
// There is no recursion limit if unset.
func WithInitialAdRecursionLimit(limit int64) Option {
return func(o *options) error {
o.initAdRecurLimit = limit
return nil
}
}
// WithEntriesRecursionLimit specifies the recursion limit for syncing the
// advertisement entries.
//
// There is no recursion limit if unset.
func WithEntriesRecursionLimit(limit int64) Option {
return func(o *options) error {
o.entriesRecurLimit = limit
return nil
}
}
// WithRemappedEntriesCacheCapacity sets the LRU cache capacity used to store the remapped
// advertisement entries. The capacity refers to the number of complete entries DAGs cached. The
// actual storage occupied by the cache depends on the shape of the DAGs.
// See: chunker.CachedEntriesChunker.
//
// This option has no effect if no entries remapper option is set.
// Defaults to 1024.
func WithRemappedEntriesCacheCapacity(c int) Option {
return func(o *options) error {
o.chunkCacheCap = c
return nil
}
}
// WithPurgeCachedEntries specifies whether to delete any cached entries on start-up.
// This option has no effect if no entries remapper option is set.
func WithPurgeCachedEntries(b bool) Option {
return func(o *options) error {
o.chunkCachePurge = b
return nil
}
}
// WithTopicName specifies the topi name on which the mirrored advertisements are announced.
func WithTopicName(t string) Option {
return func(o *options) error {
o.topic = t
return nil
}
}
// WithAlwaysReSignAds specifies whether every mirrored ad should be resigned by the mirror identity
// regardless of weather the advertisement content is changed as a result of mirroring or not.
// By default, advertisements are only re-signed if: 1) the link to previous advertisement is not
// changed, and 2) link to entries is not changed.
func WithAlwaysReSignAds(r bool) Option {
return func(o *options) error {
o.alwaysReSignAds = r
return nil
}
}