Skip to content

Commit

Permalink
update network ids on epoch phase changes
Browse files Browse the repository at this point in the history
* adding NodeIDRefresher to update network ids on epoch phase change

* refactoring tests

* fixing concurrency issue with connGator
  • Loading branch information
vishalchangrani committed Oct 21, 2020
1 parent c362eac commit add3a38
Show file tree
Hide file tree
Showing 25 changed files with 719 additions and 820 deletions.
15 changes: 9 additions & 6 deletions cmd/scaffold.go
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/onflow/flow-go/fvm"
"github.com/onflow/flow-go/model/bootstrap"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/local"
"github.com/onflow/flow-go/module/metrics"
Expand Down Expand Up @@ -176,17 +175,17 @@ func (fnb *FlowNodeBuilder) enqueueNetworkInit() {
}
fnb.Middleware = mw

participants, err := fnb.State.Final().Identities(filter.Any)
if err != nil {
return nil, fmt.Errorf("could not get network identities: %w", err)
}

nodeID, err := fnb.State.Final().Identity(fnb.Me.NodeID())
if err != nil {
return nil, fmt.Errorf("could not get node id: %w", err)
}
nodeRole := nodeID.Role

participants, err := libp2p.IDsFromState(fnb.State)
if err != nil {
return nil, fmt.Errorf("could not get network identities: %w", err)
}

