/
p2p.go
103 lines (86 loc) · 2.95 KB
/
p2p.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package committee
import (
"context"
"fmt"
"github.com/oasisprotocol/oasis-core/go/common/cbor"
"github.com/oasisprotocol/oasis-core/go/common/crash"
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
"github.com/oasisprotocol/oasis-core/go/worker/common/p2p"
p2pError "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/error"
)
type committeeMsgHandler struct {
n *Node
}
func (h *committeeMsgHandler) DecodeMessage(msg []byte) (interface{}, error) {
var dec p2p.CommitteeMessage
if err := cbor.Unmarshal(msg, &dec); err != nil {
return nil, err
}
return &dec, nil
}
func (h *committeeMsgHandler) AuthorizeMessage(ctx context.Context, peerID signature.PublicKey, msg interface{}) error {
cm := msg.(*p2p.CommitteeMessage) // Ensured by DecodeMessage.
epoch := h.n.commonNode.Group.GetEpochSnapshot()
if !epoch.IsValid() {
return fmt.Errorf("epoch is not yet known")
}
switch now := epoch.GetEpochNumber(); {
case cm.Epoch == now:
case cm.Epoch < now:
// Past messages will never become valid.
return p2pError.Permanent(fmt.Errorf("epoch in the past"))
case cm.Epoch > now+1:
// Messages too far off should be dropped.
return p2pError.Permanent(fmt.Errorf("epoch in the future"))
case cm.Epoch > now:
// Future messages may become valid.
return fmt.Errorf("epoch in the future")
}
// Only known committee members are allowed to submit messages on this topic.
committee := epoch.GetExecutorCommittee()
if committee == nil {
return fmt.Errorf("executor committee is not yet known")
}
if !committee.Peers[peerID] {
return p2pError.Permanent(fmt.Errorf("peer is not authorized to publish committee messages"))
}
return nil
}
func (h *committeeMsgHandler) HandleMessage(ctx context.Context, peerID signature.PublicKey, msg interface{}, isOwn bool) error {
cm := msg.(*p2p.CommitteeMessage) // Ensured by DecodeMessage.
switch {
case cm.Proposal != nil:
// Ignore own messages as those are handled separately.
if isOwn {
return nil
}
crash.Here(crashPointBatchReceiveAfter)
proposal := cm.Proposal
epoch := h.n.commonNode.Group.GetEpochSnapshot()
h.n.commonNode.CrossNode.Lock()
round := h.n.commonNode.CurrentBlock.Header.Round
h.n.commonNode.CrossNode.Unlock()
// Before opening the signed dispatch message, verify that it was actually signed by the
// current transaction scheduler.
if err := epoch.VerifyTxnSchedulerSigner(proposal.NodeID, round); err != nil {
// Not signed by the current txn scheduler.
return errMsgFromNonTxnSched
}
// Transaction scheduler checks out, verify signature.
if err := proposal.Verify(h.n.commonNode.Runtime.ID()); err != nil {
return p2pError.Permanent(err)
}
err := h.n.queueBatchBlocking(ctx, proposal)
if err != nil {
return err
}
return nil
default:
return p2pError.ErrUnhandledMessage
}
}
// HandlePeerTx implements NodeHooks.
func (n *Node) HandlePeerTx(ctx context.Context, tx []byte) error {
// Nothing to do here.
return nil
}