New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Vishal/refactor network #48
Conversation
cmd/scaffold.go
Outdated
@@ -195,12 +189,15 @@ func (fnb *FlowNodeBuilder) enqueueNetworkInit() { | |||
return nil, fmt.Errorf("could not create topology: %w", err) | |||
} | |||
|
|||
net, err := libp2p.NewNetwork(fnb.Logger, codec, participants, fnb.Me, fnb.Middleware, 10e6, nodeTopology, fnb.Metrics.Network) | |||
net, err := libp2p.NewNetwork(fnb.Logger, codec, fnb.State, fnb.Me, fnb.Middleware, 10e6, nodeTopology, fnb.Metrics.Network) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
network now needs the State
instead of the participants list since the participant list changes epoch to epoch.
cmd/scaffold.go
Outdated
if err != nil { | ||
return nil, fmt.Errorf("could not initialize network: %w", err) | ||
} | ||
|
||
fnb.Network = net | ||
|
||
fnb.ProtocolEvents.AddConsumer(net) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the network as a consumer of the epoch events
@@ -34,6 +34,7 @@ require ( | |||
github.com/onflow/flow-go/crypto v0.9.4 | |||
github.com/onflow/flow/protobuf/go/flow v0.1.7 | |||
github.com/opentracing/opentracing-go v1.2.0 | |||
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the past, when the network unit test ran, the ports were automatically selected when the libp2p host started by specifying the listen address of 0.0.0.0:0
. While that was convenient in terms of getting a free local port to listen on, the test setup became overly complex. The test had to create a half baked network, start the libp2p host and go back and update the peer ids.
I changed it so that the test asks for open ports on a system using this third party library and uses those passing it down as the flow.Identity from network -> middleware -> libp2p.
} | ||
|
||
func NewDiscovery(log zerolog.Logger, overlay middleware.Overlay, me flow.Identifier, done chan struct{}) *Discovery { | ||
d := &Discovery{overlay: overlay, log: log, me: me, done: done} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
is replaced by a context
// Topology returns the identities of a uniform subset of nodes in protocol state | ||
Topology() (map[flow.Identifier]flow.Identity, error) | ||
// Topology returns an identity list of nodes which this node should be directly connected to as peers | ||
Topology() (flow.IdentityList, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
returning a simple flow.IdentityList
instead of a map.
|
||
// TestNewNodeAdded tests that an additional node in a new epoch get connected to other nodes and can exchange messages | ||
func (ts *EpochTransitionTestSuite) TestNewNodeAdded() { | ||
ts.T().Skip("skipping till discovery is updated to discover newly added nodes") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these test fail since currently there is no way to trigger the discovery on demand. Once https://github.com/dapperlabs/flow-go/issues/4893 is implemented, these should pass.
@@ -0,0 +1,191 @@ | |||
package test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the new and improved helper
to setup the network for network related unit tests
network/gossip/libp2p/network.go
Outdated
@@ -450,3 +477,16 @@ func (n *Network) queueSubmitFunc(message interface{}) { | |||
Msg("failed to process message") | |||
} | |||
} | |||
|
|||
func (n *Network) EpochTransition(newEpoch uint64, first *flow.Header) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be listening to EpochSetupPhaseStarted
event instead of this one, so that nodes are valid network members prior to their epoch starting. We will also need to account for starting up after this event has been emitted.
// handle the case that we are running when this phase change occurs
func (n *Network) EpochSetupPhaseStarted(...) {
n.ids = n.state.Final.Identities().Union(n.state.Final().Epochs().Next().InitialIdentities())
}
// in the network initialization -- handle the case where we start up after the phase change has occurred
// since we won't get the event when this happens
phase := n.state.Final().Phase()
if phase == flow.EpochPhaseSetup || phase == flow.EpochPhaseCommitted {
// include next epoch identitites
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm doing something similar so implemented Union
in #51
network/gossip/libp2p/test/helper.go
Outdated
if dryrun { | ||
state = generateStateSnapshot(ids) | ||
} else { | ||
identities := make(flow.IdentityList, 0) | ||
state = generateStateSnapshot(identities) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When dryrun
is specified we use the passed-in ids
and otherwise we generate an empty list? Kind of seems backward to me 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so we first start the network with empty identities if dryrun is false, then when the network is up and running we modify the identities with the actual identities with real ip and port. But, as you pointed out - this is confusing and also does not depict the real usage where the ids will already have IP and port.
I have removed helper.go
all together now.
|
||
"github.com/phayes/freeport" | ||
"github.com/rs/zerolog" | ||
mock2 "github.com/stretchr/testify/mock" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Goland auto-rename? Probably want to alias /module/mock
to module
or mockmodule
Co-authored-by: Jordan Schalm <jordan@dapperlabs.com>
Co-authored-by: Jordan Schalm <jordan@dapperlabs.com>
…och' into vishal/refactor_network
…nto vishal/refactor_network
…nto vishal/refactor_network
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice job! Just added a few comments, please consider them before merging.
func (d *Discovery) Advertise(ctx context.Context, _ string, _ ...discovery.Option) (time.Duration, error) { | ||
err := ctx.Err() | ||
if err != nil { | ||
return 0, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be good to append some metadata information on the place where the error happens in both this and subsequent functions: return 0, fmt.Errorf("could not advertise the node: %w", err)
. Since if such an error happens we are sending a zero
delay for the next subsequent call. So it would be good to have a clue on it.
func (p *P2PNode) IsConnected(address NodeAddress) (bool, error) { | ||
pInfo, err := GetPeerInfo(address) | ||
if err != nil { | ||
return false, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same error appending here.
} | ||
// query libp2p for connectedness status of this peer | ||
isConnected := p.libP2PHost.Network().Connectedness(pInfo.ID) == network.Connected | ||
return isConnected, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe do this: return p.libP2PHost.Network().Connectedness(pInfo.ID) == network.Connected, nil
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think both of us think alike - I had exactly that way and Jordan suggested adding a new variable isConnected
. I think, the additional variable makes the code a little more readable.
n.Unlock() | ||
|
||
// update the allow list | ||
err := n.mw.UpdateAllowList() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we also cover this method with a Lock
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UpdateAllowList
calls back network.Identity
which in turn does a lock
. Hence, it would end up in deadlock if I covered UpdateAllowList
with it.
network/gossip/libp2p/network.go
Outdated
// current epoch phase | ||
phase, err := state.Final().Phase() | ||
if err != nil { | ||
log.Err(err).Msg("failed to retrieve epoch phase") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we return an error at this phase and do not continue?
var rootBlockID = unittest.IdentifierFixture().String() | ||
|
||
// generateIDs generate flow Identities with a valid port and networking key | ||
func generateIDs(n int) ([]*flow.Identity, []crypto.PrivateKey, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest passing a t *testing.T
here as a parameter. Then instead of returning the error, you can fail the test at the line it happens. It would be easier to track.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense..will change it to accept t and not return an error
} | ||
|
||
// generateMiddlewares creates and initializes middleware instances for all the identities | ||
func generateMiddlewares(log zerolog.Logger, identities []*flow.Identity, keys []crypto.PrivateKey) ([]*libp2p.Middleware, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here for *testing.T
and the rest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tried to clear up how and when the node identities are added with a diagram. Maybe we can have a quick call to make sure we're on the same page and I can backfill more details into the notion doc.
The implementation looks correct to me, but the test case makes me think there's some miscommunication about the flow of nodes being added/removed.
newNetworks := append(ts.nets, nets...) | ||
|
||
// increment epoch | ||
ts.currentEpoch = ts.currentEpoch + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this test case is testing a circumstance we expect to happen. It's testing this flow:
- start in epoch k with identities
I
- transition to the next epoch (k+1), where we have one additional node identity
N
- trigger
EpochSetupPhaseStarted
in the next epoch - expect that
N
is added
This diagram shows when we add and remove nodes. We always add nodes for epoch N before epoch N begins. By necessity this means that all node identities added to the networking identity list are not yet in the current epoch identity list (state.Final().Identities()
). They're always in the epoch information for the next epoch state.Final().Epochs().Next().InitialIdentities()
.
Translating this test case to fit this scheme, it would look something like:
- start in epoch k with identities
I
- transition to epoch setup phase within epoch k
- this mimics the smart contract emitting the service event containing the identity list for the next (k+1) epoch
- we mock out the protocol state
Epochs().Next().InitialIdentities()
to includeN
- trigger
EpochSetupPhaseStarted
for epoch k - expect that
N
is added
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test utility might be useful for the updated test case:
type EpochQuery struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jordanschalm thanks for those details. My understanding was the same as well but I haven't yet updated the test after my initial set of changes since the test fail as peer discovery cannot be triggered on demand right now. That is something what I am working on right now. Eventually thought, the test should do what you described - add a node from the next epoch and ensure that it is part of the network.
Having said that 😃 , I will update the test and still keep it marked as skipped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this change make sense though - https://github.com/onflow/flow-go/pull/48/files#diff-e313509e481ce199478c3001443850d7880761b01f8f93ba677fab52a2176812R497 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that part LGTM. It was just this test case that threw me off
Co-authored-by: Jordan Schalm <jordan@dapperlabs.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was reading your code to get a bit more insight on how the networking could interface with the protocol state.
I have a couple of recommendations:
- I think
Network
should not implementprotocol.Consumer
:- There will be multiple conditions under which the identity table changes. Monitoring these conditions is IMHO not the business of the networking layer
- I like how the networking layer exposes
SetIDs
. I think this is sufficient. - I would recommend implementing
protocol.Consumer
as its own struct, which calls intoNetwork.SetIDs
. I think the networking layer cold be completely oblivious to the existence of state
- I found it very difficult to understand the used concurrency pattern
Network
, specifically non-atomicity ofSetIDs
andTopology
. I am worried there are potential edge-cases (commented on the ones I noticed).- In my opinion the reason for the complexity is that
Network
need to update theMiddleware
when the identity list changes, but theMiddleware
calls back intoNetwork
(through the Overlay interface). This kind of creates a circular information flow, which is very hard to understand. - I don't think
connGater
is concurrency safe in any way, is it? Yet, don't we update and read its fieldpeerIDAllowlist
concurrently⚠️ :flow-go/network/gossip/libp2p/libp2pNode.go
Line 525 in b18fafa
p.connGater.update(whilelistPInfos)
- In my opinion the reason for the complexity is that
I think both points would be addressed with a stronger hierarchical update
- Specifically, I would suggest to remove
Network.Topology()
andNetwork.Identity()
(including from theOverlay
interface). - Instead of the
Middleware
always queryingNetwork
, I would suggest thatNetwork
pushes updates to the identity list toMiddleware
:- For example,
Middleware
could expose a setterSetOverlay(authorizedNodes map[flow.Identifier]flow.Identity, topology flow.IdentityList) error
Middleware
can then cache the result until the nextSetOverlay
overrides the values. Furthermore,Middleware
can propagate
-allowListAddrs
(computed fromauthorizedNodes
) to theconnGater
component
- newtopology
to theDiscovery
component
- For example,
network/gossip/libp2p/network.go
Outdated
fanout := n.fanout() | ||
n.RLock() | ||
defer n.RUnlock() | ||
subset, err := n.top.Subset(n.ids, fanout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚠️ 🕷️ Uncovered Edge case
I think this is an uncovered edge case:
- there are two inputs to computing the Topology:
n.ids
andfanout
, wherefanout
also depends on the number of nodes, i.e.len(n.ids)
- The values are computed in independently locked code blocks! For example, consider the case where we compute
fanout
but before line 178, the internal values ofn.ids
are concurrently updated. We would then compute the wrong topology.
Instead, the topology needs to be computed in a atomic operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for that detailed review!
the fanout
currently has no effect. The topology ignores it. But I agree with you, I will change this to make the whole operation atomic.
…rency safe; removing fanout function
Updated the PR as per our discussion:
|
|
||
// create the engine for the new node | ||
newEngine := generateEngines(ts.T(), nets) | ||
newEngines := append(ts.engines, newEngine...) | ||
|
||
// increment epoch | ||
ts.currentEpoch = ts.currentEpoch + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should you do ts.epochQuery.Transition()
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so I am still not testing transition from one epoch to the other, I am testing changes of epoch phases. If I call ts.epochQuery.Transition()
it will make the epochQuery
point to the next epoch while I am testing against the current epoch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see, I think the variable name is throwing me a bit. If we aren't doing an epoch transition here, ts.currentEpoch
refers to the counter of the next epoch once we increment it. Maybe we can do:
// update epoch query to return new IDs for next epoch
ts.addEpoch(ts.currentEpoch+1, newIDs)
// setup state related mocks | ||
ts.state = new(protocol.ReadOnlyState) | ||
ts.snapshot = new(protocol.Snapshot) | ||
ts.snapshot.On("Identities", mock2.Anything).Return(ids, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
goland auto-rename
} | ||
} | ||
|
||
// IDsFromState returns the identities that the network should be using based on the current epoch phase |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is worth adding a note here about how this works and the limitations, because it's a bit non-obvious.
For example:
- it will currently omit nodes from previous epoch if we restart during
StakingPhase
- it includes nodes from previous epoch otherwise because we wait to call this function until the transition to setup phase, before that transition we retain what was set during the previous epoch
if err != nil { | ||
return nil, fmt.Errorf("failed to retrieve epoch phase: %w", err) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eventually we will want something like:
if phase == flow.EpochPhaseStaking {
ids = ids.Union(state.Final().EPochs().Previous().InitialIdentities())
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated comment on this function to reflect that
@@ -46,7 +46,16 @@ func (listener *NodeIDRefresher) EpochSetupPhaseStarted(newEpoch uint64, first * | |||
} | |||
} | |||
|
|||
// IDsFromState returns the identities that the network should be using based on the current epoch phase | |||
// IDsFromState returns the identities that the network should be using based on the current epoch phase as following: | |||
// ---------------------------------------------------------------------------------------------------- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
|
||
// create the engine for the new node | ||
newEngine := generateEngines(ts.T(), nets) | ||
newEngines := append(ts.engines, newEngine...) | ||
|
||
// increment epoch | ||
ts.currentEpoch = ts.currentEpoch + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see, I think the variable name is throwing me a bit. If we aren't doing an epoch transition here, ts.currentEpoch
refers to the counter of the next epoch once we increment it. Maybe we can do:
// update epoch query to return new IDs for next epoch
ts.addEpoch(ts.currentEpoch+1, newIDs)
…nto vishal/refactor_network
This PR does the following:
testUtil.go
which should eventually replacehandler.go
).