Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db migration to reset blocks #4389

Merged
merged 19 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1332,10 +1332,13 @@ func TruncateBlocks(ctx context.Context, tx kv.RwTx, blockFrom uint64) error {
return err
}
}
if err := tx.Delete(kv.Headers, k, nil); err != nil {
// Copying k because otherwise the same memory will be reused
// for the next key and Delete below will end up deleting 1 more record than required
kCopy := common.CopyBytes(k)
if err := tx.Delete(kv.Headers, kCopy, nil); err != nil {
return err
}
if err := tx.Delete(kv.BlockBody, k, nil); err != nil {
if err := tx.Delete(kv.BlockBody, kCopy, nil); err != nil {
return err
}

Expand Down
9 changes: 9 additions & 0 deletions eth/stagedsync/stage_bodies.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ func BodiesForward(
test bool, // Set to true in tests, allows the stage to fail rather than wait indefinitely
firstCycle bool,
) error {
var doUpdate bool
if cfg.snapshots != nil && s.BlockNumber < cfg.snapshots.BlocksAvailable() {
s.BlockNumber = cfg.snapshots.BlocksAvailable()
doUpdate = true
}

var d1, d2, d3, d4, d5, d6 time.Duration
Expand All @@ -74,6 +76,13 @@ func BodiesForward(
}
timeout := cfg.timeout

// this update is required, because cfg.bd.UpdateFromDb(tx) below reads it and initialises requestedLow accordingly
// if not done, it will cause downloading from block 1
if doUpdate {
if err := s.Update(tx, s.BlockNumber); err != nil {
return err
}
}
// This will update bd.maxProgress
if _, _, _, err = cfg.bd.UpdateFromDb(tx); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion eth/stagedsync/stage_call_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func promoteCallTraces(logPrefix string, tx kv.RwTx, startBlock, endBlock uint64
case <-logEvery.C:
var m runtime.MemStats
libcommon.ReadMemStats(&m)
log.Info(fmt.Sprintf("[%s] Pruning call trace intermediate table", logPrefix), "number", blockNum,
log.Info(fmt.Sprintf("[%s] Pruning call trace table", logPrefix), "number", blockNum,
"alloc", libcommon.ByteCount(m.Alloc), "sys", libcommon.ByteCount(m.Sys))
}
if err = traceCursor.DeleteCurrentDuplicates(); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion migrations/db_schema_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"context"

"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/node/nodecfg/datadir"
)

var dbSchemaVersion5 = Migration{
Name: "db_schema_version5",
Up: func(db kv.RwDB, tmpdir string, progress []byte, BeforeCommit Callback) (err error) {
Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback) (err error) {
tx, err := db.BeginRw(context.Background())
if err != nil {
return err
Expand Down
10 changes: 7 additions & 3 deletions migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/node/nodecfg/datadir"
"github.com/ledgerwatch/log/v3"
"github.com/ugorji/go/codec"
)
Expand All @@ -33,6 +34,7 @@ var migrations = map[kv.Label][]Migration{
kv.ChainDB: {
dbSchemaVersion5,
txsBeginEnd,
resetBlocks,
},
kv.TxPoolDB: {},
kv.SentryDB: {},
Expand All @@ -41,7 +43,7 @@ var migrations = map[kv.Label][]Migration{
type Callback func(tx kv.RwTx, progress []byte, isDone bool) error
type Migration struct {
Name string
Up func(db kv.RwDB, tmpdir string, progress []byte, BeforeCommit Callback) error
Up func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback) error
}

var (
Expand Down Expand Up @@ -151,10 +153,11 @@ func (m *Migrator) VerifyVersion(db kv.RwDB) error {
return nil
}

func (m *Migrator) Apply(db kv.RwDB, datadir string) error {
func (m *Migrator) Apply(db kv.RwDB, dataDir string) error {
if len(m.Migrations) == 0 {
return nil
}
dirs := datadir.New(dataDir)

var applied map[string][]byte
if err := db.View(context.Background(), func(tx kv.Tx) error {
Expand Down Expand Up @@ -198,7 +201,8 @@ func (m *Migrator) Apply(db kv.RwDB, datadir string) error {
return fmt.Errorf("migrator.Apply: %w", err)
}

if err := v.Up(db, filepath.Join(datadir, "migrations", v.Name), progress, func(tx kv.RwTx, key []byte, isDone bool) error {
dirs.Tmp = filepath.Join(dirs.DataDir, "migrations", v.Name)
if err := v.Up(db, dirs, progress, func(tx kv.RwTx, key []byte, isDone bool) error {
if !isDone {
if key != nil {
if err := tx.Put(kv.Migrations, []byte("_progress_"+v.Name), key); err != nil {
Expand Down
19 changes: 10 additions & 9 deletions migrations/migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/node/nodecfg/datadir"

"github.com/stretchr/testify/require"
)
Expand All @@ -17,7 +18,7 @@ func TestApplyWithInit(t *testing.T) {
m := []Migration{
{
"one",
func(db kv.RwDB, tmpdir string, progress []byte, BeforeCommit Callback) (err error) {
func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback) (err error) {
tx, err := db.BeginRw(context.Background())
if err != nil {
return err
Expand All @@ -32,7 +33,7 @@ func TestApplyWithInit(t *testing.T) {
},
{
"two",
func(db kv.RwDB, tmpdir string, progress []byte, BeforeCommit Callback) (err error) {
func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback) (err error) {
tx, err := db.BeginRw(context.Background())
if err != nil {
return err
Expand Down Expand Up @@ -81,14 +82,14 @@ func TestApplyWithoutInit(t *testing.T) {
m := []Migration{
{
"one",
func(db kv.RwDB, tmpdir string, progress []byte, BeforeCommit Callback) (err error) {
func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback) (err error) {
t.Fatal("shouldn't been executed")
return nil
},
},
{
"two",
func(db kv.RwDB, tmpdir string, progress []byte, BeforeCommit Callback) (err error) {
func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback) (err error) {
tx, err := db.BeginRw(context.Background())
if err != nil {
return err
Expand Down Expand Up @@ -145,7 +146,7 @@ func TestWhenNonFirstMigrationAlreadyApplied(t *testing.T) {
m := []Migration{
{
"one",
func(db kv.RwDB, tmpdir string, progress []byte, BeforeCommit Callback) (err error) {
func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback) (err error) {
tx, err := db.BeginRw(context.Background())
if err != nil {
return err
Expand All @@ -160,7 +161,7 @@ func TestWhenNonFirstMigrationAlreadyApplied(t *testing.T) {
},
{
"two",
func(db kv.RwDB, tmpdir string, progress []byte, BeforeCommit Callback) (err error) {
func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback) (err error) {
t.Fatal("shouldn't been executed")
return nil
},
Expand Down Expand Up @@ -226,7 +227,7 @@ func TestValidation(t *testing.T) {
m := []Migration{
{
Name: "repeated_name",
Up: func(db kv.RwDB, tmpdir string, progress []byte, BeforeCommit Callback) (err error) {
Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback) (err error) {
tx, err := db.BeginRw(context.Background())
if err != nil {
return err
Expand All @@ -241,7 +242,7 @@ func TestValidation(t *testing.T) {
},
{
Name: "repeated_name",
Up: func(db kv.RwDB, tmpdir string, progress []byte, BeforeCommit Callback) (err error) {
Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback) (err error) {
tx, err := db.BeginRw(context.Background())
if err != nil {
return err
Expand Down Expand Up @@ -275,7 +276,7 @@ func TestCommitCallRequired(t *testing.T) {
m := []Migration{
{
Name: "one",
Up: func(db kv.RwDB, tmpdir string, progress []byte, BeforeCommit Callback) (err error) {
Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback) (err error) {
//don't call BeforeCommit
return nil
},
Expand Down
69 changes: 69 additions & 0 deletions migrations/reset_blocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package migrations

import (
"context"

"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/rawdb/rawdbreset"
"github.com/ledgerwatch/erigon/node/nodecfg/datadir"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snap"
"github.com/ledgerwatch/log/v3"
)

var resetBlocks = Migration{
Name: "reset_blocks_3",
Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback) (err error) {
tx, err := db.BeginRw(context.Background())
if err != nil {
return err
}
defer tx.Rollback()

enabled, err := snap.Enabled(tx)
if err != nil {
return err
}

if !enabled {
if err := BeforeCommit(tx, nil, true); err != nil {
return err
}
return
}
genesisBlock := rawdb.ReadHeaderByNumber(tx, 0)
if genesisBlock == nil {
if err := BeforeCommit(tx, nil, true); err != nil {
return err
}
return nil
}
chainConfig, err := rawdb.ReadChainConfig(tx, genesisBlock.Hash())
if err != nil {
return err
}
log.Warn("NOTE: this migration will remove recent blocks (and senders) to fix several recent bugs. Your node will re-download last ~400K blocks, should not take very long")

if err := snap.RemoveNonPreverifiedFiles(chainConfig.ChainName, dirs.Snap); err != nil {
return err
}

if err := rawdbreset.ResetBlocks(tx); err != nil {
return err
}

if err := rawdbreset.ResetSenders(tx); err != nil {
return err
}

if err := rawdbreset.ResetTxLookup(tx); err != nil {
return err
}

// This migration is no-op, but it forces the migration mechanism to apply it and thus write the DB schema version info
if err := BeforeCommit(tx, nil, true); err != nil {
return err
}
return tx.Commit()
},
}
3 changes: 2 additions & 1 deletion migrations/txs_begin_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/node/nodecfg/datadir"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/log/v3"
)
Expand All @@ -25,7 +26,7 @@ var ErrTxsBeginEndNoMigration = fmt.Errorf("in this Erigon version DB format was

var txsBeginEnd = Migration{
Name: "txs_begin_end",
Up: func(db kv.RwDB, tmpdir string, progress []byte, BeforeCommit Callback) (err error) {
Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback) (err error) {
logEvery := time.NewTicker(10 * time.Second)
defer logEvery.Stop()

Expand Down
6 changes: 3 additions & 3 deletions turbo/snapshotsync/block_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func (s *RoSnapshots) Reopen() error {
s.Txs.lock.Lock()
defer s.Txs.lock.Unlock()
s.closeSegmentsLocked()
files, err := segments2(s.dir)
files, err := segments(s.dir)
if err != nil {
return err
}
Expand Down Expand Up @@ -483,7 +483,7 @@ func (s *RoSnapshots) ReopenSegments() error {
s.Txs.lock.Lock()
defer s.Txs.lock.Unlock()
s.closeSegmentsLocked()
files, err := segments2(s.dir)
files, err := segments(s.dir)
if err != nil {
return err
}
Expand Down Expand Up @@ -830,7 +830,7 @@ func noOverlaps(in []snap.FileInfo) (res []snap.FileInfo) {
return res
}

func segments2(dir string) (res []snap.FileInfo, err error) {
func segments(dir string) (res []snap.FileInfo, err error) {
list, err := snap.Segments(dir)
if err != nil {
return nil, err
Expand Down
42 changes: 42 additions & 0 deletions turbo/snapshotsync/snap/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strconv"
"strings"

"github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshothashes"
"golang.org/x/exp/slices"
)

Expand Down Expand Up @@ -208,3 +209,44 @@ func ParseDir(dir string) (res []FileInfo, err error) {

return res, nil
}

func RemoveNonPreverifiedFiles(chainName, snapDir string) error {
preverified := snapshothashes.KnownConfig(chainName).Preverified
keep := map[string]struct{}{}
for _, p := range preverified {
ext := filepath.Ext(p.Name)
withoutExt := p.Name[0 : len(p.Name)-len(ext)]
keep[withoutExt] = struct{}{}
}
list, err := Segments(snapDir)
if err != nil {
return err
}
for _, f := range list {
_, fname := filepath.Split(f.Path)
ext := filepath.Ext(fname)
withoutExt := fname[0 : len(fname)-len(ext)]
if _, ok := keep[withoutExt]; !ok {
_ = os.Remove(f.Path)
} else {
if f.T == Transactions {
idxPath := IdxFileName(f.From, f.To, Transactions2Block.String())
idxExt := filepath.Ext(idxPath)
keep[idxPath[0:len(idxPath)-len(idxExt)]] = struct{}{}
}
}
}
list, err = IdxFiles(snapDir)
if err != nil {
return err
}
for _, f := range list {
_, fname := filepath.Split(f.Path)
ext := filepath.Ext(fname)
withoutExt := fname[0 : len(fname)-len(ext)]
if _, ok := keep[withoutExt]; !ok {
_ = os.Remove(f.Path)
}
}
return nil
}