New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ATX Integration #820
ATX Integration #820
Changes from all commits
ff3ff00
d9b5694
3d8e2c7
209ceff
61411e5
eebe5cf
6159b6a
780f748
1047d14
907293c
79c4648
0305d22
2ccd84f
e795a3e
dd79313
1944d31
7bee9b7
9951478
c05ee67
027d780
34756d3
e58fb6d
39c0f9d
d61372e
f3c1571
5110960
8db2da8
b84ba82
8ec3ec6
8af6936
aabe61c
11bda41
e84c01f
58b829d
643b385
6a03f01
6636034
fc5a7bd
d342a82
5f0d2ed
21abc98
d7a74d7
0f04113
69f82c7
683a5ee
4a8cd3e
d25a58c
b48c8f4
83139be
51eea66
d67ccff
8930d05
3fcc9f3
2bebe94
316dbc1
d250c97
ffd23fe
bbe354d
2ea5c96
dcde9ea
e6704f9
da4eae6
683d213
ce21f4e
dfb0c33
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,25 +2,25 @@ package activation | |
|
||
import ( | ||
"fmt" | ||
"github.com/spacemeshos/go-spacemesh/database" | ||
"github.com/spacemeshos/go-spacemesh/common" | ||
"github.com/spacemeshos/go-spacemesh/log" | ||
"github.com/spacemeshos/go-spacemesh/mesh" | ||
"github.com/spacemeshos/go-spacemesh/nipst" | ||
"github.com/spacemeshos/go-spacemesh/rand" | ||
"github.com/spacemeshos/go-spacemesh/types" | ||
"sync/atomic" | ||
) | ||
|
||
const AtxProtocol = "AtxGossip" | ||
|
||
var activesetCache = NewActivesetCache(1000) | ||
|
||
type ActiveSetProvider interface { | ||
GetActiveSetSize(l types.LayerID) uint32 | ||
ActiveSetSize(l types.EpochId) uint32 | ||
} | ||
|
||
type MeshProvider interface { | ||
GetLatestView() []types.BlockID | ||
LatestLayerId() types.LayerID | ||
LatestLayer() types.LayerID | ||
} | ||
|
||
type EpochProvider interface { | ||
|
@@ -39,113 +39,218 @@ func (provider *PoETNumberOfTickProvider) NumOfTicks() uint64 { | |
} | ||
|
||
type NipstBuilder interface { | ||
BuildNIPST(challange []byte) (*nipst.NIPST, error) | ||
BuildNIPST(challenge *common.Hash) (*nipst.NIPST, error) | ||
IsPostInitialized() bool | ||
InitializePost() (*nipst.PostProof, error) | ||
} | ||
|
||
type IdStore interface { | ||
StoreNodeIdentity(id types.NodeId) error | ||
GetIdentity(id string) (types.NodeId, error) | ||
} | ||
|
||
type NipstValidator interface { | ||
Validate(nipst *nipst.NIPST, expectedChallenge common.Hash) error | ||
} | ||
|
||
type Builder struct { | ||
nodeId types.NodeId | ||
db *ActivationDb | ||
net Broadcaster | ||
activeSet ActiveSetProvider | ||
mesh MeshProvider | ||
epochProvider EpochProvider | ||
layersPerEpoch uint64 | ||
layersPerEpoch uint16 | ||
tickProvider PoETNumberOfTickProvider | ||
nipstBuilder NipstBuilder | ||
challenge *types.NIPSTChallenge | ||
nipst *nipst.NIPST | ||
posLayerID types.LayerID | ||
prevATX *types.ActivationTx | ||
timer chan types.LayerID | ||
stop chan struct{} | ||
finished chan struct{} | ||
working bool | ||
started uint32 | ||
log log.Log | ||
} | ||
|
||
type Processor struct { | ||
db *ActivationDb | ||
epochProvider EpochProvider | ||
} | ||
|
||
func NewBuilder(nodeId types.NodeId, db database.DB, meshdb *mesh.MeshDB, net Broadcaster, activeSet ActiveSetProvider, view MeshProvider, epochDuration EpochProvider, layersPerEpoch uint64, nipstBuilder NipstBuilder, identityStore IdStore) *Builder { | ||
func NewBuilder(nodeId types.NodeId, db *ActivationDb, net Broadcaster, activeSet ActiveSetProvider, view MeshProvider, | ||
layersPerEpoch uint16, nipstBuilder NipstBuilder, layerClock chan types.LayerID, log log.Log) *Builder { | ||
|
||
return &Builder{ | ||
nodeId, | ||
NewActivationDb(db, identityStore, meshdb, layersPerEpoch), | ||
net, | ||
activeSet, | ||
view, | ||
epochDuration, | ||
layersPerEpoch, | ||
PoETNumberOfTickProvider{}, | ||
nipstBuilder, | ||
nodeId: nodeId, | ||
db: db, | ||
net: net, | ||
activeSet: activeSet, | ||
mesh: view, | ||
layersPerEpoch: layersPerEpoch, | ||
nipstBuilder: nipstBuilder, | ||
timer: layerClock, | ||
stop: make(chan struct{}), | ||
finished: make(chan struct{}), | ||
log: log, | ||
} | ||
} | ||
|
||
func (b *Builder) PublishActivationTx(epoch types.EpochId) error { | ||
prevAtx, err := b.GetPrevAtxId(b.nodeId) | ||
seq := uint64(0) | ||
if err == nil { | ||
atx, err := b.db.GetAtx(*prevAtx) | ||
func (b *Builder) Start() { | ||
if atomic.LoadUint32(&b.started) == 1 { | ||
return | ||
} | ||
atomic.StoreUint32(&b.started, 1) | ||
go b.loop() | ||
} | ||
|
||
func (b *Builder) Stop() { | ||
b.finished <- struct{}{} | ||
} | ||
|
||
func (b *Builder) loop() { | ||
if !b.nipstBuilder.IsPostInitialized() { | ||
_, err := b.nipstBuilder.InitializePost() // TODO: add proof to first ATX | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks a bit weird to keep There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. where do you think we should initialise the post? I see it as a neccsary operation before trying to create an ATX, if you have other ides do share There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The initialization call should stay there, but the implementation can be outside of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lets do so in following refactoring |
||
if err != nil { | ||
return err | ||
b.log.Error("PoST initialization failed: %v", err) | ||
return | ||
} | ||
seq = atx.Sequence + 1 | ||
} | ||
for { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The loop starts here. Rather remove the post initialization to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. loop should not start unless post is initialised, and all of which should be done in a concurrent manner, should I change the name of loop? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can either move the initialization to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't want Start to block on post initialisation |
||
select { | ||
case <-b.stop: | ||
return | ||
case layer := <-b.timer: | ||
if b.working { | ||
break | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the purpose of this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. to not allow multiple ATXs to be created. we try to build an atx whenever a layer begind, if we are in the process of doint so we sleep until next layer |
||
} | ||
b.working = true | ||
go func() { | ||
epoch := layer.GetEpoch(b.layersPerEpoch) | ||
err := b.PublishActivationTx(epoch) | ||
if err != nil { | ||
b.log.Error("cannot create atx : %v in current epoch %v", err, epoch) | ||
} | ||
b.finished <- struct{}{} | ||
}() | ||
case <-b.finished: | ||
b.working = false | ||
} | ||
} | ||
} | ||
|
||
func (b *Builder) PublishActivationTx(epoch types.EpochId) error { | ||
if b.nipst != nil { | ||
b.log.Info("re-entering atx creation in epoch %v", epoch) | ||
|
||
} else { | ||
prevAtx = &types.EmptyAtx | ||
} | ||
posAtxId := &types.EmptyAtx | ||
endTick := uint64(0) | ||
LayerIdx := b.mesh.LatestLayerId() | ||
if epoch > 0 { | ||
//positioning atx is from the last epoch | ||
posAtxId, err = b.GetPositioningAtxId(epoch - 1) | ||
b.log.Info("starting build atx in epoch %v", epoch) | ||
if b.prevATX == nil { | ||
prevAtxId, err := b.GetPrevAtxId(b.nodeId) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you are correct, we are now assuming no adversary behaviour and will fix this once we merge this code |
||
if err == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should handle both "no prev ATX found" cases equally. At the moment the first one isn't logged. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is ok for a node not to have a previous atx, however is is not ok for a node to have and atx id and not have the actual atx in the db |
||
b.prevATX, err = b.db.GetAtx(*prevAtxId) | ||
if err != nil { | ||
b.log.Info("no prev ATX found, starting fresh") | ||
} | ||
} | ||
} | ||
seq := uint64(0) | ||
prevAtxId := *types.EmptyAtxId | ||
if b.prevATX != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Better to handle this condition in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't shink this simplifies the flow, the default behaviour is set first, and then when we have a previous atx we override the default with it. |
||
seq = b.prevATX.Sequence + 1 | ||
|
||
//check if this node hasn't published an activation already | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line space before this one. |
||
if b.prevATX.PubLayerIdx.GetEpoch(b.layersPerEpoch) == epoch+1 { | ||
return fmt.Errorf("atx already created for epoch %v", epoch) | ||
} | ||
prevAtxId = b.prevATX.Id() | ||
} | ||
posAtxEndTick := uint64(0) | ||
b.posLayerID = types.LayerID(0) | ||
|
||
//positioning atx is from this epoch as well, since we will be publishing the atx in the next epoch | ||
//todo: what if no other atx was received in this epoch yet? | ||
posAtxId := *types.EmptyAtxId | ||
posAtx, err := b.GetPositioningAtx(epoch) | ||
atxPubLayerID := types.LayerID(0) | ||
if err == nil { | ||
posAtxEndTick = posAtx.EndTick | ||
b.posLayerID = posAtx.PubLayerIdx | ||
atxPubLayerID = b.posLayerID.Add(b.layersPerEpoch) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need to check whether atx should commit to an epoch instead of a layer (If I remember correctly Barak said so). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is not decided yet. this implementation relies on the spacemesh protocol as it is currently described. |
||
posAtxId = posAtx.Id() | ||
} else { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handle the error first. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. error is handled by exiting flow, only when not in genesis epoch, otherwise default params are in place |
||
if !epoch.IsGenesis() { | ||
return fmt.Errorf("cannot find pos atx: %v", err) | ||
} | ||
} | ||
|
||
b.challenge = &types.NIPSTChallenge{ | ||
NodeId: b.nodeId, | ||
Sequence: seq, | ||
PrevATXId: prevAtxId, | ||
PubLayerIdx: atxPubLayerID, | ||
StartTick: posAtxEndTick, | ||
EndTick: b.tickProvider.NumOfTicks(), //todo: add provider when | ||
PositioningAtx: posAtxId, | ||
} | ||
|
||
hash, err := b.challenge.Hash() | ||
if err != nil { | ||
return err | ||
return fmt.Errorf("getting challenge hash failed: %v", err) | ||
} | ||
posAtx, err := b.db.GetAtx(*posAtxId) | ||
b.nipst, err = b.nipstBuilder.BuildNIPST(hash) | ||
if err != nil { | ||
return err | ||
return fmt.Errorf("cannot create nipst " + err.Error()) | ||
} | ||
endTick = posAtx.EndTick | ||
} | ||
|
||
challenge := types.PoETChallenge{ | ||
NodeId: b.nodeId, | ||
Sequence: seq, | ||
PrevATXId: *prevAtx, | ||
LayerIdx: types.LayerID(uint64(LayerIdx) + b.layersPerEpoch), | ||
StartTick: endTick, | ||
EndTick: b.tickProvider.NumOfTicks(), //todo: add provider when | ||
PositioningAtx: *posAtxId, | ||
if b.mesh.LatestLayer().GetEpoch(b.layersPerEpoch) < b.challenge.PubLayerIdx.GetEpoch(b.layersPerEpoch) { | ||
return fmt.Errorf("an epoch has not passed during nipst creation, current: %v wanted: %v", b.mesh.LatestLayer().GetEpoch(b.layersPerEpoch), b.challenge.PubLayerIdx.GetEpoch(b.layersPerEpoch)) | ||
} | ||
|
||
data, err := challenge.ToBytes() | ||
if err != nil { | ||
return err | ||
// when we reach here an epoch has passed | ||
// we've completed the sequential work, now before publishing the atx, | ||
// we need to provide number of atx seen in the epoch of the positioning atx. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Organize this comment clause, it looks weird/funny. |
||
activeIds := uint32(b.activeSet.ActiveSetSize(b.posLayerID.GetEpoch(b.layersPerEpoch))) | ||
b.log.Info("active ids seen for epoch of the pos atx (epoch: %v) is %v", b.posLayerID.GetEpoch(b.layersPerEpoch), activeIds) | ||
atx := types.NewActivationTxWithChallenge(*b.challenge, activeIds, b.mesh.GetLatestView(), b.nipst, true) | ||
activeSetSize, err := b.db.CalcActiveSetFromView(atx) | ||
if epoch != 0 && (activeSetSize == 0 || activeSetSize != atx.ActiveSetSize) { | ||
b.log.Panic("empty active set size found! len(view): %d, view: %v", len(atx.View), atx.View) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you want to crash the node here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. currently, yes |
||
} | ||
npst, err := b.nipstBuilder.BuildNIPST(data) | ||
buf, err := types.AtxAsBytes(atx) | ||
if err != nil { | ||
return err | ||
} | ||
//todo: check if view should be latest layer -1 | ||
atx := types.NewActivationTxWithcChallenge(challenge, b.activeSet.GetActiveSetSize(b.mesh.LatestLayerId()-1), b.mesh.GetLatestView(), npst) | ||
b.prevATX = atx | ||
|
||
buf, err := types.AtxAsBytes(atx) | ||
// cleanup state | ||
b.nipst = nil | ||
b.challenge = nil | ||
b.posLayerID = 0 | ||
|
||
err = b.net.Broadcast(AtxProtocol, buf) | ||
if err != nil { | ||
return err | ||
} | ||
//todo: should we do something about it? wait for something? | ||
return b.net.Broadcast(AtxProtocol, buf) | ||
|
||
b.log.Info("atx published! id: %v, prevATXID: %v, posATXID: %v, layer: %v, published in epoch: %v, active set: %v miner: %v view %v", | ||
atx.Id().String()[2:7], atx.PrevATXId.String()[2:7], atx.PositioningAtx.String()[2:7], atx.PubLayerIdx, | ||
atx.PubLayerIdx.GetEpoch(b.layersPerEpoch), atx.ActiveSetSize, b.nodeId.Key[:5], len(atx.View)) | ||
return nil | ||
} | ||
|
||
func (b *Builder) Persist(c *types.PoETChallenge) { | ||
func (b *Builder) Persist(c *types.NIPSTChallenge) { | ||
//todo: implement storing to persistent media | ||
} | ||
|
||
func (b *Builder) Load() *types.PoETChallenge { | ||
func (b *Builder) Load() *types.NIPSTChallenge { | ||
//todo: implement loading from persistent media | ||
return nil | ||
} | ||
|
||
func (b *Builder) GetPrevAtxId(node types.NodeId) (*types.AtxId, error) { | ||
//todo: make sure atx ids are ordered and valid | ||
ids, err := b.db.GetNodeAtxIds(node) | ||
if err != nil { | ||
return nil, err | ||
|
@@ -174,8 +279,25 @@ func (b *Builder) GetLastSequence(node types.NodeId) uint64 { | |
} | ||
atx, err := b.db.GetAtx(*atxId) | ||
if err != nil { | ||
log.Error("wtf no atx in db %v", *atxId) | ||
b.log.Error("wtf no atx in db %v", *atxId) | ||
return 0 | ||
} | ||
return atx.Sequence | ||
} | ||
|
||
func (b *Builder) GetPositioningAtx(epochId types.EpochId) (*types.ActivationTx, error) { | ||
posAtxId, err := b.GetPositioningAtxId(epochId) | ||
if err != nil { | ||
if b.prevATX != nil { | ||
//if the atx was created by this miner but have not propagated as an atx to the notwork yet, use the cached atx | ||
return b.prevATX, nil | ||
} else { | ||
return nil, fmt.Errorf("cannot find pos atx id: %v", err) | ||
} | ||
} | ||
posAtx, err := b.db.GetAtx(*posAtxId) | ||
if err != nil { | ||
return nil, fmt.Errorf("cannot find pos atx: %v", err.Error()) | ||
} | ||
return posAtx, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you start a builder multiple times ? if not add a lock or a comment