Skip to content

Commit

Permalink
Bor reduce logging verbosity (#8401)
Browse files Browse the repository at this point in the history
Don't log every sync event action, but gather stats and print every 30
secs
  • Loading branch information
mh0lt committed Oct 7, 2023
1 parent 58379a3 commit d4d5cb9
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 54 deletions.
8 changes: 8 additions & 0 deletions eth/borfinality/whitelist.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ func retryHeimdallHandler(fn heimdallHandler, config *config, tickerDuration tim
defer ticker.Stop()

for {
defer func() {
r := recover()
if r != nil {
log.Warn(fmt.Sprintf("service %s- run failed with panic", fnName), "err", r)
}
}()

select {
case <-ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), timeout)
Expand Down Expand Up @@ -176,6 +183,7 @@ func handleMilestone(ctx context.Context, heimdallClient heimdall.IHeimdallClien
// add that milestone to the future milestone list.
if errors.Is(err, errMissingBlocks) || errors.Is(err, errHashMismatch) {
service.ProcessFutureMilestone(num, hash)
return nil
}

if errors.Is(err, heimdall.ErrServiceUnavailable) {
Expand Down
2 changes: 1 addition & 1 deletion eth/borfinality/whitelist_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func fetchWhitelistMilestone(ctx context.Context, heimdallClient heimdall.IHeimd
return num, hash, errMilestone
}

config.logger.Info("Got new milestone from heimdall", "start", milestone.StartBlock.Uint64(), "end", milestone.EndBlock.Uint64(), "hash", milestone.Hash.String())
config.logger.Debug("Got new milestone from heimdall", "start", milestone.StartBlock.Uint64(), "end", milestone.EndBlock.Uint64(), "hash", milestone.Hash.String())

num = milestone.EndBlock.Uint64()
hash = milestone.Hash
Expand Down
2 changes: 0 additions & 2 deletions eth/stagedsync/default_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ func DefaultStages(ctx context.Context,
if badBlockUnwind {
return nil
}
//fmt.Println("BorHeimdallForward", "validate")
//defer fmt.Println("BorHeimdallForward", "validate", "DONE")
return BorHeimdallForward(s, u, ctx, tx, borHeimdallCfg, false, logger)
},
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx, logger log.Logger) error {
Expand Down
106 changes: 55 additions & 51 deletions eth/stagedsync/stage_bor_heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func BorHeimdallForward(
mine bool,
logger log.Logger,
) (err error) {
processStart := time.Now()

if cfg.chainConfig.Bor == nil {
return
}
Expand Down Expand Up @@ -121,25 +123,6 @@ func BorHeimdallForward(
} else {
return
}

/*
This is the code from the original PR - it has been removed in favour of having the outer
stage loop perform the unwind which is the standard erigon operating model
err := s.state.RunUnwind(nil, tx)
if err != nil {
log.Warn(fmt.Sprintf("Milestone block mismatch, automatic rewind failed due to err: %v. Please manually rewind the chain to block num: %d", err, generics.BorMilestoneRewind.Load()))
return err
}
var reset uint64 = 0
generics.BorMilestoneRewind.Store(&reset)
// Update highest in db field after the rewind
if err = cfg.hd.ReadProgressFromDb(tx); err != nil {
return err
}
*/
}

if mine {
Expand Down Expand Up @@ -196,7 +179,25 @@ func BorHeimdallForward(
lastBlockNum = cfg.blockReader.FrozenBorBlocks()
}

for blockNum := lastBlockNum + 1; blockNum <= headNumber; blockNum++ {
if !mine {
logger.Info("["+s.LogPrefix()+"] Processng sync events...", "from", lastBlockNum+1)
}

var blockNum uint64
var fetchTime time.Duration
var eventRecords int
var lastSpanId uint64

logTimer := time.NewTicker(30 * time.Second)
defer logTimer.Stop()

for blockNum = lastBlockNum + 1; blockNum <= headNumber; blockNum++ {
select {
default:
case <-logTimer.C:
logger.Info("["+s.LogPrefix()+"] StateSync Progress", "progress", blockNum, "lastSpanId", lastSpanId, "lastEventId", lastEventId, "total records", eventRecords, "fetch time", fetchTime, "process time", time.Since(processStart))
}

if !mine {
header, err = cfg.blockReader.HeaderByNumber(ctx, tx, blockNum)
if err != nil {
Expand All @@ -207,7 +208,7 @@ func BorHeimdallForward(
// on the cannonical chain according to milestones
if service != nil {
if !service.IsValidChain(blockNum, []*types.Header{header}) {
logger.Debug("[BorHeimdall] Verification failed for header", "height", blockNum, "hash", header.Hash())
logger.Debug("["+s.LogPrefix()+"] Verification failed for header", "height", blockNum, "hash", header.Hash())
cfg.penalize(ctx, []headerdownload.PenaltyItem{
{Penalty: headerdownload.BadBlockPenalty, PeerID: cfg.hd.SourcePeerId(header.Hash())}})
dataflow.HeaderDownloadStates.AddChange(blockNum, dataflow.HeaderInvalidated)
Expand All @@ -218,12 +219,18 @@ func BorHeimdallForward(
}

if blockNum%cfg.chainConfig.Bor.CalculateSprint(blockNum) == 0 {
if lastEventId, err = fetchAndWriteBorEvents(ctx, cfg.blockReader, cfg.chainConfig.Bor, header, lastEventId, cfg.chainConfig.ChainID.String(), tx, cfg.heimdallClient, cfg.stateReceiverABI, s.LogPrefix(), logger); err != nil {
var callTime time.Duration
var records int
if lastEventId, records, callTime, err = fetchAndWriteBorEvents(ctx, cfg.blockReader, cfg.chainConfig.Bor, header, lastEventId, cfg.chainConfig.ChainID.String(), tx, cfg.heimdallClient, cfg.stateReceiverABI, s.LogPrefix(), logger); err != nil {
return err
}

eventRecords += records
fetchTime += callTime
}

if blockNum == 1 || (blockNum > zerothSpanEnd && ((blockNum-zerothSpanEnd-1)%spanLength) == 0) {
if err = fetchAndWriteSpans(ctx, blockNum, tx, cfg.heimdallClient, s.LogPrefix(), logger); err != nil {
if lastSpanId, err = fetchAndWriteSpans(ctx, blockNum, tx, cfg.heimdallClient, s.LogPrefix(), logger); err != nil {
return err
}
}
Expand All @@ -238,6 +245,9 @@ func BorHeimdallForward(
return err
}
}

logger.Info("["+s.LogPrefix()+"] Sync events processed", "progress", blockNum-1, "lastSpanId", lastSpanId, "lastEventId", lastEventId, "total records", eventRecords, "fetch time", fetchTime, "process time", time.Since(processStart))

return
}

Expand All @@ -253,7 +263,7 @@ func fetchAndWriteBorEvents(
stateReceiverABI abi.ABI,
logPrefix string,
logger log.Logger,
) (uint64, error) {
) (uint64, int, time.Duration, error) {
fetchStart := time.Now()

// Find out the latest eventId
Expand All @@ -270,22 +280,23 @@ func fetchAndWriteBorEvents(
} else {
pHeader, err := blockReader.HeaderByNumber(ctx, tx, blockNum-config.CalculateSprint(blockNum))
if err != nil {
return lastEventId, err
return lastEventId, 0, time.Since(fetchStart), err
}
to = time.Unix(int64(pHeader.Time), 0)
}

from = lastEventId + 1

logger.Info(
logger.Debug(
fmt.Sprintf("[%s] Fetching state updates from Heimdall", logPrefix),
"fromID", from,
"to", to.Format(time.RFC3339),
)

eventRecords, err := heimdallClient.StateSyncEvents(ctx, from, to.Unix())

if err != nil {
return lastEventId, err
return lastEventId, 0, time.Since(fetchStart), err
}

if config.OverrideStateSyncRecords != nil {
Expand All @@ -294,9 +305,6 @@ func fetchAndWriteBorEvents(
}
}

fetchTime := time.Since(fetchStart)
processStart := time.Now()

if len(eventRecords) > 0 {
var key, val [8]byte
binary.BigEndian.PutUint64(key[:], blockNum)
Expand All @@ -305,49 +313,45 @@ func fetchAndWriteBorEvents(
const method = "commitState"

wroteIndex := false
for _, eventRecord := range eventRecords {
for i, eventRecord := range eventRecords {
if eventRecord.ID <= lastEventId {
continue
}
if lastEventId+1 != eventRecord.ID || eventRecord.ChainID != chainID || !eventRecord.Time.Before(to) {
return lastEventId, fmt.Errorf("invalid event record received blockNum=%d, eventId=%d (exp %d), chainId=%s (exp %s), time=%s (exp to %s)", blockNum, eventRecord.ID, lastEventId+1, eventRecord.ChainID, chainID, eventRecord.Time, to)
return lastEventId, i, time.Since(fetchStart), fmt.Errorf("invalid event record received blockNum=%d, eventId=%d (exp %d), chainId=%s (exp %s), time=%s (exp to %s)", blockNum, eventRecord.ID, lastEventId+1, eventRecord.ChainID, chainID, eventRecord.Time, to)
}

eventRecordWithoutTime := eventRecord.BuildEventRecord()

recordBytes, err := rlp.EncodeToBytes(eventRecordWithoutTime)
if err != nil {
return lastEventId, err
return lastEventId, i, time.Since(fetchStart), err
}

data, err := stateReceiverABI.Pack(method, big.NewInt(eventRecord.Time.Unix()), recordBytes)
if err != nil {
logger.Error(fmt.Sprintf("[%s] Unable to pack tx for commitState", logPrefix), "err", err)
return lastEventId, err
return lastEventId, i, time.Since(fetchStart), err
}
var eventIdBuf [8]byte
binary.BigEndian.PutUint64(eventIdBuf[:], eventRecord.ID)
if err = tx.Put(kv.BorEvents, eventIdBuf[:], data); err != nil {
return lastEventId, err
return lastEventId, i, time.Since(fetchStart), err
}
if !wroteIndex {
var blockNumBuf [8]byte
binary.BigEndian.PutUint64(blockNumBuf[:], blockNum)
binary.BigEndian.PutUint64(eventIdBuf[:], eventRecord.ID)
if err = tx.Put(kv.BorEventNums, blockNumBuf[:], eventIdBuf[:]); err != nil {
return lastEventId, err
return lastEventId, i, time.Since(fetchStart), err
}
wroteIndex = true
}

lastEventId++
}

processTime := time.Since(processStart)

logger.Info(fmt.Sprintf("[%s] StateSyncData", logPrefix), "number", blockNum, "lastEventID", lastEventId, "total records", len(eventRecords), "fetch time", fetchTime, "process time", processTime)

return lastEventId, nil
return lastEventId, len(eventRecords), time.Since(fetchStart), nil
}

func fetchAndWriteSpans(
Expand All @@ -357,26 +361,26 @@ func fetchAndWriteSpans(
heimdallClient heimdall.IHeimdallClient,
logPrefix string,
logger log.Logger,
) error {
var spanID uint64
) (uint64, error) {
var spanId uint64
if blockNum > zerothSpanEnd {
spanID = 1 + (blockNum-zerothSpanEnd-1)/spanLength
spanId = 1 + (blockNum-zerothSpanEnd-1)/spanLength
}
logger.Info(fmt.Sprintf("[%s] Fetching span", logPrefix), "id", spanID)
response, err := heimdallClient.Span(ctx, spanID)
logger.Debug(fmt.Sprintf("[%s] Fetching span", logPrefix), "id", spanId)
response, err := heimdallClient.Span(ctx, spanId)
if err != nil {
return err
return 0, err
}
spanBytes, err := json.Marshal(response)
if err != nil {
return err
return 0, err
}
var spanIDBytes [8]byte
binary.BigEndian.PutUint64(spanIDBytes[:], spanID)
binary.BigEndian.PutUint64(spanIDBytes[:], spanId)
if err = tx.Put(kv.BorSpans, spanIDBytes[:], spanBytes); err != nil {
return err
return 0, err
}
return nil
return spanId, nil
}

func BorHeimdallUnwind(u *UnwindState, ctx context.Context, s *StageState, tx kv.RwTx, cfg BorHeimdallCfg) (err error) {
Expand Down

0 comments on commit d4d5cb9

Please sign in to comment.