var nodeTopology topology.Topology
if nodeRole == flow.RoleCollection {
nodeTopology, err = topology.NewCollectionTopology(nodeID.NodeID, fnb.State)
Expand All @@ -203,6 +202,10 @@ func (fnb *FlowNodeBuilder) enqueueNetworkInit() {
}

fnb.Network = net

idRefresher := libp2p.NewNodeIDRefresher(fnb.Logger, fnb.State, net.SetIDs)
fnb.ProtocolEvents.AddConsumer(idRefresher)

return net, err
})
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -35,6 +35,7 @@ require (
github.com/onflow/flow-go/crypto v0.9.4
github.com/onflow/flow/protobuf/go/flow v0.1.7
github.com/opentracing/opentracing-go v1.2.0
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/tsdb v0.7.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Expand Up @@ -706,6 +706,8 @@ github.com/pborman/uuid v0.0.0-20170112150404-1b00554d8222/go.mod h1:VyrYX9gd7ir
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7/go.mod h1:CRroGNssyjTd/qIG2FyxByd2S8JEAZXBl4qUrZf8GS0=
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 h1:JhzVVoYvbOACxoUmOs6V/G4D5nPVUW73rKvXxP4XUJc=
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2/go.mod h1:iIss55rKnNBTvrwdmkUpLnDpZoAHvWaiq5+iMmen4AE=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
1 change: 1 addition & 0 deletions integration/go.sum
Expand Up @@ -800,6 +800,7 @@ github.com/pborman/uuid v0.0.0-20170112150404-1b00554d8222/go.mod h1:VyrYX9gd7ir
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7/go.mod h1:CRroGNssyjTd/qIG2FyxByd2S8JEAZXBl4qUrZf8GS0=
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2/go.mod h1:iIss55rKnNBTvrwdmkUpLnDpZoAHvWaiq5+iMmen4AE=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1-0.20171018195549-f15c970de5b7/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
Expand Down
15 changes: 7 additions & 8 deletions module/chunks/publicAssign_test.go
Expand Up @@ -10,7 +10,6 @@ import (

chmodels "github.com/onflow/flow-go/model/chunks"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/network/gossip/libp2p/test"
protocolMock "github.com/onflow/flow-go/state/protocol/mock"
"github.com/onflow/flow-go/utils/unittest"
)
Expand Down Expand Up @@ -44,7 +43,7 @@ func TestAssignment(t *testing.T) {
func (a *PublicAssignmentTestSuite) TestByNodeID() {
size := 5
// creates ids and twice chunks of the ids
ids := test.CreateIDs(size)
ids := unittest.IdentityListFixture(size)
chunks := a.CreateChunks(2*size, a.T())
assignment := chmodels.NewAssignment()

Expand Down Expand Up @@ -83,7 +82,7 @@ func (a *PublicAssignmentTestSuite) TestByNodeID() {
func (a *PublicAssignmentTestSuite) TestAssignDuplicate() {
size := 5
// creates ids and twice chunks of the ids
var ids flow.IdentityList = test.CreateIDs(size)
var ids flow.IdentityList = unittest.IdentityListFixture(size)
chunks := a.CreateChunks(2, a.T())
assignment := chmodels.NewAssignment()

Expand Down Expand Up @@ -121,7 +120,7 @@ func (a *PublicAssignmentTestSuite) TestPermuteEntirely() {

// creates random ids
count := 10
var idList flow.IdentityList = test.CreateIDs(count)
var idList flow.IdentityList = unittest.IdentityListFixture(count)
var ids flow.IdentifierList = idList.NodeIDs()
original := make(flow.IdentifierList, count)
copy(original, ids)
Expand Down Expand Up @@ -166,7 +165,7 @@ func (a *PublicAssignmentTestSuite) TestPermuteSublist() {
count := 10
subset := 4

var idList flow.IdentityList = test.CreateIDs(count)
var idList flow.IdentityList = unittest.IdentityListFixture(count)
var ids flow.IdentifierList = idList.NodeIDs()
original := make([]flow.Identifier, count)
copy(original, ids)
Expand Down Expand Up @@ -200,7 +199,7 @@ func (a *PublicAssignmentTestSuite) TestDeterministicy() {
snapshot.On("Seed", mock.Anything, mock.Anything, mock.Anything).Return(seed, nil)

// creates two set of the same nodes
nodes1 := test.CreateIDs(n)
nodes1 := unittest.IdentityListFixture(n)
nodes2 := make([]*flow.Identity, n)
require.Equal(a.T(), copy(nodes2, nodes1), n)

Expand Down Expand Up @@ -267,7 +266,7 @@ func (a *PublicAssignmentTestSuite) ChunkAssignmentScenario(chunkNum, verNum, al
snapshot.On("Seed", mock.Anything, mock.Anything, mock.Anything).Return(seed, nil)

// creates nodes and keeps a copy of them
nodes := test.CreateIDs(verNum)
nodes := unittest.IdentityListFixture(verNum)
original := make([]*flow.Identity, verNum)
require.Equal(a.T(), copy(original, nodes), verNum)

Expand All @@ -294,7 +293,7 @@ func (a *PublicAssignmentTestSuite) TestCacheAssignment() {
snapshot.On("Seed", mock.Anything, mock.Anything, mock.Anything).Return(seed, nil)

// creates nodes and keeps a copy of them
nodes := test.CreateIDs(5)
nodes := unittest.IdentityListFixture(5)
assigner, err := NewPublicAssignment(3, state)
require.NoError(a.T(), err)

Expand Down
7 changes: 7 additions & 0 deletions network/gossip/libp2p/connGator.go
@@ -1,6 +1,8 @@
package libp2p

import (
"sync"

"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/control"
"github.com/libp2p/go-libp2p-core/network"
Expand All @@ -14,6 +16,7 @@ var _ connmgr.ConnectionGater = (*connGater)(nil)
// connGater is the implementatiion of the libp2p connmgr.ConnectionGater interface
// It provides node allowlisting by libp2p peer.ID which is derived from the node public networking key
type connGater struct {
sync.RWMutex
peerIDAllowlist map[peer.ID]struct{} // the in-memory map of approved peer IDs
log zerolog.Logger
}
Expand All @@ -38,7 +41,9 @@ func (c *connGater) update(peerInfos []peer.AddrInfo) {
}

// cache the new map
c.Lock()
c.peerIDAllowlist = peerIDs
c.Unlock()

c.log.Info().Msg("approved list of peers updated")
}
Expand Down Expand Up @@ -85,6 +90,8 @@ func (c *connGater) InterceptUpgraded(network.Conn) (allow bool, reason control.
}

func (c *connGater) validPeerID(p peer.ID) bool {
c.RLock()
defer c.RUnlock()
_, ok := c.peerIDAllowlist[p]
return ok
}
4 changes: 2 additions & 2 deletions network/gossip/libp2p/connManager.go
Expand Up @@ -47,10 +47,10 @@ func (c ConnManager) ListenCloseNotifee(n network.Network, m multiaddr.Multiaddr

// called by libp2p when a connection opened
func (c ConnManager) Connected(n network.Network, con network.Conn) {
c.log.Debug().Str("remote_peer", con.RemotePeer().String()).Int("total_conns", len(n.Conns())).Msg("opened connection")
c.log.Debug().Str("remote_peer", con.RemoteMultiaddr().String()).Int("total_conns", len(n.Conns())).Msg("opened connection")
}

// called by libp2p when a connection closed
func (c ConnManager) Disconnected(n network.Network, con network.Conn) {
c.log.Debug().Str("remote_peer", con.RemotePeer().String()).Int("total_conns", len(n.Conns())).Msg("closed connection")
c.log.Debug().Str("remote_peer", con.RemoteMultiaddr().String()).Int("total_conns", len(n.Conns())).Msg("closed connection")
}
54 changes: 25 additions & 29 deletions network/gossip/libp2p/discovery.go
Expand Up @@ -3,84 +3,80 @@ package libp2p
import (
"context"
"fmt"
"net"
"time"

"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/rs/zerolog"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/network/gossip/libp2p/middleware"
)

// Discovery implements the discovery.Discovery interface to provide libp2p a way to discover other nodes
type Discovery struct {
ctx context.Context
log zerolog.Logger
overlay middleware.Overlay
me flow.Identifier
done chan struct{}
}

func NewDiscovery(log zerolog.Logger, overlay middleware.Overlay, me flow.Identifier, done chan struct{}) *Discovery {
d := &Discovery{overlay: overlay, log: log, me: me, done: done}
func NewDiscovery(ctx context.Context, log zerolog.Logger, overlay middleware.Overlay, me flow.Identifier) *Discovery {
d := &Discovery{
ctx: ctx,
overlay: overlay,
log: log,
me: me,
}
return d
}

// Advertise is suppose to advertise this node's interest in a topic to a discovery service. However, we are not using
// a discovery service hence this function just returns a long duration to reduce the frequency with which libp2p calls it.
func (d *Discovery) Advertise(_ context.Context, _ string, _ ...discovery.Option) (time.Duration, error) {
select {
case <-d.done:
return 0, fmt.Errorf("middleware stopped")
default:
return time.Hour, nil
func (d *Discovery) Advertise(ctx context.Context, _ string, _ ...discovery.Option) (time.Duration, error) {
err := ctx.Err()
if err != nil {
return 0, err
}
return time.Hour, nil
}

// FindPeers returns a channel providing all peers of the node. No parameters are needed as of now since the overlay.Identity
// provides all the information about the other nodes.
func (d *Discovery) FindPeers(_ context.Context, _ string, _ ...discovery.Option) (<-chan peer.AddrInfo, error) {
func (d *Discovery) FindPeers(ctx context.Context, topic string, _ ...discovery.Option) (<-chan peer.AddrInfo, error) {

// if middleware has been stopped, don't return any peers
select {
case <-d.done:
d.log.Debug().Str("topic", topic).Msg("initiating peer discovery")

err := ctx.Err()
if err != nil {
emptyCh := make(chan peer.AddrInfo)
close(emptyCh)
return emptyCh, fmt.Errorf("middleware stopped")
default:
return emptyCh, err
}

// query the overlay to get all the other nodes that should be directly connected to this node for 1-k messaging
// call the callback to get all the other nodes that should be directly connected to this node for 1-k messaging
ids, err := d.overlay.Topology()
if err != nil {
return nil, fmt.Errorf("failed to get ids: %w", err)
}

// remove self from list of nodes to avoid self dial
delete(ids, d.me)
ids = ids.Filter(filter.Not(filter.HasNodeID(d.me)))

// create a channel of peer.AddrInfo that needs to be returned
ch := make(chan peer.AddrInfo, len(ids))

// send each id to the channel after converting it to a peer.AddrInfo
for _, id := range ids {
// create a new NodeAddress
ip, port, err := net.SplitHostPort(id.Address)
if err != nil {
return nil, fmt.Errorf("could not parse address %s: %w", id.Address, err)
}

// convert the Flow key to a LibP2P key
lkey, err := PublicKey(id.NetworkPubKey)
nodeAddress, err := nodeAddressFromIdentity(*id)
if err != nil {
return nil, fmt.Errorf("could not convert flow public key to libp2p public key: %v", err)
return nil, fmt.Errorf(" invalid node address: %s, %w", id.String(), err)
}

nodeAddress := NodeAddress{Name: id.NodeID.String(), IP: ip, Port: port, PubKey: lkey}
addrInfo, err := GetPeerInfo(nodeAddress)
if err != nil {
return nil, fmt.Errorf(" invalid node address: %s, %w", nodeAddress.Name, err)
return nil, fmt.Errorf(" invalid node address: %s, %w", id.String(), err)
}

// add the address to the channel
Expand Down
11 changes: 11 additions & 0 deletions network/gossip/libp2p/libp2pNode.go
Expand Up @@ -526,6 +526,17 @@ func (p *P2PNode) UpdateAllowlist(allowListAddrs ...NodeAddress) error {
return nil
}

// IsConnected returns true is address is a direct peer of this node else false
func (p *P2PNode) IsConnected(address NodeAddress) (bool, error) {
pInfo, err := GetPeerInfo(address)
if err != nil {
return false, err
}
// query libp2p for connectedness status of this peer
isConnected := p.libP2PHost.Network().Connectedness(pInfo.ID) == network.Connected
return isConnected, nil
}

func generateProtocolID(rootBlockID string) protocol.ID {
return protocol.ID(FlowLibP2PProtocolIDPrefix + rootBlockID)
}
6 changes: 4 additions & 2 deletions network/gossip/libp2p/libp2pNode_test.go
Expand Up @@ -33,6 +33,8 @@ import (
// Workaround for https://github.com/stretchr/testify/pull/808
const tickForAssertEventually = 100 * time.Millisecond

var rootBlockID = unittest.IdentifierFixture().String()

type LibP2PNodeTestSuite struct {
suite.Suite
ctx context.Context
Expand Down Expand Up @@ -214,7 +216,7 @@ func (l *LibP2PNodeTestSuite) TestCreateStream() {

address2 := addrs[1]

flowProtocolID := generateProtocolID(rootID)
flowProtocolID := generateProtocolID(rootBlockID)
// Assert that there is no outbound stream to the target yet
require.Equal(l.T(), 0, CountStream(nodes[0].libP2PHost, nodes[1].libP2PHost.ID(), flowProtocolID, network.DirOutbound))

Expand Down Expand Up @@ -518,7 +520,7 @@ func (l *LibP2PNodeTestSuite) CreateNodes(count int, handler network.StreamHandl
require.NoError(l.Suite.T(), err)

// create a node on localhost with a random port assigned by the OS
n, nodeID := l.CreateNode(name, pkey, "0.0.0.0", "0", rootID, handler, allowList)
n, nodeID := l.CreateNode(name, pkey, "0.0.0.0", "0", rootBlockID, handler, allowList)
nodes = append(nodes, n)
nodeAddrs = append(nodeAddrs, nodeID)
}
Expand Down

0 comments on commit add3a38

Please sign in to comment.