Skip to content

Commit

Permalink
Propagate request ID through gRPC context (#7356)
Browse files Browse the repository at this point in the history
* Propagate request ID through gRPC context

The request ID only gets propagated through HTTP calls and is not available
in gRPC servers.

This commit adds intereceptors to grpc servers and clients to make sure request ID
propagation happens.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Add license

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

---------

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed May 15, 2024
1 parent 9b26db4 commit 9707a4f
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 3 deletions.
3 changes: 3 additions & 0 deletions pkg/extgrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"

grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/tracing"
)
Expand Down Expand Up @@ -58,12 +59,14 @@ func StoreClientGRPCOpts(logger log.Logger, reg prometheus.Registerer, tracer op
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpcserver.NewUnaryClientRequestIDInterceptor(),
grpcMets.UnaryClientInterceptor(),
tracing.UnaryClientInterceptor(tracer),
),
),
grpc.WithStreamInterceptor(
grpc_middleware.ChainStreamClient(
grpcserver.NewStreamClientRequestIDInterceptor(),
grpcMets.StreamClientInterceptor(),
tracing.StreamClientInterceptor(tracer),
),
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,15 @@ func New(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer
grpc.MaxSendMsgSize(math.MaxInt32),
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc_middleware.WithUnaryServerChain(
NewUnaryServerRequestIDInterceptor(),
grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
met.UnaryServerInterceptor(),
tags.UnaryServerInterceptor(tagsOpts...),
tracing.UnaryServerInterceptor(tracer),
grpc_logging.UnaryServerInterceptor(kit.InterceptorLogger(logger), logOpts...),
),
grpc_middleware.WithStreamServerChain(
NewStreamServerRequestIDInterceptor(),
grpc_recovery.StreamServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
met.StreamServerInterceptor(),
tags.StreamServerInterceptor(tagsOpts...),
Expand Down
67 changes: 67 additions & 0 deletions pkg/server/grpc/request_id.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package grpc

import (
"context"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/thanos-io/thanos/pkg/server/http/middleware"
)

const requestIDKey = "request-id"

func NewUnaryClientRequestIDInterceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
reqID, ok := middleware.RequestIDFromContext(ctx)
if ok {
ctx = metadata.AppendToOutgoingContext(ctx, requestIDKey, reqID)
}
return invoker(ctx, method, req, reply, cc, opts...)
}
}

func NewUnaryServerRequestIDInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
if vals := metadata.ValueFromIncomingContext(ctx, requestIDKey); len(vals) == 1 {
ctx = middleware.NewContextWithRequestID(ctx, vals[0])
}
return handler(ctx, req)
}
}

func NewStreamClientRequestIDInterceptor() grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
reqID, ok := middleware.RequestIDFromContext(ctx)
if ok {
ctx = metadata.AppendToOutgoingContext(ctx, requestIDKey, reqID)
}
return streamer(ctx, desc, cc, method, opts...)
}
}

func NewStreamServerRequestIDInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if vals := metadata.ValueFromIncomingContext(ss.Context(), requestIDKey); len(vals) == 1 {
ctx := middleware.NewContextWithRequestID(ss.Context(), vals[0])
return handler(srv, newStreamWithContext(ctx, ss))
}
return handler(srv, ss)
}
}

type streamWithContext struct {
grpc.ServerStream
ctx context.Context
}

func newStreamWithContext(ctx context.Context, serverStream grpc.ServerStream) *streamWithContext {
return &streamWithContext{ServerStream: serverStream, ctx: ctx}
}

func (s streamWithContext) Context() context.Context {
return s.ctx
}
6 changes: 3 additions & 3 deletions pkg/server/http/middleware/request_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ type ctxKey int

const reqIDKey = ctxKey(0)

// newContextWithRequestID creates a context with a request id.
func newContextWithRequestID(ctx context.Context, rid string) context.Context {
// NewContextWithRequestID creates a context with a request id.
func NewContextWithRequestID(ctx context.Context, rid string) context.Context {
return context.WithValue(ctx, reqIDKey, rid)
}

Expand All @@ -36,7 +36,7 @@ func RequestID(h http.Handler) http.HandlerFunc {
reqID = ulid.MustNew(ulid.Timestamp(time.Now()), entropy).String()
r.Header.Set("X-Request-ID", reqID)
}
ctx := newContextWithRequestID(r.Context(), reqID)
ctx := NewContextWithRequestID(r.Context(), reqID)
h.ServeHTTP(w, r.WithContext(ctx))
}
}

0 comments on commit 9707a4f

Please sign in to comment.