Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize processing of incoming gossip messages #2463

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
578e787
Parallelize processing of incoming gossip messages
lrettig Jun 17, 2021
d80de79
Remove comment
lrettig Jun 18, 2021
7e25ccd
Add errmsg
lrettig Jun 18, 2021
3412507
Fix merge conflict
lrettig Jun 18, 2021
00d5d9e
Increase timeout for test
lrettig Jun 18, 2021
f3831b4
Misc cleanup of syncer test
lrettig Jun 18, 2021
f278190
More misc cleanup
lrettig Jun 18, 2021
533a19a
Add test
lrettig Jun 18, 2021
1a2f3ef
Add another test
lrettig Jun 18, 2021
85c6c68
Merge branch 'develop' into inbound-gossip-patch
lrettig Aug 22, 2021
c03ffdc
Improve per review feedback
lrettig Aug 23, 2021
f690faa
Merge branch 'develop' into inbound-gossip-patch
lrettig Aug 23, 2021
1ed3e75
Misc cleanup, respond to review comments
lrettig Aug 23, 2021
92b91b9
Properly handle taskgroup errors
lrettig Aug 25, 2021
8ac0b4a
Fix long line
lrettig Aug 28, 2021
e213df4
Merge branch 'develop' into inbound-gossip-patch
lrettig Aug 28, 2021
546aeab
Remove timeout per PR feedback
lrettig Aug 29, 2021
352092e
Comment out one more log line
lrettig Aug 29, 2021
b83918b
Remove unused cmdline arg
lrettig Aug 29, 2021
ba05641
Reduce panic to info
lrettig Aug 30, 2021
f8cffce
Merge branch 'develop' into inbound-gossip-patch
dshulyak Sep 1, 2021
74f0667
Merge branch 'develop' into inbound-gossip-patch
lrettig Sep 2, 2021
5c879f3
Merge branch 'develop' into inbound-gossip-patch
lrettig Sep 8, 2021
313bfed
Merge branch 'develop' into inbound-gossip-patch
lrettig Sep 8, 2021
724c716
Merge branch 'develop' into inbound-gossip-patch
nkryuchkov Oct 16, 2021
b4fa583
Fix golangci-lint
nkryuchkov Oct 16, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/node/multi_node.go
Expand Up @@ -166,6 +166,9 @@ func getTestDefaultConfig(numOfInstances int) *config.Config {
cfg.LAYERS.RequestTimeout = 10
cfg.GoldenATXID = "0x5678"

// increase from 2s to 10s: for some reason, the multi node test requires a long timeout
cfg.SyncRequestTimeout = 10000
lrettig marked this conversation as resolved.
Show resolved Hide resolved

types.SetLayersPerEpoch(int32(cfg.LayersPerEpoch))

return cfg
Expand Down
2 changes: 1 addition & 1 deletion cmd/node/node.go
Expand Up @@ -615,7 +615,7 @@ func (app *SpacemeshApp) initServices(ctx context.Context,
// TODO: genesisMinerWeight is set to app.Config.SpaceToCommit, because PoET ticks are currently hardcoded to 1
}

gossipListener := service.NewListener(swarm, layerFetch, syncer.ListenToGossip, app.addLogger(GossipListener, lg))
gossipListener := service.NewListener(swarm, layerFetch, syncer.ListenToGossip, app.Config.P2P, app.addLogger(GossipListener, lg))
ha := app.HareFactory(ctx, mdb, swarm, sgn, nodeID, syncer, msh, hOracle, idStore, clock, lg)

stateAndMeshProjector := pendingtxs.NewStateAndMeshProjector(processor, msh)
Expand Down
12 changes: 4 additions & 8 deletions layerfetcher/layers.go
Expand Up @@ -103,7 +103,6 @@ func DefaultConfig() Config {

// NewLogic creates a new instance of layer fetching logic
func NewLogic(ctx context.Context, cfg Config, blocks blockHandler, atxs atxHandler, poet poetDb, atxIDs atxIDsDB, txs TxProcessor, network service.Service, fetcher fetch.Fetcher, layers layerDB, log log.Log) *Logic {

srv := fetch.NewMessageNetwork(ctx, cfg.RequestTimeout, network, layersProtocol, log)
l := &Logic{
log: log,
Expand Down Expand Up @@ -252,8 +251,7 @@ func (l *Logic) PollLayer(ctx context.Context, layer types.LayerID) chan LayerPr
timeoutFunc := func(err error) {
l.receiveLayerHash(ctx, layer, peer, len(peers), nil, err)
}
err := l.net.SendRequest(ctx, LayerHashMsg, layer.Bytes(), p, receiveForPeerFunc, timeoutFunc)
if err != nil {
if err := l.net.SendRequest(ctx, LayerHashMsg, layer.Bytes(), p, receiveForPeerFunc, timeoutFunc); err != nil {
l.receiveLayerHash(ctx, layer, peer, len(peers), nil, err)
}
}
Expand Down Expand Up @@ -292,14 +290,14 @@ func (l *Logic) receiveLayerHash(ctx context.Context, id types.LayerID, p p2ppee
}
l.log.Info("got hashes for layer, now aggregating")
l.layerHashResM.Unlock()
errors := 0
errs := 0
//aggregate hashes so that same hash will not be requested several times
hashes := make(map[types.Hash32][]p2ppeers.Peer)
l.layerHashResM.RLock()
for peer, hash := range l.layerHashResults[id] {
//count nil hashes - mark errors.
if hash == nil {
errors++
errs++
} else {
// if there is a new hash - query for it
if _, ok := hashes[*hash]; !ok {
Expand All @@ -316,7 +314,7 @@ func (l *Logic) receiveLayerHash(ctx context.Context, id types.LayerID, p p2ppee

// if more than half the peers returned an error, fail the sync of the entire layer
// todo: think whether we should panic here
if errors > peers/2 {
if errs > peers/2 {
l.log.Error("cannot sync layer %v", id)
l.notifyLayerPromiseResult(id, 0, fmt.Errorf("too many peers returned error"))
return
Expand Down Expand Up @@ -345,12 +343,10 @@ func (l *Logic) receiveLayerHash(ctx context.Context, id types.LayerID, p p2ppee
l.receiveBlockHashes(ctx, id, nil, len(hashes), err)
}
}

}

// notifyLayerPromiseResult notifies that a layer result has been received or wasn't received
func (l *Logic) notifyLayerPromiseResult(id types.LayerID, expectedResults int, err error) {

// count number of results - only after all results were received we can notify the caller
l.blockHashResM.Lock()
// put false if no blocks
Expand Down
5 changes: 0 additions & 5 deletions layerfetcher/layers_test.go
Expand Up @@ -55,10 +55,6 @@ func (m *mockNet) SendRequest(ctx context.Context, msgType server.MessageType, p
return nil
}

/*if m.callSuccessCallback {
return resHandler()
}*/

return nil
}

Expand Down Expand Up @@ -122,7 +118,6 @@ func (m mockFetcher) Start() {
}

func (m mockFetcher) AddDB(hint fetch.Hint, db database.Store) {

}

func (m mockFetcher) GetHash(hash types.Hash32, h fetch.Hint, validateAndSubmit bool) chan fetch.HashDataPromiseResult {
Expand Down
6 changes: 5 additions & 1 deletion p2p/config/config.go
Expand Up @@ -20,6 +20,8 @@ const (
defaultTCPPort = 7513
// defaultTCPInterface is the inet interface that P2P listens on by default
defaultTCPInterface = "0.0.0.0"
// max number of parallel gossip goroutines per channel (protocol)
defaultMaxGossipRoutines = 50
lrettig marked this conversation as resolved.
Show resolved Hide resolved
)

// Values specifies default values for node config params.
Expand Down Expand Up @@ -56,6 +58,7 @@ type Config struct {
SwarmConfig SwarmConfig `mapstructure:"swarm"`
BufferSize int `mapstructure:"buffer-size"`
MsgSizeLimit int `mapstructure:"msg-size-limit"` // in bytes
MaxGossipRoutines int `mapstructure:"max-gossip-routines"`
lrettig marked this conversation as resolved.
Show resolved Hide resolved
}

// SwarmConfig specifies swarm config params.
Expand All @@ -71,7 +74,6 @@ type SwarmConfig struct {

// DefaultConfig defines the default p2p configuration
func DefaultConfig() Config {

// SwarmConfigValues defines default values for swarm config params.
var SwarmConfigValues = SwarmConfig{
Gossip: true,
Expand Down Expand Up @@ -99,6 +101,7 @@ func DefaultConfig() Config {
SwarmConfig: SwarmConfigValues,
BufferSize: 10000,
MsgSizeLimit: UnlimitedMsgSize,
MaxGossipRoutines: defaultMaxGossipRoutines,
}
}

Expand All @@ -107,5 +110,6 @@ func DefaultTestConfig() Config {
conf := DefaultConfig()
conf.TCPPort += 10000
conf.TCPInterface = "127.0.0.1"
conf.MaxGossipRoutines = 2
return conf
}
49 changes: 41 additions & 8 deletions p2p/service/gossip_listener.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/p2p/config"
"github.com/spacemeshos/go-spacemesh/priorityq"
"sync"
)
Expand All @@ -22,16 +23,18 @@ type Listener struct {
syncer Fetcher
wg sync.WaitGroup
shouldListenToGossip enableGossipFunc
config config.Config
}

// NewListener creates a new listener struct
func NewListener(net Service, syncer Fetcher, shouldListenToGossip enableGossipFunc, log log.Log) *Listener {
func NewListener(net Service, syncer Fetcher, shouldListenToGossip enableGossipFunc, config config.Config, log log.Log) *Listener {
return &Listener{
Log: &log,
net: net,
syncer: syncer,
shouldListenToGossip: shouldListenToGossip,
wg: sync.WaitGroup{},
config: config,
}
}

Expand All @@ -41,9 +44,6 @@ type Syncer interface {
FetchPoetProof(ctx context.Context, poetProofRef []byte) error
ListenToGossip() bool
GetBlock(ID types.BlockID) (*types.Block, error)
//GetTxs(IDs []types.TransactionID) error
//GetBlocks(IDs []types.BlockID) error
//GetAtxs(IDs []types.ATXID) error
IsSynced(context.Context) bool
}

Expand All @@ -67,7 +67,7 @@ func (l *Listener) AddListener(ctx context.Context, channel string, priority pri
l.channels = append(l.channels, ch)
l.stoppers = append(l.stoppers, stop)
l.wg.Add(1)
go l.listenToGossip(log.WithNewSessionID(ctx), dataHandler, ch, stop)
go l.listenToGossip(log.WithNewSessionID(ctx), dataHandler, ch, stop, channel)
}

// Stop stops listening to all gossip channels
Expand All @@ -78,8 +78,41 @@ func (l *Listener) Stop() {
l.wg.Wait()
}

func (l *Listener) listenToGossip(ctx context.Context, dataHandler GossipDataHandler, gossipChannel chan GossipMessage, stop chan struct{}) {
l.WithContext(ctx).Info("start listening")
func (l *Listener) listenToGossip(ctx context.Context, dataHandler GossipDataHandler, gossipChannel chan GossipMessage, stop chan struct{}, channel string) {
l.WithContext(ctx).With().Info("start listening to gossip", log.String("protocol", channel))

// fill the channel with tokens to limit number of concurrent routines
tokenChan := make(chan struct{}, l.config.MaxGossipRoutines)
for i := 0; i < l.config.MaxGossipRoutines; i++ {
tokenChan <- struct{}{}
lrettig marked this conversation as resolved.
Show resolved Hide resolved
}

handleMsg := func(ctx context.Context, data GossipMessage) {
// get a token to create a new channel
l.WithContext(ctx).With().Info("waiting for available slot for gossip handler",
log.Int("available_slots", len(tokenChan)),
log.Int("total_slots", cap(tokenChan)))
if len(tokenChan) == 0 {
l.WithContext(ctx).Error("no available slots for gossip handler, blocking")
lrettig marked this conversation as resolved.
Show resolved Hide resolved
}
<-tokenChan

l.WithContext(ctx).With().Info("got gossip message, forwarding to data handler",
log.String("protocol", channel),
log.Int("queue_length", len(gossipChannel)))
if !l.syncer.ListenToGossip() {
// not accepting data
l.WithContext(ctx).Info("not currently listening to gossip, dropping message")
return
lrettig marked this conversation as resolved.
Show resolved Hide resolved
}
go func() {
lrettig marked this conversation as resolved.
Show resolved Hide resolved
// TODO: these handlers should have an API that includes a cancel method. they should time out eventually.
dataHandler(ctx, data, l.syncer)
// replace token when done
tokenChan <- struct{}{}
}()
}

for {
select {
case <-stop:
Expand All @@ -90,7 +123,7 @@ func (l *Listener) listenToGossip(ctx context.Context, dataHandler GossipDataHan
// not accepting data
continue
}
dataHandler(log.WithNewRequestID(ctx), data, l.syncer)
handleMsg(log.WithNewRequestID(ctx), data)
}
}
}
92 changes: 72 additions & 20 deletions p2p/service/gossip_listener_test.go
Expand Up @@ -4,54 +4,60 @@ import (
"context"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/p2p/config"
"github.com/spacemeshos/go-spacemesh/priorityq"
"github.com/stretchr/testify/assert"
"sync"
"sync/atomic"
"testing"
"time"
)

type syncMock struct {
Synced bool
Synced bool
listenToGossipFn func() bool
}

func (m *syncMock) FetchBlock(ctx context.Context, ID types.BlockID) error {
func (sm *syncMock) FetchBlock(context.Context, types.BlockID) error {
return nil
}

func (m *syncMock) FetchAtx(ctx context.Context, ID types.ATXID) error {
func (sm *syncMock) FetchAtx(context.Context, types.ATXID) error {
return nil
}

func (m *syncMock) GetPoetProof(ctx context.Context, ID types.Hash32) error {
func (sm *syncMock) GetPoetProof(context.Context, types.Hash32) error {
return nil
}

func (m *syncMock) GetBlock(ID types.BlockID) error {
func (sm *syncMock) GetBlock(types.BlockID) error {
return nil
}

func (m *syncMock) GetTxs(ctx context.Context, IDs []types.TransactionID) error {
func (sm *syncMock) GetTxs(context.Context, []types.TransactionID) error {
return nil
}

func (m *syncMock) GetBlocks(ctx context.Context, IDs []types.BlockID) error {
func (sm *syncMock) GetBlocks(context.Context, []types.BlockID) error {
return nil
}

func (m *syncMock) GetAtxs(ctx context.Context, IDs []types.ATXID) error {
func (sm *syncMock) GetAtxs(context.Context, []types.ATXID) error {
return nil
}

func (*syncMock) FetchAtxReferences(ctx context.Context, atx *types.ActivationTx) error {
func (*syncMock) FetchAtxReferences(context.Context, *types.ActivationTx) error {
return nil
}

func (*syncMock) FetchPoetProof(ctx context.Context, poetProofRef []byte) error {
func (*syncMock) FetchPoetProof(context.Context, []byte) error {
panic("implement me")
}

func (*syncMock) ListenToGossip() bool {
func (sm *syncMock) ListenToGossip() bool {
if sm.listenToGossipFn != nil {
return sm.listenToGossipFn()
}
return true
}

Expand All @@ -62,7 +68,8 @@ func (*syncMock) IsSynced(context.Context) bool {
func Test_AddListener(t *testing.T) {
net := NewSimulator()
n1 := net.NewNode()
l := NewListener(n1, &syncMock{true}, func() bool { return true }, log.NewDefault(n1.Info.ID.String()))
l := NewListener(n1, &syncMock{Synced: true}, func() bool { return true }, config.DefaultConfig(), log.NewDefault(n1.Info.ID.String()))
defer l.Stop()

var channelCount, secondChannel int32
wg := sync.WaitGroup{}
Expand All @@ -85,16 +92,15 @@ func Test_AddListener(t *testing.T) {
assert.NoError(t, n1.Broadcast(context.TODO(), "channel2", []byte{}))

wg.Wait()
assert.Equal(t, atomic.LoadInt32(&channelCount), int32(1))
assert.Equal(t, atomic.LoadInt32(&secondChannel), int32(1))

l.Stop()
assert.Equal(t, int32(1), atomic.LoadInt32(&channelCount))
assert.Equal(t, int32(1), atomic.LoadInt32(&secondChannel))
}

func Test_AddListener_notSynced(t *testing.T) {
net := NewSimulator()
n1 := net.NewNode()
l := NewListener(n1, &syncMock{false}, func() bool { return true }, log.NewDefault(n1.Info.ID.String()))
l := NewListener(n1, &syncMock{Synced: false}, func() bool { return true }, config.DefaultConfig(), log.NewDefault(n1.Info.ID.String()))
defer l.Stop()

var channelCount, secondChannel int32

Expand All @@ -112,8 +118,54 @@ func Test_AddListener_notSynced(t *testing.T) {
assert.NoError(t, n1.Broadcast(context.TODO(), "channel1", []byte{}))
assert.NoError(t, n1.Broadcast(context.TODO(), "channel2", []byte{}))

assert.Equal(t, atomic.LoadInt32(&channelCount), int32(0))
assert.Equal(t, atomic.LoadInt32(&secondChannel), int32(0))
assert.Equal(t, int32(0), atomic.LoadInt32(&channelCount))
assert.Equal(t, int32(0), atomic.LoadInt32(&secondChannel))
}

func TestListenerConcurrency(t *testing.T) {
net := NewSimulator()
n1 := net.NewNode()
var listenCount int32
listenFn := func() bool {
atomic.AddInt32(&listenCount, 1)
return true
}
conf := config.DefaultTestConfig()
l := NewListener(n1, &syncMock{true, listenFn}, func() bool { return true }, conf, log.NewDefault(n1.Info.ID.String()))
defer l.Stop()

var channelCount int32

l.Stop()
releaseChan := make(chan struct{})
handlerFn := func(ctx context.Context, data GossipMessage, syncer Fetcher) {
<-releaseChan
atomic.AddInt32(&channelCount, 1)
lrettig marked this conversation as resolved.
Show resolved Hide resolved
}

l.AddListener(context.TODO(), "channel1", priorityq.Mid, handlerFn)

assert.Equal(t, 2, conf.MaxGossipRoutines)

// broadcast several messages. expect the first two to call a handler, then the rest should block.
assert.NoError(t, n1.Broadcast(context.TODO(), "channel1", []byte{}))
assert.NoError(t, n1.Broadcast(context.TODO(), "channel1", []byte{}))
assert.NoError(t, n1.Broadcast(context.TODO(), "channel1", []byte{}))
assert.NoError(t, n1.Broadcast(context.TODO(), "channel1", []byte{}))

checkVal := func(expectedVal int32) func() bool {
return func() bool {
return atomic.LoadInt32(&listenCount) == expectedVal
}
}
assert.Eventually(t, checkVal(2), time.Second, 10*time.Millisecond)

// release one handler
releaseChan <- struct{}{}
assert.Eventually(t, checkVal(3), time.Second, 10*time.Millisecond)
releaseChan <- struct{}{}
assert.Eventually(t, checkVal(4), time.Second, 10*time.Millisecond)
releaseChan <- struct{}{}
lrettig marked this conversation as resolved.
Show resolved Hide resolved
assert.Eventually(t, checkVal(4), time.Second, 10*time.Millisecond)
releaseChan <- struct{}{}
assert.Eventually(t, checkVal(4), time.Second, 10*time.Millisecond)
}