Skip to content

Commit

Permalink
Merge branch 'develop' into miner/parametrize-grades-threshold
Browse files Browse the repository at this point in the history
  • Loading branch information
dshulyak committed Sep 1, 2023
2 parents 1111227 + f1c1dd8 commit 73051a5
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The network will start using the new algorithm at layer 18_000 (2023-09-14 20:00
* [#4923](https://github.com/spacemeshos/go-spacemesh/pull/4923) Faster ballot eligibility validation. Improves sync speed.
* [#4934](https://github.com/spacemeshos/go-spacemesh/pull/4934) Ensure state is synced before participating in tortoise consensus.
* [#4939](https://github.com/spacemeshos/go-spacemesh/pull/4939) Make sure to fetch data from peers that are already connected.
* [#4936](https://github.com/spacemeshos/go-spacemesh/pull/4936) Use correct hare active set after node was synced. Otherwise applied layer may lag slightly behind the rest.

## v1.1.2

Expand Down
4 changes: 2 additions & 2 deletions hare/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func (ps *delayedPubSub) Publish(ctx context.Context, protocol string, msg []byt
return nil
}

func (ps *delayedPubSub) Register(protocol string, handler pubsub.GossipHandler) {
func (ps *delayedPubSub) Register(protocol string, handler pubsub.GossipHandler, opts ...pubsub.ValidatorOpt) {
if ps.recvDelay != 0 {
handler = func(ctx context.Context, pid p2p.Peer, msg []byte) error {
rng := time.Duration(rand.Uint32()) * time.Second % ps.recvDelay
Expand Down Expand Up @@ -609,6 +609,6 @@ func (eps *equivocatePubSub) Publish(ctx context.Context, protocol string, data
return nil
}

func (eps *equivocatePubSub) Register(protocol string, handler pubsub.GossipHandler) {
func (eps *equivocatePubSub) Register(protocol string, handler pubsub.GossipHandler, opts ...pubsub.ValidatorOpt) {
eps.ps.Register(protocol, handler)
}
42 changes: 35 additions & 7 deletions hare/eligibility/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,20 @@ type cachedActiveSet struct {

// Oracle is the hare eligibility oracle.
type Oracle struct {
lock sync.Mutex
mu sync.Mutex
activesCache activeSetCache
fallback map[types.EpochID][]types.ATXID
sync system.SyncStateProvider
// NOTE(dshulyak) on switch from synced to not synced reset the cache
// to cope with https://github.com/spacemeshos/go-spacemesh/issues/4552
// until graded oracle is implemented
synced bool

beacons system.BeaconGetter
cdb *datastore.CachedDB
vrfSigner *signing.VRFSigner
vrfVerifier vrfVerifier
layersPerEpoch uint32
activesCache activeSetCache
fallback map[types.EpochID][]types.ATXID
cfg config.Config
log.Log
}
Expand Down Expand Up @@ -110,6 +116,27 @@ type VrfMessage struct {
Layer types.LayerID
}

func (o *Oracle) SetSync(sync system.SyncStateProvider) {
o.mu.Lock()
defer o.mu.Unlock()
o.sync = sync
}

func (o *Oracle) resetCacheOnSynced(ctx context.Context) {
if o.sync == nil {
return
}
synced := o.synced
o.synced = o.sync.IsSynced(ctx)
if !synced && o.synced {
ac, err := lru.New[types.EpochID, *cachedActiveSet](activesCacheSize)
if err != nil {
o.Log.With().Fatal("failed to create lru cache for active set", log.Err(err))
}
o.activesCache = ac
}
}

// buildVRFMessage builds the VRF message used as input for the BLS (msg=Beacon##Layer##Round).
func (o *Oracle) buildVRFMessage(ctx context.Context, layer types.LayerID, round uint32) ([]byte, error) {
beacon, err := o.beacons.GetBeacon(layer.GetEpoch())
Expand Down Expand Up @@ -341,8 +368,9 @@ func (o *Oracle) actives(ctx context.Context, targetLayer types.LayerID) (*cache
log.Stringer("target_epoch", targetEpoch),
)

o.lock.Lock()
defer o.lock.Unlock()
o.mu.Lock()
defer o.mu.Unlock()
o.resetCacheOnSynced(ctx)
if value, exists := o.activesCache.Get(targetEpoch); exists {
return value, nil
}
Expand Down Expand Up @@ -462,8 +490,8 @@ func (o *Oracle) UpdateActiveSet(epoch types.EpochID, activeSet []types.ATXID) {
epoch,
log.Int("size", len(activeSet)),
)
o.lock.Lock()
defer o.lock.Unlock()
o.mu.Lock()
defer o.mu.Unlock()
if _, ok := o.fallback[epoch]; ok {
o.Log.With().Debug("fallback active set already exists", epoch)
return
Expand Down
29 changes: 29 additions & 0 deletions hare/eligibility/oracle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,35 @@ func TestActiveSetDD(t *testing.T) {
}
}

func TestResetCache(t *testing.T) {
oracle := defaultOracle(t)
ctrl := gomock.NewController(t)

prev := oracle.activesCache
prev.Add(1, nil)

oracle.resetCacheOnSynced(context.Background())
require.Equal(t, prev, oracle.activesCache)

sync := mocks.NewMockSyncStateProvider(ctrl)
oracle.SetSync(sync)

sync.EXPECT().IsSynced(gomock.Any()).Return(false)
oracle.resetCacheOnSynced(context.Background())
require.Equal(t, prev, oracle.activesCache)

sync.EXPECT().IsSynced(gomock.Any()).Return(true)
oracle.resetCacheOnSynced(context.Background())
require.NotEqual(t, prev, oracle.activesCache)

prev = oracle.activesCache
prev.Add(1, nil)

sync.EXPECT().IsSynced(gomock.Any()).Return(true)
oracle.resetCacheOnSynced(context.Background())
require.Equal(t, prev, oracle.activesCache)
}

func FuzzVrfMessageConsistency(f *testing.F) {
tester.FuzzConsistency[VrfMessage](f)
}
Expand Down
2 changes: 1 addition & 1 deletion hare/flows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ type p2pManipulator struct {
err error
}

func (m *p2pManipulator) Register(protocol string, handler pubsub.GossipHandler) {
func (m *p2pManipulator) Register(protocol string, handler pubsub.GossipHandler, opts ...pubsub.ValidatorOpt) {
m.nd.Register(protocol, handler)
}

Expand Down
2 changes: 1 addition & 1 deletion hare3/hare.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (h *Hare) Coins() <-chan WeakCoinOutput {
}

func (h *Hare) Start() {
h.pubsub.Register(h.config.ProtocolName, h.Handler)
h.pubsub.Register(h.config.ProtocolName, h.Handler, pubsub.WithValidatorInline(true))
current := h.nodeclock.CurrentLayer() + 1
enabled := types.MaxLayer(current, h.config.EnableLayer)
enabled = types.MaxLayer(enabled, types.GetEffectiveGenesis()+1)
Expand Down
2 changes: 1 addition & 1 deletion hare3/hare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (n *node) withOracle() *node {

func (n *node) withPublisher() *node {
n.mpublisher = pmocks.NewMockPublishSubsciber(n.ctrl)
n.mpublisher.EXPECT().Register(gomock.Any(), gomock.Any()).AnyTimes()
n.mpublisher.EXPECT().Register(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
return n
}

Expand Down
1 change: 1 addition & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ func (app *App) initServices(ctx context.Context) error {
)
// TODO(dshulyak) this needs to be improved, but dependency graph is a bit complicated
beaconProtocol.SetSyncState(newSyncer)
app.hOracle.SetSync(newSyncer)

hareOutputCh := make(chan hare.LayerOutput, app.Config.HARE.LimitConcurrent)
app.blockGen = blocks.NewGenerator(app.cachedDB, executor, msh, fetcherWrapped, app.certifier, patrol,
Expand Down
26 changes: 18 additions & 8 deletions p2p/pubsub/mocks/publisher.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion p2p/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,13 @@ type Publisher interface {

// Subscriber is an interface for subcribing to messages.
type Subscriber interface {
Register(string, GossipHandler)
Register(string, GossipHandler, ...ValidatorOpt)
}

type ValidatorOpt = pubsub.ValidatorOpt

var WithValidatorInline = pubsub.WithValidatorInline

// PublishSubsciber common interface for publisher and subscribing.
type PublishSubsciber interface {
Publisher
Expand Down
4 changes: 2 additions & 2 deletions p2p/pubsub/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type PubSub struct {
}

// Register handler for topic.
func (ps *PubSub) Register(topic string, handler GossipHandler) {
func (ps *PubSub) Register(topic string, handler GossipHandler, opts ...ValidatorOpt) {
ps.mu.Lock()
defer ps.mu.Unlock()
if _, exist := ps.topics[topic]; exist {
Expand All @@ -47,7 +47,7 @@ func (ps *PubSub) Register(topic string, handler GossipHandler) {
default:
return pubsub.ValidationAccept
}
})
}, opts...)
topich, err := ps.pubsub.Join(topic)
if err != nil {
ps.logger.With().Panic("failed to join a topic", log.String("topic", topic), log.Err(err))
Expand Down

0 comments on commit 73051a5

Please sign in to comment.