Skip to content

Commit

Permalink
Try #5221:
Browse files Browse the repository at this point in the history
  • Loading branch information
spacemesh-bors[bot] committed May 22, 2024
2 parents 3ca48df + 62ea324 commit 66a61f1
Show file tree
Hide file tree
Showing 46 changed files with 2,422 additions and 622 deletions.
37 changes: 29 additions & 8 deletions activation/activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"github.com/spacemeshos/go-spacemesh/sql/localsql/nipost"
)

var ErrNotFound = errors.New("not found")

// PoetConfig is the configuration to interact with the poet server.
type PoetConfig struct {
PhaseShift time.Duration `mapstructure:"phase-shift"`
Expand Down Expand Up @@ -82,6 +84,7 @@ type Builder struct {
syncer syncer
log *zap.Logger
parentCtx context.Context
poets []PoetClient
poetCfg PoetConfig
poetRetryInterval time.Duration
// delay before PoST in ATX is considered valid (counting from the time it was received)
Expand Down Expand Up @@ -138,6 +141,12 @@ func WithPoetConfig(c PoetConfig) BuilderOption {
}
}

func WithPoets(poets ...PoetClient) BuilderOption {
return func(b *Builder) {
b.poets = poets
}
}

func WithValidator(v nipostValidator) BuilderOption {
return func(b *Builder) {
b.validator = v
Expand Down Expand Up @@ -323,7 +332,7 @@ func (b *Builder) buildInitialPost(ctx context.Context, nodeID types.NodeID) err
return nil
}
// ...and if we haven't stored an initial post yet.
_, err := nipost.InitialPost(b.localDB, nodeID)
_, err := nipost.GetPost(b.localDB, nodeID)
switch {
case err == nil:
b.log.Info("load initial post from db")
Expand All @@ -347,9 +356,10 @@ func (b *Builder) buildInitialPost(ctx context.Context, nodeID types.NodeID) err
return errors.New("nil VRF nonce")
}
initialPost := nipost.Post{
Nonce: post.Nonce,
Indices: post.Indices,
Pow: post.Pow,
Nonce: post.Nonce,
Indices: post.Indices,
Pow: post.Pow,
Challenge: shared.ZeroChallenge,

NumUnits: postInfo.NumUnits,
CommitmentATX: postInfo.CommitmentATX,
Expand All @@ -361,7 +371,7 @@ func (b *Builder) buildInitialPost(ctx context.Context, nodeID types.NodeID) err
}, postInfo.NumUnits)
if err != nil {
b.log.Error("initial POST is invalid", log.ZShortStringer("smesherID", nodeID), zap.Error(err))
if err := nipost.RemoveInitialPost(b.localDB, nodeID); err != nil {
if err := nipost.RemovePost(b.localDB, nodeID); err != nil {
b.log.Fatal("failed to remove initial post", log.ZShortStringer("smesherID", nodeID), zap.Error(err))
}
return fmt.Errorf("initial POST is invalid: %w", err)
Expand All @@ -371,7 +381,7 @@ func (b *Builder) buildInitialPost(ctx context.Context, nodeID types.NodeID) err
public.PostSeconds.Set(float64(time.Since(startTime)))
b.log.Info("created the initial post")

return nipost.AddInitialPost(b.localDB, nodeID, initialPost)
return nipost.AddPost(b.localDB, nodeID, initialPost)
}

func (b *Builder) run(ctx context.Context, sig *signing.EdSigner) {
Expand All @@ -390,6 +400,17 @@ func (b *Builder) run(ctx context.Context, sig *signing.EdSigner) {
case <-b.layerClock.AwaitLayer(currentLayer.Add(1)):
}
}
var eg errgroup.Group
for _, poet := range b.poets {
eg.Go(func() error {
_, err := poet.Certify(ctx, sig.NodeID())
if err != nil {
b.log.Warn("failed to certify poet", zap.Error(err), log.ZShortStringer("smesherID", sig.NodeID()))
}
return nil
})
}
eg.Wait()

for {
err := b.PublishActivationTx(ctx, sig)
Expand Down Expand Up @@ -515,7 +536,7 @@ func (b *Builder) BuildNIPostChallenge(ctx context.Context, nodeID types.NodeID)
switch {
case errors.Is(err, sql.ErrNotFound):
logger.Info("no previous ATX found, creating an initial nipost challenge")
post, err := nipost.InitialPost(b.localDB, nodeID)
post, err := nipost.GetPost(b.localDB, nodeID)
if err != nil {
return nil, fmt.Errorf("get initial post: %w", err)
}
Expand All @@ -531,7 +552,7 @@ func (b *Builder) BuildNIPostChallenge(ctx context.Context, nodeID types.NodeID)
}, post.NumUnits)
if err != nil {
logger.Error("initial POST is invalid", zap.Error(err))
if err := nipost.RemoveInitialPost(b.localDB, nodeID); err != nil {
if err := nipost.RemovePost(b.localDB, nodeID); err != nil {
logger.Fatal("failed to remove initial post", zap.Error(err))
}
return nil, fmt.Errorf("initial POST is invalid: %w", err)
Expand Down
10 changes: 7 additions & 3 deletions activation/activation_multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,12 @@ func Test_Builder_Multi_InitialPost(t *testing.T) {
},
nil,
)
require.NoError(t, tab.buildInitialPost(context.Background(), sig.NodeID()))
err := tab.buildInitialPost(context.Background(), sig.NodeID())
require.NoError(t, err)

// postClient.Proof() should not be called again
require.NoError(t, tab.buildInitialPost(context.Background(), sig.NodeID()))
err = tab.buildInitialPost(context.Background(), sig.NodeID())
require.NoError(t, err)
return nil
})
}
Expand Down Expand Up @@ -397,7 +399,9 @@ func Test_Builder_Multi_HappyPath(t *testing.T) {
VRFNonce: types.VRFPostIndex(rand.Uint64()),
}
nipostState[sig.NodeID()] = state
tab.mnipost.EXPECT().BuildNIPost(gomock.Any(), sig, ref.PublishEpoch, ref.Hash()).Return(state, nil)
tab.mnipost.EXPECT().
BuildNIPost(gomock.Any(), sig, ref.PublishEpoch, ref.Hash()).
Return(state, nil)

