Skip to content

Commit

Permalink
Use pre-encoded ATX blobs in checkpoint recovery (#5858)
Browse files Browse the repository at this point in the history
## Motivation

It avoids encoding the ATX again when putting into the new DB.



Co-authored-by: Matthias <5011972+fasmat@users.noreply.github.com>
  • Loading branch information
poszu and fasmat committed Apr 18, 2024
1 parent a516b29 commit 936f977
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 158 deletions.
71 changes: 48 additions & 23 deletions checkpoint/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,14 @@ func copyToLocalFile(
return dst, nil
}

type AtxDep struct {
ID types.ATXID
PublishEpoch types.EpochID
Blob []byte
}

type PreservedData struct {
Deps []*types.VerifiedActivationTx
Deps []*AtxDep
Proofs []*types.PoetProofMessage
}

Expand Down Expand Up @@ -190,7 +196,7 @@ func recoverFromLocalFile(
log.Int("num accounts", len(data.accounts)),
log.Int("num atxs", len(data.atxs)),
)
deps := make(map[types.ATXID]*types.VerifiedActivationTx)
deps := make(map[types.ATXID]*AtxDep)
proofs := make(map[types.PoetProofRef]*types.PoetProofMessage)
if cfg.PreserveOwnAtx {
logger.With().Info("preserving own atx deps",
Expand All @@ -216,25 +222,30 @@ func recoverFromLocalFile(
maps.Copy(proofs, nodeProofs)
}
}
if err := db.Close(); err != nil {
return nil, fmt.Errorf("close old db: %w", err)
}

allDeps := maps.Values(deps)
// sort ATXs them by publishEpoch and then by ID
slices.SortFunc(allDeps, func(i, j *types.VerifiedActivationTx) int {
return bytes.Compare(i.ID().Bytes(), j.ID().Bytes())
slices.SortFunc(allDeps, func(i, j *AtxDep) int {
return bytes.Compare(i.ID.Bytes(), j.ID.Bytes())
})
slices.SortStableFunc(allDeps, func(i, j *types.VerifiedActivationTx) int {
slices.SortStableFunc(allDeps, func(i, j *AtxDep) int {
return int(i.PublishEpoch) - int(j.PublishEpoch)
})
allProofs := make([]*types.PoetProofMessage, 0, len(proofs))
for _, dep := range allDeps {
proof, ok := proofs[types.PoetProofRef(dep.GetPoetProofRef())]
poetProofRef, err := atxs.PoetProofRef(context.Background(), db, dep.ID)
if err != nil {
return nil, fmt.Errorf("get poet proof ref (%v): %w", dep.ID, err)
}
proof, ok := proofs[poetProofRef]
if !ok {
return nil, fmt.Errorf("missing poet proof for atx %v", dep.ID())
return nil, fmt.Errorf("missing poet proof for atx %v", dep.ID)
}
allProofs = append(allProofs, proof)
}
if err := db.Close(); err != nil {
return nil, fmt.Errorf("close old db: %w", err)
}

// all is ready. backup the old data and create new.
backupDir, err := backupOldDb(fs, cfg.DataDir, cfg.DbFile)
Expand Down Expand Up @@ -360,7 +371,7 @@ func collectOwnAtxDeps(
nodeID types.NodeID,
goldenATX types.ATXID,
data *recoveryData,
) (map[types.ATXID]*types.VerifiedActivationTx, map[types.PoetProofRef]*types.PoetProofMessage, error) {
) (map[types.ATXID]*AtxDep, map[types.PoetProofRef]*types.PoetProofMessage, error) {
atxid, err := atxs.GetLastIDByNodeID(db, nodeID)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
return nil, nil, fmt.Errorf("query own last atx id: %w", err)
Expand Down Expand Up @@ -389,7 +400,7 @@ func collectOwnAtxDeps(
all[cAtx.ID] = struct{}{}
}
var (
deps map[types.ATXID]*types.VerifiedActivationTx
deps map[types.ATXID]*AtxDep
proofs map[types.PoetProofRef]*types.PoetProofMessage
)
if ref != types.EmptyATXID {
Expand Down Expand Up @@ -424,8 +435,8 @@ func collectDeps(
db *sql.Database,
ref types.ATXID,
all map[types.ATXID]struct{},
) (map[types.ATXID]*types.VerifiedActivationTx, map[types.PoetProofRef]*types.PoetProofMessage, error) {
deps := make(map[types.ATXID]*types.VerifiedActivationTx)
) (map[types.ATXID]*AtxDep, map[types.PoetProofRef]*types.PoetProofMessage, error) {
deps := make(map[types.ATXID]*AtxDep)
if err := collect(db, ref, all, deps); err != nil {
return nil, nil, err
}
Expand All @@ -440,7 +451,7 @@ func collect(
db *sql.Database,
ref types.ATXID,
all map[types.ATXID]struct{},
deps map[types.ATXID]*types.VerifiedActivationTx,
deps map[types.ATXID]*AtxDep,
) error {
if _, ok := all[ref]; ok {
return nil
Expand Down Expand Up @@ -471,26 +482,40 @@ func collect(
if err = collect(db, atx.PositioningATX, all, deps); err != nil {
return err
}
deps[ref] = atx
var blob sql.Blob
err = atxs.LoadBlob(context.Background(), db, ref.Bytes(), &blob)
if err != nil {
return fmt.Errorf("load atx blob %v: %w", ref, err)
}

deps[ref] = &AtxDep{
ID: ref,
PublishEpoch: atx.PublishEpoch,
Blob: blob.Bytes,
}
all[ref] = struct{}{}
return nil
}

func poetProofs(
db *sql.Database,
vAtxs map[types.ATXID]*types.VerifiedActivationTx,
atxIds map[types.ATXID]*AtxDep,
) (map[types.PoetProofRef]*types.PoetProofMessage, error) {
proofs := make(map[types.PoetProofRef]*types.PoetProofMessage, len(vAtxs))
for _, vatx := range vAtxs {
proof, err := poets.Get(db, types.PoetProofRef(vatx.GetPoetProofRef()))
proofs := make(map[types.PoetProofRef]*types.PoetProofMessage, len(atxIds))
for atx := range atxIds {
ref, err := atxs.PoetProofRef(context.Background(), db, atx)
if err != nil {
return nil, fmt.Errorf("get poet proof ref: %w", err)
}
proof, err := poets.Get(db, ref)
if err != nil {
return nil, fmt.Errorf("get poet proof (%v): %w", vatx.ID(), err)
return nil, fmt.Errorf("get poet proof (atx: %v): %w", atx, err)
}
var msg types.PoetProofMessage
if err := codec.Decode(proof, &msg); err != nil {
return nil, fmt.Errorf("decode poet proof (%v): %w", vatx.ID(), err)
return nil, fmt.Errorf("decode poet proof (%v): %w", atx, err)
}
proofs[types.PoetProofRef(vatx.GetPoetProofRef())] = &msg
proofs[ref] = &msg
}
return proofs, nil
}

0 comments on commit 936f977

Please sign in to comment.