Skip to content

Commit

Permalink
Try #2463:
Browse files Browse the repository at this point in the history
  • Loading branch information
bors[bot] committed Oct 16, 2021
2 parents f0af702 + b4fa583 commit 15cbf62
Show file tree
Hide file tree
Showing 19 changed files with 157 additions and 82 deletions.
3 changes: 0 additions & 3 deletions .env
Expand Up @@ -67,9 +67,6 @@ SMESH_TORTOISE_BEACON_VOTES_LIMIT=100
# Number of blocks to wait for beacon sync
SMESH_TORTOISE_BEACON_SYNC_NUM_BLOCKS=1600

# ?
SMESH_SYNC_REQ_TIMEOUT=60000

# ?
SMESH_POST_LABELS=100

Expand Down
1 change: 0 additions & 1 deletion cmd/node/multi_node.go
Expand Up @@ -162,7 +162,6 @@ func getTestDefaultConfig() *config.Config {
cfg.LayerDurationSec = 20
cfg.HareEligibility.ConfidenceParam = 4
cfg.HareEligibility.EpochOffset = 0
cfg.SyncRequestTimeout = 500
cfg.SyncInterval = 2

cfg.FETCH.RequestTimeout = 10
Expand Down
2 changes: 1 addition & 1 deletion cmd/node/node.go
Expand Up @@ -648,7 +648,7 @@ func (app *App) initServices(ctx context.Context,
// TODO: genesisMinerWeight is set to app.Config.SpaceToCommit, because PoET ticks are currently hardcoded to 1
}

gossipListener := service.NewListener(swarm, layerFetch, newSyncer.ListenToGossip, app.addLogger(GossipListener, lg))
gossipListener := service.NewListener(swarm, layerFetch, newSyncer.ListenToGossip, app.Config.P2P, app.addLogger(GossipListener, lg))
rabbit := app.HareFactory(ctx, mdb, swarm, sgn, nodeID, patrol, newSyncer, msh, hOracle, idStore, clock, lg)

stateAndMeshProjector := pendingtxs.NewStateAndMeshProjector(processor, msh)
Expand Down
2 changes: 0 additions & 2 deletions cmd/root.go
Expand Up @@ -57,8 +57,6 @@ func AddCommands(cmd *cobra.Command) {
cmd.PersistentFlags().StringVar(&config.ProfilerName, "profiler-name",
config.ProfilerURL, "the name to use when sending profiles")

cmd.PersistentFlags().IntVar(&config.SyncRequestTimeout, "sync-request-timeout",
config.SyncRequestTimeout, "the timeout in ms for direct requests in the sync")
cmd.PersistentFlags().IntVar(&config.AtxsPerBlock, "atxs-per-block",
config.AtxsPerBlock, "the number of atxs to select per block on block creation")
cmd.PersistentFlags().IntVar(&config.TxsPerBlock, "txs-per-block",
Expand Down
1 change: 0 additions & 1 deletion cmd/sync/sync.go
Expand Up @@ -94,7 +94,6 @@ func (app *syncApp) start(_ *cobra.Command, _ []string) {
log.String("storage_path", bucket),
log.Bool("download_from_remote_storage", remote),
log.Uint32("expected_layers", expectedLayers),
log.Int("request_timeout", app.Config.SyncRequestTimeout),
log.String("data_version", version),
log.Uint32("layers_per_epoch", app.Config.LayersPerEpoch),
log.Uint32("hdist", app.Config.Hdist),
Expand Down
3 changes: 0 additions & 3 deletions config/config.go
Expand Up @@ -100,8 +100,6 @@ type BaseConfig struct {

GenesisActiveSet int `mapstructure:"genesis-active-size"` // the active set size for genesis

SyncRequestTimeout int `mapstructure:"sync-request-timeout"` // ms the timeout for direct request in the sync

SyncInterval int `mapstructure:"sync-interval"` // sync interval in seconds

PublishEventsURL string `mapstructure:"events-url"`
Expand Down Expand Up @@ -207,7 +205,6 @@ func defaultBaseConfig() BaseConfig {
LocalThreshold: big.NewRat(20, 100), // fraction
TortoiseRerunInterval: 60 * 24, // in minutes, once per day
BlockCacheSize: 20,
SyncRequestTimeout: 2000,
SyncInterval: 10,
AtxsPerBlock: 100,
TxsPerBlock: 100,
Expand Down
1 change: 0 additions & 1 deletion docker-compose.yml
Expand Up @@ -32,7 +32,6 @@ services:
"--tortoise-beacon-theta", "${SMESH_TORTOISE_BEACON_THETA:?1/4}",
"--tortoise-beacon-votes-limit", "${SMESH_TORTOISE_BEACON_VOTES_LIMIT:-100}",
"--tortoise-beacon-sync-num-blocks", "${SMESH_TORTOISE_BEACON_SYNC_NUM_BLOCKS:-1600}",
"--sync-request-timeout", "${SMESH_SYNC_REQ_TIMEOUT:-60000}",
"--post-labels", "${SMESH_POST_LABELS:-100}",
"--max-inbound", "${SMESH_MAX_INBOUND:-12}",
"--genesis-time", "${SMESH_GENESIS_TIME:?SMESH_GENESIS_TIME Must be specified}",
Expand Down
7 changes: 3 additions & 4 deletions layerfetcher/layers.go
Expand Up @@ -271,8 +271,7 @@ func (l *Logic) PollLayerContent(ctx context.Context, layerID types.LayerID) cha
errFunc := func(err error) {
l.receiveLayerContent(ctx, layerID, peer, numPeers, nil, err)
}
err := l.net.SendRequest(ctx, server.LayerBlocksMsg, layerID.Bytes(), peer, receiveForPeerFunc, errFunc)
if err != nil {
if err := l.net.SendRequest(ctx, server.LayerBlocksMsg, layerID.Bytes(), peer, receiveForPeerFunc, errFunc); err != nil {
l.receiveLayerContent(ctx, layerID, peer, numPeers, nil, err)
}
}
Expand Down Expand Up @@ -400,8 +399,8 @@ func notifyLayerBlocksResult(ctx context.Context, layerID types.LayerID, layerDB
}
}
}

logger.With().Debug("notifying layer blocks result", log.String("blocks", fmt.Sprintf("%+v", result)))
logger.With().Debug("notifying layer blocks result",
log.String("blocks", fmt.Sprintf("%+v", result)))
for _, ch := range channels {
ch <- result
}
Expand Down
2 changes: 2 additions & 0 deletions layerfetcher/layers_test.go
Expand Up @@ -67,6 +67,7 @@ func (m *mockNet) SendRequest(_ context.Context, msgType server.MessageType, _ [
errorHandler(errors.New("peer timeout"))
return nil
}

switch msgType {
case server.LayerBlocksMsg:
if data, ok := m.layerBlocks[address]; ok {
Expand All @@ -76,6 +77,7 @@ func (m *mockNet) SendRequest(_ context.Context, msgType server.MessageType, _ [
}
return m.errors[address]
}

func (mockNet) Close() {}

type mockFetcher struct {
Expand Down
6 changes: 6 additions & 0 deletions p2p/config/config.go
Expand Up @@ -20,6 +20,9 @@ const (
defaultTCPPort = 7513
// defaultTCPInterface is the inet interface that P2P listens on by default.
defaultTCPInterface = "0.0.0.0"
// max number of parallel gossip goroutines per channel (protocol)
// TODO: correctly parameterize this. see https://github.com/spacemeshos/go-spacemesh/issues/2708.
defaultMaxGossipRoutines = 50
)

// Values specifies default values for node config params.
Expand Down Expand Up @@ -56,6 +59,7 @@ type Config struct {
SwarmConfig SwarmConfig `mapstructure:"swarm"`
BufferSize int `mapstructure:"buffer-size"`
MsgSizeLimit int `mapstructure:"msg-size-limit"` // in bytes
MaxGossipRoutines int `mapstructure:"max-gossip-routines"`
}

// SwarmConfig specifies swarm config params.
Expand Down Expand Up @@ -98,6 +102,7 @@ func DefaultConfig() Config {
SwarmConfig: SwarmConfigValues,
BufferSize: 10000,
MsgSizeLimit: UnlimitedMsgSize,
MaxGossipRoutines: defaultMaxGossipRoutines,
}
}

Expand All @@ -106,5 +111,6 @@ func DefaultTestConfig() Config {
conf := DefaultConfig()
conf.TCPPort += 10000
conf.TCPInterface = "127.0.0.1"
conf.MaxGossipRoutines = 2
return conf
}
14 changes: 8 additions & 6 deletions p2p/net/conn.go
Expand Up @@ -261,10 +261,11 @@ func (c *FormattedConnection) sendListener() {
for {
select {
case m := <-c.messages:
c.logger.With().Debug("connection: sending outgoing message",
log.String("peer_id", m.peerID),
log.String("requestId", m.reqID),
log.Int("queue_length", len(c.messages)))
// this causes issues with tests, leaving it here for debugging purposes
// c.logger.With().Debug("connection: sending outgoing message",
// log.String("peer_id", m.peerID),
// log.String("requestId", m.reqID),
// log.Int("queue_length", len(c.messages)))

// TODO: do we need to propagate this error upwards or add a callback?
// see https://github.com/spacemeshos/go-spacemesh/issues/2733
Expand All @@ -291,8 +292,9 @@ func (c *FormattedConnection) Send(ctx context.Context, m []byte) error {
reqID, _ := log.ExtractRequestID(ctx)
peerID, _ := ctx.Value(log.PeerIDKey).(string)

c.logger.WithContext(ctx).With().Debug("connection: enqueuing outgoing message",
log.Int("queue_length", len(c.messages)))
// this causes issues with tests, leaving it here for debugging purposes
// c.logger.WithContext(ctx).With().Debug("connection: enqueuing outgoing message",
// log.Int("queue_length", len(c.messages)))
if len(c.messages) > 30 {
c.logger.WithContext(ctx).With().Warning("connection: outbound send queue backlog",
log.Int("queue_length", len(c.messages)))
Expand Down
14 changes: 8 additions & 6 deletions p2p/net/msgcon.go
Expand Up @@ -176,10 +176,11 @@ func (c *MsgConnection) sendListener() {
for {
select {
case m := <-c.messages:
c.logger.With().Debug("msgconnection: sending outgoing message",
log.String("peer_id", m.peerID),
log.String("requestId", m.reqID),
log.Int("queue_length", len(c.messages)))
// this causes issues with tests, leaving it here for debugging purposes
// c.logger.With().Debug("msgconnection: sending outgoing message",
// log.String("peer_id", m.peerID),
// log.String("requestId", m.reqID),
// log.Int("queue_length", len(c.messages)))

// TODO: do we need to propagate this error upwards or add a callback?
// see https://github.com/spacemeshos/go-spacemesh/issues/2733
Expand Down Expand Up @@ -209,8 +210,9 @@ func (c *MsgConnection) Send(ctx context.Context, m []byte) error {
reqID, _ := log.ExtractRequestID(ctx)
peerID, _ := ctx.Value(log.PeerIDKey).(string)

c.logger.WithContext(ctx).With().Debug("msgconnection: enqueuing outgoing message",
log.Int("queue_length", len(c.messages)))
// this causes issues with tests, leaving it here for debugging purposes
// c.logger.WithContext(ctx).With().Debug("msgconnection: enqueuing outgoing message",
// log.Int("queue_length", len(c.messages)))
if len(c.messages) > 30 {
c.logger.WithContext(ctx).With().Warning("msgconnection: outbound send queue backlog",
log.Int("queue_length", len(c.messages)))
Expand Down
10 changes: 6 additions & 4 deletions p2p/server/msgserver.go
Expand Up @@ -155,16 +155,17 @@ func (p *MessageServer) Close() {
p.With().Info("closing message server")
p.cancel()
p.With().Info("waiting for message workers to finish")
p.eg.Wait()
_ = p.eg.Wait()
p.With().Info("message workers all done")
}

// readLoop reads incoming messages and matches them to requests or responses.
func (p *MessageServer) readLoop(ctx context.Context) error {
logger := p.WithContext(ctx)
sctx := log.WithNewSessionID(ctx)
timer := time.NewTicker(p.requestLifetime + time.Millisecond*100)
defer timer.Stop()
defer p.With().Info("shutting down protocol", log.String("protocol", p.name))
defer logger.With().Info("shutting down protocol", log.String("protocol", p.name))
for {
select {
case <-ctx.Done():
Expand All @@ -177,9 +178,10 @@ func (p *MessageServer) readLoop(ctx context.Context) error {
case msg, ok := <-p.ingressChannel:
// generate new reqID for message
ctx := log.WithNewRequestID(ctx)
p.WithContext(ctx).Debug("new msg received from channel")
// this causes issues with tests, leaving it here for debugging purposes
// logger.Debug("new msg received from channel")
if !ok {
p.WithContext(ctx).Error("read loop channel was closed")
logger.Error("read loop channel was closed")
return context.Canceled
}
select {
Expand Down
56 changes: 51 additions & 5 deletions p2p/service/gossip_listener.go
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/p2p/config"
"github.com/spacemeshos/go-spacemesh/priorityq"
)

Expand All @@ -23,16 +24,18 @@ type Listener struct {
fetcher Fetcher
wg sync.WaitGroup
shouldListenToGossip enableGossipFunc
config config.Config
}

// NewListener creates a new listener struct.
func NewListener(net Service, fetcher Fetcher, shouldListenToGossip enableGossipFunc, log log.Log) *Listener {
func NewListener(net Service, fetcher Fetcher, shouldListenToGossip enableGossipFunc, config config.Config, log log.Log) *Listener {
return &Listener{
Log: &log,
net: net,
fetcher: fetcher,
shouldListenToGossip: shouldListenToGossip,
wg: sync.WaitGroup{},
config: config,
}
}

Expand Down Expand Up @@ -67,20 +70,63 @@ func (l *Listener) Stop() {

func (l *Listener) listenToGossip(ctx context.Context, dataHandler GossipDataHandler, gossipChannel chan GossipMessage, stop chan struct{}, channel string) {
l.WithContext(ctx).With().Info("start listening to gossip", log.String("protocol", channel))

// set channel capacity to limit number of concurrent routines
tokenChan := make(chan struct{}, l.config.MaxGossipRoutines)

waitForGossipToken := func(ctx context.Context) {
// this causes issues with tests, leaving here for debug purposes
// l.WithContext(ctx).With().Debug("waiting for available slot for gossip handler",
// log.Int("available_slots", cap(tokenChan)-len(tokenChan)),
// log.Int("total_slots", cap(tokenChan)))

// get a token to create a new channel
if len(tokenChan) == cap(tokenChan) {
l.WithContext(ctx).Error("no available slots for gossip handler, blocking")
}

tokenChan <- struct{}{}

// this causes issues with tests, leaving here for debug purposes
// l.WithContext(ctx).With().Debug("got gossip token",
// log.String("protocol", channel),
// log.Int("queue_length", len(gossipChannel)))
}

handleMsg := func(ctx context.Context, data GossipMessage) {
if !l.shouldListenToGossip() {
// not accepting data
l.WithContext(ctx).Info("not currently listening to gossip, dropping message")
<-tokenChan
return
}
go func() {
// this causes issues with tests, leaving here for debug purposes
// l.WithContext(ctx).Info("passing data to data handler")
// TODO: these handlers should have an API that includes a cancel method. they should time out eventually.
dataHandler(ctx, data, l.fetcher)
// replace token when done
<-tokenChan
}()
}

for {
select {
case <-stop:
l.wg.Done()
return
case data := <-gossipChannel:
l.WithContext(ctx).With().Debug("got gossip message, forwarding to data handler",
log.String("protocol", channel),
log.Int("queue_length", len(gossipChannel)))
// this causes issues with tests, leaving here for debug purposes
// l.WithContext(ctx).With().Debug("got gossip message, forwarding to data handler",
// log.String("protocol", channel),
// log.Int("queue_length", len(gossipChannel)))
if !l.shouldListenToGossip() {
// not accepting data
continue
}
dataHandler(log.WithNewRequestID(ctx), data, l.fetcher)
// block until there's a token available
waitForGossipToken(ctx)
handleMsg(log.WithNewRequestID(ctx), data)
}
}
}

0 comments on commit 15cbf62

Please sign in to comment.