Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vishal/refactor network #48

Merged
merged 46 commits into from Oct 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
55a7328
disconnecting from peers not part of the new topology
vishalchangrani Sep 30, 2020
60cf911
wip
vishalchangrani Oct 1, 2020
9e70c00
wip
vishalchangrani Oct 1, 2020
30f68c2
wip
vishalchangrani Oct 1, 2020
25249d8
using collection topology in test
vishalchangrani Oct 1, 2020
e87115b
wip
vishalchangrani Oct 2, 2020
2c14597
creating a new way to create network mocks
vishalchangrani Oct 2, 2020
f70458d
wip
vishalchangrani Oct 2, 2020
bac1694
wip
vishalchangrani Oct 3, 2020
391ad5c
adding network as protocol event listener in scaffold
vishalchangrani Oct 3, 2020
8be8148
subscribing to dummy topic to trigger discovery via libp2p discovery
vishalchangrani Oct 5, 2020
2723e55
wip
vishalchangrani Oct 5, 2020
88bfe16
reverting dummy topic subscription logic
vishalchangrani Oct 6, 2020
103c95f
Merge branch 'master' into vishal/refactor_network
vishalchangrani Oct 6, 2020
433d305
Update network/gossip/libp2p/test/testUtil.go
vishalchangrani Oct 7, 2020
e0704c4
Update network/gossip/libp2p/libp2pNode.go
vishalchangrani Oct 7, 2020
585bdcc
fixing middleware_test
vishalchangrani Oct 7, 2020
eac83be
Merge remote-tracking branch 'origin/jordan/2594-broadcast-to-next-ep…
vishalchangrani Oct 7, 2020
8ddeb7b
removing helper.go; listening for EpochSetupPhaseStarted in the netwo…
vishalchangrani Oct 8, 2020
84c097b
lowering libp2p log level
vishalchangrani Oct 8, 2020
722a9d4
changing connection manager log messages
vishalchangrani Oct 8, 2020
d347421
add skip to setup of epoch transition tests
vishalchangrani Oct 8, 2020
c1828ee
add skip to setup of epoch transition tests
vishalchangrani Oct 8, 2020
70c484e
wip
vishalchangrani Oct 8, 2020
9b6345a
Merge branch 'master' into vishal/refactor_network
vishalchangrani Oct 8, 2020
681e66c
lint
vishalchangrani Oct 8, 2020
4556b19
conditional skip to network epoch test
vishalchangrani Oct 8, 2020
71b1370
Merge branch 'vishal/refactor_network' of github.com:onflow/flow-go i…
vishalchangrani Oct 8, 2020
703d2e9
Merge branch 'master' into vishal/refactor_network
vishalchangrani Oct 8, 2020
34b0c0e
changing how free ports are derived
vishalchangrani Oct 9, 2020
2366b67
Merge branch 'vishal/refactor_network' of github.com:onflow/flow-go i…
vishalchangrani Oct 9, 2020
4d2a5fc
Merge branch 'master' into vishal/refactor_network
vishalchangrani Oct 9, 2020
525a320
Merge branch 'master' into vishal/refactor_network
vishalchangrani Oct 9, 2020
b2da736
adding missing return
vishalchangrani Oct 10, 2020
bb9f48f
Merge branch 'vishal/refactor_network' of github.com:onflow/flow-go i…
vishalchangrani Oct 10, 2020
fe1f949
incorporating review comments
vishalchangrani Oct 13, 2020
cc04818
Merge branch 'master' into vishal/refactor_network
vishalchangrani Oct 13, 2020
b18fafa
Update network/gossip/libp2p/network.go
vishalchangrani Oct 13, 2020
514e12c
fixing the epoch transition test
vishalchangrani Oct 14, 2020
a2d5e47
sepearting epoch consumer to NodeIDRefresher; making conngator concur…
vishalchangrani Oct 14, 2020
a2a8747
Merge branch 'master' into vishal/refactor_network
vishalchangrani Oct 20, 2020
64be110
incorporating review comments
vishalchangrani Oct 20, 2020
21a4c7a
Merge branch 'vishal/refactor_network' of github.com:onflow/flow-go i…
vishalchangrani Oct 20, 2020
baafb04
Merge branch 'master' into vishal/refactor_network
vishalchangrani Oct 21, 2020
20fa2f8
fixing test flakyness
vishalchangrani Oct 21, 2020
a0ff783
Merge branch 'vishal/refactor_network' of github.com:onflow/flow-go i…
vishalchangrani Oct 21, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 9 additions & 6 deletions cmd/scaffold.go
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/onflow/flow-go/fvm"
"github.com/onflow/flow-go/model/bootstrap"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/local"
"github.com/onflow/flow-go/module/metrics"
Expand Down Expand Up @@ -176,17 +175,17 @@ func (fnb *FlowNodeBuilder) enqueueNetworkInit() {
}
fnb.Middleware = mw

