Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Return ATX version along with blob #5922

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion activation/activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ func (b *Builder) Regossip(ctx context.Context, nodeID types.NodeID) error {
return err
}
var blob sql.Blob
if err := atxs.LoadBlob(ctx, b.db, atx.Bytes(), &blob); err != nil {
if _, err := atxs.LoadBlob(ctx, b.db, atx.Bytes(), &blob); err != nil {
return fmt.Errorf("get blob %s: %w", atx.ShortString(), err)
}
if len(blob.Bytes) == 0 {
Expand Down
4 changes: 3 additions & 1 deletion activation/activation_multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ func TestRegossip(t *testing.T) {
}

var blob sql.Blob
require.NoError(t, atxs.LoadBlob(context.Background(), tab.db, refAtx.ID().Bytes(), &blob))
ver, err := atxs.LoadBlob(context.Background(), tab.db, refAtx.ID().Bytes(), &blob)
require.NoError(t, err)
require.Equal(t, types.AtxV1, ver)

// atx will be regossiped once (by the smesher)
tab.mclock.EXPECT().CurrentLayer().Return(layer)
Expand Down
17 changes: 11 additions & 6 deletions activation/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,8 @@
// Obtain the atxSignature of the given ATX.
func atxSignature(ctx context.Context, db sql.Executor, id types.ATXID) (types.EdSignature, error) {
var blob sql.Blob
if err := atxs.LoadBlob(ctx, db, id.Bytes(), &blob); err != nil {
v, err := atxs.LoadBlob(ctx, db, id.Bytes(), &blob)
if err != nil {
return types.EmptyEdSignature, err
}

Expand All @@ -306,10 +307,14 @@
return types.EmptyEdSignature, fmt.Errorf("can't get signature for a golden (checkpointed) ATX: %s", id)
}

// TODO: decide how to decode based on the `version` column.
var prev wire.ActivationTxV1
if err := codec.Decode(blob.Bytes, &prev); err != nil {
return types.EmptyEdSignature, fmt.Errorf("decoding previous atx: %w", err)
// TODO: implement for ATX V2
switch v {
case types.AtxV1:
var atx wire.ActivationTxV1
if err := codec.Decode(blob.Bytes, &atx); err != nil {
return types.EmptyEdSignature, fmt.Errorf("decoding atx v1: %w", err)

Check warning on line 315 in activation/handler.go

View check run for this annotation

Codecov / codecov/patch

activation/handler.go#L315

Added line #L315 was not covered by tests
}
return atx.Signature, nil
}
return prev.Signature, nil
return types.EmptyEdSignature, fmt.Errorf("unsupported ATX version: %v", v)

Check warning on line 319 in activation/handler.go

View check run for this annotation

Codecov / codecov/patch

activation/handler.go#L319

Added line #L319 was not covered by tests
}
6 changes: 5 additions & 1 deletion activation/handler_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@
// to use the effective num units.
func (h *HandlerV1) previous(ctx context.Context, atx *wire.ActivationTxV1) (*types.ActivationTx, error) {
var blob sql.Blob
if err := atxs.LoadBlob(ctx, h.cdb, atx.PrevATXID[:], &blob); err != nil {
v, err := atxs.LoadBlob(ctx, h.cdb, atx.PrevATXID[:], &blob)
if err != nil {
return nil, err
}

Expand All @@ -150,6 +151,9 @@
}
return atx, nil
}
if v != types.AtxV1 {
return nil, fmt.Errorf("previous atx %s is not of version 1", atx.PrevATXID)

Check warning on line 155 in activation/handler_v1.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v1.go#L155

Added line #L155 was not covered by tests
}

var prev wire.ActivationTxV1
if err := codec.Decode(blob.Bytes, &prev); err != nil {
Expand Down
47 changes: 26 additions & 21 deletions activation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,34 +373,39 @@

func (v *Validator) getAtxDeps(ctx context.Context, db sql.Executor, id types.ATXID) (*atxDeps, error) {
var blob sql.Blob
if err := atxs.LoadBlob(ctx, v.db, id.Bytes(), &blob); err != nil {
version, err := atxs.LoadBlob(ctx, v.db, id.Bytes(), &blob)
if err != nil {
return nil, fmt.Errorf("getting blob for %s: %w", id, err)
}

// TODO: decide about version based on `version` column
var atx wire.ActivationTxV1
if err := codec.Decode(blob.Bytes, &atx); err != nil {
return nil, fmt.Errorf("decoding ATX blob: %w", err)
}
var commitment types.ATXID
if atx.CommitmentATXID != nil {
commitment = *atx.CommitmentATXID
} else {
catx, err := atxs.CommitmentATX(v.db, atx.SmesherID)
if err != nil {
return nil, fmt.Errorf("getting commitment ATX: %w", err)
// TODO: implement ATX V2
switch version {
case types.AtxV1:
var commitment types.ATXID
var atx wire.ActivationTxV1
if err := codec.Decode(blob.Bytes, &atx); err != nil {
return nil, fmt.Errorf("decoding ATX blob: %w", err)

Check warning on line 387 in activation/validation.go

View check run for this annotation

Codecov / codecov/patch

activation/validation.go#L387

Added line #L387 was not covered by tests
}
if atx.CommitmentATXID != nil {
commitment = *atx.CommitmentATXID
} else {
catx, err := atxs.CommitmentATX(v.db, atx.SmesherID)
if err != nil {
return nil, fmt.Errorf("getting commitment ATX: %w", err)

Check warning on line 394 in activation/validation.go

View check run for this annotation

Codecov / codecov/patch

activation/validation.go#L394

Added line #L394 was not covered by tests
}
commitment = catx
}
commitment = catx
}

deps := &atxDeps{
nipost: *wire.NiPostFromWireV1(atx.NIPost),
positioning: atx.PositioningATXID,
previous: atx.PrevATXID,
commitment: commitment,
deps := &atxDeps{
nipost: *wire.NiPostFromWireV1(atx.NIPost),
positioning: atx.PositioningATXID,
previous: atx.PrevATXID,
commitment: commitment,
}
return deps, nil
}

return deps, nil
return nil, fmt.Errorf("unsupported ATX version: %v", version)

Check warning on line 408 in activation/validation.go

View check run for this annotation

Codecov / codecov/patch

activation/validation.go#L408

Added line #L408 was not covered by tests
}

func (v *Validator) verifyChainWithOpts(
Expand Down
2 changes: 1 addition & 1 deletion checkpoint/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ func collect(
return err
}
var blob sql.Blob
err = atxs.LoadBlob(context.Background(), db, ref.Bytes(), &blob)
_, err = atxs.LoadBlob(context.Background(), db, ref.Bytes(), &blob)
if err != nil {
return fmt.Errorf("load atx blob %v: %w", ref, err)
}
Expand Down
37 changes: 23 additions & 14 deletions checkpoint/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,29 +168,38 @@

func positioningATX(ctx context.Context, db sql.Executor, id types.ATXID) (types.ATXID, error) {
var blob sql.Blob
if err := atxs.LoadBlob(ctx, db, id.Bytes(), &blob); err != nil {
version, err := atxs.LoadBlob(ctx, db, id.Bytes(), &blob)
if err != nil {
return types.EmptyATXID, fmt.Errorf("get blob %s: %w", id, err)
}
// TODO: decide how to decode based on the `version` column
var atx wire.ActivationTxV1
if err := codec.Decode(blob.Bytes, &atx); err != nil {
return types.EmptyATXID, fmt.Errorf("decode %s: %w", id, err)
// TODO: implement for ATX V2
switch version {
case types.AtxV1:
var atx wire.ActivationTxV1
if err := codec.Decode(blob.Bytes, &atx); err != nil {
return types.EmptyATXID, fmt.Errorf("decode %s: %w", id, err)

Check warning on line 180 in checkpoint/util.go

View check run for this annotation

Codecov / codecov/patch

checkpoint/util.go#L180

Added line #L180 was not covered by tests
}
return atx.PositioningATXID, nil
}

return atx.PositioningATXID, nil
return types.EmptyATXID, fmt.Errorf("unsupported ATX version: %v", version)

Check warning on line 184 in checkpoint/util.go

View check run for this annotation

Codecov / codecov/patch

checkpoint/util.go#L184

Added line #L184 was not covered by tests
}

func poetProofRef(ctx context.Context, db sql.Executor, id types.ATXID) (types.PoetProofRef, error) {
var blob sql.Blob
if err := atxs.LoadBlob(ctx, db, id.Bytes(), &blob); err != nil {
version, err := atxs.LoadBlob(ctx, db, id.Bytes(), &blob)
if err != nil {
return types.PoetProofRef{}, fmt.Errorf("getting blob for %s: %w", id, err)
}

// TODO: decide about version based the `version` column in `atx_blobs`
var atx wire.ActivationTxV1
if err := codec.Decode(blob.Bytes, &atx); err != nil {
return types.PoetProofRef{}, fmt.Errorf("decoding ATX blob: %w", err)
}
// TODO: implement for ATX V2
switch version {
case types.AtxV1:
var atx wire.ActivationTxV1
if err := codec.Decode(blob.Bytes, &atx); err != nil {
return types.PoetProofRef{}, fmt.Errorf("decoding ATX blob: %w", err)

Check warning on line 199 in checkpoint/util.go

View check run for this annotation

Codecov / codecov/patch

checkpoint/util.go#L199

Added line #L199 was not covered by tests
}

return types.PoetProofRef(atx.NIPost.PostMetadata.Challenge), nil
return types.PoetProofRef(atx.NIPost.PostMetadata.Challenge), nil
}
return types.PoetProofRef{}, fmt.Errorf("unsupported ATX version: %v", version)

Check warning on line 204 in checkpoint/util.go

View check run for this annotation

Codecov / codecov/patch

checkpoint/util.go#L204

Added line #L204 was not covered by tests
}
5 changes: 4 additions & 1 deletion datastore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,10 @@ type (
)

var loadBlobDispatch = map[Hint]loadBlobFunc{
ATXDB: atxs.LoadBlob,
ATXDB: func(ctx context.Context, db sql.Executor, key []byte, blob *sql.Blob) error {
_, err := atxs.LoadBlob(ctx, db, key, blob)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need blob version handling in fetch client / server, but that can be done later, of course.
Initial syncv2 version will reuse current blob fetching mechanism

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would it need to care about the version here? The fetched blobs are pushed through the ATX handler which can figure out the version while decoding on its own.

return err
},
BallotDB: ballots.LoadBlob,
BlockDB: blocks.LoadBlob,
TXDB: transactions.LoadBlob,
Expand Down
72 changes: 47 additions & 25 deletions sql/atxs/atxs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"slices"
"time"

sqlite "github.com/go-llsqlite/crawshaw"
Expand Down Expand Up @@ -324,37 +325,58 @@ func GetBlobSizes(db sql.Executor, ids [][]byte) (sizes []int, err error) {
}

// LoadBlob loads ATX as an encoded blob, ready to be sent over the wire.
func LoadBlob(ctx context.Context, db sql.Executor, id []byte, blob *sql.Blob) error {
func LoadBlob(ctx context.Context, db sql.Executor, id []byte, blob *sql.Blob) (types.AtxVersion, error) {
if sql.IsCached(db) {
b, err := getBlob(ctx, db, id)
type cachedBlob struct {
version types.AtxVersion
buf []byte
}
cacheKey := sql.QueryCacheKey(CacheKindATXBlob, string(id))
cached, err := sql.WithCachedValue(ctx, db, cacheKey, func(context.Context) (*cachedBlob, error) {
// We don't use the provided blob in this case to avoid
// storing references to the underlying slice (subsequent calls would modify it).
var blob sql.Blob
v, err := getBlob(ctx, db, id, &blob)
if err != nil {
return nil, err
}
return &cachedBlob{version: v, buf: blob.Bytes}, nil
})
if err != nil {
return err
return 0, err
}
blob.Bytes = b
return nil

n := len(cached.buf)
blob.Bytes = slices.Grow(blob.Bytes[:0], n)[:n]
copy(blob.Bytes, cached.buf)
return cached.version, nil
}
return sql.LoadBlob(db, "select atx from atx_blobs where id = ?1", id, blob)

return getBlob(ctx, db, id, blob)
}

func getBlob(ctx context.Context, db sql.Executor, id []byte) (buf []byte, err error) {
cacheKey := sql.QueryCacheKey(CacheKindATXBlob, string(id))
return sql.WithCachedValue(ctx, db, cacheKey, func(context.Context) ([]byte, error) {
if rows, err := db.Exec("select atx from atx_blobs where id = ?1",
func(stmt *sql.Statement) {
stmt.BindBytes(1, id)
}, func(stmt *sql.Statement) bool {
if stmt.ColumnLen(0) > 0 {
buf = make([]byte, stmt.ColumnLen(0))
stmt.ColumnBytes(0, buf)
}
return true
}); err != nil {
return nil, fmt.Errorf("get %s: %w", types.BytesToHash(id), err)
} else if rows == 0 {
return nil, fmt.Errorf("%w: atx %s", sql.ErrNotFound, types.BytesToHash(id))
}
return buf, nil
})
func getBlob(ctx context.Context, db sql.Executor, id []byte, blob *sql.Blob) (types.AtxVersion, error) {
var version types.AtxVersion
rows, err := db.Exec("select atx, version from atx_blobs where id = ?1",
func(stmt *sql.Statement) {
stmt.BindBytes(1, id)
}, func(stmt *sql.Statement) bool {
n := stmt.ColumnLen(0)
blob.Bytes = slices.Grow(blob.Bytes[0:], n)[:n]
stmt.ColumnReader(0).Read(blob.Bytes)
poszu marked this conversation as resolved.
Show resolved Hide resolved

version = types.AtxVersion(stmt.ColumnInt(1))
return true
},
)
if err != nil {
return 0, fmt.Errorf("get %v: %w", types.BytesToHash(id), err)
}
if rows == 0 {
return 0, fmt.Errorf("%w: atx %s", sql.ErrNotFound, types.BytesToHash(id))
}

return version, nil
}

// NonceByID retrieves VRFNonce corresponding to the specified ATX ID.
Expand Down
Loading
Loading