Skip to content

Commit

Permalink
Add configurable MaxRequestsPerConn cluster param
Browse files Browse the repository at this point in the history
Before this change, a connection had a hardcoded maximum number of
streams. Since this value was so large (32768 for CQL v3+) this
essentially meant an unbound concurrency.

This commit allows a user to configure this parameter by a newly
added MaxRequestsPerConn option.

The "MaxRequestsPerConn" naming was chosen to be very general
deliberately to allow us to safely introduce a more advanced behavior in
the future: where the concurrency limiting happens not at a IDGenerator
level and an additional client-side queue is introduced for requests
to wait to be sent to Scylla (in addition to inflight requests).

This commit deliberately does not change the defaults as to be
non-pessimizing for the users.

Fixes #112
  • Loading branch information
avelanarius committed Feb 16, 2023
1 parent c8cd0ba commit cc4c65b
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 1 deletion.
5 changes: 5 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ type ClusterConfig struct {
// Default: 2
NumConns int

// Maximum number of inflight requests allowed per connection.
// Default: 32768 for CQL v3 and newer
// Default: 128 for older CQL versions
MaxRequestsPerConn int

// Default consistency level.
// Default: Quorum
Consistency Consistency
Expand Down
9 changes: 8 additions & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (s *Session) dialWithoutObserver(ctx context.Context, host *HostInfo, cfg *
errorHandler: errorHandler,
compressor: cfg.Compressor,
session: s,
streams: streams.New(cfg.ProtoVersion),
streams: s.streamIDGenerator(cfg.ProtoVersion),
host: host,
isSchemaV2: true, // Try using "system.peers_v2" until proven otherwise
frameObserver: s.frameObserver,
Expand All @@ -313,6 +313,13 @@ func (s *Session) dialWithoutObserver(ctx context.Context, host *HostInfo, cfg *
return c, nil
}

func (s *Session) streamIDGenerator(protocol int) *streams.IDGenerator {
if s.cfg.MaxRequestsPerConn > 0 {
return streams.NewLimited(s.cfg.MaxRequestsPerConn)
}
return streams.New(protocol)
}

func (c *Conn) init(ctx context.Context, dialedHost *DialedHost) error {
if c.session.cfg.AuthProvider != nil {
var err error
Expand Down
7 changes: 7 additions & 0 deletions internal/streams/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ func New(protocol int) *IDGenerator {
if protocol > 2 {
maxStreams = 32768
}
return NewLimited(maxStreams)
}

func NewLimited(maxStreams int) *IDGenerator {
// Round up maxStreams to a nearest
// multiple of 64
maxStreams = ((maxStreams + 63) / 64) * 64

buckets := maxStreams / 64
// reserve stream 0
Expand Down

0 comments on commit cc4c65b

Please sign in to comment.