Skip to content

Commit

Permalink
Merge pull request #45 from renproject/fix/storage-deadlock
Browse files Browse the repository at this point in the history
Delegate saving/restoring state to process
  • Loading branch information
loongy committed Mar 19, 2020
2 parents c8b494f + 4033fe3 commit ec389c6
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 48 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ go 1.13

require (
github.com/ethereum/go-ethereum v1.9.5
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/onsi/ginkgo v1.11.0
github.com/onsi/gomega v1.8.1
github.com/renproject/abi v0.4.1
Expand Down
48 changes: 35 additions & 13 deletions process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ type Blockchain interface {
BlockExistsAtHeight(block.Height) bool
}

// A SaveRestorer defines a storage interface for the State.
type SaveRestorer interface {
Save(*State)
Restore(*State)
}

// A Proposer builds a `block.Block` for proposals.
type Proposer interface {
BlockProposal(block.Height, block.Round) block.Block
Expand Down Expand Up @@ -97,16 +103,17 @@ type Process struct {
blockchain Blockchain
state State

proposer Proposer
validator Validator
scheduler Scheduler
broadcaster Broadcaster
timer Timer
observer Observer
saveRestorer SaveRestorer
proposer Proposer
validator Validator
scheduler Scheduler
broadcaster Broadcaster
timer Timer
observer Observer
}

// New Process initialised to the default state, starting in the first round.
func New(logger logrus.FieldLogger, signatory id.Signatory, blockchain Blockchain, state State, proposer Proposer, validator Validator, observer Observer, broadcaster Broadcaster, scheduler Scheduler, timer Timer) *Process {
func New(logger logrus.FieldLogger, signatory id.Signatory, blockchain Blockchain, state State, saveRestorer SaveRestorer, proposer Proposer, validator Validator, observer Observer, broadcaster Broadcaster, scheduler Scheduler, timer Timer) *Process {
p := &Process{
logger: logger,
mu: new(sync.Mutex),
Expand All @@ -115,16 +122,31 @@ func New(logger logrus.FieldLogger, signatory id.Signatory, blockchain Blockchai
blockchain: blockchain,
state: state,

proposer: proposer,
validator: validator,
observer: observer,
broadcaster: broadcaster,
scheduler: scheduler,
timer: timer,
saveRestorer: saveRestorer,
proposer: proposer,
validator: validator,
observer: observer,
broadcaster: broadcaster,
scheduler: scheduler,
timer: timer,
}
return p
}

// Save the current state of the process using the saveRestorer.
func (p *Process) Save() {
p.mu.Lock()
defer p.mu.Unlock()
p.saveRestorer.Save(&p.state)
}

// Restore the current state of the process using the saveRestorer.
func (p *Process) Restore() {
p.mu.Lock()
defer p.mu.Unlock()
p.saveRestorer.Restore(&p.state)
}

// SizeHint returns the number of bytes required to store this process in
// binary.
func (p *Process) SizeHint() int {
Expand Down
32 changes: 25 additions & 7 deletions replica/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ type Message struct {
}

// ProcessStorage saves and restores `process.State` to persistent memory. This
// guarantess that in the event of an unexpected shutdown, the Replica will only
// guarantees that in the event of an unexpected shutdown, the Replica will only
// drop the `process.Message` that was currently being handling.
type ProcessStorage interface {
SaveProcess(p *process.Process, shard Shard)
RestoreProcess(p *process.Process, shard Shard)
SaveState(state *process.State, shard Shard)
RestoreState(state *process.State, shard Shard)
}

// Options define a set of properties that can be used to parameterise the
Expand Down Expand Up @@ -96,7 +96,6 @@ type Replica struct {
options Options
shard Shard
p *process.Process
pStorage ProcessStorage
blockStorage BlockStorage

scheduler *roundRobinScheduler
Expand All @@ -121,20 +120,20 @@ func New(options Options, pStorage ProcessStorage, blockStorage BlockStorage, bl
id.NewSignatory(privKey.PublicKey),
blockStorage.Blockchain(shard),
process.DefaultState((len(latestBase.Header().Signatories())-1)/3),
newSaveRestorer(pStorage, shard),
shardRebaser,
shardRebaser,
shardRebaser,
newSigner(broadcaster, shard, privKey),
scheduler,
newBackOffTimer(options.BackOffExp, options.BackOffBase, options.BackOffMax),
)
pStorage.RestoreProcess(p, shard)
p.Restore()

return Replica{
options: options,
shard: shard,
p: p,
pStorage: pStorage,
blockStorage: blockStorage,

scheduler: scheduler,
Expand Down Expand Up @@ -173,14 +172,33 @@ func (replica *Replica) HandleMessage(m Message) {
// Handle the underlying `process.Message` and immediately save the
// `process.Process` afterwards to protect against unexpected crashes
replica.p.HandleMessage(m.Message)
replica.pStorage.SaveProcess(replica.p, replica.shard)
replica.p.Save()
}

func (replica *Replica) Rebase(sigs id.Signatories) {
replica.scheduler.rebase(sigs)
replica.rebaser.rebase(sigs)
}

type saveRestorer struct {
pStorage ProcessStorage
shard Shard
}

func newSaveRestorer(pStorage ProcessStorage, shard Shard) *saveRestorer {
return &saveRestorer{
pStorage: pStorage,
shard: shard,
}
}

func (saveRestorer *saveRestorer) Save(state *process.State) {
saveRestorer.pStorage.SaveState(state, saveRestorer.shard)
}
func (saveRestorer *saveRestorer) Restore(state *process.State) {
saveRestorer.pStorage.RestoreState(state, saveRestorer.shard)
}

type baseBlockCache struct {
lastBaseBlockHeight block.Height
lastBaseBlockHash id.Hash
Expand Down
4 changes: 2 additions & 2 deletions replica/replica_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ func (m mockObserver) DidReceiveSufficientNilPrevotes(process.Messages, int) {
type mockProcessStorage struct {
}

func (m mockProcessStorage) SaveProcess(p *process.Process, shard Shard) {
func (m mockProcessStorage) SaveState(state *process.State, shard Shard) {
}

func (m mockProcessStorage) RestoreProcess(p *process.Process, shard Shard) {
func (m mockProcessStorage) RestoreState(state *process.State, shard Shard) {
}
41 changes: 28 additions & 13 deletions testutil/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,13 @@ type ProcessOrigin struct {
BroadcastMessages chan process.Message
CastMessages chan process.Message

Proposer process.Proposer
Validator process.Validator
Scheduler process.Scheduler
Broadcaster process.Broadcaster
Timer process.Timer
Observer process.Observer
SaveRestorer process.SaveRestorer
Proposer process.Proposer
Validator process.Validator
Scheduler process.Scheduler
Broadcaster process.Broadcaster
Timer process.Timer
Observer process.Observer
}

func NewProcessOrigin(f int) ProcessOrigin {
Expand Down Expand Up @@ -204,12 +205,13 @@ func NewProcessOrigin(f int) ProcessOrigin {
BroadcastMessages: broadcastMessages,
CastMessages: castMessages,

Proposer: NewMockProposer(privateKey),
Validator: NewMockValidator(nil),
Scheduler: NewMockScheduler(sig),
Broadcaster: NewMockBroadcaster(broadcastMessages, castMessages),
Timer: NewMockTimer(1 * time.Second),
Observer: MockObserver{},
SaveRestorer: NewMockSaveRestorer(),
Proposer: NewMockProposer(privateKey),
Validator: NewMockValidator(nil),
Scheduler: NewMockScheduler(sig),
Broadcaster: NewMockBroadcaster(broadcastMessages, castMessages),
Timer: NewMockTimer(1 * time.Second),
Observer: MockObserver{},
}
}

Expand All @@ -223,6 +225,7 @@ func (p ProcessOrigin) ToProcess() *process.Process {
p.Signatory,
p.Blockchain,
p.State,
p.SaveRestorer,
p.Proposer,
p.Validator,
p.Observer,
Expand Down Expand Up @@ -261,7 +264,7 @@ func (bc *MockBlockchain) InsertBlockAtHeight(height block.Height, block block.B
bc.blocks[height] = block
}

func (bc *MockBlockchain) InsertBlockStatAtHeight(height block.Height, state block.State) {
func (bc *MockBlockchain) InsertBlockStateAtHeight(height block.Height, state block.State) {
bc.mu.Lock()
defer bc.mu.Unlock()

Expand Down Expand Up @@ -309,6 +312,18 @@ func (bc *MockBlockchain) LatestBlock(kind block.Kind) block.Block {
return b
}

type MockSaveRestorer struct {
}

func NewMockSaveRestorer() process.SaveRestorer {
return &MockSaveRestorer{}
}

func (m *MockSaveRestorer) Save(state *process.State) {
}
func (m *MockSaveRestorer) Restore(state *process.State) {
}

type MockProposer struct {
Key *ecdsa.PrivateKey
}
Expand Down
4 changes: 2 additions & 2 deletions testutil/replica/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,14 @@ func (m MockObserver) DidCommitBlock(height block.Height, shard replica.Shard) {
panic("DidCommitBlock should be called only when the block has been added to storage")
}
digest := sha256.Sum256(b.Txs())
blockchain.InsertBlockStatAtHeight(height, digest[:])
blockchain.InsertBlockStateAtHeight(height, digest[:])

// Insert executed state of the previous height
prevBlock, ok := blockchain.BlockAtHeight(height - 1)
if !ok {
panic(fmt.Sprintf("cannot find block of height %v, %v", height-1, prevBlock))
}
blockchain.InsertBlockStatAtHeight(height-1, prevBlock.PreviousState())
blockchain.InsertBlockStateAtHeight(height-1, prevBlock.PreviousState())
}

func (observer *MockObserver) IsSignatory(replica.Shard) bool {
Expand Down
21 changes: 10 additions & 11 deletions testutil/replica/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,27 @@ func NewMockPersistentStorage(shards replica.Shards) *MockPersistentStorage {
}
}

func (store *MockPersistentStorage) SaveProcess(p *process.Process, shard replica.Shard) {
store.mu.Lock()
defer store.mu.Unlock()

data, err := surge.ToBinary(p)
func (store *MockPersistentStorage) SaveState(state *process.State, shard replica.Shard) {
data, err := surge.ToBinary(state)
if err != nil {
panic(fmt.Sprintf("fail to marshal the process, err = %v", err))
panic(fmt.Sprintf("failed to marshal state: %v", err))

}
store.mu.Lock()
defer store.mu.Unlock()
store.processes[shard] = data
}

func (store *MockPersistentStorage) RestoreProcess(p *process.Process, shard replica.Shard) {
func (store *MockPersistentStorage) RestoreState(state *process.State, shard replica.Shard) {
store.mu.RLock()
defer store.mu.RUnlock()

data, ok := store.processes[shard]
if !ok {
return
}
err := surge.FromBinary(data, p)
if err != nil {
panic(err)
if err := surge.FromBinary(data, state); err != nil {
panic(fmt.Sprintf("failed to unmarshal state: %v", err))
}
}

Expand Down Expand Up @@ -101,6 +100,6 @@ func (store *MockPersistentStorage) Init(gb block.Block) {

for _, bc := range store.blockchains {
bc.InsertBlockAtHeight(block.Height(0), gb)
bc.InsertBlockStatAtHeight(block.Height(0), nil)
bc.InsertBlockStateAtHeight(block.Height(0), nil)
}
}

0 comments on commit ec389c6

Please sign in to comment.