From e63119b2547ed00173f6587fb123c17b6576fd50 Mon Sep 17 00:00:00 2001 From: Preston Van Loon Date: Mon, 3 Feb 2020 15:19:22 -0800 Subject: [PATCH] Better state locking (#4733) * Rearrange lock a bit * better locking without deadlock * reorder lock --- beacon-chain/state/getters.go | 92 ++++++++++++++++++ beacon-chain/state/setters.go | 177 +++++++++++++++++++++------------- beacon-chain/state/types.go | 1 + 3 files changed, 204 insertions(+), 66 deletions(-) diff --git a/beacon-chain/state/getters.go b/beacon-chain/state/getters.go index 1fbda137851..9e958025597 100644 --- a/beacon-chain/state/getters.go +++ b/beacon-chain/state/getters.go @@ -119,6 +119,10 @@ func (b *BeaconState) Fork() *pbp2p.Fork { if b.state.Fork == nil { return nil } + + b.lock.RLock() + defer b.lock.RUnlock() + prevVersion := make([]byte, len(b.state.Fork.PreviousVersion)) copy(prevVersion, b.state.Fork.PreviousVersion) currVersion := make([]byte, len(b.state.Fork.PreviousVersion)) @@ -135,6 +139,10 @@ func (b *BeaconState) LatestBlockHeader() *ethpb.BeaconBlockHeader { if b.state.LatestBlockHeader == nil { return nil } + + b.lock.RLock() + defer b.lock.RUnlock() + hdr := ðpb.BeaconBlockHeader{ Slot: b.state.LatestBlockHeader.Slot, } @@ -154,6 +162,9 @@ func (b *BeaconState) LatestBlockHeader() *ethpb.BeaconBlockHeader { // BlockRoots kept track of in the beacon state. func (b *BeaconState) BlockRoots() [][]byte { + b.lock.RLock() + defer b.lock.RUnlock() + if b.state.BlockRoots == nil { return nil } @@ -172,6 +183,10 @@ func (b *BeaconState) BlockRootAtIndex(idx uint64) ([]byte, error) { if b.state.BlockRoots == nil { return nil, nil } + + b.lock.RLock() + defer b.lock.RUnlock() + if len(b.state.BlockRoots) <= int(idx) { return nil, fmt.Errorf("index %d out of range", idx) } @@ -185,6 +200,10 @@ func (b *BeaconState) StateRoots() [][]byte { if b.state.StateRoots == nil { return nil } + + b.lock.RLock() + defer b.lock.RUnlock() + roots := make([][]byte, len(b.state.StateRoots)) for i, r := range b.state.StateRoots { tmpRt := make([]byte, len(r)) @@ -199,6 +218,10 @@ func (b *BeaconState) HistoricalRoots() [][]byte { if b.state.HistoricalRoots == nil { return nil } + + b.lock.RLock() + defer b.lock.RUnlock() + roots := make([][]byte, len(b.state.HistoricalRoots)) for i, r := range b.state.HistoricalRoots { tmpRt := make([]byte, len(r)) @@ -240,6 +263,10 @@ func (b *BeaconState) Validators() []*ethpb.Validator { if b.state.Validators == nil { return nil } + + b.lock.RLock() + defer b.lock.RUnlock() + res := make([]*ethpb.Validator, len(b.state.Validators)) for i := 0; i < len(res); i++ { val := b.state.Validators[i] @@ -270,6 +297,10 @@ func (b *BeaconState) ValidatorsReadOnly() []*ReadOnlyValidator { if b.state.Validators == nil { return nil } + + b.lock.RLock() + defer b.lock.RUnlock() + res := make([]*ReadOnlyValidator, len(b.state.Validators)) for i := 0; i < len(res); i++ { val := b.state.Validators[i] @@ -286,6 +317,10 @@ func (b *BeaconState) ValidatorAtIndex(idx uint64) (*ethpb.Validator, error) { if len(b.state.Validators) <= int(idx) { return nil, fmt.Errorf("index %d out of range", idx) } + + b.lock.RLock() + defer b.lock.RUnlock() + val := b.state.Validators[idx] pubKey := make([]byte, len(val.PublicKey)) copy(pubKey, val.PublicKey) @@ -309,6 +344,10 @@ func (b *BeaconState) ValidatorAtIndexReadOnly(idx uint64) (*ReadOnlyValidator, if b.state.Validators == nil { return &ReadOnlyValidator{}, nil } + + b.lock.RLock() + defer b.lock.RUnlock() + if len(b.state.Validators) <= int(idx) { return nil, fmt.Errorf("index %d out of range", idx) } @@ -324,6 +363,9 @@ func (b *BeaconState) ValidatorIndexByPubkey(key [48]byte) (uint64, bool) { } func (b *BeaconState) validatorIndexMap() map[[48]byte]uint64 { + b.lock.RLock() + defer b.lock.RUnlock() + m := make(map[[48]byte]uint64, len(b.valIdxMap)) for k, v := range b.valIdxMap { @@ -335,6 +377,9 @@ func (b *BeaconState) validatorIndexMap() map[[48]byte]uint64 { // PubkeyAtIndex returns the pubkey at the given // validator index. func (b *BeaconState) PubkeyAtIndex(idx uint64) [48]byte { + b.lock.RLock() + defer b.lock.RUnlock() + return bytesutil.ToBytes48(b.state.Validators[idx].PublicKey) } @@ -346,6 +391,9 @@ func (b *BeaconState) NumValidators() int { // ReadFromEveryValidator reads values from every validator and applies it to the provided function. // Warning: This method is potentially unsafe, as it exposes the actual validator registry. func (b *BeaconState) ReadFromEveryValidator(f func(idx int, val *ReadOnlyValidator) error) error { + b.lock.RLock() + defer b.lock.RUnlock() + for i, v := range b.state.Validators { err := f(i, &ReadOnlyValidator{validator: v}) if err != nil { @@ -360,6 +408,9 @@ func (b *BeaconState) Balances() []uint64 { if b.state.Balances == nil { return nil } + b.lock.RLock() + defer b.lock.RUnlock() + res := make([]uint64, len(b.state.Balances)) copy(res, b.state.Balances) return res @@ -370,6 +421,10 @@ func (b *BeaconState) BalanceAtIndex(idx uint64) (uint64, error) { if b.state.Balances == nil { return 0, nil } + + b.lock.RLock() + defer b.lock.RUnlock() + if len(b.state.Balances) <= int(idx) { return 0, fmt.Errorf("index of %d does not exist", idx) } @@ -381,6 +436,10 @@ func (b *BeaconState) RandaoMixes() [][]byte { if b.state.RandaoMixes == nil { return nil } + + b.lock.RLock() + defer b.lock.RUnlock() + mixes := make([][]byte, len(b.state.RandaoMixes)) for i, r := range b.state.RandaoMixes { tmpRt := make([]byte, len(r)) @@ -396,6 +455,10 @@ func (b *BeaconState) RandaoMixAtIndex(idx uint64) ([]byte, error) { if b.state.RandaoMixes == nil { return nil, nil } + + b.lock.RLock() + defer b.lock.RUnlock() + if len(b.state.RandaoMixes) <= int(idx) { return nil, fmt.Errorf("index %d out of range", idx) } @@ -409,6 +472,10 @@ func (b *BeaconState) Slashings() []uint64 { if b.state.Slashings == nil { return nil } + + b.lock.RLock() + defer b.lock.RUnlock() + res := make([]uint64, len(b.state.Slashings)) copy(res, b.state.Slashings) return res @@ -419,6 +486,10 @@ func (b *BeaconState) PreviousEpochAttestations() []*pbp2p.PendingAttestation { if b.state.PreviousEpochAttestations == nil { return nil } + + b.lock.RLock() + defer b.lock.RUnlock() + res := make([]*pbp2p.PendingAttestation, len(b.state.PreviousEpochAttestations)) for i := 0; i < len(res); i++ { res[i] = CopyPendingAttestation(b.state.PreviousEpochAttestations[i]) @@ -431,6 +502,10 @@ func (b *BeaconState) CurrentEpochAttestations() []*pbp2p.PendingAttestation { if b.state.CurrentEpochAttestations == nil { return nil } + + b.lock.RLock() + defer b.lock.RUnlock() + res := make([]*pbp2p.PendingAttestation, len(b.state.CurrentEpochAttestations)) for i := 0; i < len(res); i++ { res[i] = CopyPendingAttestation(b.state.CurrentEpochAttestations[i]) @@ -443,6 +518,10 @@ func (b *BeaconState) JustificationBits() bitfield.Bitvector4 { if b.state.JustificationBits == nil { return nil } + + b.lock.RLock() + defer b.lock.RUnlock() + res := make([]byte, len(b.state.JustificationBits.Bytes())) copy(res, b.state.JustificationBits.Bytes()) return res @@ -453,6 +532,10 @@ func (b *BeaconState) PreviousJustifiedCheckpoint() *ethpb.Checkpoint { if b.state.PreviousJustifiedCheckpoint == nil { return nil } + + b.lock.RLock() + defer b.lock.RUnlock() + return CopyCheckpoint(b.state.PreviousJustifiedCheckpoint) } @@ -461,6 +544,10 @@ func (b *BeaconState) CurrentJustifiedCheckpoint() *ethpb.Checkpoint { if b.state.CurrentJustifiedCheckpoint == nil { return nil } + + b.lock.RLock() + defer b.lock.RUnlock() + return CopyCheckpoint(b.state.CurrentJustifiedCheckpoint) } @@ -469,6 +556,10 @@ func (b *BeaconState) FinalizedCheckpoint() *ethpb.Checkpoint { if b.state.FinalizedCheckpoint == nil { return nil } + + b.lock.RLock() + defer b.lock.RUnlock() + return CopyCheckpoint(b.state.FinalizedCheckpoint) } @@ -477,6 +568,7 @@ func CopyETH1Data(data *ethpb.Eth1Data) *ethpb.Eth1Data { if data == nil { return ðpb.Eth1Data{} } + newETH1 := ðpb.Eth1Data{ DepositCount: data.DepositCount, } diff --git a/beacon-chain/state/setters.go b/beacon-chain/state/setters.go index c429305033b..ed3eaab393b 100644 --- a/beacon-chain/state/setters.go +++ b/beacon-chain/state/setters.go @@ -43,47 +43,52 @@ const ( // SetGenesisTime for the beacon state. func (b *BeaconState) SetGenesisTime(val uint64) error { - b.state.GenesisTime = val b.lock.Lock() + defer b.lock.Unlock() + + b.state.GenesisTime = val b.markFieldAsDirty(genesisTime) - b.lock.Unlock() return nil } // SetSlot for the beacon state. func (b *BeaconState) SetSlot(val uint64) error { - b.state.Slot = val b.lock.Lock() + defer b.lock.Unlock() + + b.state.Slot = val b.markFieldAsDirty(slot) - b.lock.Unlock() return nil } // SetFork version for the beacon chain. func (b *BeaconState) SetFork(val *pbp2p.Fork) error { - b.state.Fork = proto.Clone(val).(*pbp2p.Fork) b.lock.Lock() + defer b.lock.Unlock() + + b.state.Fork = proto.Clone(val).(*pbp2p.Fork) b.markFieldAsDirty(fork) - b.lock.Unlock() return nil } // SetLatestBlockHeader in the beacon state. func (b *BeaconState) SetLatestBlockHeader(val *ethpb.BeaconBlockHeader) error { - b.state.LatestBlockHeader = proto.Clone(val).(*ethpb.BeaconBlockHeader) b.lock.Lock() + defer b.lock.Unlock() + + b.state.LatestBlockHeader = proto.Clone(val).(*ethpb.BeaconBlockHeader) b.markFieldAsDirty(latestBlockHeader) - b.lock.Unlock() return nil } // SetBlockRoots for the beacon state. This PR updates the entire // list to a new value by overwriting the previous one. func (b *BeaconState) SetBlockRoots(val [][]byte) error { - b.state.BlockRoots = val b.lock.Lock() + defer b.lock.Unlock() + + b.state.BlockRoots = val b.markFieldAsDirty(blockRoots) - b.lock.Unlock() return nil } @@ -96,22 +101,26 @@ func (b *BeaconState) UpdateBlockRootAtIndex(idx uint64, blockRoot [32]byte) err // Copy on write since this is a shared array. r := b.BlockRoots() + + // Must secure lock after copy or hit a deadlock. + b.lock.Lock() + defer b.lock.Unlock() + r[idx] = blockRoot[:] b.state.BlockRoots = r - b.lock.Lock() b.markFieldAsDirty(blockRoots) - b.lock.Unlock() return nil } // SetStateRoots for the beacon state. This PR updates the entire // to a new value by overwriting the previous one. func (b *BeaconState) SetStateRoots(val [][]byte) error { - b.state.StateRoots = val b.lock.Lock() + defer b.lock.Unlock() + + b.state.StateRoots = val b.markFieldAsDirty(stateRoots) - b.lock.Unlock() return nil } @@ -124,70 +133,79 @@ func (b *BeaconState) UpdateStateRootAtIndex(idx uint64, stateRoot [32]byte) err // Copy on write since this is a shared array. r := b.StateRoots() + + // Must secure lock after copy or hit a deadlock. + b.lock.Lock() + defer b.lock.Unlock() + r[idx] = stateRoot[:] b.state.StateRoots = r - b.lock.Lock() b.markFieldAsDirty(stateRoots) - b.lock.Unlock() return nil } // SetHistoricalRoots for the beacon state. This PR updates the entire // list to a new value by overwriting the previous one. func (b *BeaconState) SetHistoricalRoots(val [][]byte) error { - b.state.HistoricalRoots = val b.lock.Lock() + defer b.lock.Unlock() + + b.state.HistoricalRoots = val b.markFieldAsDirty(historicalRoots) - b.lock.Unlock() return nil } // SetEth1Data for the beacon state. func (b *BeaconState) SetEth1Data(val *ethpb.Eth1Data) error { - b.state.Eth1Data = val b.lock.Lock() + defer b.lock.Unlock() + + b.state.Eth1Data = val b.markFieldAsDirty(eth1Data) - b.lock.Unlock() return nil } // SetEth1DataVotes for the beacon state. This PR updates the entire // list to a new value by overwriting the previous one. func (b *BeaconState) SetEth1DataVotes(val []*ethpb.Eth1Data) error { - b.state.Eth1DataVotes = val b.lock.Lock() + defer b.lock.Unlock() + + b.state.Eth1DataVotes = val b.markFieldAsDirty(eth1DataVotes) - b.lock.Unlock() return nil } // AppendEth1DataVotes for the beacon state. This PR appends the new value // to the the end of list. func (b *BeaconState) AppendEth1DataVotes(val *ethpb.Eth1Data) error { - b.state.Eth1DataVotes = append(b.state.Eth1DataVotes, val) b.lock.Lock() + defer b.lock.Unlock() + + b.state.Eth1DataVotes = append(b.state.Eth1DataVotes, val) b.markFieldAsDirty(eth1DataVotes) - b.lock.Unlock() return nil } // SetEth1DepositIndex for the beacon state. func (b *BeaconState) SetEth1DepositIndex(val uint64) error { - b.state.Eth1DepositIndex = val b.lock.Lock() + defer b.lock.Unlock() + + b.state.Eth1DepositIndex = val b.markFieldAsDirty(eth1DepositIndex) - b.lock.Unlock() return nil } // SetValidators for the beacon state. This PR updates the entire // to a new value by overwriting the previous one. func (b *BeaconState) SetValidators(val []*ethpb.Validator) error { - b.state.Validators = val b.lock.Lock() + defer b.lock.Unlock() + + b.state.Validators = val b.markFieldAsDirty(validators) - b.lock.Unlock() return nil } @@ -196,16 +214,19 @@ func (b *BeaconState) SetValidators(val []*ethpb.Validator) error { func (b *BeaconState) ApplyToEveryValidator(f func(idx int, val *ethpb.Validator) error) error { // Copy on write since this is a shared array. v := b.Validators() + for i, val := range v { err := f(i, val) if err != nil { return err } } - b.state.Validators = v + b.lock.Lock() + defer b.lock.Unlock() + + b.state.Validators = v b.markFieldAsDirty(validators) - b.lock.Unlock() return nil } @@ -217,11 +238,13 @@ func (b *BeaconState) UpdateValidatorAtIndex(idx uint64, val *ethpb.Validator) e } // Copy on write since this is a shared array. v := b.Validators() + + b.lock.Lock() + defer b.lock.Unlock() + v[idx] = val b.state.Validators = v - b.lock.Lock() b.markFieldAsDirty(validators) - b.lock.Unlock() return nil } @@ -230,6 +253,10 @@ func (b *BeaconState) UpdateValidatorAtIndex(idx uint64, val *ethpb.Validator) e func (b *BeaconState) SetValidatorIndexByPubkey(pubKey [48]byte, validatorIdx uint64) { // Copy on write since this is a shared map. m := b.validatorIndexMap() + + b.lock.Lock() + defer b.lock.Unlock() + m[pubKey] = validatorIdx b.valIdxMap = m } @@ -237,10 +264,11 @@ func (b *BeaconState) SetValidatorIndexByPubkey(pubKey [48]byte, validatorIdx ui // SetBalances for the beacon state. This PR updates the entire // list to a new value by overwriting the previous one. func (b *BeaconState) SetBalances(val []uint64) error { - b.state.Balances = val b.lock.Lock() + defer b.lock.Unlock() + + b.state.Balances = val b.markFieldAsDirty(balances) - b.lock.Unlock() return nil } @@ -250,20 +278,22 @@ func (b *BeaconState) UpdateBalancesAtIndex(idx uint64, val uint64) error { if len(b.state.Balances) <= int(idx) { return errors.Errorf("invalid index provided %d", idx) } - b.state.Balances[idx] = val b.lock.Lock() + defer b.lock.Unlock() + + b.state.Balances[idx] = val b.markFieldAsDirty(balances) - b.lock.Unlock() return nil } // SetRandaoMixes for the beacon state. This PR updates the entire // list to a new value by overwriting the previous one. func (b *BeaconState) SetRandaoMixes(val [][]byte) error { - b.state.RandaoMixes = val b.lock.Lock() + defer b.lock.Unlock() + + b.state.RandaoMixes = val b.markFieldAsDirty(randaoMixes) - b.lock.Unlock() return nil } @@ -276,22 +306,25 @@ func (b *BeaconState) UpdateRandaoMixesAtIndex(val []byte, idx uint64) error { // Copy on write since this is a shared array. mixes := b.RandaoMixes() + + b.lock.Lock() + defer b.lock.Unlock() + mixes[idx] = val b.state.RandaoMixes = mixes - b.lock.Lock() b.markFieldAsDirty(randaoMixes) - b.lock.Unlock() return nil } // SetSlashings for the beacon state. This PR updates the entire // list to a new value by overwriting the previous one. func (b *BeaconState) SetSlashings(val []uint64) error { - b.state.Slashings = val b.lock.Lock() + defer b.lock.Unlock() + + b.state.Slashings = val b.markFieldAsDirty(slashings) - b.lock.Unlock() return nil } @@ -301,116 +334,128 @@ func (b *BeaconState) UpdateSlashingsAtIndex(idx uint64, val uint64) error { if len(b.state.Slashings) <= int(idx) { return errors.Errorf("invalid index provided %d", idx) } - b.state.Slashings[idx] = val b.lock.Lock() + defer b.lock.Unlock() + + b.state.Slashings[idx] = val b.markFieldAsDirty(slashings) - b.lock.Unlock() return nil } // SetPreviousEpochAttestations for the beacon state. This PR updates the entire // list to a new value by overwriting the previous one. func (b *BeaconState) SetPreviousEpochAttestations(val []*pbp2p.PendingAttestation) error { - b.state.PreviousEpochAttestations = val b.lock.Lock() + defer b.lock.Unlock() + + b.state.PreviousEpochAttestations = val b.markFieldAsDirty(previousEpochAttestations) - b.lock.Unlock() return nil } // SetCurrentEpochAttestations for the beacon state. This PR updates the entire // list to a new value by overwriting the previous one. func (b *BeaconState) SetCurrentEpochAttestations(val []*pbp2p.PendingAttestation) error { - b.state.CurrentEpochAttestations = val b.lock.Lock() + defer b.lock.Unlock() + + b.state.CurrentEpochAttestations = val b.markFieldAsDirty(currentEpochAttestations) - b.lock.Unlock() return nil } // AppendHistoricalRoots for the beacon state. This PR appends the new value // to the the end of list. func (b *BeaconState) AppendHistoricalRoots(root [32]byte) error { - b.state.HistoricalRoots = append(b.state.HistoricalRoots, root[:]) b.lock.Lock() + defer b.lock.Unlock() + + b.state.HistoricalRoots = append(b.state.HistoricalRoots, root[:]) b.markFieldAsDirty(historicalRoots) - b.lock.Unlock() return nil } // AppendCurrentEpochAttestations for the beacon state. This PR appends the new value // to the the end of list. func (b *BeaconState) AppendCurrentEpochAttestations(val *pbp2p.PendingAttestation) error { - b.state.CurrentEpochAttestations = append(b.state.CurrentEpochAttestations, val) b.lock.Lock() + defer b.lock.Unlock() + + b.state.CurrentEpochAttestations = append(b.state.CurrentEpochAttestations, val) b.markFieldAsDirty(currentEpochAttestations) - b.lock.Unlock() return nil } // AppendPreviousEpochAttestations for the beacon state. This PR appends the new value // to the the end of list. func (b *BeaconState) AppendPreviousEpochAttestations(val *pbp2p.PendingAttestation) error { - b.state.PreviousEpochAttestations = append(b.state.PreviousEpochAttestations, val) b.lock.Lock() + defer b.lock.Unlock() + + b.state.PreviousEpochAttestations = append(b.state.PreviousEpochAttestations, val) b.markFieldAsDirty(previousEpochAttestations) - b.lock.Unlock() return nil } // AppendValidator for the beacon state. This PR appends the new value // to the the end of list. func (b *BeaconState) AppendValidator(val *ethpb.Validator) error { - b.state.Validators = append(b.state.Validators, val) b.lock.Lock() + defer b.lock.Unlock() + + b.state.Validators = append(b.state.Validators, val) b.markFieldAsDirty(validators) - b.lock.Unlock() return nil } // AppendBalance for the beacon state. This PR appends the new value // to the the end of list. func (b *BeaconState) AppendBalance(bal uint64) error { - b.state.Balances = append(b.state.Balances, bal) b.lock.Lock() + defer b.lock.Unlock() + + b.state.Balances = append(b.state.Balances, bal) b.markFieldAsDirty(balances) - b.lock.Unlock() return nil } // SetJustificationBits for the beacon state. func (b *BeaconState) SetJustificationBits(val bitfield.Bitvector4) error { - b.state.JustificationBits = val b.lock.Lock() + defer b.lock.Unlock() + + b.state.JustificationBits = val b.markFieldAsDirty(justificationBits) - b.lock.Unlock() return nil } // SetPreviousJustifiedCheckpoint for the beacon state. func (b *BeaconState) SetPreviousJustifiedCheckpoint(val *ethpb.Checkpoint) error { - b.state.PreviousJustifiedCheckpoint = val b.lock.Lock() + defer b.lock.Unlock() + + b.state.PreviousJustifiedCheckpoint = val b.markFieldAsDirty(previousJustifiedCheckpoint) - b.lock.Unlock() return nil } // SetCurrentJustifiedCheckpoint for the beacon state. func (b *BeaconState) SetCurrentJustifiedCheckpoint(val *ethpb.Checkpoint) error { - b.state.CurrentJustifiedCheckpoint = val b.lock.Lock() + defer b.lock.Unlock() + + b.state.CurrentJustifiedCheckpoint = val b.markFieldAsDirty(currentJustifiedCheckpoint) - b.lock.Unlock() return nil } // SetFinalizedCheckpoint for the beacon state. func (b *BeaconState) SetFinalizedCheckpoint(val *ethpb.Checkpoint) error { - b.state.FinalizedCheckpoint = val b.lock.Lock() + defer b.lock.Unlock() + + b.state.FinalizedCheckpoint = val b.markFieldAsDirty(finalizedCheckpoint) - b.lock.Unlock() return nil } diff --git a/beacon-chain/state/types.go b/beacon-chain/state/types.go index 7a6a0ffa782..f8d7299207a 100644 --- a/beacon-chain/state/types.go +++ b/beacon-chain/state/types.go @@ -51,6 +51,7 @@ func InitializeFromProtoUnsafe(st *pbp2p.BeaconState) (*BeaconState, error) { func (b *BeaconState) Copy() *BeaconState { b.lock.RLock() defer b.lock.RUnlock() + dst := &BeaconState{ state: &pbp2p.BeaconState{ // Primitive types, safe to copy.