participants, err := fnb.State.Final().Identities(filter.Any)
if err != nil {
return nil, fmt.Errorf("could not get network identities: %w", err)
}

nodeID, err := fnb.State.Final().Identity(fnb.Me.NodeID())
if err != nil {
return nil, fmt.Errorf("could not get node id: %w", err)
}
nodeRole := nodeID.Role

participants, err := libp2p.IDsFromState(fnb.State)
if err != nil {
return nil, fmt.Errorf("could not get network identities: %w", err)
}

var nodeTopology topology.Topology
if nodeRole == flow.RoleCollection {
nodeTopology, err = topology.NewCollectionTopology(nodeID.NodeID, fnb.State)
Expand All @@ -203,6 +202,10 @@ func (fnb *FlowNodeBuilder) enqueueNetworkInit() {
}

fnb.Network = net

idRefresher := libp2p.NewNodeIDRefresher(fnb.Logger, fnb.State, net.SetIDs)
fnb.ProtocolEvents.AddConsumer(idRefresher)

return net, err
})
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -35,6 +35,7 @@ require (
github.com/onflow/flow-go/crypto v0.9.4
github.com/onflow/flow/protobuf/go/flow v0.1.7
github.com/opentracing/opentracing-go v1.2.0
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the past, when the network unit test ran, the ports were automatically selected when the libp2p host started by specifying the listen address of 0.0.0.0:0. While that was convenient in terms of getting a free local port to listen on, the test setup became overly complex. The test had to create a half baked network, start the libp2p host and go back and update the peer ids.
I changed it so that the test asks for open ports on a system using this third party library and uses those passing it down as the flow.Identity from network -> middleware -> libp2p.

github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/tsdb v0.7.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Expand Up @@ -706,6 +706,8 @@ github.com/pborman/uuid v0.0.0-20170112150404-1b00554d8222/go.mod h1:VyrYX9gd7ir
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7/go.mod h1:CRroGNssyjTd/qIG2FyxByd2S8JEAZXBl4qUrZf8GS0=
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 h1:JhzVVoYvbOACxoUmOs6V/G4D5nPVUW73rKvXxP4XUJc=
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2/go.mod h1:iIss55rKnNBTvrwdmkUpLnDpZoAHvWaiq5+iMmen4AE=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
1 change: 1 addition & 0 deletions integration/go.sum
Expand Up @@ -800,6 +800,7 @@ github.com/pborman/uuid v0.0.0-20170112150404-1b00554d8222/go.mod h1:VyrYX9gd7ir
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7/go.mod h1:CRroGNssyjTd/qIG2FyxByd2S8JEAZXBl4qUrZf8GS0=
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2/go.mod h1:iIss55rKnNBTvrwdmkUpLnDpZoAHvWaiq5+iMmen4AE=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1-0.20171018195549-f15c970de5b7/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
Expand Down
15 changes: 7 additions & 8 deletions module/chunks/publicAssign_test.go
Expand Up @@ -10,7 +10,6 @@ import (

chmodels "github.com/onflow/flow-go/model/chunks"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/network/gossip/libp2p/test"
protocolMock "github.com/onflow/flow-go/state/protocol/mock"
"github.com/onflow/flow-go/utils/unittest"
)
Expand Down Expand Up @@ -44,7 +43,7 @@ func TestAssignment(t *testing.T) {
func (a *PublicAssignmentTestSuite) TestByNodeID() {
size := 5
// creates ids and twice chunks of the ids
ids := test.CreateIDs(size)
ids := unittest.IdentityListFixture(size)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using the more standard unittest.IdentityListFixture call.

chunks := a.CreateChunks(2*size, a.T())
assignment := chmodels.NewAssignment()

Expand Down Expand Up @@ -83,7 +82,7 @@ func (a *PublicAssignmentTestSuite) TestByNodeID() {
func (a *PublicAssignmentTestSuite) TestAssignDuplicate() {
size := 5
// creates ids and twice chunks of the ids
var ids flow.IdentityList = test.CreateIDs(size)
var ids flow.IdentityList = unittest.IdentityListFixture(size)
chunks := a.CreateChunks(2, a.T())
assignment := chmodels.NewAssignment()

Expand Down Expand Up @@ -121,7 +120,7 @@ func (a *PublicAssignmentTestSuite) TestPermuteEntirely() {

// creates random ids
count := 10
var idList flow.IdentityList = test.CreateIDs(count)
var idList flow.IdentityList = unittest.IdentityListFixture(count)
var ids flow.IdentifierList = idList.NodeIDs()
original := make(flow.IdentifierList, count)
copy(original, ids)
Expand Down Expand Up @@ -166,7 +165,7 @@ func (a *PublicAssignmentTestSuite) TestPermuteSublist() {
count := 10
subset := 4

var idList flow.IdentityList = test.CreateIDs(count)
var idList flow.IdentityList = unittest.IdentityListFixture(count)
var ids flow.IdentifierList = idList.NodeIDs()
original := make([]flow.Identifier, count)
copy(original, ids)
Expand Down Expand Up @@ -200,7 +199,7 @@ func (a *PublicAssignmentTestSuite) TestDeterministicy() {
snapshot.On("Seed", mock.Anything, mock.Anything, mock.Anything).Return(seed, nil)

// creates two set of the same nodes
nodes1 := test.CreateIDs(n)
nodes1 := unittest.IdentityListFixture(n)
nodes2 := make([]*flow.Identity, n)
require.Equal(a.T(), copy(nodes2, nodes1), n)

Expand Down Expand Up @@ -267,7 +266,7 @@ func (a *PublicAssignmentTestSuite) ChunkAssignmentScenario(chunkNum, verNum, al
snapshot.On("Seed", mock.Anything, mock.Anything, mock.Anything).Return(seed, nil)

// creates nodes and keeps a copy of them
nodes := test.CreateIDs(verNum)
nodes := unittest.IdentityListFixture(verNum)
original := make([]*flow.Identity, verNum)
require.Equal(a.T(), copy(original, nodes), verNum)

Expand All @@ -294,7 +293,7 @@ func (a *PublicAssignmentTestSuite) TestCacheAssignment() {
snapshot.On("Seed", mock.Anything, mock.Anything, mock.Anything).Return(seed, nil)

// creates nodes and keeps a copy of them
nodes := test.CreateIDs(5)
nodes := unittest.IdentityListFixture(5)
assigner, err := NewPublicAssignment(3, state)
require.NoError(a.T(), err)

Expand Down
7 changes: 7 additions & 0 deletions network/gossip/libp2p/connGator.go
@@ -1,6 +1,8 @@
package libp2p

import (
"sync"

"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/control"
"github.com/libp2p/go-libp2p-core/network"
Expand All @@ -14,6 +16,7 @@ var _ connmgr.ConnectionGater = (*connGater)(nil)
// connGater is the implementatiion of the libp2p connmgr.ConnectionGater interface
// It provides node allowlisting by libp2p peer.ID which is derived from the node public networking key
type connGater struct {
sync.RWMutex
peerIDAllowlist map[peer.ID]struct{} // the in-memory map of approved peer IDs
log zerolog.Logger
}
Expand All @@ -38,7 +41,9 @@ func (c *connGater) update(peerInfos []peer.AddrInfo) {
}

// cache the new map
c.Lock()
c.peerIDAllowlist = peerIDs
c.Unlock()

c.log.Info().Msg("approved list of peers updated")
}
Expand Down Expand Up @@ -85,6 +90,8 @@ func (c *connGater) InterceptUpgraded(network.Conn) (allow bool, reason control.
}

func (c *connGater) validPeerID(p peer.ID) bool {
c.RLock()
defer c.RUnlock()
_, ok := c.peerIDAllowlist[p]
return ok
}
4 changes: 2 additions & 2 deletions network/gossip/libp2p/connManager.go
Expand Up @@ -47,10 +47,10 @@ func (c ConnManager) ListenCloseNotifee(n network.Network, m multiaddr.Multiaddr

// called by libp2p when a connection opened
func (c ConnManager) Connected(n network.Network, con network.Conn) {
c.log.Debug().Str("remote_peer", con.RemotePeer().String()).Int("total_conns", len(n.Conns())).Msg("opened connection")
c.log.Debug().Str("remote_peer", con.RemoteMultiaddr().String()).Int("total_conns", len(n.Conns())).Msg("opened connection")
}

// called by libp2p when a connection closed
func (c ConnManager) Disconnected(n network.Network, con network.Conn) {
c.log.Debug().Str("remote_peer", con.RemotePeer().String()).Int("total_conns", len(n.Conns())).Msg("closed connection")
c.log.Debug().Str("remote_peer", con.RemoteMultiaddr().String()).Int("total_conns", len(n.Conns())).Msg("closed connection")
}
54 changes: 25 additions & 29 deletions network/gossip/libp2p/discovery.go
Expand Up @@ -3,84 +3,80 @@ package libp2p
import (
"context"
"fmt"
"net"
"time"

"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/rs/zerolog"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/network/gossip/libp2p/middleware"
)

// Discovery implements the discovery.Discovery interface to provide libp2p a way to discover other nodes
type Discovery struct {
ctx context.Context
log zerolog.Logger
overlay middleware.Overlay
me flow.Identifier
done chan struct{}
}

func NewDiscovery(log zerolog.Logger, overlay middleware.Overlay, me flow.Identifier, done chan struct{}) *Discovery {
d := &Discovery{overlay: overlay, log: log, me: me, done: done}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done is replaced by a context

func NewDiscovery(ctx context.Context, log zerolog.Logger, overlay middleware.Overlay, me flow.Identifier) *Discovery {
d := &Discovery{
ctx: ctx,
overlay: overlay,
log: log,
me: me,
}
return d
}

// Advertise is suppose to advertise this node's interest in a topic to a discovery service. However, we are not using
// a discovery service hence this function just returns a long duration to reduce the frequency with which libp2p calls it.
func (d *Discovery) Advertise(_ context.Context, _ string, _ ...discovery.Option) (time.Duration, error) {
select {
case <-d.done:
return 0, fmt.Errorf("middleware stopped")
default:
return time.Hour, nil
func (d *Discovery) Advertise(ctx context.Context, _ string, _ ...discovery.Option) (time.Duration, error) {
err := ctx.Err()
if err != nil {
return 0, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be good to append some metadata information on the place where the error happens in both this and subsequent functions: return 0, fmt.Errorf("could not advertise the node: %w", err). Since if such an error happens we are sending a zero delay for the next subsequent call. So it would be good to have a clue on it.

}
return time.Hour, nil
}

// FindPeers returns a channel providing all peers of the node. No parameters are needed as of now since the overlay.Identity
// provides all the information about the other nodes.
func (d *Discovery) FindPeers(_ context.Context, _ string, _ ...discovery.Option) (<-chan peer.AddrInfo, error) {
func (d *Discovery) FindPeers(ctx context.Context, topic string, _ ...discovery.Option) (<-chan peer.AddrInfo, error) {

// if middleware has been stopped, don't return any peers
select {
case <-d.done:
d.log.Debug().Str("topic", topic).Msg("initiating peer discovery")

err := ctx.Err()
if err != nil {
emptyCh := make(chan peer.AddrInfo)
close(emptyCh)
return emptyCh, fmt.Errorf("middleware stopped")
default:
return emptyCh, err
}

// query the overlay to get all the other nodes that should be directly connected to this node for 1-k messaging
// call the callback to get all the other nodes that should be directly connected to this node for 1-k messaging
ids, err := d.overlay.Topology()
if err != nil {
return nil, fmt.Errorf("failed to get ids: %w", err)
}

// remove self from list of nodes to avoid self dial
delete(ids, d.me)
ids = ids.Filter(filter.Not(filter.HasNodeID(d.me)))

// create a channel of peer.AddrInfo that needs to be returned
ch := make(chan peer.AddrInfo, len(ids))

// send each id to the channel after converting it to a peer.AddrInfo
for _, id := range ids {
// create a new NodeAddress
ip, port, err := net.SplitHostPort(id.Address)
if err != nil {
return nil, fmt.Errorf("could not parse address %s: %w", id.Address, err)
}

// convert the Flow key to a LibP2P key
lkey, err := PublicKey(id.NetworkPubKey)
nodeAddress, err := nodeAddressFromIdentity(*id)
if err != nil {
return nil, fmt.Errorf("could not convert flow public key to libp2p public key: %v", err)
return nil, fmt.Errorf(" invalid node address: %s, %w", id.String(), err)
}

nodeAddress := NodeAddress{Name: id.NodeID.String(), IP: ip, Port: port, PubKey: lkey}
addrInfo, err := GetPeerInfo(nodeAddress)
if err != nil {
return nil, fmt.Errorf(" invalid node address: %s, %w", nodeAddress.Name, err)
return nil, fmt.Errorf(" invalid node address: %s, %w", id.String(), err)
}

// add the address to the channel
Expand Down
11 changes: 11 additions & 0 deletions network/gossip/libp2p/libp2pNode.go
Expand Up @@ -526,6 +526,17 @@ func (p *P2PNode) UpdateAllowlist(allowListAddrs ...NodeAddress) error {
return nil
}

// IsConnected returns true is address is a direct peer of this node else false
func (p *P2PNode) IsConnected(address NodeAddress) (bool, error) {
pInfo, err := GetPeerInfo(address)
if err != nil {
return false, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same error appending here.

}
// query libp2p for connectedness status of this peer
isConnected := p.libP2PHost.Network().Connectedness(pInfo.ID) == network.Connected
return isConnected, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe do this: return p.libP2PHost.Network().Connectedness(pInfo.ID) == network.Connected, nil

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both of us think alike - I had exactly that way and Jordan suggested adding a new variable isConnected. I think, the additional variable makes the code a little more readable.

}

func generateProtocolID(rootBlockID string) protocol.ID {
return protocol.ID(FlowLibP2PProtocolIDPrefix + rootBlockID)
}
6 changes: 4 additions & 2 deletions network/gossip/libp2p/libp2pNode_test.go
Expand Up @@ -33,6 +33,8 @@ import (
// Workaround for https://github.com/stretchr/testify/pull/808
const tickForAssertEventually = 100 * time.Millisecond

var rootBlockID = unittest.IdentifierFixture().String()

type LibP2PNodeTestSuite struct {
suite.Suite
ctx context.Context
Expand Down Expand Up @@ -214,7 +216,7 @@ func (l *LibP2PNodeTestSuite) TestCreateStream() {

address2 := addrs[1]

flowProtocolID := generateProtocolID(rootID)
flowProtocolID := generateProtocolID(rootBlockID)
// Assert that there is no outbound stream to the target yet
require.Equal(l.T(), 0, CountStream(nodes[0].libP2PHost, nodes[1].libP2PHost.ID(), flowProtocolID, network.DirOutbound))

Expand Down Expand Up @@ -518,7 +520,7 @@ func (l *LibP2PNodeTestSuite) CreateNodes(count int, handler network.StreamHandl
require.NoError(l.Suite.T(), err)

// create a node on localhost with a random port assigned by the OS
n, nodeID := l.CreateNode(name, pkey, "0.0.0.0", "0", rootID, handler, allowList)
n, nodeID := l.CreateNode(name, pkey, "0.0.0.0", "0", rootBlockID, handler, allowList)
nodes = append(nodes, n)
nodeAddrs = append(nodeAddrs, nodeID)
}
Expand Down