From 18a51d1efc79119419bcbead0c055216ceba01e4 Mon Sep 17 00:00:00 2001 From: Giulio Rebuffo Date: Mon, 27 Jun 2022 19:19:15 +0200 Subject: [PATCH 1/4] fixed fcu --- eth/stagedsync/stage_headers.go | 8 ++++---- turbo/stages/headerdownload/header_algos.go | 12 +++++++++--- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index ad0f8f3dd47..71df3c38d57 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -571,14 +571,14 @@ func verifyAndSaveNewPoSHeader( // TODO(yperbasis): considered non-canonical because some missing headers were downloaded but not canonized // Or it's not a problem because forkChoice is updated frequently? if cfg.memoryOverlay { - status, validationError, criticalError := cfg.hd.ValidatePayload(tx, header, body, false, cfg.execPayload) + status, latestValidHash, validationError, criticalError := cfg.hd.ValidatePayload(tx, header, body, false, cfg.execPayload) if criticalError != nil { return &privateapi.PayloadStatus{CriticalError: criticalError}, false, criticalError } success = status == remote.EngineStatus_VALID || status == remote.EngineStatus_ACCEPTED return &privateapi.PayloadStatus{ Status: status, - LatestValidHash: currentHeadHash, + LatestValidHash: latestValidHash, ValidationError: validationError, }, success, nil } @@ -587,14 +587,14 @@ func verifyAndSaveNewPoSHeader( } if cfg.memoryOverlay && (cfg.hd.GetNextForkHash() == (common.Hash{}) || header.ParentHash == cfg.hd.GetNextForkHash()) { - status, validationError, criticalError := cfg.hd.ValidatePayload(tx, header, body, true, cfg.execPayload) + status, latestValidHash, validationError, criticalError := cfg.hd.ValidatePayload(tx, header, body, true, cfg.execPayload) if criticalError != nil { return &privateapi.PayloadStatus{CriticalError: criticalError}, false, criticalError } success = status == remote.EngineStatus_VALID || status == remote.EngineStatus_ACCEPTED return &privateapi.PayloadStatus{ Status: status, - LatestValidHash: currentHeadHash, + LatestValidHash: latestValidHash, ValidationError: validationError, }, success, nil } diff --git a/turbo/stages/headerdownload/header_algos.go b/turbo/stages/headerdownload/header_algos.go index 2fe3b984551..32c775e3da5 100644 --- a/turbo/stages/headerdownload/header_algos.go +++ b/turbo/stages/headerdownload/header_algos.go @@ -1093,7 +1093,7 @@ func abs64(n int64) uint64 { return uint64(n) } -func (hd *HeaderDownload) ValidatePayload(tx kv.RwTx, header *types.Header, body *types.RawBody, store bool, execPayload func(kv.RwTx, *types.Header, *types.RawBody, uint64, []*types.Header, []*types.RawBody) error) (status remote.EngineStatus, validationError error, criticalError error) { +func (hd *HeaderDownload) ValidatePayload(tx kv.RwTx, header *types.Header, body *types.RawBody, store bool, execPayload func(kv.RwTx, *types.Header, *types.RawBody, uint64, []*types.Header, []*types.RawBody) error) (status remote.EngineStatus, latestValidHash common.Hash, validationError error, criticalError error) { hd.lock.Lock() defer hd.lock.Unlock() maxDepth := uint64(16) @@ -1108,8 +1108,10 @@ func (hd *HeaderDownload) ValidatePayload(tx kv.RwTx, header *types.Header, body status = remote.EngineStatus_VALID // Let's assemble the side fork chain if we have others building. validationError = execPayload(hd.nextForkState, header, body, 0, nil, nil) + latestValidHash = header.Hash() if validationError != nil { status = remote.EngineStatus_INVALID + latestValidHash = header.ParentHash } return } @@ -1123,8 +1125,6 @@ func (hd *HeaderDownload) ValidatePayload(tx kv.RwTx, header *types.Header, body status = remote.EngineStatus_ACCEPTED return } - // if it is not canonical we validate it as a side fork. - batch := memdb.NewMemoryBatch(tx) // Let's assemble the side fork backwards var foundCanonical bool currentHash := header.ParentHash @@ -1155,8 +1155,13 @@ func (hd *HeaderDownload) ValidatePayload(tx kv.RwTx, header *types.Header, body } hd.sideForksBlock[header.Hash()] = sideForkBlock{header, body} status = remote.EngineStatus_VALID + // if it is not canonical we validate it as a side fork. + batch := memdb.NewMemoryBatch(tx) + defer batch.Close() validationError = execPayload(batch, header, body, unwindPoint, headersChain, bodiesChain) + latestValidHash = header.Hash() if validationError != nil { + latestValidHash = header.ParentHash status = remote.EngineStatus_INVALID } // After the we finished executing, we clean up old forks @@ -1174,6 +1179,7 @@ func (hd *HeaderDownload) FlushNextForkState(tx kv.RwTx) error { if err := hd.nextForkState.Flush(tx); err != nil { return err } + hd.nextForkState.Close() hd.nextForkHash = common.Hash{} hd.nextForkState = nil return nil From db6b549b64fcbe979635e5bf255b39004b0a900f Mon Sep 17 00:00:00 2001 From: Giulio Rebuffo Date: Mon, 27 Jun 2022 23:57:49 +0200 Subject: [PATCH 2/4] fixed leak --- turbo/stages/headerdownload/header_algos.go | 30 ++++++++++++++------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/turbo/stages/headerdownload/header_algos.go b/turbo/stages/headerdownload/header_algos.go index 32c775e3da5..759ba81be44 100644 --- a/turbo/stages/headerdownload/header_algos.go +++ b/turbo/stages/headerdownload/header_algos.go @@ -1097,6 +1097,12 @@ func (hd *HeaderDownload) ValidatePayload(tx kv.RwTx, header *types.Header, body hd.lock.Lock() defer hd.lock.Unlock() maxDepth := uint64(16) + + currentHeight := rawdb.ReadCurrentBlockNumber(tx) + if currentHeight == nil { + criticalError = fmt.Errorf("could not read block number.") + return + } if store { // If it is a continuation of the canonical chain we can stack it up. if hd.nextForkState == nil { @@ -1105,19 +1111,17 @@ func (hd *HeaderDownload) ValidatePayload(tx kv.RwTx, header *types.Header, body hd.nextForkState.UpdateTxn(tx) } hd.nextForkHash = header.Hash() - status = remote.EngineStatus_VALID // Let's assemble the side fork chain if we have others building. validationError = execPayload(hd.nextForkState, header, body, 0, nil, nil) - latestValidHash = header.Hash() if validationError != nil { status = remote.EngineStatus_INVALID latestValidHash = header.ParentHash + return } - return - } - currentHeight := rawdb.ReadCurrentBlockNumber(tx) - if currentHeight == nil { - criticalError = fmt.Errorf("could not read block number.") + status = remote.EngineStatus_VALID + latestValidHash = header.Hash() + hd.sideForksBlock[latestValidHash] = sideForkBlock{header, body} + hd.cleanupOutdateSideForks(*currentHeight, maxDepth) return } // if the block is not in range of MAX_DEPTH from head then we do not validate it. @@ -1165,12 +1169,16 @@ func (hd *HeaderDownload) ValidatePayload(tx kv.RwTx, header *types.Header, body status = remote.EngineStatus_INVALID } // After the we finished executing, we clean up old forks + hd.cleanupOutdateSideForks(*currentHeight, maxDepth) + return +} + +func (hd *HeaderDownload) cleanupOutdateSideForks(currentHeight uint64, maxDepth uint64) { for hash, sb := range hd.sideForksBlock { - if abs64(int64(*currentHeight)-sb.header.Number.Int64()) > maxDepth { + if abs64(int64(currentHeight)-sb.header.Number.Int64()) > maxDepth { delete(hd.sideForksBlock, hash) } } - return } func (hd *HeaderDownload) FlushNextForkState(tx kv.RwTx) error { @@ -1179,6 +1187,10 @@ func (hd *HeaderDownload) FlushNextForkState(tx kv.RwTx) error { if err := hd.nextForkState.Flush(tx); err != nil { return err } + // If the side fork hash is now becoming canonical we can clean up. + if _, ok := hd.sideForksBlock[hd.nextForkHash]; ok { + delete(hd.sideForksBlock, hd.nextForkHash) + } hd.nextForkState.Close() hd.nextForkHash = common.Hash{} hd.nextForkState = nil From 7b3caf87a4c871778780088cddc5b328a1a7432a Mon Sep 17 00:00:00 2001 From: Giulio Rebuffo Date: Tue, 28 Jun 2022 14:05:38 +0200 Subject: [PATCH 3/4] maybe now? --- eth/stagedsync/stage_headers.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index 71df3c38d57..14930c7cf1c 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -586,7 +586,7 @@ func verifyAndSaveNewPoSHeader( return &privateapi.PayloadStatus{Status: remote.EngineStatus_ACCEPTED}, true, nil } - if cfg.memoryOverlay && (cfg.hd.GetNextForkHash() == (common.Hash{}) || header.ParentHash == cfg.hd.GetNextForkHash()) { + /*if cfg.memoryOverlay && (cfg.hd.GetNextForkHash() == (common.Hash{}) || header.ParentHash == cfg.hd.GetNextForkHash()) { status, latestValidHash, validationError, criticalError := cfg.hd.ValidatePayload(tx, header, body, true, cfg.execPayload) if criticalError != nil { return &privateapi.PayloadStatus{CriticalError: criticalError}, false, criticalError @@ -597,7 +597,7 @@ func verifyAndSaveNewPoSHeader( LatestValidHash: latestValidHash, ValidationError: validationError, }, success, nil - } + }*/ // OK, we're on the canonical chain if requestStatus == engineapi.New { From 142b82a8a5f4d86c5abe4330a31d4a3f6eb2ae43 Mon Sep 17 00:00:00 2001 From: Giulio Rebuffo Date: Tue, 28 Jun 2022 15:08:20 +0200 Subject: [PATCH 4/4] wrote forkchoice --- eth/stagedsync/stage_headers.go | 35 ++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index 14930c7cf1c..be526ba04a9 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -349,6 +349,28 @@ func startHandlingForkChoice( } } + if cfg.memoryOverlay && headerHash == cfg.hd.GetNextForkHash() { + log.Info("Flushing in-memory state") + if err := cfg.hd.FlushNextForkState(tx); err != nil { + return nil, err + } + cfg.hd.BeaconRequestList.Remove(requestId) + rawdb.WriteForkchoiceHead(tx, forkChoice.HeadBlockHash) + canonical, err := safeAndFinalizedBlocksAreCanonical(forkChoice, s, tx, cfg) + if err != nil { + log.Warn(fmt.Sprintf("[%s] Fork choice err", s.LogPrefix()), "err", err) + return nil, err + } + if canonical { + cfg.hd.SetPendingPayloadHash(headerHash) + return nil, nil + } else { + return &privateapi.PayloadStatus{ + CriticalError: &privateapi.InvalidForkchoiceStateErr, + }, nil + } + } + cfg.hd.UpdateTopSeenHeightPoS(headerNumber) forkingPoint := uint64(0) if headerNumber > 0 { @@ -362,15 +384,6 @@ func startHandlingForkChoice( } } - if cfg.memoryOverlay && headerHash == cfg.hd.GetNextForkHash() { - log.Info("Flushing in-memory state") - if err := cfg.hd.FlushNextForkState(tx); err != nil { - return nil, err - } - cfg.hd.SetPendingPayloadHash(headerHash) - return nil, nil - } - log.Info(fmt.Sprintf("[%s] Fork choice re-org", s.LogPrefix()), "headerNumber", headerNumber, "forkingPoint", forkingPoint) if requestStatus == engineapi.New { @@ -586,7 +599,7 @@ func verifyAndSaveNewPoSHeader( return &privateapi.PayloadStatus{Status: remote.EngineStatus_ACCEPTED}, true, nil } - /*if cfg.memoryOverlay && (cfg.hd.GetNextForkHash() == (common.Hash{}) || header.ParentHash == cfg.hd.GetNextForkHash()) { + if cfg.memoryOverlay && (cfg.hd.GetNextForkHash() == (common.Hash{}) || header.ParentHash == cfg.hd.GetNextForkHash()) { status, latestValidHash, validationError, criticalError := cfg.hd.ValidatePayload(tx, header, body, true, cfg.execPayload) if criticalError != nil { return &privateapi.PayloadStatus{CriticalError: criticalError}, false, criticalError @@ -597,7 +610,7 @@ func verifyAndSaveNewPoSHeader( LatestValidHash: latestValidHash, ValidationError: validationError, }, success, nil - }*/ + } // OK, we're on the canonical chain if requestStatus == engineapi.New {