/
sync.go
386 lines (347 loc) · 11.7 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
package ipnisync
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"sync"
"time"
"github.com/hashicorp/go-retryablehttp"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
headschema "github.com/ipni/go-libipni/dagsync/ipnisync/head"
"github.com/ipni/go-libipni/maurl"
"github.com/ipni/go-libipni/mautil"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
libp2phttp "github.com/libp2p/go-libp2p/p2p/http"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multihash"
)
var log = logging.Logger("dagsync/ipnisync")
var ErrNoHTTPServer = errors.New("publisher has libp2p server without HTTP")
// Sync provides sync functionality for use with all http syncs.
type Sync struct {
blockHook func(peer.ID, cid.Cid)
client http.Client
lsys ipld.LinkSystem
httpTimeout time.Duration
// libp2phttp
clientHost *libp2phttp.Host
clientHostMutex sync.Mutex
authPeerID bool
rclient *retryablehttp.Client
}
// Syncer provides sync functionality for a single sync with a peer.
type Syncer struct {
client *http.Client
peerInfo peer.AddrInfo
rootURL url.URL
urls []*url.URL
sync *Sync
// For legacy HTTP and external server support without IPNI path.
noPath bool
plainHTTP bool
}
// NewSync creates a new Sync.
func NewSync(lsys ipld.LinkSystem, blockHook func(peer.ID, cid.Cid), options ...ClientOption) *Sync {
opts := getClientOpts(options)
s := &Sync{
blockHook: blockHook,
client: http.Client{
Timeout: opts.httpTimeout,
},
clientHost: &libp2phttp.Host{
StreamHost: opts.streamHost,
},
lsys: lsys,
authPeerID: opts.authPeerID,
httpTimeout: opts.httpTimeout,
}
if opts.httpRetryMax != 0 {
// Configure retryable HTTP client created by calls to NewSyncer.
s.rclient = &retryablehttp.Client{
RetryWaitMin: opts.httpRetryWaitMin,
RetryWaitMax: opts.httpRetryWaitMax,
RetryMax: opts.httpRetryMax,
}
}
return s
}
// NewSyncer creates a new Syncer to use for a single sync operation against a
// peer. A value for peerInfo.ID is optional for the HTTP transport.
func (s *Sync) NewSyncer(peerInfo peer.AddrInfo) (*Syncer, error) {
var cli http.Client
var httpClient *http.Client
var err error
var rtOpts []libp2phttp.RoundTripperOption
if s.authPeerID {
rtOpts = append(rtOpts, libp2phttp.ServerMustAuthenticatePeerID)
}
peerInfo = mautil.CleanPeerAddrInfo(peerInfo)
if len(peerInfo.Addrs) == 0 {
if s.clientHost.StreamHost == nil {
return nil, errors.New("no peer addrs and no stream host")
}
peerStore := s.clientHost.StreamHost.Peerstore()
if peerStore == nil {
return nil, errors.New("no peer addrs and no stream host peerstore")
}
peerInfo.Addrs = peerStore.Addrs(peerInfo.ID)
if len(peerInfo.Addrs) == 0 {
return nil, errors.New("no peer addrs and none found in peertore")
}
}
s.clientHostMutex.Lock()
cli, err = s.clientHost.NamespacedClient(ProtocolID, peerInfo, rtOpts...)
s.clientHostMutex.Unlock()
var plainHTTP bool
if err != nil {
httpAddrs := mautil.FindHTTPAddrs(peerInfo.Addrs)
if len(httpAddrs) == 0 {
return nil, ErrNoHTTPServer
}
log.Infow("Publisher is not a libp2phttp server. Using plain http", "err", err, "peer", peerInfo.ID)
httpClient = &s.client
plainHTTP = true
} else {
log.Infow("Publisher supports libp2phttp", "peer", peerInfo.ID)
httpClient = &cli
}
httpClient.Timeout = s.httpTimeout
if s.rclient != nil {
// Instantiate retryable HTTP client.
rclient := &retryablehttp.Client{
HTTPClient: httpClient,
RetryWaitMin: s.rclient.RetryWaitMin,
RetryWaitMax: s.rclient.RetryWaitMax,
RetryMax: s.rclient.RetryMax,
CheckRetry: retryablehttp.DefaultRetryPolicy,
Backoff: retryablehttp.DefaultBackoff,
}
httpClient = rclient.StandardClient()
}
urls := make([]*url.URL, len(peerInfo.Addrs))
for i, addr := range peerInfo.Addrs {
u, err := maurl.ToURL(addr)
if err != nil {
return nil, err
}
urls[i] = u.JoinPath(IPNIPath)
}
return &Syncer{
client: httpClient,
peerInfo: peerInfo,
rootURL: *urls[0],
urls: urls[1:],
sync: s,
plainHTTP: plainHTTP,
}, nil
}
func (s *Sync) Close() {
s.client.CloseIdleConnections()
s.clientHostMutex.Lock()
s.clientHost.Close()
s.clientHostMutex.Unlock()
}
// GetHead fetches the head of the peer's advertisement chain.
func (s *Syncer) GetHead(ctx context.Context) (cid.Cid, error) {
var signedHead *headschema.SignedHead
err := s.fetch(ctx, "head", func(msg io.Reader) error {
var err error
signedHead, err = headschema.Decode(msg)
return err
})
if err != nil {
return cid.Undef, err
}
signerID, err := signedHead.Validate()
if err != nil {
return cid.Undef, err
}
if s.peerInfo.ID == "" {
log.Warn("Cannot verify publisher signature without peer ID")
} else if signerID != s.peerInfo.ID {
return cid.Undef, fmt.Errorf("found head signed by an unexpected peer, peerID: %s, signed-by: %s", s.peerInfo.ID.String(), signerID.String())
}
// TODO: Check that the returned topic, if any, matches the expected topic.
//if signedHead.Topic != nil && *signedHead.Topic != "" && expectedTopic != "" {
// if *signedHead.Topic != expectedTopic {
// return nil, ErrTopicMismatch
// }
//}
return signedHead.Head.(cidlink.Link).Cid, nil
}
func (s *Syncer) SameAddrs(maddrs []multiaddr.Multiaddr) bool {
return mautil.MultiaddrsEqual(s.peerInfo.Addrs, maddrs)
}
// Sync syncs the peer's advertisement chain or entries chain.
func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error {
xsel, err := selector.CompileSelector(sel)
if err != nil {
return fmt.Errorf("failed to compile selector: %w", err)
}
cids, err := s.walkFetch(ctx, nextCid, xsel)
if err != nil {
return fmt.Errorf("failed to traverse requested dag: %w", err)
}
// The blockHook callback gets called for every synced block, even if block
// is already stored locally.
//
// We are purposefully not doing this in the StorageReadOpener because the
// hook can do anything, including deleting the block from the block store.
// If it did that then we would not be able to continue our traversal. So
// instead we remember the blocks seen during traversal and then call the
// hook at the end when we no longer care what it does with the blocks.
if s.sync.blockHook != nil {
for _, c := range cids {
s.sync.blockHook(s.peerInfo.ID, c)
}
}
s.client.CloseIdleConnections()
return nil
}
// walkFetch is run by a traversal of the selector. For each block that the
// selector walks over, walkFetch will look to see if it can find it in the
// local data store. If it cannot, it will then go and get it over HTTP.
func (s *Syncer) walkFetch(ctx context.Context, rootCid cid.Cid, sel selector.Selector) ([]cid.Cid, error) {
// Track the order of cids seen during traversal so that the block hook
// function gets called in the same order.
var traversalOrder []cid.Cid
getMissingLs := cidlink.DefaultLinkSystem()
// Trusted because it will be hashed/verified on the way into the link
// system when fetched.
getMissingLs.TrustedStorage = true
getMissingLs.StorageReadOpener = func(lc ipld.LinkContext, l ipld.Link) (io.Reader, error) {
c := l.(cidlink.Link).Cid
// fetchBlock checks if the node is already present in storage.
err := s.fetchBlock(ctx, c)
if err != nil {
return nil, fmt.Errorf("failed to fetch block for cid %s: %w", c, err)
}
r, err := s.sync.lsys.StorageReadOpener(lc, l)
if err == nil {
traversalOrder = append(traversalOrder, c)
}
return r, err
}
progress := traversal.Progress{
Cfg: &traversal.Config{
Ctx: ctx,
LinkSystem: getMissingLs,
LinkTargetNodePrototypeChooser: basicnode.Chooser,
},
Path: datamodel.NewPath([]datamodel.PathSegment{}),
}
// get the direct node.
rootNode, err := getMissingLs.Load(ipld.LinkContext{Ctx: ctx}, cidlink.Link{Cid: rootCid}, basicnode.Prototype.Any)
if err != nil {
return nil, fmt.Errorf("failed to load node for root cid %s: %w", rootCid, err)
}
err = progress.WalkMatching(rootNode, sel, func(_ traversal.Progress, _ datamodel.Node) error { return nil })
if err != nil {
return nil, err
}
return traversalOrder, nil
}
func (s *Syncer) fetch(ctx context.Context, rsrc string, cb func(io.Reader) error) error {
nextURL:
fetchURL := s.rootURL.JoinPath(rsrc)
var doneRetry bool
retry:
req, err := http.NewRequestWithContext(ctx, "GET", fetchURL.String(), nil)
if err != nil {
return err
}
resp, err := s.client.Do(req)
if err != nil {
if len(s.urls) != 0 {
log.Errorw("Fetch request failed, will retry with next address", "err", err)
s.rootURL = *s.urls[0]
s.urls = s.urls[1:]
if s.noPath {
s.rootURL.Path = strings.TrimSuffix(s.rootURL.Path, strings.Trim(IPNIPath, "/"))
}
goto nextURL
}
if !doneRetry && errors.Is(err, network.ErrReset) {
log.Errorw("stream reset err, retrying", "publisher", s.peerInfo.ID, "url", fetchURL.String())
// Only retry the same fetch once.
doneRetry = true
goto retry
}
return fmt.Errorf("fetch request failed: %w", err)
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusOK:
return cb(resp.Body)
case http.StatusNotFound:
_, _ = io.Copy(io.Discard, resp.Body)
if s.plainHTTP && !s.noPath {
// Try again with no path for legacy http.
log.Warnw("Plain HTTP got not found response, retrying without IPNI path for legacy HTTP")
s.rootURL.Path = strings.TrimSuffix(s.rootURL.Path, strings.Trim(IPNIPath, "/"))
s.noPath = true
goto nextURL
}
log.Errorw("Block not found from HTTP publisher", "resource", rsrc)
// Include the string "content not found" so that indexers that have not
// upgraded gracefully handle the error case. Because, this string is
// being checked already.
return fmt.Errorf("content not found: %w", ipld.ErrNotExists{})
case http.StatusForbidden:
_, _ = io.Copy(io.Discard, resp.Body)
if s.plainHTTP && !s.noPath {
// Try again with no path for legacy http.
log.Warnw("Plain HTTP got forbidden response, retrying without IPNI path for legacy HTTP")
s.rootURL.Path = strings.TrimSuffix(s.rootURL.Path, strings.Trim(IPNIPath, "/"))
s.noPath = true
goto nextURL
}
fallthrough
default:
_, _ = io.Copy(io.Discard, resp.Body)
return fmt.Errorf("non success http fetch response at %s: %d", fetchURL.String(), resp.StatusCode)
}
}
// fetchBlock fetches an item into the datastore at c if not locally available.
func (s *Syncer) fetchBlock(ctx context.Context, c cid.Cid) error {
n, err := s.sync.lsys.Load(ipld.LinkContext{Ctx: ctx}, cidlink.Link{Cid: c}, basicnode.Prototype.Any)
// node is already present.
if n != nil && err == nil {
return nil
}
return s.fetch(ctx, c.String(), func(data io.Reader) error {
writer, committer, err := s.sync.lsys.StorageWriteOpener(ipld.LinkContext{Ctx: ctx})
if err != nil {
log.Errorw("Failed to get write opener", "err", err)
return err
}
tee := io.TeeReader(data, writer)
sum, err := multihash.SumStream(tee, c.Prefix().MhType, c.Prefix().MhLength)
if err != nil {
return err
}
if !bytes.Equal(c.Hash(), sum) {
err := fmt.Errorf("hash digest mismatch; expected %s but got %s", c.Hash().B58String(), sum.B58String())
log.Errorw("Failed to persist fetched block with mismatching digest", "cid", c, "err", err)
return err
}
if err = committer(cidlink.Link{Cid: c}); err != nil {
log.Errorw("Failed to commit", "err", err)
return err
}
return nil
})
}