Skip to content

Commit

Permalink
Startpost rpc (#1312)
Browse files Browse the repository at this point in the history
* Added GRPC for initialization of post, submittransaction now returns tx id alongside ok message
closing #1247

* added support for next layer rewards RPC

* added reset to nipst in UT

* added flag for init post on node init, added this flag to automation

* after fmt

* added prints and flag for init post on node init

* after CR

* after fmt

* fixed merge error

* more changes after cr

* added todos in grpc server api calls

* renaming commitment size to space and logical drive to dataDir

* after more fixes

* renaming UT as well

* go mod tidy
  • Loading branch information
antonlerner committed Aug 15, 2019
1 parent 9e4830a commit 547510f
Show file tree
Hide file tree
Showing 25 changed files with 479 additions and 102 deletions.
83 changes: 73 additions & 10 deletions activation/activation.go
Expand Up @@ -7,7 +7,9 @@ import (
"github.com/spacemeshos/go-spacemesh/common"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/types"
"sync"
"sync/atomic"
"time"
)

const AtxProtocol = "AtxGossip"
Expand Down Expand Up @@ -38,7 +40,7 @@ func (provider *PoETNumberOfTickProvider) NumOfTicks() uint64 {
type NipstBuilder interface {
BuildNIPST(challenge *common.Hash) (*types.NIPST, error)
IsPostInitialized() bool
InitializePost() (*types.PostProof, error)
InitializePost(logicalDrive string, commitmentSize uint64) (*types.PostProof, error)
}

type IdStore interface {
Expand All @@ -62,6 +64,12 @@ type BytesStore interface {
Get(key []byte) ([]byte, error)
}

const (
Idle = 1 + iota
InProgress
Done
)

type Builder struct {
nodeId types.NodeId
coinbaseAccount address.Address
Expand All @@ -82,6 +90,10 @@ type Builder struct {
started uint32
store BytesStore
isSynced func() bool
accountLock sync.RWMutex
postInitLock sync.RWMutex
initStatus int
errorTimeoutMs time.Duration
log log.Log
}

Expand All @@ -101,6 +113,8 @@ func NewBuilder(nodeId types.NodeId, coinbaseAccount address.Address, db ATXDBPr
finished: make(chan struct{}),
isSynced: isSyncedFunc,
store: store,
initStatus: Idle,
errorTimeoutMs: 100,
log: log,
}
}
Expand All @@ -121,14 +135,6 @@ func (b *Builder) Stop() {

// loop is the main loop that tries to create an atx per tick received from the global clock
func (b *Builder) loop() {
// post is initialized here, consider moving it to another location.
if !b.nipstBuilder.IsPostInitialized() {
_, err := b.nipstBuilder.InitializePost() // TODO: add proof to first ATX
if err != nil {
b.log.Error("PoST initialization failed: %v", err)
return
}
}
err := b.loadChallenge()
if err != nil {
log.Info("challenge not loaded: %s", err)
Expand All @@ -142,6 +148,13 @@ func (b *Builder) loop() {
b.log.Info("cannot create atx : not synced")
break
}
b.postInitLock.RLock()
initStat := b.initStatus
b.postInitLock.RUnlock()
if initStat != Done || !b.nipstBuilder.IsPostInitialized() {
b.log.Info("post is not initialized yet, not building nipst")
break
}
if b.working {
break
}
Expand Down Expand Up @@ -228,6 +241,56 @@ func (b *Builder) buildNipstChallenge(epoch types.EpochId) error {
return nil
}

func (b *Builder) StartPost(rewardAddress address.Address, dataDir string, space uint64) error {
b.log.Info("Starting post, reward address: %x", rewardAddress)
b.SetCoinbaseAccount(rewardAddress)
b.postInitLock.Lock()
if b.initStatus == Done {
return nil
}
if b.initStatus == InProgress {
b.postInitLock.Unlock()
return fmt.Errorf("attempted to start post when post already initiated")
}
b.initStatus = InProgress
errChan := make(chan error, 1)
go func() {
_, err := b.nipstBuilder.InitializePost(dataDir, space) // TODO: add proof to first ATX
b.postInitLock.Lock()
if err != nil {
b.initStatus = Idle
b.postInitLock.Unlock()
b.log.Error(err.Error())
errChan <- err
return
}
b.initStatus = Done
b.postInitLock.Unlock()
}()
b.postInitLock.Unlock()
// todo: refactor initializePost to be concurrent, returning error in a blocking manner
t := time.NewTimer(b.errorTimeoutMs * time.Millisecond)
select {
case <-t.C:
return nil
case err := <-errChan:
return err
}
}

func (b *Builder) SetCoinbaseAccount(rewardAddress address.Address) {
b.accountLock.Lock()
b.coinbaseAccount = rewardAddress
b.accountLock.Unlock()
}

func (b *Builder) getCoinbaseAccount() address.Address {
b.accountLock.RLock()
acc := b.coinbaseAccount
b.accountLock.RUnlock()
return acc
}

func (b Builder) getNipstKey() []byte {
return []byte("nipst")
}
Expand Down Expand Up @@ -332,7 +395,7 @@ func (b *Builder) PublishActivationTx(epoch types.EpochId) error {
pubEpoch, len(view), view)
}

atx := types.NewActivationTxWithChallenge(*b.challenge, b.coinbaseAccount, activeSetSize, view, b.nipst)
atx := types.NewActivationTxWithChallenge(*b.challenge, b.getCoinbaseAccount(), activeSetSize, view, b.nipst)

b.log.With().Info("active ids seen for epoch", log.Uint64("pos_atx_epoch", uint64(posEpoch)),
log.Uint32("view_cnt", activeSetSize))
Expand Down
58 changes: 52 additions & 6 deletions activation/activation_test.go
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/require"
"sort"
"testing"
"time"
)

// ========== Vars / Consts ==========
Expand Down Expand Up @@ -71,15 +72,24 @@ func (n *NetMock) Broadcast(id string, d []byte) error {
}

type NipstBuilderMock struct {
poetRef []byte
buildNipstFunc func(challenge *common.Hash) (*types.NIPST, error)
poetRef []byte
buildNipstFunc func(challenge *common.Hash) (*types.NIPST, error)
initPostFunc func(logicalDrive string, commitmentSize uint64) (*types.PostProof, error)
SleepTime int
PostInitialized bool
}

func (np *NipstBuilderMock) IsPostInitialized() bool {
return true
return np.PostInitialized
}

func (np *NipstBuilderMock) InitializePost() (*types.PostProof, error) {
func (np *NipstBuilderMock) InitializePost(datadir string, space uint64) (*types.PostProof, error) {
if np.initPostFunc != nil {
return np.initPostFunc(datadir, space)
}
if np.SleepTime != 0 {
time.Sleep(time.Duration(np.SleepTime) * time.Millisecond)
}
return nil, nil
}

Expand All @@ -97,7 +107,7 @@ func (np *NipstErrBuilderMock) IsPostInitialized() bool {
return true
}

func (np *NipstErrBuilderMock) InitializePost() (*types.PostProof, error) {
func (np *NipstErrBuilderMock) InitializePost(dataDir string, space uint64) (*types.PostProof, error) {
return nil, nil
}

Expand Down Expand Up @@ -432,7 +442,7 @@ func TestBuilder_NipstPublishRecovery(t *testing.T) {
coinbase := address.HexToAddress("0xaaa")
net := &NetMock{}
layers := &MeshProviderMock{}
nipstBuilder := &NipstBuilderMock{}
nipstBuilder := &NipstBuilderMock{PostInitialized: true}
layersPerEpoch := uint16(10)
lg := log.NewDefault(id.Key[:5])
db := NewMockDB()
Expand Down Expand Up @@ -494,3 +504,39 @@ func TestBuilder_NipstPublishRecovery(t *testing.T) {
assert.EqualError(t, err, "cannot find pos atx in epoch 3: cannot find pos atx id: current posAtx (epoch 1) does not belong to the requested epoch (3)")
assert.True(t, db.hadNone)
}

func TestStartPost(t *testing.T) {
id := types.NodeId{"aaaaaa", []byte("bbbbb")}
coinbase := address.HexToAddress("0xaaa")
//net := &NetMock{}
layers := &MeshProviderMock{}
nipstBuilder := &NipstBuilderMock{PostInitialized: false}
layersPerEpoch := uint16(10)
lg := log.NewDefault(id.Key[:5])

drive := "/tmp/anton"
coinbase2 := address.HexToAddress("0xabb")
db := NewMockDB()
activationDb := NewActivationDb(database.NewMemDatabase(), &MockIdStore{}, mesh.NewMemMeshDB(lg.WithName("meshDB")), layersPerEpoch, &ValidatorMock{}, lg.WithName("atxDB1"))
b := NewBuilder(id, coinbase, activationDb, &FaultyNetMock{}, layers, layersPerEpoch, nipstBuilder, nil, func() bool { return true }, db, lg.WithName("atxBuilder"))

nipstBuilder.SleepTime = 10000
err := b.StartPost(coinbase2, drive, 1000)
assert.NoError(t, err)

// negative test to run
err = b.StartPost(coinbase2, drive, 1000)
assert.EqualError(t, err, "attempted to start post when post already initiated")

nipstBuilder.initPostFunc = func(logicalDrive string, commitmentSize uint64) (*types.PostProof, error) {
return nil, fmt.Errorf("error")
}

// negative test for error in buildNipst - this might not work in travis so I've commented it out
b = NewBuilder(id, coinbase, activationDb, &FaultyNetMock{}, layers, layersPerEpoch, nipstBuilder, nil, func() bool { return true }, db, lg.WithName("atxBuilder"))
b.errorTimeoutMs = 1000
nipstBuilder.PostInitialized = false
err = b.StartPost(coinbase2, drive, 1000)
assert.EqualError(t, err, "error")

}
16 changes: 16 additions & 0 deletions address/address.go
Expand Up @@ -30,6 +30,22 @@ func BigToAddress(b *big.Int) Address { return BytesToAddress(b.Bytes()) }
// If s is larger than len(h), s will be cropped from the left.
func HexToAddress(s string) Address { return BytesToAddress(common.FromHex(s)) }

func StringToAddress(s string) (Address, error) {
if len(s) > 1 {
if s[0:2] == "0x" || s[0:2] == "0X" {
s = s[2:]
}
}
if len(s)%2 == 1 {
s = "0" + s
}
bt, err := hex.DecodeString(s)
if err != nil {
return Address{}, err
}
return BytesToAddress(bt), nil
}

// Bytes gets the string representation of the underlying address.
func (a Address) Bytes() []byte { return a[:] }

Expand Down

0 comments on commit 547510f

Please sign in to comment.