Skip to content

Commit

Permalink
Merge branch 'feat/fix-testclusterspeerrejoin' into feat/flush-adder-…
Browse files Browse the repository at this point in the history
…output

License: MIT
Signed-off-by: Hector Sanjuan <code@hector.link>
  • Loading branch information
hsanjuan committed Aug 17, 2018
2 parents 045587b + c8e7131 commit adfc646
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 17 deletions.
2 changes: 2 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,9 @@ This might be due to one or several causes:
}

close(c.readyCh)
c.shutdownLock.Lock()
c.readyB = true
c.shutdownLock.Unlock()
logger.Info("** IPFS Cluster is READY **")
}

Expand Down
29 changes: 16 additions & 13 deletions consensus/raft/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Consensus struct {
rpcReady chan struct{}
readyCh chan struct{}

shutdownLock sync.Mutex
shutdownLock sync.RWMutex
shutdown bool
}

Expand Down Expand Up @@ -139,12 +139,10 @@ func (cc *Consensus) WaitForSync() error {
// signal the component as Ready.
func (cc *Consensus) finishBootstrap() {
// wait until we have RPC to perform any actions.
if cc.rpcClient == nil {
select {
case <-cc.ctx.Done():
return
case <-cc.rpcReady:
}
select {
case <-cc.ctx.Done():
return
case <-cc.rpcReady:
}

// Sometimes bootstrap is a no-op. It only applies when
Expand Down Expand Up @@ -291,9 +289,9 @@ func (cc *Consensus) commit(op *LogOp, rpcOp string, redirectArg interface{}) er
// Being here means we are the LEADER. We can commit.

// now commit the changes to our state
cc.shutdownLock.Lock() // do not shut down while committing
cc.shutdownLock.RLock() // do not shut down while committing
_, finalErr = cc.consensus.CommitOp(op)
cc.shutdownLock.Unlock()
cc.shutdownLock.RUnlock()
if finalErr != nil {
goto RETRY
}
Expand Down Expand Up @@ -347,9 +345,9 @@ func (cc *Consensus) AddPeer(pid peer.ID) error {
return err
}
// Being here means we are the leader and can commit
cc.shutdownLock.Lock() // do not shutdown while committing
cc.shutdownLock.RLock() // do not shutdown while committing
finalErr = cc.raft.AddPeer(peer.IDB58Encode(pid))
cc.shutdownLock.Unlock()
cc.shutdownLock.RUnlock()
if finalErr != nil {
time.Sleep(cc.config.CommitRetryDelay)
continue
Expand All @@ -374,9 +372,9 @@ func (cc *Consensus) RmPeer(pid peer.ID) error {
return err
}
// Being here means we are the leader and can commit
cc.shutdownLock.Lock() // do not shutdown while committing
cc.shutdownLock.RLock() // do not shutdown while committing
finalErr = cc.raft.RemovePeer(peer.IDB58Encode(pid))
cc.shutdownLock.Unlock()
cc.shutdownLock.RUnlock()
if finalErr != nil {
time.Sleep(cc.config.CommitRetryDelay)
continue
Expand Down Expand Up @@ -414,6 +412,8 @@ func (cc *Consensus) Leader() (peer.ID, error) {
// Clean removes all raft data from disk. Next time
// a full new peer will be bootstrapped.
func (cc *Consensus) Clean() error {
cc.shutdownLock.RLock()
defer cc.shutdownLock.RUnlock()
if !cc.shutdown {
return errors.New("consensus component is not shutdown")
}
Expand All @@ -438,6 +438,9 @@ func (cc *Consensus) Rollback(state state.State) error {
// Peers return the current list of peers in the consensus.
// The list will be sorted alphabetically.
func (cc *Consensus) Peers() ([]peer.ID, error) {
cc.shutdownLock.RLock() // prevent shutdown while here
defer cc.shutdownLock.RUnlock()

if cc.shutdown { // things hang a lot in this case
return nil, errors.New("consensus is shutdown")
}
Expand Down
3 changes: 1 addition & 2 deletions ipfscluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func init() {
continue
}
}
ReadyTimeout = 11 * time.Second
}

func checkErr(t *testing.T, err error) {
Expand Down Expand Up @@ -149,8 +150,6 @@ func createComponents(t *testing.T, i int, clusterSecret []byte, staging bool) (
clusterCfg.LeaveOnShutdown = false
clusterCfg.SetBaseDir("./e2eTestRaft/" + pid.Pretty())

ReadyTimeout = consensusCfg.WaitForLeaderTimeout + 1*time.Second

host, err := NewClusterHost(context.Background(), clusterCfg)
checkErr(t, err)

Expand Down
6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@
"hash": "QmZAsayEQakfFbHyakgHRKHwBTWrwuSBTfaMyxJZUG97VC",
"name": "go-libp2p-kad-dht",
"version": "4.3.1"
},
{
"hash": "Qmbq7kGxgcpALGLPaWDyTa6KUq5kBUKdEvkvPZcBkJoLex",
"name": "go-log",
"version": "1.5.6"
}
],
"gxVersion": "0.11.0",
Expand All @@ -145,4 +150,3 @@
"releaseCmd": "git commit -S -a -m \"gx publish $VERSION\"",
"version": "0.4.0"
}

2 changes: 1 addition & 1 deletion peer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func TestClustersPeerRejoin(t *testing.T) {

// add all clusters
for i := 1; i < len(clusters); i++ {
_, err := clusters[0].PeerAdd(clusters[i].id)
err := clusters[i].Join(clusterAddr(clusters[0]))
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 3 additions & 0 deletions pintracker/stateless/stateless_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ func TestTrackUntrackWithNoCancel(t *testing.T) {
t.Fatal(err)
}

// Otherwise fails when running with -race
time.Sleep(300 * time.Millisecond)

err = spt.Track(fastPin)
if err != nil {
t.Fatal(err)
Expand Down

0 comments on commit adfc646

Please sign in to comment.