Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vishal/refactor network #48

Merged
merged 46 commits into from Oct 21, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
55a7328
disconnecting from peers not part of the new topology
vishalchangrani Sep 30, 2020
60cf911
wip
vishalchangrani Oct 1, 2020
9e70c00
wip
vishalchangrani Oct 1, 2020
30f68c2
wip
vishalchangrani Oct 1, 2020
25249d8
using collection topology in test
vishalchangrani Oct 1, 2020
e87115b
wip
vishalchangrani Oct 2, 2020
2c14597
creating a new way to create network mocks
vishalchangrani Oct 2, 2020
f70458d
wip
vishalchangrani Oct 2, 2020
bac1694
wip
vishalchangrani Oct 3, 2020
391ad5c
adding network as protocol event listener in scaffold
vishalchangrani Oct 3, 2020
8be8148
subscribing to dummy topic to trigger discovery via libp2p discovery
vishalchangrani Oct 5, 2020
2723e55
wip
vishalchangrani Oct 5, 2020
88bfe16
reverting dummy topic subscription logic
vishalchangrani Oct 6, 2020
103c95f
Merge branch 'master' into vishal/refactor_network
vishalchangrani Oct 6, 2020
433d305
Update network/gossip/libp2p/test/testUtil.go
vishalchangrani Oct 7, 2020
e0704c4
Update network/gossip/libp2p/libp2pNode.go
vishalchangrani Oct 7, 2020
585bdcc
fixing middleware_test
vishalchangrani Oct 7, 2020
eac83be
Merge remote-tracking branch 'origin/jordan/2594-broadcast-to-next-ep…
vishalchangrani Oct 7, 2020
8ddeb7b
removing helper.go; listening for EpochSetupPhaseStarted in the netwo…
vishalchangrani Oct 8, 2020
84c097b
lowering libp2p log level
vishalchangrani Oct 8, 2020
722a9d4
changing connection manager log messages
vishalchangrani Oct 8, 2020
d347421
add skip to setup of epoch transition tests
vishalchangrani Oct 8, 2020
c1828ee
add skip to setup of epoch transition tests
vishalchangrani Oct 8, 2020
70c484e
wip
vishalchangrani Oct 8, 2020
9b6345a
Merge branch 'master' into vishal/refactor_network
vishalchangrani Oct 8, 2020
681e66c
lint
vishalchangrani Oct 8, 2020
4556b19
conditional skip to network epoch test
vishalchangrani Oct 8, 2020
71b1370
Merge branch 'vishal/refactor_network' of github.com:onflow/flow-go i…
vishalchangrani Oct 8, 2020
703d2e9
Merge branch 'master' into vishal/refactor_network
vishalchangrani Oct 8, 2020
34b0c0e
changing how free ports are derived
vishalchangrani Oct 9, 2020
2366b67
Merge branch 'vishal/refactor_network' of github.com:onflow/flow-go i…
vishalchangrani Oct 9, 2020
4d2a5fc
Merge branch 'master' into vishal/refactor_network
vishalchangrani Oct 9, 2020
525a320
Merge branch 'master' into vishal/refactor_network
vishalchangrani Oct 9, 2020
b2da736
adding missing return
vishalchangrani Oct 10, 2020
bb9f48f
Merge branch 'vishal/refactor_network' of github.com:onflow/flow-go i…
vishalchangrani Oct 10, 2020
fe1f949
incorporating review comments
vishalchangrani Oct 13, 2020
cc04818
Merge branch 'master' into vishal/refactor_network
vishalchangrani Oct 13, 2020
b18fafa
Update network/gossip/libp2p/network.go
vishalchangrani Oct 13, 2020
514e12c
fixing the epoch transition test
vishalchangrani Oct 14, 2020
a2d5e47
sepearting epoch consumer to NodeIDRefresher; making conngator concur…
vishalchangrani Oct 14, 2020
a2a8747
Merge branch 'master' into vishal/refactor_network
vishalchangrani Oct 20, 2020
64be110
incorporating review comments
vishalchangrani Oct 20, 2020
21a4c7a
Merge branch 'vishal/refactor_network' of github.com:onflow/flow-go i…
vishalchangrani Oct 20, 2020
baafb04
Merge branch 'master' into vishal/refactor_network
vishalchangrani Oct 21, 2020
20fa2f8
fixing test flakyness
vishalchangrani Oct 21, 2020
a0ff783
Merge branch 'vishal/refactor_network' of github.com:onflow/flow-go i…
vishalchangrani Oct 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 15 additions & 1 deletion network/gossip/libp2p/nodeIDRefresher.go
Expand Up @@ -46,7 +46,16 @@ func (listener *NodeIDRefresher) EpochSetupPhaseStarted(newEpoch uint64, first *
}
}

