/
handler.go
79 lines (71 loc) · 2.33 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package weakcoin
import (
"context"
"errors"
"fmt"
"time"
"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/metrics"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/p2p/pubsub"
)
// HandleProposal defines method to handle Beacon Weak Coin Messages from gossip.
func (wc *WeakCoin) HandleProposal(ctx context.Context, peer p2p.Peer, msg []byte) error {
logger := wc.logger.WithContext(ctx)
var message Message
if err := codec.Decode(msg, &message); err != nil {
logger.With().Warning("malformed weak coin message", log.Err(err))
return pubsub.ErrValidationReject
}
if err := wc.receiveMessage(ctx, message); err != nil {
if !errors.Is(err, errNotSmallest) {
logger.With().Debug("invalid proposal",
message.Epoch,
message.Round,
log.Stringer("peer", peer),
log.Err(err),
)
}
return err
}
metrics.ReportMessageLatency(
pubsub.BeaconProtocol,
pubsub.BeaconWeakCoinProtocol,
time.Since(wc.msgTime.WeakCoinProposalSendTime(message.Epoch, message.Round)),
)
return nil
}
func (wc *WeakCoin) receiveMessage(ctx context.Context, message Message) error {
if wc.aboveThreshold(message.VRFSignature) {
return fmt.Errorf("proposal %s is above threshold", message.VRFSignature)
}
wc.mu.Lock()
defer wc.mu.Unlock()
if wc.epoch != message.Epoch || wc.round != message.Round || !wc.epochStarted || !wc.roundStarted {
if wc.isNextRound(message.Epoch, message.Round) && len(wc.nextRoundBuffer) < cap(wc.nextRoundBuffer) {
wc.nextRoundBuffer = append(wc.nextRoundBuffer, message)
return nil
}
return fmt.Errorf("message for the wrong round %v/%v", message.Epoch, message.Round)
}
return wc.updateProposal(ctx, message)
}
func (wc *WeakCoin) isNextRound(epoch types.EpochID, round types.RoundID) bool {
if wc.epoch == epoch && wc.round+1 == round && round <= wc.config.MaxRound {
return true
}
if wc.epoch+1 == epoch && wc.round == wc.config.MaxRound {
return true
}
// after completed epoch but haven't started the new one
if wc.epoch+1 == epoch && !wc.roundStarted && !wc.epochStarted {
return true
}
// after started epoch but didn't start the round
if wc.epoch == epoch && !wc.roundStarted && wc.epochStarted {
return true
}
return false
}