// awaiting atx publication epoch log
tab.mclock.EXPECT().CurrentLayer().DoAndReturn(
Expand Down
56 changes: 32 additions & 24 deletions activation/activation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,8 +666,9 @@ func TestBuilder_PublishActivationTx_NoPrevATX(t *testing.T) {
NumUnits: uint32(12),
CommitmentATX: types.RandomATXID(),
VRFNonce: types.VRFPostIndex(rand.Uint64()),
Challenge: shared.ZeroChallenge,
}
require.NoError(t, nipost.AddInitialPost(tab.localDb, sig.NodeID(), post))
require.NoError(t, nipost.AddPost(tab.localDb, sig.NodeID(), post))
initialPost := &types.Post{
Nonce: post.Nonce,
Indices: post.Indices,
Expand Down Expand Up @@ -706,8 +707,9 @@ func TestBuilder_PublishActivationTx_NoPrevATX_PublishFails_InitialPost_preserve
NumUnits: uint32(12),
CommitmentATX: types.RandomATXID(),
VRFNonce: types.VRFPostIndex(rand.Uint64()),
Challenge: shared.ZeroChallenge,
}
require.NoError(t, nipost.AddInitialPost(tab.localDb, sig.NodeID(), refPost))
require.NoError(t, nipost.AddPost(tab.localDb, sig.NodeID(), refPost))
initialPost := &types.Post{
Nonce: refPost.Nonce,
Indices: refPost.Indices,
Expand All @@ -728,12 +730,10 @@ func TestBuilder_PublishActivationTx_NoPrevATX_PublishFails_InitialPost_preserve
genesis := time.Now().Add(-time.Duration(currLayer) * layerDuration)
return genesis.Add(layerDuration * time.Duration(got))
}).AnyTimes()

tab.mnipost.EXPECT().
BuildNIPost(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, ErrATXChallengeExpired)
tab.mnipost.EXPECT().ResetState(sig.NodeID()).Return(nil)

