Skip to content

Commit

Permalink
Try #5564:
Browse files Browse the repository at this point in the history
  • Loading branch information
spacemesh-bors[bot] authored Feb 14, 2024
2 parents 737c88d + 7efbea6 commit 07fe569
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 1 deletion.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ configuration is as follows:

### Improvements

* [#5564](https://github.com/spacemeshos/go-spacemesh/pull/5564) Use decaying tags for fetch peers. This prevents
libp2p's Connection Manager from breaking sync.
* [#5418](https://github.com/spacemeshos/go-spacemesh/pull/5418) Add `grpc-post-listener` to separate post service from
`grpc-private-listener` and not require mTLS for the post service.

Expand Down
10 changes: 9 additions & 1 deletion fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ type Config struct {
ServersConfig map[string]ServerConfig `mapstructure:"servers"`
PeersRateThreshold float64 `mapstructure:"peers-rate-threshold"`
// The maximum number of concurrent requests to get ATXs.
GetAtxsConcurrency int64 `mapstructure:"getatxsconcurrency"`
GetAtxsConcurrency int64 `mapstructure:"getatxsconcurrency"`
DecayingTag server.DecayingTagSpec `mapstructure:"decaying-tag"`
}

func (c Config) getServerConfig(protocol string) ServerConfig {
Expand Down Expand Up @@ -151,6 +152,12 @@ func DefaultConfig() Config {
},
PeersRateThreshold: 0.02,
GetAtxsConcurrency: 100,
DecayingTag: server.DecayingTagSpec{
Interval: time.Minute,
Inc: 1000,
Dec: 1000,
Cap: 10000,
},
}
}

Expand Down Expand Up @@ -290,6 +297,7 @@ func (f *Fetch) registerServer(
server.WithTimeout(f.cfg.RequestTimeout),
server.WithHardTimeout(f.cfg.RequestHardTimeout),
server.WithLog(f.logger),
server.WithDecayingTag(f.cfg.DecayingTag),
}
if f.cfg.EnableServerMetrics {
opts = append(opts, server.WithMetrics())
Expand Down
2 changes: 2 additions & 0 deletions p2p/server/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"time"

"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
Expand All @@ -17,6 +18,7 @@ type Host interface {
SetStreamHandler(protocol.ID, network.StreamHandler)
NewStream(context.Context, peer.ID, ...protocol.ID) (network.Stream, error)
Network() network.Network
ConnManager() connmgr.ConnManager
}

type peerStream interface {
Expand Down
39 changes: 39 additions & 0 deletions p2p/server/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions p2p/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"time"

"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
Expand All @@ -20,6 +21,13 @@ import (
"github.com/spacemeshos/go-spacemesh/log"
)

type DecayingTagSpec struct {
Interval time.Duration `mapstructure:"interval"`
Inc int `mapstructure:"inc"`
Dec int `mapstructure:"dec"`
Cap int `mapstructure:"cap"`
}

// ErrNotConnected is returned when peer is not connected.
var ErrNotConnected = errors.New("peer is not connected")

Expand Down Expand Up @@ -88,6 +96,12 @@ func WithRequestsPerInterval(n int, interval time.Duration) Opt {
}
}

func WithDecayingTag(tag DecayingTagSpec) Opt {
return func(s *Server) {
s.decayingTagSpec = &tag
}
}

// Handler is the handler to be defined by the application.
type Handler func(context.Context, []byte) ([]byte, error)

Expand All @@ -110,6 +124,8 @@ type Server struct {
queueSize int
requestsPerInterval int
interval time.Duration
decayingTagSpec *DecayingTagSpec
decayingTag connmgr.DecayingTag

metrics *tracker // metrics can be nil

Expand All @@ -133,6 +149,23 @@ func New(h Host, proto string, handler Handler, opts ...Opt) *Server {
for _, opt := range opts {
opt(srv)
}

if srv.decayingTagSpec != nil {
decayer, supported := connmgr.SupportsDecay(h.ConnManager())
if supported {
tag, err := decayer.RegisterDecayingTag(
"server:"+proto,
srv.decayingTagSpec.Interval,
connmgr.DecayFixed(srv.decayingTagSpec.Dec),
connmgr.BumpSumBounded(0, srv.decayingTagSpec.Cap))
if err != nil {
srv.logger.Error("error registering decaying tag", log.Err(err))
} else {
srv.decayingTag = tag
}
}
}

return srv
}

Expand Down Expand Up @@ -176,6 +209,9 @@ func (s *Server) Run(ctx context.Context) error {
return nil
}
eg.Go(func() error {
if s.decayingTag != nil {
s.decayingTag.Bump(req.stream.Conn().RemotePeer(), s.decayingTagSpec.Inc)
}
ok := s.queueHandler(ctx, req.stream)
if s.metrics != nil {
s.metrics.serverLatency.Observe(time.Since(req.received).Seconds())
Expand Down

0 comments on commit 07fe569

Please sign in to comment.