/
signing.go
396 lines (344 loc) · 12.5 KB
/
signing.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
package tbtc
import (
"context"
"fmt"
"math/big"
"strings"
"sync"
"github.com/keep-network/keep-core/pkg/generator"
"github.com/keep-network/keep-core/pkg/net"
"github.com/keep-network/keep-core/pkg/protocol/announcer"
"github.com/keep-network/keep-core/pkg/protocol/group"
"github.com/keep-network/keep-core/pkg/tecdsa"
"github.com/keep-network/keep-core/pkg/tecdsa/signing"
"go.uber.org/zap"
"golang.org/x/sync/semaphore"
)
const (
// signingBatchInterludeBlocks determines the block duration of the
// interlude preserved between subsequent signings in a signing batch.
// If the signing of the previous message completed at block X, the signing
// of the next message starts at `X + signingBatchInterludeBlocks`.
// This is the additional time signers have to realize that the signing is
// done by receiving the signingDoneMessage. Note that the end block of the
// previous signing used to establish the start block of the next signing
// comes from signingDoneMessage received and there is no guarantee all
// signing group members received signingDoneMessage before the highest
// endBlock is reached on the chain. The interlude is an additional time for
// the broadcast channel to spread information about signing successfully
// completed by the slowest signing group member (the one who sends the
// signingDoneMessage as the last one).
signingBatchInterludeBlocks = 2
)
// errSigningExecutorBusy is an error returned when the signing executor
// cannot execute the requested signature due to an ongoing signing.
var errSigningExecutorBusy = fmt.Errorf("signing executor is busy")
// signingExecutor is a component responsible for executing signing related to
// a specific wallet whose part is controlled by this node.
type signingExecutor struct {
lock *semaphore.Weighted
signers []*signer
broadcastChannel net.BroadcastChannel
membershipValidator *group.MembershipValidator
groupParameters *GroupParameters
protocolLatch *generator.ProtocolLatch
// getCurrentBlockFn is a function used to get the current block.
getCurrentBlockFn getCurrentBlockFn
// waitForBlockFn is a function used to wait for the given block.
waitForBlockFn waitForBlockFn
// signingAttemptsLimit determines the maximum attempts count that will
// be made by a single signer for the given message. Once the attempts
// limit is hit the signer gives up.
signingAttemptsLimit uint
}
func newSigningExecutor(
signers []*signer,
broadcastChannel net.BroadcastChannel,
membershipValidator *group.MembershipValidator,
groupParameters *GroupParameters,
protocolLatch *generator.ProtocolLatch,
getCurrentBlockFn getCurrentBlockFn,
waitForBlockFn waitForBlockFn,
signingAttemptsLimit uint,
) *signingExecutor {
return &signingExecutor{
lock: semaphore.NewWeighted(1),
signers: signers,
broadcastChannel: broadcastChannel,
membershipValidator: membershipValidator,
groupParameters: groupParameters,
protocolLatch: protocolLatch,
getCurrentBlockFn: getCurrentBlockFn,
waitForBlockFn: waitForBlockFn,
signingAttemptsLimit: signingAttemptsLimit,
}
}
// signBatch performs the signing process for each message from the given
// messages batch, one after another. If at least one message cannot be signed,
// this function returns an error. If all messages were signed successfully,
// a slice of signatures is returned. Order of the returned signatures matches
// the order of the messages in the batch, i.e. the first signature corresponds
// to the first message, and so on.
func (se *signingExecutor) signBatch(
ctx context.Context,
messages []*big.Int,
startBlock uint64,
) ([]*tecdsa.Signature, error) {
wallet := se.wallet()
walletPublicKeyBytes, err := marshalPublicKey(wallet.publicKey)
if err != nil {
return nil, fmt.Errorf("cannot marshal wallet public key: [%v]", err)
}
messagesDigests := make([]string, len(messages))
for i, message := range messages {
bytes := message.Bytes()
// Real-world messages are usually 32-byte however, test ones can be
// much shorter. The distinction of displaying whole messages up
// to 8 bytes is arbitrary though can be justified as digesting shorter
// values is rather an overkill. Having full messages while inspecting
// test logs may help while debugging.
var messageDigest string
if len(bytes) > 8 {
messageDigest = fmt.Sprintf(
"0x%x...%x",
bytes[:2],
bytes[len(bytes)-2:],
)
} else {
messageDigest = fmt.Sprintf("0x%x", bytes)
}
messagesDigests[i] = messageDigest
}
signingBatchLogger := logger.With(
zap.String("wallet", fmt.Sprintf("0x%x", walletPublicKeyBytes)),
zap.String("signedMessages", strings.Join(messagesDigests, ", ")),
)
signingStartBlock := startBlock // start block for the first signing
signatures := make([]*tecdsa.Signature, len(messages))
endBlocks := make([]uint64, len(messages))
for i, message := range messages {
signingBatchMessageLogger := signingBatchLogger.With(
zap.String("signedMessage", fmt.Sprintf("0x%x", message)),
zap.String("index", fmt.Sprintf("%v/%v", i+1, len(messages))),
)
signingBatchMessageLogger.Infof("generating signature for message")
if i > 0 {
signingStartBlock = endBlocks[i-1] + signingBatchInterludeBlocks
}
signature, endBlock, err := se.sign(ctx, message, signingStartBlock)
if err != nil {
return nil, err
}
signingBatchMessageLogger.Infof(
"generated signature [%v] for message at block [%v]",
signature,
endBlock,
)
signatures[i] = signature
endBlocks[i] = endBlock
}
return signatures, nil
}
// sign performs the signing process for the given message. The process is
// triggered according to the given start block. If the message cannot be signed
// within a limited time window, an error is returned. If the message was
// signed successfully, this function returns the signature along with the
// block at which the signature was calculated. This end block is common for
// all wallet signers so can be used as a synchronization point.
func (se *signingExecutor) sign(
ctx context.Context,
message *big.Int,
startBlock uint64,
) (*tecdsa.Signature, uint64, error) {
if lockAcquired := se.lock.TryAcquire(1); !lockAcquired {
return nil, 0, errSigningExecutorBusy
}
defer se.lock.Release(1)
wallet := se.wallet()
walletPublicKeyBytes, err := marshalPublicKey(wallet.publicKey)
if err != nil {
return nil, 0, fmt.Errorf("cannot marshal wallet public key: [%v]", err)
}
loopTimeoutBlock := startBlock +
uint64(se.signingAttemptsLimit*signingAttemptMaximumBlocks())
signingLogger := logger.With(
zap.String("wallet", fmt.Sprintf("0x%x", walletPublicKeyBytes)),
zap.String("signedMessage", fmt.Sprintf("0x%x", message)),
zap.Uint64("signingStartBlock", startBlock),
zap.Uint64("signingTimeoutBlock", loopTimeoutBlock),
)
type signingOutcome struct {
signature *tecdsa.Signature
endBlock uint64
}
wg := sync.WaitGroup{}
wg.Add(len(se.signers))
signingOutcomeChan := make(chan *signingOutcome, len(se.signers))
for _, currentSigner := range se.signers {
go func(signer *signer) {
se.protocolLatch.Lock()
defer se.protocolLatch.Unlock()
defer wg.Done()
announcer := announcer.New(
fmt.Sprintf("%v-%v", ProtocolName, "signing"),
se.broadcastChannel,
se.membershipValidator,
)
doneCheck := newSigningDoneCheck(
se.groupParameters.GroupSize,
se.broadcastChannel,
se.membershipValidator,
)
retryLoop := newSigningRetryLoop(
signingLogger,
message,
startBlock,
signer.signingGroupMemberIndex,
wallet.signingGroupOperators,
se.groupParameters,
announcer,
doneCheck,
)
// Set up the loop timeout signal. This context is associated with
// all attempts and gets canceled in three situations:
// - one of the attempts failed with an error,
// - we gave up after executing multiple attempts,
// - one of the attempts succeeded.
// In the last case, the context is not canceled immediately but,
// we wait until the timeout for the successful attempt is done.
// This lets the inner context to retransmit the signing done
// messages for a longer period of time than just for the actual
// execution time from the perspective of the current member.
// This is important to ensure everyone has a chance to receive
// the signing done message broadcasted by the current member.
loopCtx, cancelLoopCtx := withCancelOnBlock(
ctx,
loopTimeoutBlock,
se.waitForBlockFn,
)
loopResult, err := retryLoop.start(
loopCtx,
se.waitForBlockFn,
se.getCurrentBlockFn,
func(attempt *signingAttemptParams) (*signing.Result, uint64, error) {
signingAttemptLogger := signingLogger.With(
zap.Uint("attemptNumber", attempt.number),
zap.Uint64("attemptStartBlock", attempt.startBlock),
zap.Uint64("attemptTimeoutBlock", attempt.timeoutBlock),
)
signingAttemptLogger.Infof(
"[member:%v] starting signing protocol "+
"with [%v] group members (excluded: [%v])",
signer.signingGroupMemberIndex,
wallet.groupSize()-len(attempt.excludedMembersIndexes),
attempt.excludedMembersIndexes,
)
// Set up the attempt timeout signal.
// This context is associated with the current attempt and
// gets canceled when the timeout for the current attempt is
// hit. The context is not canceled earlier, even if the
// execution succeeded. This is needed to ensure all
// protocol participants, even the slowest one, have
// a chance to receive all messages sent by this member
// and complete the protocol.
attemptCtx, _ := withCancelOnBlock(
loopCtx,
attempt.timeoutBlock,
se.waitForBlockFn,
)
sessionID := fmt.Sprintf(
"%v-%v",
message.Text(16),
attempt.number,
)
result, err := signing.Execute(
attemptCtx,
signingAttemptLogger,
message,
sessionID,
signer.signingGroupMemberIndex,
signer.privateKeyShare,
wallet.groupSize(),
wallet.groupDishonestThreshold(
se.groupParameters.HonestThreshold,
),
attempt.excludedMembersIndexes,
se.broadcastChannel,
se.membershipValidator,
)
if err != nil {
return nil, 0, err
}
endBlock, err := se.getCurrentBlockFn()
if err != nil {
return nil, 0, err
}
return result, endBlock, nil
},
)
if err != nil {
// Signer failed so there is no point to hold the loopCtx.
// Cancel it regardless of their timeout.
cancelLoopCtx()
signingLogger.Errorf(
"[member:%v] all retries for the signing failed; "+
"giving up: [%v]",
signer.signingGroupMemberIndex,
err,
)
return
}
// Just as mentioned in the comment above the definition of loopCtx,
// do not cancel the loopCtx upon function exit immediately and
// continue to broadcast signing done checks until the successful
// attempt timeout. This way we maximize the chance that other
// members, especially the ones not participating in the successful
// signature attempt, receive the done checks as well.
go func() {
defer cancelLoopCtx()
err := se.waitForBlockFn(
loopCtx,
loopResult.attemptTimeoutBlock,
)
if err != nil {
signingLogger.Warnf(
"[member:%v] failed waiting for signing "+
"loop stop signal: [%v]",
signer.signingGroupMemberIndex,
err,
)
}
}()
signingLogger.Infof(
"[member:%v] generated signature [%v] at block [%v]",
signer.signingGroupMemberIndex,
loopResult.result.Signature,
loopResult.latestEndBlock,
)
signingOutcomeChan <- &signingOutcome{
signature: loopResult.result.Signature,
endBlock: loopResult.latestEndBlock,
}
}(currentSigner)
}
// Wait until all controlled signers complete their signing routines,
// regardless of their result.
wg.Wait()
// Take the first outcome from the channel as the outcome of all members.
// This assumption is totally valid because the signing loop produces a
// result only if all signers who participated in signing confirmed they
// are done by sending a valid `signingDoneMessage` during the signing done
// check phase. If the result was not inserted to the channel by any
// signer, that means all signers failed and have not produced a signature.
select {
case outcome := <-signingOutcomeChan:
return outcome.signature, outcome.endBlock, nil
default:
return nil, 0, fmt.Errorf("all signers failed")
}
}
func (se *signingExecutor) wallet() wallet {
// All signers belong to one wallet. Take that wallet from the
// first signer.
return se.signers[0].wallet
}