-
Notifications
You must be signed in to change notification settings - Fork 178
/
factory.go
113 lines (88 loc) · 3.47 KB
/
factory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package corruptnet
import (
"context"
"fmt"
"github.com/onflow/flow-go/network/channels"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/insecure"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/network"
)
const networkingProtocolTCP = "tcp"
// ConduitFactory implements a corrupt conduit factory, that creates corrupt conduits.
type ConduitFactory struct {
logger zerolog.Logger
adapter network.Adapter
egressController insecure.EgressController
}
var _ insecure.CorruptConduitFactory = &ConduitFactory{}
func NewCorruptConduitFactory(logger zerolog.Logger, chainId flow.ChainID) *ConduitFactory {
if chainId != flow.BftTestnet {
panic("illegal chain id for using corrupt conduit factory")
}
factory := &ConduitFactory{
logger: logger.With().Str("module", "corrupt-conduit-factory").Logger(),
}
return factory
}
// RegisterAdapter sets the Adapter component of the factory.
// The Adapter is a wrapper around the Network layer that only exposes the set of methods
// that are needed by a conduit.
func (c *ConduitFactory) RegisterAdapter(adapter network.Adapter) error {
if c.adapter != nil {
return fmt.Errorf("could not register a new network adapter, one already exists")
}
c.adapter = adapter
return nil
}
// RegisterEgressController sets the EgressController component of the factory.
func (c *ConduitFactory) RegisterEgressController(controller insecure.EgressController) error {
if c.egressController != nil {
return fmt.Errorf("could not register a new egress controller, one already exists")
}
c.egressController = controller
return nil
}
// NewConduit creates a conduit on the specified channel.
// Prior to creating any conduit, the factory requires an Adapter to be registered with it.
func (c *ConduitFactory) NewConduit(ctx context.Context, channel channels.Channel) (network.Conduit, error) {
if c.adapter == nil {
return nil, fmt.Errorf("could not create a new conduit, missing a registered network adapter")
}
if c.egressController == nil {
return nil, fmt.Errorf("could not create a new conduit, missing a registered egress controller")
}
child, cancel := context.WithCancel(ctx)
con := &Conduit{
ctx: child,
cancel: cancel,
channel: channel,
egressController: c.egressController,
}
return con, nil
}
// UnregisterChannel is called by the slave conduits of this factory to let it know that the corresponding engine of the
// conduit is not going to use it anymore, so the channel can be closed safely.
func (c *ConduitFactory) UnregisterChannel(channel channels.Channel) error {
return c.adapter.UnRegisterChannel(channel)
}
// SendOnFlowNetwork dispatches the given event to the networking layer of the node in order to be delivered
// through the specified protocol to the target identifiers.
func (c *ConduitFactory) SendOnFlowNetwork(event interface{},
channel channels.Channel,
protocol insecure.Protocol,
num uint, targetIds ...flow.Identifier) error {
switch protocol {
case insecure.Protocol_UNICAST:
if len(targetIds) > 1 {
return fmt.Errorf("more than one target ids for unicast: %v", targetIds)
}
return c.adapter.UnicastOnChannel(channel, event, targetIds[0])
case insecure.Protocol_PUBLISH:
return c.adapter.PublishOnChannel(channel, event, targetIds...)
case insecure.Protocol_MULTICAST:
return c.adapter.MulticastOnChannel(channel, event, num, targetIds...)
default:
return fmt.Errorf("unknown protocol for sending on network: %d", protocol)
}
}