-
Notifications
You must be signed in to change notification settings - Fork 179
/
qc_voter.go
195 lines (164 loc) · 6.79 KB
/
qc_voter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package epochs
import (
"context"
"fmt"
"sync"
"time"
"github.com/onflow/flow-go/module/retrymiddleware"
"github.com/sethvargo/go-retry"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/consensus/hotstuff"
hotmodel "github.com/onflow/flow-go/consensus/hotstuff/model"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
clusterstate "github.com/onflow/flow-go/state/cluster"
"github.com/onflow/flow-go/state/protocol"
)
const (
// retryDuration is the initial duration to wait between retries for all retryable
// requests - increases exponentially for subsequent retries
retryDuration = time.Second
// update qc contract client after 2 consecutive failures
retryMaxConsecutiveFailures = 2
// retryDurationMax is the maximum duration to wait between two consecutive requests
retryDurationMax = 10 * time.Minute
// retryJitterPercent is the percentage jitter to introduce to each retry interval
retryJitterPercent = 25 // 25%
)
// RootQCVoter is responsible for generating and submitting votes for the
// root quorum certificate of the upcoming epoch for this node's cluster.
type RootQCVoter struct {
log zerolog.Logger
me module.Local
signer hotstuff.Signer
state protocol.State
qcContractClients []module.QCContractClient // priority ordered array of client to the QC aggregator smart contract
lastSuccessfulClientIndex int // index of the contract client that was last successful during retries
wait time.Duration // how long to sleep in between vote attempts
mu sync.Mutex
}
// NewRootQCVoter returns a new root QC voter, configured for a particular epoch.
func NewRootQCVoter(
log zerolog.Logger,
me module.Local,
signer hotstuff.Signer,
state protocol.State,
contractClients []module.QCContractClient,
) *RootQCVoter {
voter := &RootQCVoter{
log: log.With().Str("module", "root_qc_voter").Logger(),
me: me,
signer: signer,
state: state,
qcContractClients: contractClients,
wait: time.Second * 10,
mu: sync.Mutex{},
}
return voter
}
// Vote handles the full procedure of generating a vote, submitting it to the
// epoch smart contract, and verifying submission. Returns an error only if
// there is a critical error that would make it impossible for the vote to be
// submitted. Otherwise, exits when the vote has been successfully submitted.
//
// It is safe to run multiple times within a single setup phase.
func (voter *RootQCVoter) Vote(ctx context.Context, epoch protocol.Epoch) error {
counter, err := epoch.Counter()
if err != nil {
return fmt.Errorf("could not get epoch counter: %w", err)
}
clusters, err := epoch.Clustering()
if err != nil {
return fmt.Errorf("could not get clustering: %w", err)
}
cluster, clusterIndex, ok := clusters.ByNodeID(voter.me.NodeID())
if !ok {
return fmt.Errorf("could not find self in clustering")
}
log := voter.log.With().
Uint64("epoch", counter).
Uint("cluster_index", clusterIndex).
Logger()
log.Info().Msg("preparing to generate vote for cluster root qc")
// create the canonical root block for our cluster
root := clusterstate.CanonicalRootBlock(counter, cluster)
// create a signable hotstuff model
signable := hotmodel.GenesisBlockFromFlow(root.Header)
vote, err := voter.signer.CreateVote(signable)
if err != nil {
return fmt.Errorf("could not create vote for cluster root qc: %w", err)
}
// this backoff configuration will never terminate on its own, but the
// request logic will exit when we exit the EpochSetup phase
backoff := retry.NewExponential(retryDuration)
backoff = retry.WithCappedDuration(retryDurationMax, backoff)
backoff = retry.WithJitterPercent(retryJitterPercent, backoff)
clientIndex, qcContractClient := voter.getInitialContractClient()
onMaxConsecutiveRetries := func(totalAttempts int) {
voter.updateContractClient(clientIndex)
log.Warn().Msgf("retrying on attempt (%d) with fallback access node at index (%d)", totalAttempts, clientIndex)
}
backoff = retrymiddleware.AfterConsecutiveFailures(retryMaxConsecutiveFailures, backoff, onMaxConsecutiveRetries)
err = retry.Do(ctx, backoff, func(ctx context.Context) error {
// check that we're still in the setup phase, if we're not we can't
// submit a vote anyway and must exit this process
phase, err := voter.state.Final().Phase()
if err != nil {
log.Error().Err(err).Msg("could not get current phase")
} else if phase != flow.EpochPhaseSetup {
return fmt.Errorf("could not submit vote - no longer in setup phase")
}
// check whether we've already voted, if we have we can exit early
voted, err := qcContractClient.Voted(ctx)
if err != nil {
log.Error().Err(err).Msg("could not check vote status")
return retry.RetryableError(err)
} else if voted {
log.Info().Msg("already voted - exiting QC vote process...")
// update our last successful client index for future calls
voter.updateLastSuccessfulClient(clientIndex)
return nil
}
// submit the vote - this call will block until the transaction has
// either succeeded or we are able to retry
log.Info().Msg("submitting vote...")
err = qcContractClient.SubmitVote(ctx, vote)
if err != nil {
log.Error().Err(err).Msg("could not submit vote - retrying...")
return retry.RetryableError(err)
}
log.Info().Msg("successfully submitted vote - exiting QC vote process...")
// update our last successful client index for future calls
voter.updateLastSuccessfulClient(clientIndex)
return nil
})
return err
}
// updateContractClient will return the last successful client index by default for all initial operations or else
// it will return the appropriate client index with respect to last successful and number of client.
func (voter *RootQCVoter) updateContractClient(clientIndex int) (int, module.QCContractClient) {
voter.mu.Lock()
defer voter.mu.Unlock()
if clientIndex == voter.lastSuccessfulClientIndex {
if clientIndex == len(voter.qcContractClients)-1 {
clientIndex = 0
} else {
clientIndex++
}
} else {
clientIndex = voter.lastSuccessfulClientIndex
}
return clientIndex, voter.qcContractClients[clientIndex]
}
// getInitialContractClient will return the last successful contract client or the initial
func (voter *RootQCVoter) getInitialContractClient() (int, module.QCContractClient) {
voter.mu.Lock()
defer voter.mu.Unlock()
return voter.lastSuccessfulClientIndex, voter.qcContractClients[voter.lastSuccessfulClientIndex]
}
// updateLastSuccessfulClient set lastSuccessfulClientIndex in concurrency safe way
func (voter *RootQCVoter) updateLastSuccessfulClient(clientIndex int) {
voter.mu.Lock()
defer voter.mu.Unlock()
voter.lastSuccessfulClientIndex = clientIndex
}