10 changes: 8 additions & 2 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,10 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) {
// may generate internal events (votes, complete proposals, 2/3 majorities)
cs.handleMsg(mi)
case mi = <-cs.internalMsgQueue:
cs.wal.WriteSync(mi) // NOTE: fsync
err := cs.wal.WriteSync(mi) // NOTE: fsync
if err != nil {
panic(fmt.Sprintf("Failed to write %v msg to consensus wal due to %v. Check your FS and restart the node", mi, err))
}

if _, ok := mi.Msg.(*VoteMessage); ok {
// we actually want to simulate failing during
Expand Down Expand Up @@ -1313,7 +1316,10 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
// Either way, the ConsensusState should not be resumed until we
// successfully call ApplyBlock (ie. later here, or in Handshake after
// restart).
cs.wal.WriteSync(EndHeightMessage{height}) // NOTE: fsync
me := EndHeightMessage{height}
if err := cs.wal.WriteSync(me); err != nil { // NOTE: fsync
panic(fmt.Sprintf("Failed to write %v msg to consensus wal due to %v. Check your FS and restart the node", me, err))
}

fail.Fail() // XXX

Expand Down
46 changes: 28 additions & 18 deletions consensus/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
)

const (
// must be greater than types.BlockPartSizeBytes + a few bytes
maxMsgSizeBytes = 1024 * 1024 // 1MB
// amino overhead + time.Time + max consensus msg size
maxMsgSizeBytes = maxMsgSize + 24

// how often the WAL should be sync'd during period sync'ing
walDefaultFlushInterval = 2 * time.Second
Expand All @@ -29,8 +29,9 @@ const (
//--------------------------------------------------------
// types and functions for savings consensus messages

// TimedWALMessage wraps WALMessage and adds Time for debugging purposes.
type TimedWALMessage struct {
Time time.Time `json:"time"` // for debugging purposes
Time time.Time `json:"time"`
Msg WALMessage `json:"msg"`
}

Expand All @@ -55,8 +56,8 @@ func RegisterWALMessages(cdc *amino.Codec) {

// WAL is an interface for any write-ahead logger.
type WAL interface {
Write(WALMessage)
WriteSync(WALMessage)
Write(WALMessage) error
WriteSync(WALMessage) error
FlushAndSync() error

SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error)
Expand Down Expand Up @@ -174,29 +175,39 @@ func (wal *baseWAL) Wait() {
// Write is called in newStep and for each receive on the
// peerMsgQueue and the timeoutTicker.
// NOTE: does not call fsync()
func (wal *baseWAL) Write(msg WALMessage) {
func (wal *baseWAL) Write(msg WALMessage) error {
if wal == nil {
return
return nil
}

// Write the wal message
if err := wal.enc.Encode(&TimedWALMessage{tmtime.Now(), msg}); err != nil {
panic(fmt.Sprintf("Error writing msg to consensus wal: %v \n\nMessage: %v", err, msg))
wal.Logger.Error("Error writing msg to consensus wal. WARNING: recover may not be possible for the current height",
"err", err, "msg", msg)
return err
}

return nil
}

// WriteSync is called when we receive a msg from ourselves
// so that we write to disk before sending signed messages.
// NOTE: calls fsync()
func (wal *baseWAL) WriteSync(msg WALMessage) {
func (wal *baseWAL) WriteSync(msg WALMessage) error {
if wal == nil {
return
return nil
}

if err := wal.Write(msg); err != nil {
return err
}

wal.Write(msg)
if err := wal.FlushAndSync(); err != nil {
panic(fmt.Sprintf("Error flushing consensus wal buf to file. Error: %v \n", err))
wal.Logger.Error("WriteSync failed to flush consensus wal. WARNING: may result in creating alternative proposals / votes for the current height iff the node restarted",
"err", err)
return err
}

return nil
}

// WALSearchOptions are optional arguments to SearchForEndHeight.
Expand Down Expand Up @@ -285,7 +296,7 @@ func (enc *WALEncoder) Encode(v *TimedWALMessage) error {
crc := crc32.Checksum(data, crc32c)
length := uint32(len(data))
if length > maxMsgSizeBytes {
return fmt.Errorf("Msg is too big: %d bytes, max: %d bytes", length, maxMsgSizeBytes)
return fmt.Errorf("msg is too big: %d bytes, max: %d bytes", length, maxMsgSizeBytes)
}
totalLength := 8 + int(length)

Expand All @@ -295,7 +306,6 @@ func (enc *WALEncoder) Encode(v *TimedWALMessage) error {
copy(msg[8:], data)

_, err := enc.wr.Write(msg)

return err
}

Expand Down Expand Up @@ -383,9 +393,9 @@ type nilWAL struct{}

var _ WAL = nilWAL{}

func (nilWAL) Write(m WALMessage) {}
func (nilWAL) WriteSync(m WALMessage) {}
func (nilWAL) FlushAndSync() error { return nil }
func (nilWAL) Write(m WALMessage) error { return nil }
func (nilWAL) WriteSync(m WALMessage) error { return nil }
func (nilWAL) FlushAndSync() error { return nil }
func (nilWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
return nil, false, nil
}
Expand Down
12 changes: 7 additions & 5 deletions consensus/wal_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,10 @@ func newByteBufferWAL(logger log.Logger, enc *WALEncoder, nBlocks int64, signalS
// Save writes message to the internal buffer except when heightToStop is
// reached, in which case it will signal the caller via signalWhenStopsTo and
// skip writing.
func (w *byteBufferWAL) Write(m WALMessage) {
func (w *byteBufferWAL) Write(m WALMessage) error {
if w.stopped {
w.logger.Debug("WAL already stopped. Not writing message", "msg", m)
return
return nil
}

if endMsg, ok := m.(EndHeightMessage); ok {
Expand All @@ -179,7 +179,7 @@ func (w *byteBufferWAL) Write(m WALMessage) {
w.logger.Debug("Stopping WAL at height", "height", endMsg.Height)
w.signalWhenStopsTo <- struct{}{}
w.stopped = true
return
return nil
}
}

Expand All @@ -188,10 +188,12 @@ func (w *byteBufferWAL) Write(m WALMessage) {
if err != nil {
panic(fmt.Sprintf("failed to encode the msg %v", m))
}

return nil
}

func (w *byteBufferWAL) WriteSync(m WALMessage) {
w.Write(m)
func (w *byteBufferWAL) WriteSync(m WALMessage) error {
return w.Write(m)
}

func (w *byteBufferWAL) FlushAndSync() error { return nil }
Expand Down
28 changes: 23 additions & 5 deletions consensus/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/crypto/merkle"
"github.com/tendermint/tendermint/libs/autofile"
"github.com/tendermint/tendermint/libs/log"
tmtypes "github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const (
Expand Down Expand Up @@ -101,7 +102,7 @@ func TestWALEncoderDecoder(t *testing.T) {
}
}

func TestWALWritePanicsIfMsgIsTooBig(t *testing.T) {
func TestWALWrite(t *testing.T) {
walDir, err := ioutil.TempDir("", "wal")
require.NoError(t, err)
defer os.RemoveAll(walDir)
Expand All @@ -118,7 +119,24 @@ func TestWALWritePanicsIfMsgIsTooBig(t *testing.T) {
wal.Wait()
}()

assert.Panics(t, func() { wal.Write(make([]byte, maxMsgSizeBytes+1)) })
// 1) Write returns an error if msg is too big
msg := &BlockPartMessage{
Height: 1,
Round: 1,
Part: &tmtypes.Part{
Index: 1,
Bytes: make([]byte, 1),
Proof: merkle.SimpleProof{
Total: 1,
Index: 1,
LeafHash: make([]byte, maxMsgSizeBytes-30),
},
},
}
err = wal.Write(msg)
if assert.Error(t, err) {
assert.Contains(t, err.Error(), "msg is too big")
}
}

func TestWALSearchForEndHeight(t *testing.T) {
Expand Down
32 changes: 31 additions & 1 deletion crypto/merkle/simple_proof.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@ package merkle

import (
"bytes"
"errors"
"fmt"

"github.com/pkg/errors"

"github.com/tendermint/tendermint/crypto/tmhash"
cmn "github.com/tendermint/tendermint/libs/common"
)

const (
// given maxMsgSizeBytes in consensus wal is 1MB
maxAunts = 30000
)

// SimpleProof represents a simple Merkle proof.
// NOTE: The convention for proofs is to include leaf hashes but to
// exclude the root hash.
Expand Down Expand Up @@ -109,6 +116,29 @@ func (sp *SimpleProof) StringIndented(indent string) string {
indent)
}

// ValidateBasic performs basic validation.
// NOTE: it expects LeafHash and Aunts of tmhash.Size size.
func (sp *SimpleProof) ValidateBasic() error {
if sp.Total < 0 {
return errors.New("negative Total")
}
if sp.Index < 0 {
return errors.New("negative Index")
}
if len(sp.LeafHash) != tmhash.Size {
return errors.Errorf("expected LeafHash size to be %d, got %d", tmhash.Size, len(sp.LeafHash))
}
if len(sp.Aunts) > maxAunts {
return errors.Errorf("expected no more than %d aunts, got %d", maxAunts, len(sp.Aunts))
}
for i, auntHash := range sp.Aunts {
if len(auntHash) != tmhash.Size {
return errors.Errorf("expected Aunts#%d size to be %d, got %d", i, tmhash.Size, len(auntHash))
}
}
return nil
}

// Use the leafHash and innerHashes to get the root merkle hash.
// If the length of the innerHashes slice isn't exactly correct, the result is nil.
// Recursive impl.
Expand Down
38 changes: 38 additions & 0 deletions crypto/merkle/simple_proof_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package merkle

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestSimpleProofValidateBasic(t *testing.T) {
testCases := []struct {
testName string
malleateProof func(*SimpleProof)
errStr string
}{
{"Good", func(sp *SimpleProof) {}, ""},
{"Negative Total", func(sp *SimpleProof) { sp.Total = -1 }, "negative Total"},
{"Negative Index", func(sp *SimpleProof) { sp.Index = -1 }, "negative Index"},
{"Invalid LeafHash", func(sp *SimpleProof) { sp.LeafHash = make([]byte, 10) }, "expected LeafHash size to be 32, got 10"},
{"Too many Aunts", func(sp *SimpleProof) { sp.Aunts = make([][]byte, maxAunts+1) }, "expected no more than 100 aunts, got 101"},
{"Invalid Aunt", func(sp *SimpleProof) { sp.Aunts[0] = make([]byte, 10) }, "expected Aunts#0 size to be 32, got 10"},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.testName, func(t *testing.T) {
_, proofs := SimpleProofsFromByteSlices([][]byte{
[]byte("apple"),
[]byte("watermelon"),
[]byte("kiwi"),
})
tc.malleateProof(proofs[0])
err := proofs[0].ValidateBasic()
if tc.errStr != "" {
assert.Contains(t, err.Error(), tc.errStr)
}
})
}
}
3 changes: 3 additions & 0 deletions types/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ const (

// BlockPartSizeBytes is the size of one block part.
BlockPartSizeBytes = 65536 // 64kB

// MaxBlockPartsCount is the maximum count of block parts.
MaxBlockPartsCount = MaxBlockSizeBytes / BlockPartSizeBytes
)

// ConsensusParams contains consensus critical parameters that determine the
Expand Down
7 changes: 5 additions & 2 deletions types/part_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@ type Part struct {
// ValidateBasic performs basic validation.
func (part *Part) ValidateBasic() error {
if part.Index < 0 {
return errors.New("Negative Index")
return errors.New("negative Index")
}
if len(part.Bytes) > BlockPartSizeBytes {
return fmt.Errorf("Too big (max: %d)", BlockPartSizeBytes)
return errors.Errorf("too big: %d bytes, max: %d", len(part.Bytes), BlockPartSizeBytes)
}
if err := part.Proof.ValidateBasic(); err != nil {
return errors.Wrap(err, "wrong Proof")
}
return nil
}
Expand Down
8 changes: 8 additions & 0 deletions types/part_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/tendermint/tendermint/crypto/merkle"
cmn "github.com/tendermint/tendermint/libs/common"
)

Expand Down Expand Up @@ -114,6 +115,13 @@ func TestPartValidateBasic(t *testing.T) {
{"Good Part", func(pt *Part) {}, false},
{"Negative index", func(pt *Part) { pt.Index = -1 }, true},
{"Too big part", func(pt *Part) { pt.Bytes = make([]byte, BlockPartSizeBytes+1) }, true},
{"Too big proof", func(pt *Part) {
pt.Proof = merkle.SimpleProof{
Total: 1,
Index: 1,
LeafHash: make([]byte, 1024*1024),
}
}, true},
}

for _, tc := range testCases {
Expand Down
6 changes: 6 additions & 0 deletions types/vote_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ import (
cmn "github.com/tendermint/tendermint/libs/common"
)

const (
// MaxVotesCount is the maximum votes count. Used in ValidateBasic funcs for
// protection against DOS attacks.
MaxVotesCount = 10000
)

// UNSTABLE
// XXX: duplicate of p2p.ID to avoid dependence between packages.
// Perhaps we can have a minimal types package containing this (and other things?)
Expand Down
2 changes: 1 addition & 1 deletion version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
// Must be a string because scripts like dist.sh read this file.
// XXX: Don't change the name of this variable or you will break
// automation :)
TMCoreSemVer = "0.31.10"
TMCoreSemVer = "0.31.11"

// ABCISemVer is the semantic version of the ABCI library
ABCISemVer = "0.16.0"
Expand Down