Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
huitseeker committed Aug 10, 2021
1 parent 4d3f256 commit 7c3987e
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 27 deletions.
49 changes: 31 additions & 18 deletions network/test/meshengine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/ipfs/go-log"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -49,10 +50,10 @@ func (suite *MeshEngineTestSuite) SetupTest() {
logger := zerolog.New(os.Stderr).Level(zerolog.ErrorLevel)
log.SetAllLoggers(log.LevelError)

// set up a channl to receive pubsub tags from connManagers of the nodes
// set up a channel to receive pubsub tags from connManagers of the nodes
var obs []observable.Observable
peerChannel := make(chan string)
ob := TagsObserver{
ob := tagsObserver{
tags: peerChannel,
}

Expand Down Expand Up @@ -163,10 +164,13 @@ func (suite *MeshEngineTestSuite) allToAllScenario(send ConduitSendWrapperFunc)
}

// allow nodes to heartbeat and discover each other
// each node will register ~D protect messages, where D is the default out-degree (6)
// see https://github.com/libp2p/go-libp2p-pubsub/blob/0c7092d1f50091ae88407ba93103ac5868da3d0a/gossipsub.go#L33
for i := 0; i < 6*count; i++ {
<-suite.obs
// each node will register ~D protect messages, where D is the default out-degree
for i := 0; i < pubsub.GossipSubD*count; i++ {
select {
case <-suite.obs:
case <-time.After(2 * time.Second):
assert.FailNow(suite.T(), "could not receive pubsub tag indicating mesh formed")
}
}

// Each node broadcasting a message to all others
Expand Down Expand Up @@ -239,10 +243,13 @@ func (suite *MeshEngineTestSuite) targetValidatorScenario(send ConduitSendWrappe
}

// allow nodes to heartbeat and discover each other
// each node will register ~D protect messages, where D is the default out-degree (6)
// see https://github.com/libp2p/go-libp2p-pubsub/blob/0c7092d1f50091ae88407ba93103ac5868da3d0a/gossipsub.go#L33
for i := 0; i < 6*count; i++ {
<-suite.obs
// each node will register ~D protect messages, where D is the default out-degree
for i := 0; i < pubsub.GossipSubD*count; i++ {
select {
case <-suite.obs:
case <-time.After(2 * time.Second):
assert.FailNow(suite.T(), "could not receive pubsub tag indicating mesh formed")
}
}

// choose half of the nodes as target
Expand Down Expand Up @@ -296,10 +303,13 @@ func (suite *MeshEngineTestSuite) messageSizeScenario(send ConduitSendWrapperFun
}

// allow nodes to heartbeat and discover each other
// each node will register ~D protect messages per mesh setup, where D is the default out-degree (6)
// see https://github.com/libp2p/go-libp2p-pubsub/blob/0c7092d1f50091ae88407ba93103ac5868da3d0a/gossipsub.go#L33
for i := 0; i < 6*count; i++ {
<-suite.obs
// each node will register ~D protect messages per mesh setup, where D is the default out-degree
for i := 0; i < pubsub.GossipSubD*count; i++ {
select {
case <-suite.obs:
case <-time.After(2 * time.Second):
assert.FailNow(suite.T(), "could not receive pubsub tag indicating mesh formed")
}
}
// others keeps the identifier of all nodes except node that is sender.
others := suite.ids.Filter(filter.Not(filter.HasNodeID(suite.ids[0].NodeID))).NodeIDs()
Expand Down Expand Up @@ -346,10 +356,13 @@ func (suite *MeshEngineTestSuite) conduitCloseScenario(send ConduitSendWrapperFu
}

// allow nodes to heartbeat and discover each other
// each node will register ~D protect messages, where D is the default out-degree (6)
// see https://github.com/libp2p/go-libp2p-pubsub/blob/0c7092d1f50091ae88407ba93103ac5868da3d0a/gossipsub.go#L33
for i := 0; i < 6*count; i++ {
<-suite.obs
// each node will register ~D protect messages, where D is the default out-degree
for i := 0; i < pubsub.GossipSubD*count; i++ {
select {
case <-suite.obs:
case <-time.After(2 * time.Second):
assert.FailNow(suite.T(), "could not receive pubsub tag indicating mesh formed")
}
}

// unregister a random engine from the test topic by calling close on it's conduit
Expand Down
16 changes: 10 additions & 6 deletions network/test/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,22 @@ import (

const testChannel = "test-channel"

type TagsObserver struct {
type tagsObserver struct {
tags chan string
}

func (co *TagsObserver) OnNext(peertag interface{}) {
func (co *tagsObserver) OnNext(peertag interface{}) {
pt, ok := peertag.(PeerTag)

if ok {
co.tags <- fmt.Sprintf("peer: %v tag: %v", pt.peer, pt.tag)
}

}
func (co *TagsObserver) OnError(err error) {
func (co *tagsObserver) OnError(err error) {
close(co.tags)
}
func (co *TagsObserver) OnComplete() {
func (co *tagsObserver) OnComplete() {
close(co.tags)
}

Expand Down Expand Up @@ -73,7 +73,7 @@ func (m *MiddlewareTestSuite) SetupTest() {
// create and start the middlewares and inject a connection observer
var obs []observable.Observable
peerChannel := make(chan string)
ob := TagsObserver{
ob := tagsObserver{
tags: peerChannel,
}

Expand Down Expand Up @@ -390,7 +390,11 @@ func (m *MiddlewareTestSuite) TestUnsubscribe() {

// set up waiting for m.size pubsub tags indicating a mesh has formed
for i := 0; i < m.size; i++ {
<-m.obs
select {
case <-m.obs:
case <-time.After(2 * time.Second):
assert.FailNow(m.T(), "could not receive pubsub tag indicating mesh formed")
}
}

origin := 0
Expand Down
6 changes: 3 additions & 3 deletions network/test/testUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,15 +201,15 @@ func GenerateNetworks(t *testing.T,
return nets
}

// returns nodeIDs, middlewares, and observables which can be subscirbed to in order to witness connect events
// returns nodeIDs, middlewares, and observables which can be subscirbed to in order to witness protect events from pubsub
func GenerateIDsAndMiddlewares(t *testing.T,
n int,
dryRunMode bool,
logger zerolog.Logger, opts ...func(*flow.Identity)) (flow.IdentityList, []*p2p.Middleware, []observable.Observable) {

ids, libP2PNodes, connectObservables := GenerateIDs(t, logger, n, dryRunMode, opts...)
ids, libP2PNodes, protectObservables := GenerateIDs(t, logger, n, dryRunMode, opts...)
mws := GenerateMiddlewares(t, logger, ids, libP2PNodes)
return ids, mws, connectObservables
return ids, mws, protectObservables
}

func GenerateIDsMiddlewaresNetworks(t *testing.T,
Expand Down

0 comments on commit 7c3987e

Please sign in to comment.