Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bor reduce logging verbosity #8401

Merged
merged 2 commits into from
Oct 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions eth/borfinality/whitelist.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ func retryHeimdallHandler(fn heimdallHandler, config *config, tickerDuration tim
defer ticker.Stop()

for {
defer func() {
r := recover()
if r != nil {
log.Warn(fmt.Sprintf("service %s- run failed with panic", fnName), "err", r)
}
}()

select {
case <-ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), timeout)
Expand Down Expand Up @@ -176,6 +183,7 @@ func handleMilestone(ctx context.Context, heimdallClient heimdall.IHeimdallClien
// add that milestone to the future milestone list.
if errors.Is(err, errMissingBlocks) || errors.Is(err, errHashMismatch) {
service.ProcessFutureMilestone(num, hash)
return nil
}

if errors.Is(err, heimdall.ErrServiceUnavailable) {
Expand Down
2 changes: 1 addition & 1 deletion eth/borfinality/whitelist_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func fetchWhitelistMilestone(ctx context.Context, heimdallClient heimdall.IHeimd
return num, hash, errMilestone
}

config.logger.Info("Got new milestone from heimdall", "start", milestone.StartBlock.Uint64(), "end", milestone.EndBlock.Uint64(), "hash", milestone.Hash.String())
config.logger.Debug("Got new milestone from heimdall", "start", milestone.StartBlock.Uint64(), "end", milestone.EndBlock.Uint64(), "hash", milestone.Hash.String())

num = milestone.EndBlock.Uint64()
hash = milestone.Hash
Expand Down
2 changes: 0 additions & 2 deletions eth/stagedsync/default_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ func DefaultStages(ctx context.Context,
if badBlockUnwind {
return nil
}
//fmt.Println("BorHeimdallForward", "validate")
//defer fmt.Println("BorHeimdallForward", "validate", "DONE")
return BorHeimdallForward(s, u, ctx, tx, borHeimdallCfg, false, logger)
},
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx, logger log.Logger) error {
Expand Down
106 changes: 55 additions & 51 deletions eth/stagedsync/stage_bor_heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func BorHeimdallForward(
mine bool,
logger log.Logger,
) (err error) {
processStart := time.Now()

if cfg.chainConfig.Bor == nil {
return
}
Expand Down Expand Up @@ -121,25 +123,6 @@ func BorHeimdallForward(
} else {
return
}

/*
This is the code from the original PR - it has been removed in favour of having the outer
stage loop perform the unwind which is the standard erigon operating model

err := s.state.RunUnwind(nil, tx)
if err != nil {
log.Warn(fmt.Sprintf("Milestone block mismatch, automatic rewind failed due to err: %v. Please manually rewind the chain to block num: %d", err, generics.BorMilestoneRewind.Load()))
return err
}

var reset uint64 = 0
generics.BorMilestoneRewind.Store(&reset)

// Update highest in db field after the rewind
if err = cfg.hd.ReadProgressFromDb(tx); err != nil {
return err
}
*/
}

if mine {
Expand Down Expand Up @@ -196,7 +179,25 @@ func BorHeimdallForward(
lastBlockNum = cfg.blockReader.FrozenBorBlocks()
}

for blockNum := lastBlockNum + 1; blockNum <= headNumber; blockNum++ {
if !mine {
logger.Info("["+s.LogPrefix()+"] Processng sync events...", "from", lastBlockNum+1)
}

var blockNum uint64
var fetchTime time.Duration
var eventRecords int
var lastSpanId uint64

logTimer := time.NewTicker(30 * time.Second)
defer logTimer.Stop()

for blockNum = lastBlockNum + 1; blockNum <= headNumber; blockNum++ {
select {
default:
case <-logTimer.C:
logger.Info("["+s.LogPrefix()+"] StateSync Progress", "progress", blockNum, "lastSpanId", lastSpanId, "lastEventId", lastEventId, "total records", eventRecords, "fetch time", fetchTime, "process time", time.Since(processStart))
}

if !mine {
header, err = cfg.blockReader.HeaderByNumber(ctx, tx, blockNum)
if err != nil {
Expand All @@ -207,7 +208,7 @@ func BorHeimdallForward(
// on the cannonical chain according to milestones
if service != nil {
if !service.IsValidChain(blockNum, []*types.Header{header}) {
logger.Debug("[BorHeimdall] Verification failed for header", "height", blockNum, "hash", header.Hash())
logger.Debug("["+s.LogPrefix()+"] Verification failed for header", "height", blockNum, "hash", header.Hash())
cfg.penalize(ctx, []headerdownload.PenaltyItem{
{Penalty: headerdownload.BadBlockPenalty, PeerID: cfg.hd.SourcePeerId(header.Hash())}})
dataflow.HeaderDownloadStates.AddChange(blockNum, dataflow.HeaderInvalidated)
Expand All @@ -218,12 +219,18 @@ func BorHeimdallForward(
}

if blockNum%cfg.chainConfig.Bor.CalculateSprint(blockNum) == 0 {
if lastEventId, err = fetchAndWriteBorEvents(ctx, cfg.blockReader, cfg.chainConfig.Bor, header, lastEventId, cfg.chainConfig.ChainID.String(), tx, cfg.heimdallClient, cfg.stateReceiverABI, s.LogPrefix(), logger); err != nil {
var callTime time.Duration
var records int
if lastEventId, records, callTime, err = fetchAndWriteBorEvents(ctx, cfg.blockReader, cfg.chainConfig.Bor, header, lastEventId, cfg.chainConfig.ChainID.String(), tx, cfg.heimdallClient, cfg.stateReceiverABI, s.LogPrefix(), logger); err != nil {
return err
}

eventRecords += records
fetchTime += callTime
}

if blockNum == 1 || (blockNum > zerothSpanEnd && ((blockNum-zerothSpanEnd-1)%spanLength) == 0) {
if err = fetchAndWriteSpans(ctx, blockNum, tx, cfg.heimdallClient, s.LogPrefix(), logger); err != nil {
if lastSpanId, err = fetchAndWriteSpans(ctx, blockNum, tx, cfg.heimdallClient, s.LogPrefix(), logger); err != nil {
return err
}
}
Expand All @@ -238,6 +245,9 @@ func BorHeimdallForward(
return err
}
}

logger.Info("["+s.LogPrefix()+"] Sync events processed", "progress", blockNum-1, "lastSpanId", lastSpanId, "lastEventId", lastEventId, "total records", eventRecords, "fetch time", fetchTime, "process time", time.Since(processStart))

return
}

Expand All @@ -253,7 +263,7 @@ func fetchAndWriteBorEvents(
stateReceiverABI abi.ABI,
logPrefix string,
logger log.Logger,
) (uint64, error) {
) (uint64, int, time.Duration, error) {
fetchStart := time.Now()

// Find out the latest eventId
Expand All @@ -270,22 +280,23 @@ func fetchAndWriteBorEvents(
} else {
pHeader, err := blockReader.HeaderByNumber(ctx, tx, blockNum-config.CalculateSprint(blockNum))
if err != nil {
return lastEventId, err
return lastEventId, 0, time.Since(fetchStart), err
}
to = time.Unix(int64(pHeader.Time), 0)
}

from = lastEventId + 1

logger.Info(
logger.Debug(
fmt.Sprintf("[%s] Fetching state updates from Heimdall", logPrefix),
"fromID", from,
"to", to.Format(time.RFC3339),
)

eventRecords, err := heimdallClient.StateSyncEvents(ctx, from, to.Unix())

if err != nil {
return lastEventId, err
return lastEventId, 0, time.Since(fetchStart), err
}

if config.OverrideStateSyncRecords != nil {
Expand All @@ -294,9 +305,6 @@ func fetchAndWriteBorEvents(
}
}

fetchTime := time.Since(fetchStart)
processStart := time.Now()

if len(eventRecords) > 0 {
var key, val [8]byte
binary.BigEndian.PutUint64(key[:], blockNum)
Expand All @@ -305,49 +313,45 @@ func fetchAndWriteBorEvents(
const method = "commitState"

wroteIndex := false
for _, eventRecord := range eventRecords {
for i, eventRecord := range eventRecords {
if eventRecord.ID <= lastEventId {
continue
}
if lastEventId+1 != eventRecord.ID || eventRecord.ChainID != chainID || !eventRecord.Time.Before(to) {
return lastEventId, fmt.Errorf("invalid event record received blockNum=%d, eventId=%d (exp %d), chainId=%s (exp %s), time=%s (exp to %s)", blockNum, eventRecord.ID, lastEventId+1, eventRecord.ChainID, chainID, eventRecord.Time, to)
return lastEventId, i, time.Since(fetchStart), fmt.Errorf("invalid event record received blockNum=%d, eventId=%d (exp %d), chainId=%s (exp %s), time=%s (exp to %s)", blockNum, eventRecord.ID, lastEventId+1, eventRecord.ChainID, chainID, eventRecord.Time, to)
}

eventRecordWithoutTime := eventRecord.BuildEventRecord()

recordBytes, err := rlp.EncodeToBytes(eventRecordWithoutTime)
if err != nil {
return lastEventId, err
return lastEventId, i, time.Since(fetchStart), err
}

data, err := stateReceiverABI.Pack(method, big.NewInt(eventRecord.Time.Unix()), recordBytes)
if err != nil {
logger.Error(fmt.Sprintf("[%s] Unable to pack tx for commitState", logPrefix), "err", err)
return lastEventId, err
return lastEventId, i, time.Since(fetchStart), err
}
var eventIdBuf [8]byte
binary.BigEndian.PutUint64(eventIdBuf[:], eventRecord.ID)
if err = tx.Put(kv.BorEvents, eventIdBuf[:], data); err != nil {
return lastEventId, err
return lastEventId, i, time.Since(fetchStart), err
}
if !wroteIndex {
var blockNumBuf [8]byte
binary.BigEndian.PutUint64(blockNumBuf[:], blockNum)
binary.BigEndian.PutUint64(eventIdBuf[:], eventRecord.ID)
if err = tx.Put(kv.BorEventNums, blockNumBuf[:], eventIdBuf[:]); err != nil {
return lastEventId, err
return lastEventId, i, time.Since(fetchStart), err
}
wroteIndex = true
}

lastEventId++
}

processTime := time.Since(processStart)

logger.Info(fmt.Sprintf("[%s] StateSyncData", logPrefix), "number", blockNum, "lastEventID", lastEventId, "total records", len(eventRecords), "fetch time", fetchTime, "process time", processTime)

return lastEventId, nil
return lastEventId, len(eventRecords), time.Since(fetchStart), nil
}

func fetchAndWriteSpans(
Expand All @@ -357,26 +361,26 @@ func fetchAndWriteSpans(
heimdallClient heimdall.IHeimdallClient,
logPrefix string,
logger log.Logger,
) error {
var spanID uint64
) (uint64, error) {
var spanId uint64
if blockNum > zerothSpanEnd {
spanID = 1 + (blockNum-zerothSpanEnd-1)/spanLength
spanId = 1 + (blockNum-zerothSpanEnd-1)/spanLength
}
logger.Info(fmt.Sprintf("[%s] Fetching span", logPrefix), "id", spanID)
response, err := heimdallClient.Span(ctx, spanID)
logger.Debug(fmt.Sprintf("[%s] Fetching span", logPrefix), "id", spanId)
response, err := heimdallClient.Span(ctx, spanId)
if err != nil {
return err
return 0, err
}
spanBytes, err := json.Marshal(response)
if err != nil {
return err
return 0, err
}
var spanIDBytes [8]byte
binary.BigEndian.PutUint64(spanIDBytes[:], spanID)
binary.BigEndian.PutUint64(spanIDBytes[:], spanId)
if err = tx.Put(kv.BorSpans, spanIDBytes[:], spanBytes); err != nil {
return err
return 0, err
}
return nil
return spanId, nil
}

func BorHeimdallUnwind(u *UnwindState, ctx context.Context, s *StageState, tx kv.RwTx, cfg BorHeimdallCfg) (err error) {
Expand Down
Loading