Skip to content

Commit

Permalink
Propagate request ID through gRPC context
Browse files Browse the repository at this point in the history
The request ID only gets propagated through HTTP calls and is not available
in gRPC servers.

This commit adds intereceptors to grpc servers and clients to make sure request ID
propagation happens.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed May 14, 2024
1 parent 9b26db4 commit 37a8076
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 3 deletions.
3 changes: 3 additions & 0 deletions pkg/extgrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"

grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/tracing"
)
Expand Down Expand Up @@ -58,12 +59,14 @@ func StoreClientGRPCOpts(logger log.Logger, reg prometheus.Registerer, tracer op
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpcserver.NewUnaryClientRequestIDInterceptor(),
grpcMets.UnaryClientInterceptor(),
tracing.UnaryClientInterceptor(tracer),
),
),
grpc.WithStreamInterceptor(
grpc_middleware.ChainStreamClient(
grpcserver.NewStreamClientRequestIDInterceptor(),
grpcMets.StreamClientInterceptor(),
tracing.StreamClientInterceptor(tracer),
),
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,15 @@ func New(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer
grpc.MaxSendMsgSize(math.MaxInt32),
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc_middleware.WithUnaryServerChain(
NewUnaryServerRequestIDInterceptor(),
grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
met.UnaryServerInterceptor(),
tags.UnaryServerInterceptor(tagsOpts...),
tracing.UnaryServerInterceptor(tracer),
grpc_logging.UnaryServerInterceptor(kit.InterceptorLogger(logger), logOpts...),
),
grpc_middleware.WithStreamServerChain(
NewStreamServerRequestIDInterceptor(),
grpc_recovery.StreamServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
met.StreamServerInterceptor(),
tags.StreamServerInterceptor(tagsOpts...),
Expand Down
64 changes: 64 additions & 0 deletions pkg/server/grpc/request_id.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package grpc

import (
"context"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/thanos-io/thanos/pkg/server/http/middleware"
)

const requestIDKey = "request-id"

func NewUnaryClientRequestIDInterceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
reqID, ok := middleware.RequestIDFromContext(ctx)
if ok {
ctx = metadata.AppendToOutgoingContext(ctx, requestIDKey, reqID)
}
return invoker(ctx, method, req, reply, cc, opts...)
}
}

func NewUnaryServerRequestIDInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
if vals := metadata.ValueFromIncomingContext(ctx, requestIDKey); len(vals) == 1 {
ctx = middleware.NewContextWithRequestID(ctx, vals[0])
}
return handler(ctx, req)
}
}

func NewStreamClientRequestIDInterceptor() grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
reqID, ok := middleware.RequestIDFromContext(ctx)
if ok {
ctx = metadata.AppendToOutgoingContext(ctx, requestIDKey, reqID)
}
return streamer(ctx, desc, cc, method, opts...)
}
}

func NewStreamServerRequestIDInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if vals := metadata.ValueFromIncomingContext(ss.Context(), requestIDKey); len(vals) == 1 {
ctx := middleware.NewContextWithRequestID(ss.Context(), vals[0])
return handler(srv, newStreamWithContext(ctx, ss))
}
return handler(srv, ss)
}
}

type streamWithContext struct {
grpc.ServerStream
ctx context.Context
}

func newStreamWithContext(ctx context.Context, serverStream grpc.ServerStream) *streamWithContext {
return &streamWithContext{ServerStream: serverStream, ctx: ctx}
}

func (s streamWithContext) Context() context.Context {
return s.ctx
}
6 changes: 3 additions & 3 deletions pkg/server/http/middleware/request_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ type ctxKey int

const reqIDKey = ctxKey(0)

// newContextWithRequestID creates a context with a request id.
func newContextWithRequestID(ctx context.Context, rid string) context.Context {
// NewContextWithRequestID creates a context with a request id.
func NewContextWithRequestID(ctx context.Context, rid string) context.Context {
return context.WithValue(ctx, reqIDKey, rid)
}

Expand All @@ -36,7 +36,7 @@ func RequestID(h http.Handler) http.HandlerFunc {
reqID = ulid.MustNew(ulid.Timestamp(time.Now()), entropy).String()
r.Header.Set("X-Request-ID", reqID)
}
ctx := newContextWithRequestID(r.Context(), reqID)
ctx := NewContextWithRequestID(r.Context(), reqID)
h.ServeHTTP(w, r.WithContext(ctx))
}
}

0 comments on commit 37a8076

Please sign in to comment.