Skip to content

Commit

Permalink
chore(controller): Remove stream concurrency limits (#12598)
Browse files Browse the repository at this point in the history
Our gRPC servers use the default gRPC server configuration, which
limits the number of concurrent streams to 100. Since the controllers
run with proxies, this provides a hard scaling limit for the number of
watches an application can have.

This change updates our gRPC server configuration to clear the default
concurrency limit, allowing the server to handle as many streams as
possible.

Signed-off-by: Matei David <matei@buoyant.io>
Co-authored-by: Oliver Gould <ver@buoyant.io>
  • Loading branch information
mateiidavid and olix0r committed May 15, 2024
1 parent 21dd252 commit 407df01
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 9 deletions.
2 changes: 1 addition & 1 deletion controller/api/destination/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func NewServer(
shutdown,
}

s := prometheus.NewGrpcServer()
s := prometheus.NewGrpcServer(grpc.MaxConcurrentStreams(0))
// linkerd2-proxy-api/destination.Destination (proxy-facing)
pb.RegisterDestinationServer(s, &srv)
return s, nil
Expand Down
3 changes: 2 additions & 1 deletion controller/cmd/identity/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/linkerd/linkerd2/pkg/tls"
"github.com/linkerd/linkerd2/pkg/trace"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -203,7 +204,7 @@ func Main(args []string) {
log.Warnf("failed to initialize tracing: %s", err)
}
}
srv := prometheus.NewGrpcServer()
srv := prometheus.NewGrpcServer(grpc.MaxConcurrentStreams(0))
identity.Register(srv, svc)
go func() {
log.Infof("starting gRPC server on %s", *addr)
Expand Down
10 changes: 6 additions & 4 deletions pkg/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,13 @@ func init() {
}

// NewGrpcServer returns a grpc server pre-configured with prometheus interceptors and oc-grpc handler
func NewGrpcServer() *grpc.Server {
func NewGrpcServer(opt ...grpc.ServerOption) *grpc.Server {
server := grpc.NewServer(
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
grpc.StatsHandler(&ocgrpc.ServerHandler{}),
append([]grpc.ServerOption{
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
grpc.StatsHandler(&ocgrpc.ServerHandler{}),
}, opt...)...,
)

grpc_prometheus.EnableHandlingTimeHistogram()
Expand Down
3 changes: 1 addition & 2 deletions viz/metrics-api/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ func NewGrpcServer(
ignoredNamespaces: ignoredNamespaces,
}

pb.RegisterApiServer(prometheus.NewGrpcServer(), server)
s := prometheus.NewGrpcServer()
s := prometheus.NewGrpcServer(grpc.MaxConcurrentStreams(0))
pb.RegisterApiServer(s, server)

return s
Expand Down
2 changes: 1 addition & 1 deletion viz/tap/api/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ func newGRPCTapServer(
ignoreHeaders: ignoreHeaders,
}

s := prometheus.NewGrpcServer()
s := prometheus.NewGrpcServer(grpc.MaxConcurrentStreams(0))
tapPb.RegisterTapServer(s, srv)

return srv
Expand Down

0 comments on commit 407df01

Please sign in to comment.