ch := make(chan struct{})
tab.mclock.EXPECT().AwaitLayer(currLayer.Add(1)).Do(func(got types.LayerID) <-chan struct{} {
close(ch)
Expand All @@ -758,7 +758,7 @@ func TestBuilder_PublishActivationTx_NoPrevATX_PublishFails_InitialPost_preserve
}

// initial post is preserved
post, err := nipost.InitialPost(tab.localDB, sig.NodeID())
post, err := nipost.GetPost(tab.localDB, sig.NodeID())
require.NoError(t, err)
require.NotNil(t, post)
require.Equal(t, refPost, *post)
Expand Down Expand Up @@ -824,11 +824,13 @@ func TestBuilder_PublishActivationTx_PrevATXWithoutPrevATX(t *testing.T) {
LabelsPerUnit: DefaultPostConfig().LabelsPerUnit,
}, nil).AnyTimes()

tab.mnipost.EXPECT().BuildNIPost(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, _ *signing.EdSigner, _ types.EpochID, _ types.Hash32) (*nipost.NIPostState, error) {
currentLayer = currentLayer.Add(5)
return newNIPostWithPoet(t, poetBytes), nil
})
tab.mnipost.EXPECT().
BuildNIPost(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(
func(_ context.Context, _ *signing.EdSigner, _ types.EpochID, _ types.Hash32) (*nipost.NIPostState, error) {
currentLayer = currentLayer.Add(5)
return newNIPostWithPoet(t, poetBytes), nil
})

tab.mValidator.EXPECT().VerifyChain(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any())

Expand Down Expand Up @@ -938,8 +940,9 @@ func TestBuilder_PublishActivationTx_TargetsEpochBasedOnPosAtx(t *testing.T) {
NumUnits: uint32(12),
CommitmentATX: types.RandomATXID(),
VRFNonce: types.VRFPostIndex(rand.Uint64()),
Challenge: shared.ZeroChallenge,
}
require.NoError(t, nipost.AddInitialPost(tab.localDb, sig.NodeID(), post))
require.NoError(t, nipost.AddPost(tab.localDb, sig.NodeID(), post))
initialPost := &types.Post{
Nonce: post.Nonce,
Indices: post.Indices,
Expand Down Expand Up @@ -979,7 +982,9 @@ func TestBuilder_PublishActivationTx_FailsWhenNIPostBuilderFails(t *testing.T) {
return genesis.Add(layerDuration * time.Duration(got))
}).AnyTimes()
nipostErr := errors.New("NIPost builder error")
tab.mnipost.EXPECT().BuildNIPost(gomock.Any(), sig, gomock.Any(), gomock.Any()).Return(nil, nipostErr)
tab.mnipost.EXPECT().
BuildNIPost(gomock.Any(), sig, gomock.Any(), gomock.Any()).
Return(nil, nipostErr)
tab.mValidator.EXPECT().VerifyChain(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any())
require.ErrorIs(t, tab.PublishActivationTx(context.Background(), sig), nipostErr)

Expand Down Expand Up @@ -1027,7 +1032,6 @@ func TestBuilder_RetryPublishActivationTx(t *testing.T) {
require.NoError(t, atxs.Add(tab.db, toAtx(t, prevAtx)))
tab.atxsdata.AddFromAtx(toAtx(t, prevAtx), false)

publishEpoch := prevAtx.PublishEpoch + 1
currLayer := prevAtx.PublishEpoch.FirstLayer()
tab.mclock.EXPECT().CurrentLayer().DoAndReturn(func() types.LayerID { return currLayer }).AnyTimes()
tab.mclock.EXPECT().LayerToTime(gomock.Any()).DoAndReturn(
Expand All @@ -1038,7 +1042,7 @@ func TestBuilder_RetryPublishActivationTx(t *testing.T) {
}).AnyTimes()
done := make(chan struct{})
close(done)
tab.mclock.EXPECT().AwaitLayer(publishEpoch.FirstLayer()).DoAndReturn(
tab.mclock.EXPECT().AwaitLayer(prevAtx.PublishEpoch.Add(1).FirstLayer()).DoAndReturn(
func(got types.LayerID) <-chan struct{} {
// advance to publish layer
if currLayer.Before(got) {
Expand Down Expand Up @@ -1125,9 +1129,6 @@ func TestBuilder_RetryPublishActivationTx(t *testing.T) {
}

// state is cleaned up
_, err = nipost.InitialPost(tab.localDB, sig.NodeID())
require.ErrorIs(t, err, sql.ErrNotFound)

_, err = nipost.Challenge(tab.localDB, sig.NodeID())
require.ErrorIs(t, err, sql.ErrNotFound)
}
Expand Down Expand Up @@ -1169,7 +1170,8 @@ func TestBuilder_InitialProofGeneratedOnce(t *testing.T) {
tab.mValidator.EXPECT().Post(gomock.Any(), sig.NodeID(), post.CommitmentATX, initialPost, metadata, post.NumUnits)
require.NoError(t, tab.buildInitialPost(context.Background(), sig.NodeID()))
// postClient.Proof() should not be called again
require.NoError(t, tab.buildInitialPost(context.Background(), sig.NodeID()))
err := tab.buildInitialPost(context.Background(), sig.NodeID())
require.NoError(t, err)
}

func TestBuilder_InitialPostIsPersisted(t *testing.T) {
Expand Down Expand Up @@ -1201,10 +1203,12 @@ func TestBuilder_InitialPostIsPersisted(t *testing.T) {
LabelsPerUnit: tab.conf.LabelsPerUnit,
}
tab.mValidator.EXPECT().Post(gomock.Any(), sig.NodeID(), commitmentATX, initialPost, metadata, numUnits)
require.NoError(t, tab.buildInitialPost(context.Background(), sig.NodeID()))
err := tab.buildInitialPost(context.Background(), sig.NodeID())
require.NoError(t, err)

// postClient.Proof() should not be called again
require.NoError(t, tab.buildInitialPost(context.Background(), sig.NodeID()))
err = tab.buildInitialPost(context.Background(), sig.NodeID())
require.NoError(t, err)
}

func TestBuilder_InitialPostLogErrorMissingVRFNonce(t *testing.T) {
Expand Down Expand Up @@ -1234,7 +1238,8 @@ func TestBuilder_InitialPostLogErrorMissingVRFNonce(t *testing.T) {
LabelsPerUnit: tab.conf.LabelsPerUnit,
}
tab.mValidator.EXPECT().Post(gomock.Any(), sig.NodeID(), commitmentATX, initialPost, metadata, numUnits)
require.ErrorContains(t, tab.buildInitialPost(context.Background(), sig.NodeID()), "nil VRF nonce")
err := tab.buildInitialPost(context.Background(), sig.NodeID())
require.ErrorContains(t, err, "nil VRF nonce")

observedLogs := tab.observedLogs.FilterLevelExact(zapcore.ErrorLevel)
require.Equal(t, 1, observedLogs.Len(), "expected 1 log message")
Expand All @@ -1256,7 +1261,8 @@ func TestBuilder_InitialPostLogErrorMissingVRFNonce(t *testing.T) {
},
nil,
)
require.NoError(t, tab.buildInitialPost(context.Background(), sig.NodeID()))
err = tab.buildInitialPost(context.Background(), sig.NodeID())
require.NoError(t, err)
}

func TestWaitPositioningAtx(t *testing.T) {
Expand Down Expand Up @@ -1287,7 +1293,8 @@ func TestWaitPositioningAtx(t *testing.T) {
// everything else are stubs that are irrelevant for the test
tab.mpostClient.EXPECT().Info(gomock.Any()).Return(&types.PostInfo{}, nil).AnyTimes()
tab.mnipost.EXPECT().ResetState(sig.NodeID()).Return(nil)
tab.mnipost.EXPECT().BuildNIPost(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
tab.mnipost.EXPECT().
BuildNIPost(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&nipost.NIPostState{}, nil)
closed := make(chan struct{})
close(closed)
Expand All @@ -1309,8 +1316,9 @@ func TestWaitPositioningAtx(t *testing.T) {
NumUnits: uint32(12),
CommitmentATX: types.RandomATXID(),
VRFNonce: types.VRFPostIndex(rand.Uint64()),
Challenge: shared.ZeroChallenge,
}
require.NoError(t, nipost.AddInitialPost(tab.localDb, sig.NodeID(), post))
require.NoError(t, nipost.AddPost(tab.localDb, sig.NodeID(), post))
initialPost := &types.Post{
Nonce: post.Nonce,
Indices: post.Indices,
Expand Down
Loading

0 comments on commit 66a61f1

Please sign in to comment.