// IDsFromState returns the identities that the network should be using based on the current epoch phase
// IDsFromState returns the identities that the network should be using based on the current epoch phase as following:
// ----------------------------------------------------------------------------------------------------
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// | Epoch Phase | IDs included |
// |---------------------------------------------------------------------------------------------------
// | EpochPhaseStaking | All IDs from current epoch are included (TODO: add ids from preivous state) |
// |---------------------------------------------------------------------------------------------------|
// | EpochPhaseSetup | All IDs from current and next epoch are included |
// |---------------------------------------------------------------------------------------------------|
// | EpochPhaseCommitted | All IDs from current and next epoch are included |
// |---------------------------------------------------------------------------------------------------|
func IDsFromState(state protocol.ReadOnlyState) (flow.IdentityList, error) {

// epoch ids from this epoch
Expand All @@ -61,6 +70,11 @@ func IDsFromState(state protocol.ReadOnlyState) (flow.IdentityList, error) {
return nil, fmt.Errorf("failed to retrieve epoch phase: %w", err)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually we will want something like:

if phase == flow.EpochPhaseStaking {
    ids = ids.Union(state.Final().EPochs().Previous().InitialIdentities())
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated comment on this function to reflect that

//TODO: Add epoch ids from previous phase
//if phase == flow.EpochPhaseStaking {
// ids = ids.Union(state.Final().Epochs().Previous().InitialIdentities())
//}

// if node is in epoch setup or epoch committed phase, include the next epoch identities as well
if phase == flow.EpochPhaseSetup || phase == flow.EpochPhaseCommitted {
nextEpochIDs, err := state.Final().Epochs().Next().InitialIdentities()
Expand Down
100 changes: 47 additions & 53 deletions network/gossip/libp2p/test/epochtransition_test.go
Expand Up @@ -2,6 +2,7 @@ package test

import (
"fmt"
"math"
"math/rand"
"os"
"sync"
Expand All @@ -12,7 +13,7 @@ import (
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/stretchr/testify/assert"
mock2 "github.com/stretchr/testify/mock"
testifymock "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

Expand Down Expand Up @@ -42,13 +43,11 @@ type EpochTransitionTestSuite struct {
}

func TestEpochTransitionTestSuite(t *testing.T) {
if _, found := os.LookupEnv("AllNetworkTest"); !found {
t.Skip("skipping till discovery is updated to add and remove nodes on-demand")
}
suite.Run(t, new(EpochTransitionTestSuite))
}

func (ts *EpochTransitionTestSuite) SetupTest() {
rand.Seed(time.Now().UnixNano())
nodeCount := 10
golog.SetAllLoggers(golog.LevelError)
ts.logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}).With().Caller().Logger()
Expand All @@ -67,7 +66,7 @@ func (ts *EpochTransitionTestSuite) SetupTest() {
// setup state related mocks
ts.state = new(protocol.ReadOnlyState)
ts.snapshot = new(protocol.Snapshot)
ts.snapshot.On("Identities", mock2.Anything).Return(ids, nil)
ts.snapshot.On("Identities", testifymock.Anything).Return(ids, nil)
ts.snapshot.On("Epochs").Return(ts.epochQuery)
ts.snapshot.On("Phase").Return(
func() flow.EpochPhase { return ts.currentEpochPhase },
Expand Down Expand Up @@ -108,7 +107,8 @@ func (ts *EpochTransitionTestSuite) TearDownTest() {
}
}

// TestNewNodeAdded tests that an additional node in a new epoch get connected to other nodes and can exchange messages
// TestNewNodeAdded tests that an additional node in the next epoch gets connected to other nodes and can exchange messages
// in the current epoch
func (ts *EpochTransitionTestSuite) TestNewNodeAdded() {

// create the id, middleware and network for a new node
Expand All @@ -125,78 +125,71 @@ func (ts *EpochTransitionTestSuite) TestNewNodeAdded() {
newEngine := generateEngines(ts.T(), nets)
newEngines := append(ts.engines, newEngine...)

// increment epoch
ts.currentEpoch = ts.currentEpoch + 1
// update epoch query mock to return new IDs for the next epoch
nextEpoch := ts.currentEpoch + 1
ts.addEpoch(nextEpoch, newIDs)

// switch the epoch phase to Setup
// adjust the epoch phase
ts.currentEpochPhase = flow.EpochPhaseSetup

// update epoch query mock to return new IDs for this epoch
ts.addEpoch(ts.currentEpoch, newIDs)

// trigger an epoch transition for all networks
// trigger an epoch phase change for all networks going from flow.EpochPhaseStaking to flow.EpochPhaseSetup
for _, n := range newIDRefreshers {
n.EpochSetupPhaseStarted(ts.currentEpoch, nil)
n.EpochSetupPhaseStarted(nextEpoch, nil)
}

threshold := len(ts.ids) / 2

// check if the new node has at least threshold connections with the existing nodes
// check if the new node has sufficient connections with the existing nodes
// if it does, then it has been inducted successfully in the network
assert.Eventually(ts.T(), func() bool {
connections := 0
for _, id := range ts.ids {
connected, err := newMiddleware.IsConnected(*id)
require.NoError(ts.T(), err)
if connected {
connections++
}
}
return connections >= threshold
}, 5*time.Second, time.Millisecond)
checkConnectivity(ts.T(), newMiddleware, ids)

// check that all the engines on this new epoch can talk to each other
sendMessagesAndVerify(ts.T(), newIDs, newEngines, ts.Publish)
}

// TestNodeRemoved tests that a node that is removed in a new epoch gets disconnected from other nodes
// TestNodeRemoved tests that a node that is removed in the next epoch remains connected for the current epoch
func (ts *EpochTransitionTestSuite) TestNodeRemoved() {

// choose a random index
// choose a random node to remove
removeIndex := rand.Intn(len(ts.ids))
removedID := ts.ids[removeIndex]
removedMW := ts.mws[removeIndex]

// remove the identity at that index from the ids
newIDs := ts.ids.Filter(filter.Not(filter.HasNodeID(ts.ids[removeIndex].NodeID)))
newIDs := ts.ids.Filter(filter.Not(filter.HasNodeID(removedID.NodeID)))

// increment epoch
ts.currentEpoch = ts.currentEpoch + 1
// update epoch query mock to return new IDs for the next epoch
nextEpoch := ts.currentEpoch + 1
ts.addEpoch(nextEpoch, newIDs)

// switch the epoch phase to Setup
// adjust the epoch phase
ts.currentEpochPhase = flow.EpochPhaseSetup

// update epoch query mock to return new IDs for this epoch
ts.addEpoch(ts.currentEpoch, newIDs)

// trigger an epoch transition for all nodes except the evicted one
for i, n := range ts.idRefreshers {
if i == removeIndex {
continue
}
n.EpochSetupPhaseStarted(uint64(ts.currentEpoch), nil)
// trigger an epoch phase change for all nodes
// from flow.EpochPhaseStaking to flow.EpochPhaseSetup
for _, n := range ts.idRefreshers {
n.EpochSetupPhaseStarted(nextEpoch, nil)
}

removedID := ts.ids[removeIndex]
// check that the evicted node has no connections
assert.Eventually(ts.T(), func() bool {
for i := range newIDs {
connected, err := ts.mws[i].IsConnected(*removedID)
require.NoError(ts.T(), err)
// check if the evicted node still has sufficient connections with the existing nodes
checkConnectivity(ts.T(), removedMW, newIDs)

// check that all the engines on this new epoch can still talk to each other
sendMessagesAndVerify(ts.T(), ts.ids, ts.engines, ts.Publish)
}

// checkConnectivity checks that the middleware of a node is directly connected to atleast half of the other nodes
func checkConnectivity(t *testing.T, mw *libp2p.Middleware, ids flow.IdentityList) {
threshold := len(ids) / 2
assert.Eventually(t, func() bool {
connections := 0
for _, id := range ids {
connected, err := mw.IsConnected(*id)
require.NoError(t, err)
if connected {
return false
connections++
}
}
return true
}, 30*time.Second, 10*time.Millisecond)
return connections >= threshold
}, 5*time.Second, time.Millisecond)
}

// sendMessagesAndVerify sends a message from each engine to the other engines and verifies that all the messages are
Expand All @@ -211,8 +204,9 @@ func sendMessagesAndVerify(t *testing.T, ids flow.IdentityList, engs []*MeshEngi

// each node broadcasting a message to all others
for i, eng := range engs {
nonce := rand.Intn(math.MaxInt64)
event := &message.TestMessage{
Text: fmt.Sprintf("hello from node %d to %d other nodes", i, count),
Text: fmt.Sprintf("%d: hello from node %d", nonce, i),
}
others := ids.Filter(filter.Not(filter.HasNodeID(ids[i].NodeID))).NodeIDs()
require.NoError(t, send(event, eng.con, others...))
Expand Down