/
options.go
119 lines (104 loc) · 2.82 KB
/
options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package w3rc
import (
"context"
"time"
datatransferi "github.com/filecoin-project/go-data-transfer"
datatransfer "github.com/filecoin-project/go-data-transfer/impl"
dtnetwork "github.com/filecoin-project/go-data-transfer/network"
gstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync"
"github.com/ipfs/go-datastore"
gsimpl "github.com/ipfs/go-graphsync/impl"
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/host"
)
type config struct {
host host.Host
ds datastore.Batching
dt datatransferi.Manager
indexerURL string
}
// An Option allows opening a Session with configured options.
type Option func(*config) error
// WithDataTransfer runs the session using an existing data transfer manager.
func WithDataTransfer(dt datatransferi.Manager) Option {
return func(c *config) error {
c.dt = dt
return nil
}
}
// WithDS sets the datastore to use for the session
func WithDS(ds datastore.Batching) Option {
return func(c *config) error {
c.ds = ds
return nil
}
}
// WithHost sets a libp2p host for the client to use.
func WithHost(h host.Host) Option {
return func(c *config) error {
c.host = h
return nil
}
}
// WithIndexer sets a URL of the indexer to use.
func WithIndexer(url string) Option {
return func(c *config) error {
c.indexerURL = url
return nil
}
}
func apply(cfg *config, opts ...Option) error {
for _, opt := range opts {
if err := opt(cfg); err != nil {
return err
}
}
return nil
}
func applyDefaults(lsys ipld.LinkSystem, cfg *config) error {
if cfg.host == nil {
host, err := libp2p.New()
if err != nil {
return err
}
cfg.host = host
}
if cfg.ds == nil {
cfg.ds = datastore.NewMapDatastore()
}
if cfg.dt == nil {
gsNet := gsnet.NewFromLibp2pHost(cfg.host)
gs := gsimpl.New(context.Background(), gsNet, lsys)
dtNet := dtnetwork.NewFromLibp2pHost(cfg.host)
tp := gstransport.NewTransport(cfg.host.ID(), gs)
dtManager, err := datatransfer.NewDataTransfer(cfg.ds, dtNet, tp)
if err != nil {
log.Errorf("Failed to create data transfer subsystem: %s", err)
return err
}
// Tell datatransfer to notify when ready.
dtReady := make(chan error)
dtManager.OnReady(func(e error) {
dtReady <- e
close(dtReady)
})
// Start datatransfer. The context passed in allows Start to be canceled
// if fsm migration takes too long. Timeout for dtManager.Start() is not
// handled here, so pass context.Background().
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
if err = dtManager.Start(ctx); err != nil {
log.Errorf("Failed to start datatransfer: %s", err)
return err
}
// Wait for datatransfer to be ready.
err = <-dtReady
if err != nil {
return err
}
cfg.dt = dtManager
}
return nil
}