Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LVH support to memory overlay #4555

Merged
merged 4 commits into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 26 additions & 13 deletions eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,28 @@ func startHandlingForkChoice(
}
}

if cfg.memoryOverlay && headerHash == cfg.hd.GetNextForkHash() {
log.Info("Flushing in-memory state")
if err := cfg.hd.FlushNextForkState(tx); err != nil {
return nil, err
}
cfg.hd.BeaconRequestList.Remove(requestId)
rawdb.WriteForkchoiceHead(tx, forkChoice.HeadBlockHash)
canonical, err := safeAndFinalizedBlocksAreCanonical(forkChoice, s, tx, cfg)
if err != nil {
log.Warn(fmt.Sprintf("[%s] Fork choice err", s.LogPrefix()), "err", err)
return nil, err
}
if canonical {
cfg.hd.SetPendingPayloadHash(headerHash)
return nil, nil
} else {
return &privateapi.PayloadStatus{
CriticalError: &privateapi.InvalidForkchoiceStateErr,
}, nil
}
}

cfg.hd.UpdateTopSeenHeightPoS(headerNumber)
forkingPoint := uint64(0)
if headerNumber > 0 {
Expand All @@ -362,15 +384,6 @@ func startHandlingForkChoice(
}
}

if cfg.memoryOverlay && headerHash == cfg.hd.GetNextForkHash() {
log.Info("Flushing in-memory state")
if err := cfg.hd.FlushNextForkState(tx); err != nil {
return nil, err
}
cfg.hd.SetPendingPayloadHash(headerHash)
return nil, nil
}

log.Info(fmt.Sprintf("[%s] Fork choice re-org", s.LogPrefix()), "headerNumber", headerNumber, "forkingPoint", forkingPoint)

