Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix non deterministic test failures and race in privval socket #3258

Merged
merged 5 commits into from
Feb 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions consensus/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,20 @@ func (wal *baseWAL) OnStart() error {
return err
}

// Stop the underlying autofile group.
// Use Wait() to ensure it's finished shutting down
// before cleaning up files.
func (wal *baseWAL) OnStop() {
wal.group.Stop()
wal.group.Close()
}

// Wait for the underlying autofile group to finish shutting down
// so it's safe to cleanup files.
func (wal *baseWAL) Wait() {
wal.group.Wait()
}

// Write is called in newStep and for each receive on the
// peerMsgQueue and the timeoutTicker.
// NOTE: does not call fsync()
Expand Down
7 changes: 6 additions & 1 deletion consensus/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ func TestWALTruncate(t *testing.T) {
wal.SetLogger(log.TestingLogger())
err = wal.Start()
require.NoError(t, err)
defer wal.Stop()
defer func() {
wal.Stop()
// wait for the wal to finish shutting down so we
// can safely remove the directory
wal.Wait()
}()

//60 block's size nearly 70K, greater than group's headBuf size(4096 * 10), when headBuf is full, truncate content will Flush to the file.
//at this time, RotateFile is called, truncate content exist in each file.
Expand Down
12 changes: 12 additions & 0 deletions libs/autofile/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ type Group struct {
minIndex int // Includes head
maxIndex int // Includes head, where Head will move to

// close this when the processTicks routine is done.
// this ensures we can cleanup the dir after calling Stop
// and the routine won't be trying to access it anymore
doneProcessTicks chan struct{}

// TODO: When we start deleting files, we need to start tracking GroupReaders
// and their dependencies.
}
Expand All @@ -90,6 +95,7 @@ func OpenGroup(headPath string, groupOptions ...func(*Group)) (g *Group, err err
groupCheckDuration: defaultGroupCheckDuration,
minIndex: 0,
maxIndex: 0,
doneProcessTicks: make(chan struct{}),
}

for _, option := range groupOptions {
Expand Down Expand Up @@ -140,6 +146,11 @@ func (g *Group) OnStop() {
g.Flush() // flush any uncommitted data
}

func (g *Group) Wait() {
// wait for processTicks routine to finish
<-g.doneProcessTicks
}

// Close closes the head file. The group must be stopped by this moment.
func (g *Group) Close() {
g.Flush() // flush any uncommitted data
Expand Down Expand Up @@ -211,6 +222,7 @@ func (g *Group) Flush() error {
}

func (g *Group) processTicks() {
defer close(g.doneProcessTicks)
for {
select {
case <-g.ticker.C:
Expand Down
19 changes: 9 additions & 10 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ func TestSplitAndTrimEmpty(t *testing.T) {
}
}

func TestNodeDelayedStop(t *testing.T) {
config := cfg.ResetTestRoot("node_delayed_node_test")
func TestNodeDelayedStart(t *testing.T) {
config := cfg.ResetTestRoot("node_delayed_start_test")
now := tmtime.Now()

// create & start node
n, err := DefaultNewNode(config, log.TestingLogger())
n.GenesisDoc().GenesisTime = now.Add(5 * time.Second)
n.GenesisDoc().GenesisTime = now.Add(2 * time.Second)
require.NoError(t, err)

n.Start()
Expand Down Expand Up @@ -133,6 +133,7 @@ func TestNodeSetPrivValTCP(t *testing.T) {
types.NewMockPV(),
dialer,
)
privval.RemoteSignerConnDeadline(100 * time.Millisecond)(pvsc)

go func() {
err := pvsc.Start()
Expand Down Expand Up @@ -172,20 +173,18 @@ func TestNodeSetPrivValIPC(t *testing.T) {
types.NewMockPV(),
dialer,
)
privval.RemoteSignerConnDeadline(100 * time.Millisecond)(pvsc)

done := make(chan struct{})
go func() {
defer close(done)
n, err := DefaultNewNode(config, log.TestingLogger())
err := pvsc.Start()
require.NoError(t, err)
assert.IsType(t, &privval.SocketVal{}, n.PrivValidator())
}()
defer pvsc.Stop()

err := pvsc.Start()
n, err := DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err)
defer pvsc.Stop()
assert.IsType(t, &privval.SocketVal{}, n.PrivValidator())

<-done
}

// testFreeAddr claims a free port so we don't block on listener being ready.
Expand Down
2 changes: 1 addition & 1 deletion p2p/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
return err
}

ni, err := handshake(conn, 50*time.Millisecond, sw.nodeInfo)
ni, err := handshake(conn, time.Second, sw.nodeInfo)
if err != nil {
if err := conn.Close(); err != nil {
sw.Logger.Error("Error closing connection", "err", err)
Expand Down
24 changes: 13 additions & 11 deletions privval/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ type SocketVal struct {
// reset if the connection fails.
// failures are detected by a background
// ping routine.
// All messages are request/response, so we hold the mutex
// so only one request/response pair can happen at a time.
// Methods on the underlying net.Conn itself
// are already gorountine safe.
mtx sync.RWMutex
mtx sync.Mutex
signer *RemoteSignerClient
}

Expand All @@ -82,22 +84,22 @@ func NewSocketVal(

// GetPubKey implements PrivValidator.
func (sc *SocketVal) GetPubKey() crypto.PubKey {
sc.mtx.RLock()
defer sc.mtx.RUnlock()
sc.mtx.Lock()
defer sc.mtx.Unlock()
return sc.signer.GetPubKey()
}

// SignVote implements PrivValidator.
func (sc *SocketVal) SignVote(chainID string, vote *types.Vote) error {
sc.mtx.RLock()
defer sc.mtx.RUnlock()
sc.mtx.Lock()
defer sc.mtx.Unlock()
return sc.signer.SignVote(chainID, vote)
}

// SignProposal implements PrivValidator.
func (sc *SocketVal) SignProposal(chainID string, proposal *types.Proposal) error {
sc.mtx.RLock()
defer sc.mtx.RUnlock()
sc.mtx.Lock()
defer sc.mtx.Unlock()
return sc.signer.SignProposal(chainID, proposal)
}

Expand All @@ -106,15 +108,15 @@ func (sc *SocketVal) SignProposal(chainID string, proposal *types.Proposal) erro

// Ping is used to check connection health.
func (sc *SocketVal) Ping() error {
sc.mtx.RLock()
defer sc.mtx.RUnlock()
sc.mtx.Lock()
defer sc.mtx.Unlock()
return sc.signer.Ping()
}

// Close closes the underlying net.Conn.
func (sc *SocketVal) Close() {
sc.mtx.RLock()
defer sc.mtx.RUnlock()
sc.mtx.Lock()
defer sc.mtx.Unlock()
if sc.signer != nil {
if err := sc.signer.Close(); err != nil {
sc.Logger.Error("OnStop", "err", err)
Expand Down