Skip to content

Commit

Permalink
fix dirty data in peerset,resolve #3304 (#3359)
Browse files Browse the repository at this point in the history
* fix dirty data in peerset

startInitPeer before PeerSet add the peer,
once mconnection start and Receive of one
Reactor faild, will try to remove it from PeerSet
while PeerSet still not contain the peer. Fix
this by change the order.

* fix test FilterDuplicate

* fix start/stop race

* fix err
  • Loading branch information
zjubfd authored and ebuchman committed Mar 2, 2019
1 parent 37a5484 commit d958941
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 29 deletions.
2 changes: 2 additions & 0 deletions libs/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ type Server struct {
cmds chan cmd
cmdsCap int

// check if we have subscription before
// subscribing or unsubscribing
mtx sync.RWMutex
subscriptions map[string]map[string]struct{} // subscriber -> query (string) -> empty struct
}
Expand Down
43 changes: 18 additions & 25 deletions p2p/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,43 +645,36 @@ func (sw *Switch) addPeer(p Peer) error {

// Handle the shut down case where the switch has stopped but we're
// concurrently trying to add a peer.
if sw.IsRunning() {
// All good. Start peer
if err := sw.startInitPeer(p); err != nil {
return err
}
} else {
if !sw.IsRunning() {
// XXX should this return an error or just log and terminate?
sw.Logger.Error("Won't start a peer - switch is not running", "peer", p)
return nil
}

// Add the peer to .peers.
// We start it first so that a peer in the list is safe to Stop.
// It should not err since we already checked peers.Has().
if err := sw.peers.Add(p); err != nil {
// Start the peer's send/recv routines.
// Must start it before adding it to the peer set
// to prevent Start and Stop from being called concurrently.
err := p.Start()
if err != nil {
// Should never happen
sw.Logger.Error("Error starting peer", "err", err, "peer", p)
return err
}

sw.Logger.Info("Added peer", "peer", p)
sw.metrics.Peers.Add(float64(1))

return nil
}

func (sw *Switch) startInitPeer(p Peer) error {
err := p.Start() // spawn send/recv routines
if err != nil {
// Should never happen
sw.Logger.Error(
"Error starting peer",
"err", err,
"peer", p,
)
// Add the peer to PeerSet. Do this before starting the reactors
// so that if Receive errors, we will find the peer and remove it.
// Add should not err since we already checked peers.Has().
if err := sw.peers.Add(p); err != nil {
return err
}
sw.metrics.Peers.Add(float64(1))

// Start all the reactor protocols on the peer.
for _, reactor := range sw.reactors {
reactor.AddPeer(p)
}

sw.Logger.Info("Added peer", "peer", p)

return nil
}
10 changes: 6 additions & 4 deletions p2p/switch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ func TestSwitchPeerFilterTimeout(t *testing.T) {

func TestSwitchPeerFilterDuplicate(t *testing.T) {
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
sw.Start()
defer sw.Stop()

// simulate remote peer
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
Expand All @@ -293,12 +295,12 @@ func TestSwitchPeerFilterDuplicate(t *testing.T) {
}

err = sw.addPeer(p)
if err, ok := err.(ErrRejected); ok {
if !err.IsDuplicate() {
t.Errorf("expected peer to be duplicate")
if errRej, ok := err.(ErrRejected); ok {
if !errRej.IsDuplicate() {
t.Errorf("expected peer to be duplicate. got %v", errRej)
}
} else {
t.Errorf("expected ErrRejected")
t.Errorf("expected ErrRejected, got %v", err)
}
}

Expand Down

0 comments on commit d958941

Please sign in to comment.