Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make order be decided first; Remove TMSP Commit/Rollback #176

Merged
merged 5 commits into from
Jan 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
.PHONY: get_deps build all list_deps install

all: install
all: test install

TMROOT = $${TMROOT:-$$HOME/.tendermint}

install:
install: get_deps
go install github.com/tendermint/tendermint/cmd/tendermint

build:
Expand Down Expand Up @@ -32,7 +32,7 @@ list_deps:
go list -f '{{join .Deps "\n"}}' github.com/tendermint/tendermint/... | xargs go list -f '{{if not .Standard}}{{.ImportPath}}{{end}}'

get_deps:
go get github.com/tendermint/tendermint/...
go get -d github.com/tendermint/tendermint/...

revision:
-echo `git rev-parse --verify HEAD` > $(TMROOT)/revision
Expand Down
48 changes: 25 additions & 23 deletions blockchain/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,20 @@ type consensusReactor interface {
type BlockchainReactor struct {
p2p.BaseReactor

sw *p2p.Switch
state *sm.State
proxyAppCtx proxy.AppContext // same as consensus.proxyAppCtx
store *BlockStore
pool *BlockPool
sync bool
requestsCh chan BlockRequest
timeoutsCh chan string
lastBlock *types.Block
sw *p2p.Switch
state *sm.State
proxyAppConn proxy.AppConn // same as consensus.proxyAppConn
store *BlockStore
pool *BlockPool
sync bool
requestsCh chan BlockRequest
timeoutsCh chan string
lastBlock *types.Block

evsw *events.EventSwitch
}

func NewBlockchainReactor(state *sm.State, proxyAppCtx proxy.AppContext, store *BlockStore, sync bool) *BlockchainReactor {
func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConn, store *BlockStore, sync bool) *BlockchainReactor {
if state.LastBlockHeight == store.Height()-1 {
store.height -= 1 // XXX HACK, make this better
}
Expand All @@ -70,13 +70,13 @@ func NewBlockchainReactor(state *sm.State, proxyAppCtx proxy.AppContext, store *
timeoutsCh,
)
bcR := &BlockchainReactor{
state: state,
proxyAppCtx: proxyAppCtx,
store: store,
pool: pool,
sync: sync,
requestsCh: requestsCh,
timeoutsCh: timeoutsCh,
state: state,
proxyAppConn: proxyAppConn,
store: store,
pool: pool,
sync: sync,
requestsCh: requestsCh,
timeoutsCh: timeoutsCh,
}
bcR.BaseReactor = *p2p.NewBaseReactor(log, "BlockchainReactor", bcR)
return bcR
Expand Down Expand Up @@ -231,16 +231,18 @@ FOR_LOOP:
break SYNC_LOOP
} else {
bcR.pool.PopRequest()
err := bcR.state.ExecBlock(bcR.proxyAppCtx, first, firstPartsHeader)
err := bcR.state.ExecBlock(bcR.evsw, bcR.proxyAppConn, first, firstPartsHeader)
if err != nil {
// TODO This is bad, are we zombie?
PanicQ(Fmt("Failed to process committed block: %v", err))
}
err = bcR.state.Commit(bcR.proxyAppCtx)
if err != nil {
// TODO Handle gracefully.
PanicQ(Fmt("Failed to commit block at application: %v", err))
}
/*
err = bcR.proxyAppConn.CommitSync()
if err != nil {
// TODO Handle gracefully.
PanicQ(Fmt("Failed to commit block at application: %v", err))
}
*/
bcR.store.SaveBlock(first, firstParts, second.LastValidation)
bcR.state.Save()
}
Expand Down
13 changes: 6 additions & 7 deletions consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"sort"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -310,17 +311,15 @@ func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) {
blockStore := bc.NewBlockStore(blockDB)

// one for mempool, one for consensus
app := example.NewCounterApplication(false)
appCMem := app.Open()
appCCon := app.Open()
proxyAppCtxMem := proxy.NewLocalAppContext(appCMem)
proxyAppCtxCon := proxy.NewLocalAppContext(appCCon)
mtx, app := new(sync.Mutex), example.NewCounterApplication(false)
proxyAppConnMem := proxy.NewLocalAppConn(mtx, app)
proxyAppConnCon := proxy.NewLocalAppConn(mtx, app)

// Make Mempool
mempool := mempl.NewMempool(proxyAppCtxMem)
mempool := mempl.NewMempool(proxyAppConnMem)

// Make ConsensusReactor
cs := NewConsensusState(state, proxyAppCtxCon, blockStore, mempool)
cs := NewConsensusState(state, proxyAppConnCon, blockStore, mempool)
cs.SetPrivValidator(privVals[0])

evsw := events.NewEventSwitch()
Expand Down
150 changes: 57 additions & 93 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var (
timeoutPrevoteDelta = 0500 * time.Millisecond // timeoutPrevoteN is timeoutPrevote0 + timeoutPrevoteDelta*N
timeoutPrecommit0 = 1000 * time.Millisecond // After any +2/3 precommits received, wait this long for stragglers.
timeoutPrecommitDelta = 0500 * time.Millisecond // timeoutPrecommitN is timeoutPrecommit0 + timeoutPrecommitDelta*N
timeoutCommit = 100 * time.Millisecond // After +2/3 commits received for committed block, wait this long for stragglers in the next height's RoundStepNewHeight.
timeoutCommit = 1000 * time.Millisecond // After +2/3 commits received for committed block, wait this long for stragglers in the next height's RoundStepNewHeight.

)

Expand Down Expand Up @@ -173,16 +173,14 @@ func (ti *timeoutInfo) String() string {
type ConsensusState struct {
QuitService

proxyAppCtx proxy.AppContext
proxyAppConn proxy.AppConn
blockStore *bc.BlockStore
mempool *mempl.Mempool
privValidator *types.PrivValidator

mtx sync.Mutex
RoundState
state *sm.State // State until height-1.
stagedBlock *types.Block // Cache last staged block.
stagedState *sm.State // Cache result of staged block.
state *sm.State // State until height-1.

peerMsgQueue chan msgInfo // serializes msgs affecting state (proposals, block parts, votes)
internalMsgQueue chan msgInfo // like peerMsgQueue but for our own proposals, parts, votes
Expand All @@ -191,14 +189,13 @@ type ConsensusState struct {
tockChan chan timeoutInfo // timeouts are relayed on tockChan to the receiveRoutine

evsw *events.EventSwitch
evc *events.EventCache // set in stageBlock and passed into state

nSteps int // used for testing to limit the number of transitions the state makes
}

func NewConsensusState(state *sm.State, proxyAppCtx proxy.AppContext, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState {
func NewConsensusState(state *sm.State, proxyAppConn proxy.AppConn, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState {
cs := &ConsensusState{
proxyAppCtx: proxyAppCtx,
proxyAppConn: proxyAppConn,
blockStore: blockStore,
mempool: mempool,
peerMsgQueue: make(chan msgInfo, msgQueueSize),
Expand Down Expand Up @@ -416,7 +413,7 @@ func (cs *ConsensusState) updateToState(state *sm.State) {

// Reset fields based on state.
validators := state.Validators
height := state.LastBlockHeight + 1 // next desired block height
height := state.LastBlockHeight + 1 // Next desired block height
lastPrecommits := (*types.VoteSet)(nil)
if cs.CommitRound > -1 && cs.Votes != nil {
if !cs.Votes.Precommits(cs.CommitRound).HasTwoThirdsMajority() {
Expand Down Expand Up @@ -452,8 +449,6 @@ func (cs *ConsensusState) updateToState(state *sm.State) {
cs.LastValidators = state.LastValidators

cs.state = state
cs.stagedBlock = nil
cs.stagedState = nil

// Finally, broadcast RoundState
cs.newStep()
Expand Down Expand Up @@ -795,8 +790,8 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts
return
}

// Mempool run transactions and the resulting hash
txs, hash, err := cs.mempool.Reap()
// Mempool validated transactions
txs, err := cs.mempool.Reap()
if err != nil {
log.Warn("createProposalBlock: Error getting proposal txs", "error", err)
return nil, nil
Expand All @@ -812,7 +807,7 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts
LastBlockHash: cs.state.LastBlockHash,
LastBlockParts: cs.state.LastBlockParts,
ValidatorsHash: cs.state.Validators.Hash(),
AppHash: hash,
AppHash: cs.state.AppHash, // state merkle root of txs from the previous block.
},
LastValidation: validation,
Data: &types.Data{
Expand Down Expand Up @@ -878,8 +873,8 @@ func (cs *ConsensusState) doPrevote(height int, round int) {
return
}

// Try staging cs.ProposalBlock
err := cs.stageBlock(cs.ProposalBlock, cs.ProposalBlockParts)
// Valdiate proposal block
err := cs.state.ValidateBlock(cs.ProposalBlock)
if err != nil {
// ProposalBlock is invalid, prevote nil.
log.Warn("enterPrevote: ProposalBlock is invalid", "error", err)
Expand Down Expand Up @@ -992,7 +987,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) {
if cs.ProposalBlock.HashesTo(hash) {
log.Info("enterPrecommit: +2/3 prevoted proposal block. Locking", "hash", hash)
// Validate the block.
if err := cs.stageBlock(cs.ProposalBlock, cs.ProposalBlockParts); err != nil {
if err := cs.state.ValidateBlock(cs.ProposalBlock); err != nil {
PanicConsensus(Fmt("enterPrecommit: +2/3 prevoted for an invalid block: %v", err))
}
cs.LockedRound = round
Expand Down Expand Up @@ -1120,27 +1115,64 @@ func (cs *ConsensusState) finalizeCommit(height int) {
}

hash, header, ok := cs.Votes.Precommits(cs.CommitRound).TwoThirdsMajority()
block, blockParts := cs.ProposalBlock, cs.ProposalBlockParts

if !ok {
PanicSanity(Fmt("Cannot finalizeCommit, commit does not have two thirds majority"))
}
if !cs.ProposalBlockParts.HasHeader(header) {
if !blockParts.HasHeader(header) {
PanicSanity(Fmt("Expected ProposalBlockParts header to be commit header"))
}
if !cs.ProposalBlock.HashesTo(hash) {
if !block.HashesTo(hash) {
PanicSanity(Fmt("Cannot finalizeCommit, ProposalBlock does not hash to commit hash"))
}
if err := cs.stageBlock(cs.ProposalBlock, cs.ProposalBlockParts); err != nil {
if err := cs.state.ValidateBlock(block); err != nil {
PanicConsensus(Fmt("+2/3 committed an invalid block: %v", err))
}

log.Notice("Finalizing commit of block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash())
log.Info(Fmt("%v", cs.ProposalBlock))
// We have the block, so stage/save/commit-vote.
cs.saveBlock(cs.ProposalBlock, cs.ProposalBlockParts, cs.Votes.Precommits(cs.CommitRound))
log.Notice("Finalizing commit of block", "height", block.Height, "hash", block.Hash())
log.Info(Fmt("%v", block))

// Fire off event for new block.
// TODO: Handle app failure. See #177
cs.evsw.FireEvent(types.EventStringNewBlock(), types.EventDataNewBlock{block})

// Create a copy of the state for staging
stateCopy := cs.state.Copy()

// Run the block on the State:
// + update validator sets
// + run txs on the proxyAppConn
err := stateCopy.ExecBlock(cs.evsw, cs.proxyAppConn, block, blockParts.Header())
if err != nil {
// TODO: handle this gracefully.
PanicQ(Fmt("Exec failed for application"))
}

// Save to blockStore.
if cs.blockStore.Height() < block.Height {
commits := cs.Votes.Precommits(cs.CommitRound)
seenValidation := commits.MakeValidation()
cs.blockStore.SaveBlock(block, blockParts, seenValidation)
}

/*
// Commit to proxyAppConn
err = cs.proxyAppConn.CommitSync()
if err != nil {
// TODO: handle this gracefully.
PanicQ(Fmt("Commit failed for application"))
}
*/

// Save the state.
stateCopy.Save()

// Update mempool.
cs.mempool.Update(block.Height, block.Txs)

// NewHeightStep!
cs.updateToState(cs.stagedState)
cs.updateToState(stateCopy)

// cs.StartTime is already set.
// Schedule Round0 to start soon.
Expand Down Expand Up @@ -1352,39 +1384,6 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string
return
}

func (cs *ConsensusState) stageBlock(block *types.Block, blockParts *types.PartSet) error {
if block == nil {
PanicSanity("Cannot stage nil block")
}

// Already staged?
blockHash := block.Hash()
if cs.stagedBlock != nil && len(blockHash) != 0 && bytes.Equal(cs.stagedBlock.Hash(), blockHash) {
return nil
}

// Create a new event cache to cache all events.
cs.evc = events.NewEventCache(cs.evsw)

// Create a copy of the state for staging
stateCopy := cs.state.Copy()
stateCopy.SetEventCache(cs.evc)

// Run the block on the State:
// + update validator sets
// + first rolls back proxyAppCtx
// + run txs on the proxyAppCtx or rollback
err := stateCopy.ExecBlock(cs.proxyAppCtx, block, blockParts.Header())
if err != nil {
return err
}

// Everything looks good!
cs.stagedBlock = block
cs.stagedState = stateCopy
return nil
}

func (cs *ConsensusState) signVote(type_ byte, hash []byte, header types.PartSetHeader) (*types.Vote, error) {
vote := &types.Vote{
Height: cs.Height,
Expand Down Expand Up @@ -1415,41 +1414,6 @@ func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.Part
}
}

// Save Block, save the +2/3 Commits we've seen
func (cs *ConsensusState) saveBlock(block *types.Block, blockParts *types.PartSet, commits *types.VoteSet) {

// The proposal must be valid.
if err := cs.stageBlock(block, blockParts); err != nil {
PanicSanity(Fmt("saveBlock() an invalid block: %v", err))
}

// Save to blockStore.
if cs.blockStore.Height() < block.Height {
seenValidation := commits.MakeValidation()
cs.blockStore.SaveBlock(block, blockParts, seenValidation)
}

// Commit to proxyAppCtx
err := cs.stagedState.Commit(cs.proxyAppCtx)
if err != nil {
// TODO: handle this gracefully.
PanicQ(Fmt("Commit failed for applicaiton"))
}

// Save the state.
cs.stagedState.Save()

// Update mempool.
cs.mempool.Update(block)

// Fire off event
if cs.evsw != nil && cs.evc != nil {
cs.evsw.FireEvent(types.EventStringNewBlock(), types.EventDataNewBlock{block})
go cs.evc.Flush()
}

}

//---------------------------------------------------------

func CompareHRS(h1, r1 int, s1 RoundStepType, h2, r2 int, s2 RoundStepType) int {
Expand Down
2 changes: 1 addition & 1 deletion consensus/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func TestBadProposal(t *testing.T) {
// wait for proposal
<-proposalCh

//wait for prevote
// wait for prevote
<-voteCh

validatePrevote(t, cs1, round, vss[0], nil)
Expand Down
Loading