Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Snapshot] Snapshot recovery handler added #2074

Merged
merged 11 commits into from
Jan 19, 2024
2 changes: 2 additions & 0 deletions consensus/istanbul/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ type backend struct {

// Node type
nodetype common.ConnType

isRestoringSnapshots atomic.Bool
}

func (sb *backend) NodeType() common.ConnType {
Expand Down
63 changes: 63 additions & 0 deletions consensus/istanbul/backend/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,10 +921,73 @@ func (sb *backend) snapshot(chain consensus.ChainReader, number uint64, hash com
logger.Trace("Stored voting snapshot to disk", "number", snap.Number, "hash", snap.Hash)
}

sb.regen(chain, headers)

sb.recents.Add(snap.Hash, snap)
return snap, err
}

// regen commits snapshot data to database
// regen is triggered if there is any checkpoint block in the `headers`.
// For each checkpoint block, this function verifies the existence of its snapshot in DB and stores one if missing.
/*
Triggered:
| ^ ^ ^ ^ ...|
SI SI*(last snapshot) SI SI
| header1, .. headerN |
Not triggered: (Guaranteed SI* was committed before )
| ^ ^ ^ ^ ...|
SI SI*(last snapshot) SI SI
| header1, .. headerN |
*/
func (sb *backend) regen(chain consensus.ChainReader, headers []*types.Header) {
// Prevent nested call. Ignore header length one
// because it was handled before the `regen` called.
if !sb.isRestoringSnapshots.CompareAndSwap(false, true) || len(headers) <= 1 {
return
}
defer func() {
sb.isRestoringSnapshots.Store(false)
}()

var (
from = headers[0].Number.Uint64()
to = headers[len(headers)-1].Number.Uint64()
start = time.Now()
commitTried = false
)

// Shortcut: No missing snapshot data to be processed.
if to-(to%uint64(params.CheckpointInterval)) < from {
return
}

for _, header := range headers {
var (
hn = header.Number.Uint64()
hh = header.Hash()
)
if params.IsCheckpointInterval(hn) {
// Store snapshot data if it was not committed before
if loadSnap, _ := sb.db.ReadIstanbulSnapshot(hh); loadSnap != nil {
continue
}
snap, err := sb.snapshot(chain, hn, hh, nil, false)
if err != nil {
logger.Warn("[Snapshot] Snapshot restoring failed", "len(headers)", len(headers), "from", from, "to", to, "headerNumber", hn)
continue
}
if err = snap.store(sb.db); err != nil {
logger.Warn("[Snapshot] Snapshot restoring failed", "len(headers)", len(headers), "from", from, "to", to, "headerNumber", hn)
}
commitTried = true
}
}
if commitTried { // This prevents pushing too many logs by potential DoS attack
logger.Trace("[Snapshot] Snapshot restoring completed", "len(headers)", len(headers), "from", from, "to", to, "elapsed", time.Since(start))
}
}

// FIXME: Need to update this for Istanbul
// sigHash returns the hash which is used as input for the Istanbul
// signing. It is the hash of the entire header apart from the 65 byte signature
Expand Down