if requestStatus == engineapi.New {
Expand Down Expand Up @@ -571,14 +584,14 @@ func verifyAndSaveNewPoSHeader(
// TODO(yperbasis): considered non-canonical because some missing headers were downloaded but not canonized
// Or it's not a problem because forkChoice is updated frequently?
if cfg.memoryOverlay {
status, validationError, criticalError := cfg.hd.ValidatePayload(tx, header, body, false, cfg.execPayload)
status, latestValidHash, validationError, criticalError := cfg.hd.ValidatePayload(tx, header, body, false, cfg.execPayload)
if criticalError != nil {
return &privateapi.PayloadStatus{CriticalError: criticalError}, false, criticalError
}
success = status == remote.EngineStatus_VALID || status == remote.EngineStatus_ACCEPTED
return &privateapi.PayloadStatus{
Status: status,
LatestValidHash: currentHeadHash,
LatestValidHash: latestValidHash,
ValidationError: validationError,
}, success, nil
}
Expand All @@ -587,14 +600,14 @@ func verifyAndSaveNewPoSHeader(
}

if cfg.memoryOverlay && (cfg.hd.GetNextForkHash() == (common.Hash{}) || header.ParentHash == cfg.hd.GetNextForkHash()) {
status, validationError, criticalError := cfg.hd.ValidatePayload(tx, header, body, true, cfg.execPayload)
status, latestValidHash, validationError, criticalError := cfg.hd.ValidatePayload(tx, header, body, true, cfg.execPayload)
if criticalError != nil {
return &privateapi.PayloadStatus{CriticalError: criticalError}, false, criticalError
}
success = status == remote.EngineStatus_VALID || status == remote.EngineStatus_ACCEPTED
return &privateapi.PayloadStatus{
Status: status,
LatestValidHash: currentHeadHash,
LatestValidHash: latestValidHash,
ValidationError: validationError,
}, success, nil
}
Expand Down
40 changes: 29 additions & 11 deletions turbo/stages/headerdownload/header_algos.go
Original file line number Diff line number Diff line change
Expand Up @@ -1093,10 +1093,16 @@ func abs64(n int64) uint64 {
return uint64(n)
}

func (hd *HeaderDownload) ValidatePayload(tx kv.RwTx, header *types.Header, body *types.RawBody, store bool, execPayload func(kv.RwTx, *types.Header, *types.RawBody, uint64, []*types.Header, []*types.RawBody) error) (status remote.EngineStatus, validationError error, criticalError error) {
func (hd *HeaderDownload) ValidatePayload(tx kv.RwTx, header *types.Header, body *types.RawBody, store bool, execPayload func(kv.RwTx, *types.Header, *types.RawBody, uint64, []*types.Header, []*types.RawBody) error) (status remote.EngineStatus, latestValidHash common.Hash, validationError error, criticalError error) {
hd.lock.Lock()
defer hd.lock.Unlock()
maxDepth := uint64(16)

currentHeight := rawdb.ReadCurrentBlockNumber(tx)
if currentHeight == nil {
criticalError = fmt.Errorf("could not read block number.")
return
}
if store {
// If it is a continuation of the canonical chain we can stack it up.
if hd.nextForkState == nil {
Expand All @@ -1105,26 +1111,24 @@ func (hd *HeaderDownload) ValidatePayload(tx kv.RwTx, header *types.Header, body
hd.nextForkState.UpdateTxn(tx)
}
hd.nextForkHash = header.Hash()
status = remote.EngineStatus_VALID
// Let's assemble the side fork chain if we have others building.
validationError = execPayload(hd.nextForkState, header, body, 0, nil, nil)
if validationError != nil {
status = remote.EngineStatus_INVALID
latestValidHash = header.ParentHash
return
}
return
}
currentHeight := rawdb.ReadCurrentBlockNumber(tx)
if currentHeight == nil {
criticalError = fmt.Errorf("could not read block number.")
status = remote.EngineStatus_VALID
latestValidHash = header.Hash()
hd.sideForksBlock[latestValidHash] = sideForkBlock{header, body}
hd.cleanupOutdateSideForks(*currentHeight, maxDepth)
return
}
// if the block is not in range of MAX_DEPTH from head then we do not validate it.
if abs64(int64(*currentHeight)-header.Number.Int64()) > maxDepth {
status = remote.EngineStatus_ACCEPTED
return
}
// if it is not canonical we validate it as a side fork.
batch := memdb.NewMemoryBatch(tx)
// Let's assemble the side fork backwards
var foundCanonical bool
currentHash := header.ParentHash
Expand Down Expand Up @@ -1155,17 +1159,26 @@ func (hd *HeaderDownload) ValidatePayload(tx kv.RwTx, header *types.Header, body
}
hd.sideForksBlock[header.Hash()] = sideForkBlock{header, body}
status = remote.EngineStatus_VALID
// if it is not canonical we validate it as a side fork.
batch := memdb.NewMemoryBatch(tx)
defer batch.Close()
validationError = execPayload(batch, header, body, unwindPoint, headersChain, bodiesChain)
latestValidHash = header.Hash()
if validationError != nil {
latestValidHash = header.ParentHash
status = remote.EngineStatus_INVALID
}
// After the we finished executing, we clean up old forks
hd.cleanupOutdateSideForks(*currentHeight, maxDepth)
return
}

func (hd *HeaderDownload) cleanupOutdateSideForks(currentHeight uint64, maxDepth uint64) {
for hash, sb := range hd.sideForksBlock {
if abs64(int64(*currentHeight)-sb.header.Number.Int64()) > maxDepth {
if abs64(int64(currentHeight)-sb.header.Number.Int64()) > maxDepth {
delete(hd.sideForksBlock, hash)
}
}
return
}

func (hd *HeaderDownload) FlushNextForkState(tx kv.RwTx) error {
Expand All @@ -1174,6 +1187,11 @@ func (hd *HeaderDownload) FlushNextForkState(tx kv.RwTx) error {
if err := hd.nextForkState.Flush(tx); err != nil {
return err
}
// If the side fork hash is now becoming canonical we can clean up.
if _, ok := hd.sideForksBlock[hd.nextForkHash]; ok {
delete(hd.sideForksBlock, hd.nextForkHash)
}
hd.nextForkState.Close()
hd.nextForkHash = common.Hash{}
hd.nextForkState = nil
return nil
Expand Down