-
Notifications
You must be signed in to change notification settings - Fork 179
/
upstream_connector.go
115 lines (95 loc) · 3.33 KB
/
upstream_connector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package upstream
import (
"context"
"sync"
"time"
"github.com/onflow/flow-go/network/p2p"
"github.com/rs/zerolog"
"github.com/sethvargo/go-retry"
"go.uber.org/atomic"
"github.com/onflow/flow-go/network/p2p/utils"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/lifecycle"
)
// upstreamConnector tries to connect the unstaked AN with atleast one of the configured bootstrap access nodes
type upstreamConnector struct {
lm *lifecycle.LifecycleManager
bootstrapIdentities flow.IdentityList
logger zerolog.Logger
unstakedNode p2p.LibP2PNode
cancel context.CancelFunc
retryInitialTimeout time.Duration
maxRetries uint64
}
func NewUpstreamConnector(bootstrapIdentities flow.IdentityList, unstakedNode p2p.LibP2PNode, logger zerolog.Logger) *upstreamConnector {
return &upstreamConnector{
lm: lifecycle.NewLifecycleManager(),
bootstrapIdentities: bootstrapIdentities,
unstakedNode: unstakedNode,
logger: logger,
retryInitialTimeout: time.Second,
maxRetries: 5,
}
}
func (connector *upstreamConnector) Ready() <-chan struct{} {
connector.lm.OnStart(func() {
// eventually, context will be passed in to Start method: https://github.com/dapperlabs/flow-go/issues/5730
ctx, cancel := context.WithCancel(context.TODO())
connector.cancel = cancel
workerCtx, cancel := context.WithCancel(ctx)
defer cancel()
success := atomic.NewBool(false)
var wg sync.WaitGroup
// spawn a connect worker for each bootstrap node
for _, b := range connector.bootstrapIdentities {
id := *b
wg.Add(1)
go func() {
defer wg.Done()
lg := connector.logger.With().Str("bootstrap_node", id.NodeID.String()).Logger()
backoff := retry.NewFibonacci(connector.retryInitialTimeout)
backoff = retry.WithMaxRetries(connector.maxRetries, backoff)
if err := retry.Do(workerCtx, backoff, func(ctx context.Context) error {
return retry.RetryableError(connector.connect(ctx, id))
}); err != nil {
lg.Err(err).Msg("failed to connect")
} else {
lg.Info().Msg("successfully connected to bootstrap node")
success.Store(true)
}
}()
}
wg.Wait()
if !success.Load() {
// log fatal as there is no point continuing further, the unstaked AN cannot connect to any of the bootstrap peers
connector.logger.Fatal().
Msg("Failed to connect to a bootstrap node. " +
"Please ensure the network address and public key of the bootstrap access node are correct " +
"and that the node is running and reachable.")
}
})
return connector.lm.Started()
}
// connect is run to connect to an boostrap peer
func (connector *upstreamConnector) connect(ctx context.Context, bootstrapPeer flow.Identity) error {
select {
// check for a cancelled/expired context
case <-ctx.Done():
return ctx.Err()
default:
}
peerAddrInfo, err := utils.PeerAddressInfo(bootstrapPeer)
if err != nil {
return err
}
// try and connect to the bootstrap server
return connector.unstakedNode.ConnectToPeer(ctx, peerAddrInfo)
}
func (connector *upstreamConnector) Done() <-chan struct{} {
connector.lm.OnStop(func() {
// this function will only be executed if connector.lm.OnStart was previously called,
// in which case connector.cancel != nil
connector.cancel()
})
return connector.lm.Stopped()
}