New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Vishal/refactor network #48
Changes from 13 commits
55a7328
60cf911
9e70c00
30f68c2
25249d8
e87115b
2c14597
f70458d
bac1694
391ad5c
8be8148
2723e55
88bfe16
103c95f
433d305
e0704c4
585bdcc
eac83be
8ddeb7b
84c097b
722a9d4
d347421
c1828ee
70c484e
9b6345a
681e66c
4556b19
71b1370
703d2e9
34b0c0e
2366b67
4d2a5fc
525a320
b2da736
bb9f48f
fe1f949
cc04818
b18fafa
514e12c
a2d5e47
a2a8747
64be110
21a4c7a
baafb04
20fa2f8
a0ff783
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,6 @@ import ( | |
"github.com/onflow/flow-go/crypto" | ||
"github.com/onflow/flow-go/model/bootstrap" | ||
"github.com/onflow/flow-go/model/flow" | ||
"github.com/onflow/flow-go/model/flow/filter" | ||
"github.com/onflow/flow-go/module" | ||
"github.com/onflow/flow-go/module/local" | ||
"github.com/onflow/flow-go/module/metrics" | ||
|
@@ -174,11 +173,6 @@ func (fnb *FlowNodeBuilder) enqueueNetworkInit() { | |
} | ||
fnb.Middleware = mw | ||
|
||
participants, err := fnb.State.Final().Identities(filter.Any) | ||
if err != nil { | ||
return nil, fmt.Errorf("could not get network identities: %w", err) | ||
} | ||
|
||
nodeID, err := fnb.State.Final().Identity(fnb.Me.NodeID()) | ||
if err != nil { | ||
return nil, fmt.Errorf("could not get node id: %w", err) | ||
|
@@ -195,12 +189,15 @@ func (fnb *FlowNodeBuilder) enqueueNetworkInit() { | |
return nil, fmt.Errorf("could not create topology: %w", err) | ||
} | ||
|
||
net, err := libp2p.NewNetwork(fnb.Logger, codec, participants, fnb.Me, fnb.Middleware, 10e6, nodeTopology, fnb.Metrics.Network) | ||
net, err := libp2p.NewNetwork(fnb.Logger, codec, fnb.State, fnb.Me, fnb.Middleware, 10e6, nodeTopology, fnb.Metrics.Network) | ||
if err != nil { | ||
return nil, fmt.Errorf("could not initialize network: %w", err) | ||
} | ||
|
||
fnb.Network = net | ||
|
||
fnb.ProtocolEvents.AddConsumer(net) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added the network as a consumer of the epoch events |
||
|
||
return net, err | ||
}) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,6 +34,7 @@ require ( | |
github.com/onflow/flow-go/crypto v0.9.4 | ||
github.com/onflow/flow/protobuf/go/flow v0.1.7 | ||
github.com/opentracing/opentracing-go v1.2.0 | ||
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the past, when the network unit test ran, the ports were automatically selected when the libp2p host started by specifying the listen address of |
||
github.com/pkg/errors v0.9.1 | ||
github.com/prometheus/client_golang v1.5.1 | ||
github.com/prometheus/tsdb v0.7.1 | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,84 +3,80 @@ package libp2p | |
import ( | ||
"context" | ||
"fmt" | ||
"net" | ||
"time" | ||
|
||
"github.com/libp2p/go-libp2p-core/discovery" | ||
"github.com/libp2p/go-libp2p-core/peer" | ||
"github.com/rs/zerolog" | ||
|
||
"github.com/onflow/flow-go/model/flow" | ||
"github.com/onflow/flow-go/model/flow/filter" | ||
"github.com/onflow/flow-go/network/gossip/libp2p/middleware" | ||
) | ||
|
||
// Discovery implements the discovery.Discovery interface to provide libp2p a way to discover other nodes | ||
type Discovery struct { | ||
ctx context.Context | ||
log zerolog.Logger | ||
overlay middleware.Overlay | ||
me flow.Identifier | ||
done chan struct{} | ||
} | ||
|
||
func NewDiscovery(log zerolog.Logger, overlay middleware.Overlay, me flow.Identifier, done chan struct{}) *Discovery { | ||
d := &Discovery{overlay: overlay, log: log, me: me, done: done} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
func NewDiscovery(ctx context.Context, log zerolog.Logger, overlay middleware.Overlay, me flow.Identifier) *Discovery { | ||
d := &Discovery{ | ||
ctx: ctx, | ||
overlay: overlay, | ||
log: log, | ||
me: me, | ||
} | ||
return d | ||
} | ||
|
||
// Advertise is suppose to advertise this node's interest in a topic to a discovery service. However, we are not using | ||
// a discovery service hence this function just returns a long duration to reduce the frequency with which libp2p calls it. | ||
func (d *Discovery) Advertise(_ context.Context, _ string, _ ...discovery.Option) (time.Duration, error) { | ||
select { | ||
case <-d.done: | ||
return 0, fmt.Errorf("middleware stopped") | ||
default: | ||
return time.Hour, nil | ||
func (d *Discovery) Advertise(ctx context.Context, _ string, _ ...discovery.Option) (time.Duration, error) { | ||
err := ctx.Err() | ||
if err != nil { | ||
return 0, err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would be good to append some metadata information on the place where the error happens in both this and subsequent functions: |
||
} | ||
return time.Hour, nil | ||
} | ||
|
||
// FindPeers returns a channel providing all peers of the node. No parameters are needed as of now since the overlay.Identity | ||
// provides all the information about the other nodes. | ||
func (d *Discovery) FindPeers(_ context.Context, _ string, _ ...discovery.Option) (<-chan peer.AddrInfo, error) { | ||
func (d *Discovery) FindPeers(ctx context.Context, topic string, _ ...discovery.Option) (<-chan peer.AddrInfo, error) { | ||
|
||
// if middleware has been stopped, don't return any peers | ||
select { | ||
case <-d.done: | ||
d.log.Debug().Str("topic", topic).Msg("initiating peer discovery") | ||
|
||
err := ctx.Err() | ||
if err != nil { | ||
emptyCh := make(chan peer.AddrInfo) | ||
close(emptyCh) | ||
return emptyCh, fmt.Errorf("middleware stopped") | ||
default: | ||
return emptyCh, err | ||
} | ||
|
||
// query the overlay to get all the other nodes that should be directly connected to this node for 1-k messaging | ||
// call the callback to get all the other nodes that should be directly connected to this node for 1-k messaging | ||
ids, err := d.overlay.Topology() | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to get ids: %w", err) | ||
} | ||
|
||
// remove self from list of nodes to avoid self dial | ||
delete(ids, d.me) | ||
ids = ids.Filter(filter.Not(filter.HasNodeID(d.me))) | ||
|
||
// create a channel of peer.AddrInfo that needs to be returned | ||
ch := make(chan peer.AddrInfo, len(ids)) | ||
|
||
// send each id to the channel after converting it to a peer.AddrInfo | ||
for _, id := range ids { | ||
// create a new NodeAddress | ||
ip, port, err := net.SplitHostPort(id.Address) | ||
if err != nil { | ||
return nil, fmt.Errorf("could not parse address %s: %w", id.Address, err) | ||
} | ||
|
||
// convert the Flow key to a LibP2P key | ||
lkey, err := PublicKey(id.NetworkPubKey) | ||
nodeAddress, err := nodeAddressFromIdentity(*id) | ||
if err != nil { | ||
return nil, fmt.Errorf("could not convert flow public key to libp2p public key: %v", err) | ||
return nil, fmt.Errorf(" invalid node address: %s, %w", id.String(), err) | ||
} | ||
|
||
nodeAddress := NodeAddress{Name: id.NodeID.String(), IP: ip, Port: port, PubKey: lkey} | ||
addrInfo, err := GetPeerInfo(nodeAddress) | ||
if err != nil { | ||
return nil, fmt.Errorf(" invalid node address: %s, %w", nodeAddress.Name, err) | ||
return nil, fmt.Errorf(" invalid node address: %s, %w", id.String(), err) | ||
} | ||
|
||
// add the address to the channel | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -526,6 +526,17 @@ func (p *P2PNode) UpdateAllowlist(allowListAddrs ...NodeAddress) error { | |
return nil | ||
} | ||
|
||
// IsConnected returns true is address is a direct peer of this node else false | ||
func (p *P2PNode) IsConnected(address NodeAddress) (bool, error) { | ||
pInfo, err := GetPeerInfo(address) | ||
if err != nil { | ||
return false, err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same error appending here. |
||
} | ||
// query libp2p for connectedness status of this peer | ||
connectedness := p.libP2PHost.Network().Connectedness(pInfo.ID) | ||
return network.Connected == connectedness, nil | ||
vishalchangrani marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
func generateProtocolID(rootBlockID string) protocol.ID { | ||
return protocol.ID(FlowLibP2PProtocolIDPrefix + rootBlockID) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,8 +58,8 @@ type Middleware interface { | |
// Overlay represents the interface that middleware uses to interact with the | ||
// overlay network layer. | ||
type Overlay interface { | ||
// Topology returns the identities of a uniform subset of nodes in protocol state | ||
Topology() (map[flow.Identifier]flow.Identity, error) | ||
// Topology returns an identity list of nodes which this node should be directly connected to as peers | ||
Topology() (flow.IdentityList, error) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. returning a simple |
||
// Identity returns a map of all identifier to flow identity | ||
Identity() (map[flow.Identifier]flow.Identity, error) | ||
Receive(nodeID flow.Identifier, msg *message.Message) error | ||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
network now needs the
State
instead of the participants list since the participant list changes epoch to epoch.