Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Startpost rpc #1312

Merged
merged 16 commits into from
Aug 15, 2019
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
83 changes: 73 additions & 10 deletions activation/activation.go
Expand Up @@ -7,7 +7,9 @@ import (
"github.com/spacemeshos/go-spacemesh/common"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/types"
"sync"
"sync/atomic"
"time"
)

const AtxProtocol = "AtxGossip"
Expand Down Expand Up @@ -38,7 +40,7 @@ func (provider *PoETNumberOfTickProvider) NumOfTicks() uint64 {
type NipstBuilder interface {
BuildNIPST(challenge *common.Hash) (*types.NIPST, error)
IsPostInitialized() bool
InitializePost() (*types.PostProof, error)
InitializePost(logicalDrive string, commitmentSize uint64) (*types.PostProof, error)
}

type IdStore interface {
Expand All @@ -62,6 +64,12 @@ type BytesStore interface {
Get(key []byte) ([]byte, error)
}

const (
Idle = 1 + iota
InProgress
Done
)

type Builder struct {
nodeId types.NodeId
coinbaseAccount address.Address
Expand All @@ -82,6 +90,10 @@ type Builder struct {
started uint32
store BytesStore
isSynced func() bool
accountLock sync.RWMutex
postInitLock sync.RWMutex
initStatus int
errorTimeoutMs time.Duration
log log.Log
}

Expand All @@ -101,6 +113,8 @@ func NewBuilder(nodeId types.NodeId, coinbaseAccount address.Address, db ATXDBPr
finished: make(chan struct{}),
isSynced: isSyncedFunc,
store: store,
initStatus: Idle,
errorTimeoutMs: 100,
log: log,
}
}
Expand All @@ -121,14 +135,6 @@ func (b *Builder) Stop() {

// loop is the main loop that tries to create an atx per tick received from the global clock
func (b *Builder) loop() {
// post is initialized here, consider moving it to another location.
if !b.nipstBuilder.IsPostInitialized() {
_, err := b.nipstBuilder.InitializePost() // TODO: add proof to first ATX
if err != nil {
b.log.Error("PoST initialization failed: %v", err)
return
}
}
err := b.loadChallenge()
if err != nil {
log.Info("challenge not loaded: %s", err)
Expand All @@ -142,6 +148,13 @@ func (b *Builder) loop() {
b.log.Info("cannot create atx : not synced")
break
}
b.postInitLock.RLock()
initStat := b.initStatus
b.postInitLock.RUnlock()
if initStat != Done || !b.nipstBuilder.IsPostInitialized() {
moshababo marked this conversation as resolved.
Show resolved Hide resolved
b.log.Info("post is not initialized yet, not building nipst")
break
}
if b.working {
break
}
Expand Down Expand Up @@ -228,6 +241,56 @@ func (b *Builder) buildNipstChallenge(epoch types.EpochId) error {
return nil
}

func (b *Builder) StartPost(rewardAddress address.Address, dataDir string, space uint64) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You changed logicalDrive to dataDir and commitmentSize to space here but not in many other places. It should all be the same, and if the context is lost, you can do postDatadir & postSpace, or something similar.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are many more leftovers.

b.log.Info("Starting post, reward address: %x", rewardAddress)
b.SetCoinbaseAccount(rewardAddress)
b.postInitLock.Lock()
if b.initStatus == Done {
return nil
}
if b.initStatus != Idle {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last time I wrote:

It might be worth to check for InProgress status directly, instead of assuming that's the case if we're not on Idle && IsPostInitialized() returns false. Whatever you think.

Now after IsPostInitialized() removal, you're not covering the Done case.
So this need to be fixed, and also you should set the Done statue value if you're getting "already initialized" err from the initializer. Also, IMO "already initialized" should not be appended to "PoST initialization failed: " string.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not to do this now as if b.initStatus == InProgress?

b.postInitLock.Unlock()
return fmt.Errorf("attempted to start post when post already initiated")
}
b.initStatus = InProgress
errChan := make(chan error, 1)
go func() {
_, err := b.nipstBuilder.InitializePost(dataDir, space) // TODO: add proof to first ATX
b.postInitLock.Lock()
if err != nil {
b.initStatus = Idle
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're not inspecting "already initialized" to set a Done status.
If you don't want to do it now, leave a TODO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no "already initialized error, we use the initStatus flag to know this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring to an error which comes from go-spacemesh/post initializer. And my mistake - it will return "already completed" and not "already initialized" (that was the old version).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then I think we should not return an error in such a case, and simply return the proof, I think It's not the best idea to keep track for specific error messages in between repos, since this can easily break

b.postInitLock.Unlock()
b.log.Error(err.Error())
errChan <- err
return
}
b.initStatus = Done
b.postInitLock.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for not doing the unlock before launching the goroutine?

}()
b.postInitLock.Unlock()
// todo: refactor initializePost to be concurrent, returning error in a blocking manner
t := time.NewTimer(b.errorTimeoutMs * time.Millisecond)
select {
case <-t.C:
return nil
case err := <-errChan:
return err
}
}

func (b *Builder) SetCoinbaseAccount(rewardAddress address.Address) {
b.accountLock.Lock()
b.coinbaseAccount = rewardAddress
b.accountLock.Unlock()
}

func (b *Builder) getCoinbaseAccount() address.Address {
b.accountLock.RLock()
acc := b.coinbaseAccount
b.accountLock.RUnlock()
return acc
}

func (b Builder) getNipstKey() []byte {
return []byte("nipst")
}
Expand Down Expand Up @@ -332,7 +395,7 @@ func (b *Builder) PublishActivationTx(epoch types.EpochId) error {
pubEpoch, len(view), view)
}

atx := types.NewActivationTxWithChallenge(*b.challenge, b.coinbaseAccount, activeSetSize, view, b.nipst)
atx := types.NewActivationTxWithChallenge(*b.challenge, b.getCoinbaseAccount(), activeSetSize, view, b.nipst)

b.log.With().Info("active ids seen for epoch", log.Uint64("pos_atx_epoch", uint64(posEpoch)),
log.Uint32("view_cnt", activeSetSize))
Expand Down
58 changes: 52 additions & 6 deletions activation/activation_test.go
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/require"
"sort"
"testing"
"time"
)

// ========== Vars / Consts ==========
Expand Down Expand Up @@ -71,15 +72,24 @@ func (n *NetMock) Broadcast(id string, d []byte) error {
}

type NipstBuilderMock struct {
poetRef []byte
buildNipstFunc func(challenge *common.Hash) (*types.NIPST, error)
poetRef []byte
buildNipstFunc func(challenge *common.Hash) (*types.NIPST, error)
initPostFunc func(logicalDrive string, commitmentSize uint64) (*types.PostProof, error)
SleepTime int
PostInitialized bool
}

func (np *NipstBuilderMock) IsPostInitialized() bool {
return true
return np.PostInitialized
}

func (np *NipstBuilderMock) InitializePost() (*types.PostProof, error) {
func (np *NipstBuilderMock) InitializePost(logicalDrive string, commitmentSize uint64) (*types.PostProof, error) {
if np.initPostFunc != nil {
return np.initPostFunc(logicalDrive, commitmentSize)
}
if np.SleepTime != 0 {
time.Sleep(time.Duration(np.SleepTime) * time.Millisecond)
}
return nil, nil
}

Expand All @@ -97,7 +107,7 @@ func (np *NipstErrBuilderMock) IsPostInitialized() bool {
return true
}

func (np *NipstErrBuilderMock) InitializePost() (*types.PostProof, error) {
func (np *NipstErrBuilderMock) InitializePost(logicalDrive string, commitmentSize uint64) (*types.PostProof, error) {
return nil, nil
}

Expand Down Expand Up @@ -432,7 +442,7 @@ func TestBuilder_NipstPublishRecovery(t *testing.T) {
coinbase := address.HexToAddress("0xaaa")
net := &NetMock{}
layers := &MeshProviderMock{}
nipstBuilder := &NipstBuilderMock{}
nipstBuilder := &NipstBuilderMock{PostInitialized: true}
layersPerEpoch := uint16(10)
lg := log.NewDefault(id.Key[:5])
db := NewMockDB()
Expand Down Expand Up @@ -494,3 +504,39 @@ func TestBuilder_NipstPublishRecovery(t *testing.T) {
assert.EqualError(t, err, "cannot find pos atx in epoch 3: cannot find pos atx id: current posAtx (epoch 1) does not belong to the requested epoch (3)")
assert.True(t, db.hadNone)
}

func TestStartPost(t *testing.T) {
id := types.NodeId{"aaaaaa", []byte("bbbbb")}
coinbase := address.HexToAddress("0xaaa")
//net := &NetMock{}
layers := &MeshProviderMock{}
nipstBuilder := &NipstBuilderMock{PostInitialized: false}
layersPerEpoch := uint16(10)
lg := log.NewDefault(id.Key[:5])

drive := "/tmp/anton"
coinbase2 := address.HexToAddress("0xabb")
db := NewMockDB()
activationDb := NewActivationDb(database.NewMemDatabase(), &MockIdStore{}, mesh.NewMemMeshDB(lg.WithName("meshDB")), layersPerEpoch, &ValidatorMock{}, lg.WithName("atxDB1"))
b := NewBuilder(id, coinbase, activationDb, &FaultyNetMock{}, layers, layersPerEpoch, nipstBuilder, nil, func() bool { return true }, db, lg.WithName("atxBuilder"))

nipstBuilder.SleepTime = 10000
err := b.StartPost(coinbase2, drive, 1000)
assert.NoError(t, err)

// negative test to run
err = b.StartPost(coinbase2, drive, 1000)
assert.EqualError(t, err, "attempted to start post when post already initiated")

nipstBuilder.initPostFunc = func(logicalDrive string, commitmentSize uint64) (*types.PostProof, error) {
return nil, fmt.Errorf("error")
}

// negative test for error in buildNipst - this might not work in travis so I've commented it out
b = NewBuilder(id, coinbase, activationDb, &FaultyNetMock{}, layers, layersPerEpoch, nipstBuilder, nil, func() bool { return true }, db, lg.WithName("atxBuilder"))
b.errorTimeoutMs = 1000
nipstBuilder.PostInitialized = false
err = b.StartPost(coinbase2, drive, 1000)
assert.EqualError(t, err, "error")

}
16 changes: 16 additions & 0 deletions address/address.go
Expand Up @@ -30,6 +30,22 @@ func BigToAddress(b *big.Int) Address { return BytesToAddress(b.Bytes()) }
// If s is larger than len(h), s will be cropped from the left.
func HexToAddress(s string) Address { return BytesToAddress(common.FromHex(s)) }

func StringToAddress(s string) (Address, error) {
if len(s) > 1 {
if s[0:2] == "0x" || s[0:2] == "0X" {
s = s[2:]
}
}
if len(s)%2 == 1 {
s = "0" + s
}
bt, err := hex.DecodeString(s)
if err != nil {
return Address{}, err
}
return BytesToAddress(bt), nil
}

// Bytes gets the string representation of the underlying address.
func (a Address) Bytes() []byte { return a[:] }

Expand Down