-
Notifications
You must be signed in to change notification settings - Fork 179
/
engine.go
140 lines (119 loc) · 4.1 KB
/
engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package ping
import (
"context"
"encoding/binary"
"time"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/network"
"github.com/onflow/flow-go/network/p2p"
)
// PingTimeout is maximum time to wait for a ping reply from a remote node
const PingTimeout = time.Second * 4
const PingInterval = time.Minute
type Engine struct {
unit *engine.Unit
log zerolog.Logger
idProvider module.IdentityProvider
idTranslator p2p.IDTranslator
me module.Local
metrics module.PingMetrics
pingEnabled bool
pingService network.PingService
nodeInfo map[flow.Identifier]string // additional details about a node such as operator name
}
func New(
log zerolog.Logger,
idProvider module.IdentityProvider,
idTranslator p2p.IDTranslator,
me module.Local,
metrics module.PingMetrics,
pingEnabled bool,
nodeInfoFile string,
pingService network.PingService,
) (*Engine, error) {
eng := &Engine{
unit: engine.NewUnit(),
log: log.With().Str("engine", "ping").Logger(),
idProvider: idProvider,
idTranslator: idTranslator,
me: me,
metrics: metrics,
pingEnabled: pingEnabled,
pingService: pingService,
}
// if a node info file is provided, it is read and the additional node information is reported as part of the ping metric
if nodeInfoFile != "" {
nodeInfo, err := readExtraNodeInfoJSON(nodeInfoFile)
if err != nil {
log.Error().Err(err).Str("node_info_file", nodeInfoFile).Msg("failed to read node info file")
} else {
eng.nodeInfo = nodeInfo
log.Debug().Str("node_info_file", nodeInfoFile).Msg("using node info file")
}
} else {
// initialize nodeInfo with an empty map
eng.nodeInfo = make(map[flow.Identifier]string)
// the node info file is not mandatory and should not stop the Ping engine from running
log.Trace().Msg("no node info file specified")
}
return eng, nil
}
// Ready returns a ready channel that is closed once the engine has fully
// started. For the ingestion engine, we consider the engine up and running
// upon initialization.
func (e *Engine) Ready() <-chan struct{} {
// only launch when ping is enabled
if e.pingEnabled {
e.unit.Launch(e.startPing)
}
e.log.Info().Bool("ping enabled", e.pingEnabled).Msg("ping enabled")
return e.unit.Ready()
}
// Done returns a done channel that is closed once the engine has fully stopped.
// For the ingestion engine, it only waits for all submit goroutines to end.
func (e *Engine) Done() <-chan struct{} {
return e.unit.Done()
}
func (e *Engine) startPing() {
e.unit.LaunchPeriodically(func() {
peers := e.idProvider.Identities(filter.Not(filter.HasNodeID(e.me.NodeID())))
// for each peer, send a ping every ping interval
for _, peer := range peers {
peer := peer
pid := peer.ID()
delay := time.Duration(binary.BigEndian.Uint16(pid[:2])) % (PingInterval / time.Millisecond)
e.unit.LaunchAfter(delay, func() {
e.pingNode(peer)
})
}
}, PingInterval, 0)
}
// pingNode pings the given peer and updates the metrics with the result and the additional node information
func (e *Engine) pingNode(peer *flow.Identity) {
pid, err := e.idTranslator.GetPeerID(peer.ID())
if err != nil {
e.log.Error().Err(err).Str("peer", peer.String()).Msg("failed to get peer ID")
return
}
ctx, cancel := context.WithTimeout(context.Background(), PingTimeout)
defer cancel()
// ping the node
resp, rtt, pingErr := e.pingService.Ping(ctx, pid) // ping will timeout in libp2p.PingTimeout seconds
if pingErr != nil {
e.log.Debug().Err(pingErr).Str("target", peer.ID().String()).Msg("failed to ping")
// report the rtt duration as negative to make it easier to distinguish between pingable and non-pingable nodes
rtt = -1
}
// get the additional info about the node
info := e.nodeInfo[peer.ID()]
// update metric
e.metrics.NodeReachable(peer, info, rtt)
// if ping succeeded then update the node info metric
if pingErr == nil {
e.metrics.NodeInfo(peer, info, resp.Version, resp.BlockHeight, resp.HotstuffView)
}
}