Skip to content

Commit

Permalink
Merge branch 'develop' into hare3/switch-to-sync-validation
Browse files Browse the repository at this point in the history
  • Loading branch information
dshulyak committed Sep 1, 2023
2 parents 71a8e79 + 5d25947 commit 3d62477
Show file tree
Hide file tree
Showing 18 changed files with 238 additions and 173 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ to set lower expected latency in the network, eventually reducing layer time.
* [#4879](https://github.com/spacemeshos/go-spacemesh/pull/4795) Makes majority calculation weighted for optimistic filtering.
The network will start using the new algorithm at layer 18_000 (2023-09-14 20:00:00 +0000 UTC)
* [#4923](https://github.com/spacemeshos/go-spacemesh/pull/4923) Faster ballot eligibility validation. Improves sync speed.
* [#4934](https://github.com/spacemeshos/go-spacemesh/pull/4934) Ensure state is synced before participating in tortoise consensus.
* [#4939](https://github.com/spacemeshos/go-spacemesh/pull/4939) Make sure to fetch data from peers that are already connected.

## v1.1.2

Expand Down
1 change: 1 addition & 0 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func MainnetConfig() Config {
MaxStaleDuration: time.Hour,
UseNewProtocol: true,
Standalone: false,
GossipDuration: 50 * time.Second,
},
Recovery: checkpoint.DefaultConfig(),
Cache: datastore.DefaultConfig(),
Expand Down
1 change: 1 addition & 0 deletions config/presets/fastnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func fastnet() config.Config {
conf.LayerAvgSize = 50
conf.LayerDuration = 15 * time.Second
conf.Sync.Interval = 5 * time.Second
conf.Sync.GossipDuration = 10 * time.Second
conf.LayersPerEpoch = 4

conf.Tortoise.Hdist = 4
Expand Down
22 changes: 7 additions & 15 deletions fetch/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"

lru "github.com/hashicorp/golang-lru/v2"
"golang.org/x/exp/maps"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
Expand Down Expand Up @@ -71,24 +72,15 @@ func (hpc *HashPeersCache) Add(hash types.Hash32, peer p2p.Peer) {
hpc.add(hash, peer)
}

// GetRandom returns a random peer for a given hash.
func (hpc *HashPeersCache) GetRandom(hash types.Hash32, hint datastore.Hint, rng *rand.Rand) (p2p.Peer, bool) {
// GetRandom returns a randomized list of peers for a given hash.
func (hpc *HashPeersCache) GetRandom(hash types.Hash32, hint datastore.Hint, rng *rand.Rand) []p2p.Peer {
hpc.mu.Lock()
defer hpc.mu.Unlock()

hashPeersMap, exists := hpc.getWithStats(hash, hint)
if !exists {
return p2p.NoPeer, false
}
n := rng.Intn(len(hashPeersMap)) + 1
i := 0
for peer := range hashPeersMap {
i++
if i == n {
return peer, true
}
}
return p2p.NoPeer, false
pm, _ := hpc.getWithStats(hash, hint)
peers := maps.Keys(pm)
rng.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] })
return peers
}

// RegisterPeerHashes registers provided peer for a list of hashes.
Expand Down
22 changes: 13 additions & 9 deletions fetch/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ func TestAdd(t *testing.T) {

func TestGetRandom(t *testing.T) {
t.Parallel()
t.Run("no hash peers", func(t *testing.T) {
cache := NewHashPeersCache(10)
hash := types.RandomHash()
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
peers := cache.GetRandom(hash, datastore.TXDB, rng)
require.Empty(t, peers)
})
t.Run("1Hash3Peers", func(t *testing.T) {
cache := NewHashPeersCache(10)
hash := types.RandomHash()
Expand All @@ -94,9 +101,8 @@ func TestGetRandom(t *testing.T) {
}()
wg.Wait()
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
peer, exists := cache.GetRandom(hash, datastore.TXDB, rng)
require.Equal(t, true, exists)
require.Contains(t, []p2p.Peer{peer1, peer2, peer3}, peer)
peers := cache.GetRandom(hash, datastore.TXDB, rng)
require.ElementsMatch(t, []p2p.Peer{peer1, peer2, peer3}, peers)
})
t.Run("2Hashes1Peer", func(t *testing.T) {
cache := NewHashPeersCache(10)
Expand All @@ -115,12 +121,10 @@ func TestGetRandom(t *testing.T) {
}()
wg.Wait()
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
randomPeer, exists := cache.GetRandom(hash1, datastore.TXDB, rng)
require.Equal(t, true, exists)
require.Equal(t, peer, randomPeer)
randomPeer, exists = cache.GetRandom(hash2, datastore.TXDB, rng)
require.Equal(t, true, exists)
require.Equal(t, peer, randomPeer)
randomPeers := cache.GetRandom(hash1, datastore.TXDB, rng)
require.Equal(t, []p2p.Peer{peer}, randomPeers)
randomPeers = cache.GetRandom(hash2, datastore.TXDB, rng)
require.Equal(t, []p2p.Peer{peer}, randomPeers)
})
}

Expand Down
21 changes: 13 additions & 8 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,18 +482,23 @@ func (f *Fetch) organizeRequests(requests []RequestMessage) map[p2p.Peer][][]Req
}
return nil
}

for _, req := range requests {
p, exists := f.hashToPeers.GetRandom(req.Hash, req.Hint, rng)
if !exists {
p = randomPeer(peers)
target := p2p.NoPeer
hashPeers := f.hashToPeers.GetRandom(req.Hash, req.Hint, rng)
for _, p := range hashPeers {
if f.host.Connected(p) {
target = p
break
}
}

_, ok := peer2requests[p]
if target == p2p.NoPeer {
target = randomPeer(peers)
}
_, ok := peer2requests[target]
if !ok {
peer2requests[p] = []RequestMessage{req}
peer2requests[target] = []RequestMessage{req}
} else {
peer2requests[p] = append(peer2requests[p], req)
peer2requests[target] = append(peer2requests[target], req)
}
}

Expand Down
38 changes: 38 additions & 0 deletions fetch/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,43 @@ func TestFetch_GetHash(t *testing.T) {
require.NotEqual(t, p1.completed, p2.completed)
}

func TestFetch_GetHashPeerNotConnected(t *testing.T) {
f := createFetch(t)
f.cfg.MaxRetriesForRequest = 0
f.cfg.MaxRetriesForPeer = 0
peer := p2p.Peer("buddy")
awol := p2p.Peer("notConnected")
f.mh.EXPECT().GetPeers().Return([]p2p.Peer{peer})
f.mh.EXPECT().ID().Return(p2p.Peer("self"))
f.mh.EXPECT().Connected(awol).Return(false)
hsh := types.RandomHash()
f.RegisterPeerHashes(awol, []types.Hash32{hsh})

res := ResponseMessage{
Hash: hsh,
Data: []byte("a"),
}
f.mHashS.EXPECT().Request(gomock.Any(), peer, gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, _ p2p.Peer, req []byte, okFunc func([]byte), _ func(error)) error {
var rb RequestBatch
err := codec.Decode(req, &rb)
require.NoError(t, err)
resBatch := ResponseBatch{
ID: rb.ID,
Responses: []ResponseMessage{res},
}
bts, err := codec.Encode(&resBatch)
require.NoError(t, err)
okFunc(bts)
return nil
})

p, err := f.getHash(context.TODO(), hsh, datastore.BlockDB, goodReceiver)
require.NoError(t, err)
f.requestHashBatchFromPeers()
<-p.completed
}

func TestFetch_RequestHashBatchFromPeers(t *testing.T) {
tt := []struct {
name string
Expand Down Expand Up @@ -164,6 +201,7 @@ func TestFetch_RequestHashBatchFromPeers(t *testing.T) {
f.cfg.MaxRetriesForPeer = 0
peer := p2p.Peer("buddy")
f.mh.EXPECT().GetPeers().Return([]p2p.Peer{peer})
f.mh.EXPECT().Connected(peer).Return(true).AnyTimes()

hsh0 := types.RandomHash()
res0 := ResponseMessage{
Expand Down
1 change: 1 addition & 0 deletions fetch/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type meshProvider interface {
type host interface {
ID() p2p.Peer
GetPeers() []p2p.Peer
Connected(p2p.Peer) bool
PeerProtocols(p2p.Peer) ([]protocol.ID, error)
Close() error
}
1 change: 1 addition & 0 deletions fetch/mesh_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func TestFetch_getHashes(t *testing.T) {
f.cfg.BatchSize = 2
f.cfg.MaxRetriesForRequest = 0
f.cfg.MaxRetriesForPeer = 0
f.mh.EXPECT().Connected(gomock.Any()).Return(true).AnyTimes()
peers := []p2p.Peer{p2p.Peer("buddy 0"), p2p.Peer("buddy 1")}
f.mh.EXPECT().GetPeers().Return(peers)
f.mh.EXPECT().ID().Return(p2p.Peer("self")).AnyTimes()
Expand Down
14 changes: 14 additions & 0 deletions fetch/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 17 additions & 10 deletions miner/proposal_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,17 @@ func (pb *ProposalBuilder) handleLayer(ctx context.Context, layerID types.LayerI
if !pb.syncer.IsSynced(ctx) {
return errNotSynced
}

// make sure the miner is eligible first
nonce, err := pb.nonceFetcher.VRFNonce(pb.signer.NodeID(), layerID.GetEpoch())
if err != nil {
if errors.Is(err, sql.ErrNotFound) {
pb.logger.WithContext(ctx).With().Info("miner has no valid vrf nonce, not building proposal", layerID)
return nil
}
return err
}

if beacon, err = pb.beaconProvider.GetBeacon(epoch); err != nil {
return errNoBeacon
}
Expand All @@ -386,14 +397,6 @@ func (pb *ProposalBuilder) handleLayer(ctx context.Context, layerID types.LayerI
return errDuplicateLayer
}

nonce, err := pb.nonceFetcher.VRFNonce(pb.signer.NodeID(), layerID.GetEpoch())
if err != nil {
if errors.Is(err, sql.ErrNotFound) {
pb.logger.WithContext(ctx).With().Info("miner has no valid vrf nonce, not building proposal", layerID)
return nil
}
return err
}
epochEligibility, err := pb.proposalOracle.ProposalEligibility(layerID, beacon, nonce)
if err != nil {
if errors.Is(err, errMinerHasNoATXInPreviousEpoch) {
Expand Down Expand Up @@ -470,8 +473,12 @@ func (pb *ProposalBuilder) createProposalLoop(ctx context.Context) {
}
next = current.Add(1)
lyrCtx := log.WithNewSessionID(ctx)
if err := pb.handleLayer(lyrCtx, current); err != nil && !errors.Is(err, errGenesis) {
pb.logger.WithContext(lyrCtx).With().Warning("failed to build proposal", current, log.Err(err))
if err := pb.handleLayer(lyrCtx, current); err != nil {
switch {
case errors.Is(err, errGenesis), errors.Is(err, errNotSynced):
default:
pb.logger.WithContext(lyrCtx).With().Warning("failed to build proposal", current, log.Err(err))
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions miner/proposal_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ func TestBuilder_HandleLayer_NoBeacon(t *testing.T) {

layerID := types.LayerID(layersPerEpoch * 3)
b.mSync.EXPECT().IsSynced(gomock.Any()).Return(true)
b.mNonce.EXPECT().VRFNonce(gomock.Any(), gomock.Any()).Return(types.VRFPostIndex(22), nil)
b.mBeacon.EXPECT().GetBeacon(gomock.Any()).Return(types.EmptyBeacon, errors.New("unknown"))

require.ErrorIs(t, b.handleLayer(context.Background(), layerID), errNoBeacon)
Expand Down Expand Up @@ -661,6 +662,7 @@ func TestBuilder_HandleLayer_Duplicate(t *testing.T) {
ballot := types.NewExistingBallot(types.BallotID{1}, types.EmptyEdSignature, b.signer.NodeID(), layerID)
require.NoError(t, ballots.Add(b.cdb, &ballot))
b.mSync.EXPECT().IsSynced(gomock.Any()).Return(true)
b.mNonce.EXPECT().VRFNonce(gomock.Any(), gomock.Any()).Return(types.VRFPostIndex(22), nil)
b.mBeacon.EXPECT().GetBeacon(gomock.Any()).Return(beacon, nil)
require.ErrorIs(t, b.handleLayer(context.Background(), layerID), errDuplicateLayer)
}
Expand Down
4 changes: 4 additions & 0 deletions p2p/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ func (fh *Host) GetPeers() []Peer {
return fh.Host.Network().Peers()
}

func (fh *Host) Connected(p Peer) bool {
return fh.Host.Network().Connectedness(p) == network.Connected
}

// ConnectedPeerInfo retrieves a peer info object for the given peer.ID, if the
// given peer is not connected then nil is returned.
func (fh *Host) ConnectedPeerInfo(id peer.ID) *PeerInfo {
Expand Down
4 changes: 2 additions & 2 deletions syncer/data_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ func (d *DataFetch) PollLayerData(ctx context.Context, lid types.LayerID, peers
}

func (d *DataFetch) receiveMaliciousIDs(ctx context.Context, req *maliciousIDRequest, peer p2p.Peer, data []byte, peerErr error) {
logger := d.logger.WithContext(ctx).WithFields(req.lid, log.Stringer("peer", peer))
logger.Debug("received layer data from peer")
logger := d.logger.WithContext(ctx).WithFields(log.Stringer("peer", peer))
logger.Debug("received malicious id from peer")
var (
result = peerResult[fetch.MaliciousIDs]{peer: peer, err: peerErr}
malIDs fetch.MaliciousIDs
Expand Down
10 changes: 7 additions & 3 deletions syncer/state_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func minLayer(a, b types.LayerID) types.LayerID {

func (s *Syncer) stateSynced() bool {
current := s.ticker.CurrentLayer()
return current.Uint32() <= 1 || !s.mesh.ProcessedLayer().Before(current.Sub(1))
return current <= types.GetEffectiveGenesis() ||
(s.mesh.ProcessedLayer() >= current-1 && !s.stateErr.Load())
}

func (s *Syncer) processLayers(ctx context.Context) error {
Expand All @@ -62,7 +63,7 @@ func (s *Syncer) processLayers(ctx context.Context) error {

// used to make sure we only resync from the same peer once during each run.
resyncPeers := make(map[p2p.Peer]struct{})
for lid := start; !lid.After(s.getLastSyncedLayer()); lid = lid.Add(1) {
for lid := start; lid <= s.getLastSyncedLayer(); lid++ {
select {
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -90,7 +91,7 @@ func (s *Syncer) processLayers(ctx context.Context) error {
s.logger.WithContext(ctx).With().Warning("failed to adopt peer opinions", lid, log.Err(err))
}
}
if s.stateSynced() {
if s.IsSynced(ctx) {
if err = s.checkMeshAgreement(ctx, lid, opinions); err != nil && errors.Is(err, errMeshHashDiverged) {
s.logger.WithContext(ctx).With().Debug("mesh hash diverged, trying to reach agreement",
lid,
Expand All @@ -114,6 +115,9 @@ func (s *Syncer) processLayers(ctx context.Context) error {
if !errors.Is(err, mesh.ErrMissingBlock) {
s.logger.WithContext(ctx).With().Warning("mesh failed to process layer from sync", lid, log.Err(err))
}
s.stateErr.Store(true)
} else {
s.stateErr.Store(false)
}
}
s.logger.WithContext(ctx).With().Debug("end of state sync",
Expand Down
6 changes: 1 addition & 5 deletions syncer/state_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ func TestProcessLayers_OpinionsOptional(t *testing.T) {
func TestProcessLayers_MeshHashDiverged(t *testing.T) {
ts := newTestSyncerForState(t)
ts.syncer.setATXSynced()
ts.syncer.setSyncState(context.Background(), synced)
current := types.GetEffectiveGenesis().Add(131)
ts.mTicker.advanceToLayer(current)
for lid := types.GetEffectiveGenesis().Add(1); lid.Before(current); lid = lid.Add(1) {
Expand Down Expand Up @@ -724,10 +725,5 @@ func TestProcessLayers_NoHashResolutionForNewlySyncedNode(t *testing.T) {
ts.mVm.EXPECT().GetStateRoot()
}
}
// only the last layer will trigger hash resolution
for i := range opns {
ts.mForkFinder.EXPECT().NeedResync(current.Sub(1), opns[i].PrevAggHash).Return(false)
}
ts.mForkFinder.EXPECT().Purge(true)
require.NoError(t, ts.syncer.processLayers(context.Background()))
}

0 comments on commit 3d62477

Please sign in to comment.