-
Notifications
You must be signed in to change notification settings - Fork 177
/
attackNetwork.go
196 lines (161 loc) · 6.51 KB
/
attackNetwork.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
package attacknetwork
import (
"fmt"
"sync"
"github.com/hashicorp/go-multierror"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/insecure"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/network"
"github.com/onflow/flow-go/network/channels"
"github.com/onflow/flow-go/utils/logging"
)
// AttackNetwork implements a middleware for mounting an attack orchestrator and empowering it to communicate with the corrupted nodes.
type AttackNetwork struct {
component.Component
cm *component.ComponentManager
orchestratorMutex sync.Mutex // to ensure thread-safe calls into orchestrator.
logger zerolog.Logger
orchestrator insecure.AttackOrchestrator // the mounted orchestrator that implements certain attack logic.
codec network.Codec
corruptedNodeIds flow.IdentityList // identity of the corrupted nodes
corruptedConnections map[flow.Identifier]insecure.CorruptedNodeConnection // existing connections to the corrupted nodes.
corruptedConnector insecure.CorruptedNodeConnector // connection generator to corrupted nodes.
}
func NewAttackNetwork(
logger zerolog.Logger,
codec network.Codec,
orchestrator insecure.AttackOrchestrator,
connector insecure.CorruptedNodeConnector,
corruptedNodeIds flow.IdentityList) (*AttackNetwork, error) {
attackNetwork := &AttackNetwork{
orchestrator: orchestrator,
logger: logger,
codec: codec,
corruptedConnector: connector,
corruptedNodeIds: corruptedNodeIds,
corruptedConnections: make(map[flow.Identifier]insecure.CorruptedNodeConnection),
}
connector.WithIncomingMessageHandler(attackNetwork.Observe)
// setting lifecycle management module.
cm := component.NewComponentManagerBuilder().
AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
err := attackNetwork.start(ctx)
if err != nil {
ctx.Throw(fmt.Errorf("could not start attackNetwork: %w", err))
}
ready()
<-ctx.Done()
err = attackNetwork.stop()
if err != nil {
ctx.Throw(fmt.Errorf("could not stop attackNetwork: %w", err))
}
}).Build()
attackNetwork.Component = cm
attackNetwork.cm = cm
return attackNetwork, nil
}
// start triggers the sub-modules of attack network.
func (a *AttackNetwork) start(ctx irrecoverable.SignalerContext) error {
// creates a connection to all corrupted nodes in the attack network.
for _, corruptedNodeId := range a.corruptedNodeIds {
connection, err := a.corruptedConnector.Connect(ctx, corruptedNodeId.NodeID)
if err != nil {
return fmt.Errorf("could not establish corruptible connection to node %x: %w", corruptedNodeId.NodeID, err)
}
a.corruptedConnections[corruptedNodeId.NodeID] = connection
a.logger.Info().Hex("node_id", logging.ID(corruptedNodeId.NodeID)).Msg("attacker successfully registered on corrupted node")
}
// registers attack network for orchestrator.
a.orchestrator.WithAttackNetwork(a)
return nil
}
// stop conducts the termination logic of the sub-modules of attack network.
func (a *AttackNetwork) stop() error {
// tears down connections to corruptible nodes.
var errors *multierror.Error
for _, connection := range a.corruptedConnections {
err := connection.CloseConnection()
if err != nil {
errors = multierror.Append(errors, err)
}
}
return errors.ErrorOrNil()
}
// Observe is the inbound message handler of the attack network.
// Instead of dispatching their messages to the networking layer of Flow, the conduits of corrupted nodes
// dispatch the outgoing messages to the attack network by calling the InboundHandler method of it remotely.
func (a *AttackNetwork) Observe(message *insecure.Message) {
if err := a.processMessageFromCorruptedNode(message); err != nil {
a.logger.Fatal().Err(err).Msg("could not process message of corrupted node")
}
}
// processMessageFromCorruptedNode processes incoming messages arrived from corruptible conduits by passing them
// to the orchestrator.
func (a *AttackNetwork) processMessageFromCorruptedNode(message *insecure.Message) error {
event, err := a.codec.Decode(message.Payload)
if err != nil {
return fmt.Errorf("could not decode observed payload: %w", err)
}
sender, err := flow.ByteSliceToId(message.OriginID)
if err != nil {
return fmt.Errorf("could not convert origin id to flow identifier: %w", err)
}
targetIds, err := flow.ByteSlicesToIds(message.TargetIDs)
if err != nil {
return fmt.Errorf("could not convert target ids to flow identifiers: %w", err)
}
// making sure events are sequentialized to orchestrator.
a.orchestratorMutex.Lock()
defer a.orchestratorMutex.Unlock()
err = a.orchestrator.HandleEventFromCorruptedNode(&insecure.Event{
CorruptedNodeId: sender,
Channel: channels.Channel(message.ChannelID),
FlowProtocolEvent: event,
Protocol: message.Protocol,
TargetNum: message.TargetNum,
TargetIds: targetIds,
})
if err != nil {
return fmt.Errorf("could not handle event by orchestrator: %w", err)
}
return nil
}
// Send enforces dissemination of given event via its encapsulated corrupted node networking layer through the Flow network
func (a *AttackNetwork) Send(event *insecure.Event) error {
connection, ok := a.corruptedConnections[event.CorruptedNodeId]
if !ok {
return fmt.Errorf("no connection available for corrupted conduit factory to node %x: ", event.CorruptedNodeId)
}
msg, err := a.eventToMessage(event.CorruptedNodeId, event.FlowProtocolEvent, event.Channel, event.Protocol, event.TargetNum, event.TargetIds...)
if err != nil {
return fmt.Errorf("could not convert event to message: %w", err)
}
err = connection.SendMessage(msg)
if err != nil {
return fmt.Errorf("could not sent event to corrupted node: %w", err)
}
return nil
}
// eventToMessage converts the given application layer event to a protobuf message that is meant to be sent to the corrupted node.
func (a *AttackNetwork) eventToMessage(corruptedId flow.Identifier,
event interface{},
channel channels.Channel,
protocol insecure.Protocol,
num uint32,
targetIds ...flow.Identifier) (*insecure.Message, error) {
payload, err := a.codec.Encode(event)
if err != nil {
return nil, fmt.Errorf("could not encode event: %w", err)
}
return &insecure.Message{
ChannelID: channel.String(),
OriginID: corruptedId[:],
TargetNum: num,
TargetIDs: flow.IdsToBytes(targetIds),
Payload: payload,
Protocol: protocol,
}, nil
}