Skip to content

Commit

Permalink
feat: propagete trace context in queues (#3645)
Browse files Browse the repository at this point in the history
* add context as part of the Queue.Enqueue method

* propagate context to nats header

* add context to the queue listen method

* make sure propagator is not nil

* use another ctx for worker
  • Loading branch information
mathnogueira committed Feb 15, 2024
1 parent 929398a commit 95758ba
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 13 deletions.
16 changes: 11 additions & 5 deletions server/pkg/pipeline/inmemory_driver.go
@@ -1,21 +1,27 @@
package pipeline

import (
"context"
"fmt"
)

func NewInMemoryQueueDriver[T any](name string) *InMemoryQueueDriver[T] {
return &InMemoryQueueDriver[T]{
log: newLoggerFn(fmt.Sprintf("InMemoryQueueDriver - %s", name)),
queue: make(chan T),
queue: make(chan queueMessage[T]),
exit: make(chan bool),
name: name,
}
}

type queueMessage[T any] struct {
ctx context.Context
message T
}

type InMemoryQueueDriver[T any] struct {
log loggerFn
queue chan T
queue chan queueMessage[T]
exit chan bool
listener Listener[T]
name string
Expand All @@ -25,8 +31,8 @@ func (qd *InMemoryQueueDriver[T]) SetListener(l Listener[T]) {
qd.listener = l
}

func (qd InMemoryQueueDriver[T]) Enqueue(item T) {
qd.queue <- item
func (qd InMemoryQueueDriver[T]) Enqueue(ctx context.Context, item T) {
qd.queue <- queueMessage[T]{ctx, item}
}

func (qd InMemoryQueueDriver[T]) Start() {
Expand All @@ -38,7 +44,7 @@ func (qd InMemoryQueueDriver[T]) Start() {
qd.log("exit")
return
case job := <-qd.queue:
qd.listener.Listen(job)
qd.listener.Listen(job.ctx, job.message)
}
}
}()
Expand Down
18 changes: 16 additions & 2 deletions server/pkg/pipeline/nats_driver.go
@@ -1,10 +1,13 @@
package pipeline

import (
"context"
"encoding/json"
"fmt"

"github.com/nats-io/nats.go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
)

type NatsDriver[T any] struct {
Expand All @@ -22,14 +25,20 @@ func NewNatsDriver[T any](conn *nats.Conn, topic string) *NatsDriver[T] {
}
}

func (d *NatsDriver[T]) Enqueue(msg T) {
func (d *NatsDriver[T]) Enqueue(ctx context.Context, msg T) {
msgJson, err := json.Marshal(msg)
if err != nil {
fmt.Printf("could not marshal message: %s\n", err.Error())
}

header := make(nats.Header)
if propagator := otel.GetTextMapPropagator(); propagator != nil {
propagator.Inject(ctx, propagation.HeaderCarrier(header))
}

err = d.conn.PublishMsg(&nats.Msg{
Subject: d.topic,
Header: header,
Data: msgJson,
})

Expand All @@ -47,8 +56,13 @@ func (d *NatsDriver[T]) SetListener(listener Listener[T]) {
fmt.Printf(`could not unmarshal message got in queue "%s": %s\n`, d.topic, err.Error())
}

ctx := context.Background()
if propagator := otel.GetTextMapPropagator(); propagator != nil {
ctx = propagator.Extract(ctx, propagation.HeaderCarrier(msg.Header))
}

// TODO: We probably should return an error for acking or nacking this message
listener.Listen(target)
listener.Listen(ctx, target)

msg.Ack()
})
Expand Down
18 changes: 12 additions & 6 deletions server/pkg/pipeline/queue.go
Expand Up @@ -4,8 +4,10 @@ import (
"context"

"github.com/alitto/pond"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/propagation"
)

type Enqueuer[T any] interface {
Expand All @@ -16,11 +18,11 @@ type QueueItemProcessor[T any] interface {
ProcessItem(context.Context, T)
}
type Listener[T any] interface {
Listen(T)
Listen(context.Context, T)
}

type QueueDriver[T any] interface {
Enqueue(T)
Enqueue(context.Context, T)
SetListener(Listener[T])
}

Expand Down Expand Up @@ -78,6 +80,12 @@ func (q Queue[T]) Enqueue(ctx context.Context, item T) {
return
}

workerCtx := context.Background()
if propagator := otel.GetTextMapPropagator(); propagator != nil {
var carrier propagation.HeaderCarrier
workerCtx = propagator.Extract(workerCtx, carrier)
}

// use a worker to enqueue the job in case the driver takes a bit to actually enqueue
// this way we release the caller as soon as possible
q.workerPool.Submit(func() {
Expand All @@ -88,13 +96,11 @@ func (q Queue[T]) Enqueue(ctx context.Context, item T) {
q.enqueueHistogram.Record(ctx, 1, metric.WithAttributes(
attribute.String("queue.name", q.name),
))
q.driver.Enqueue(item)
q.driver.Enqueue(workerCtx, item)
})
}

func (q Queue[T]) Listen(item T) {
ctx := context.Background()

func (q Queue[T]) Listen(ctx context.Context, item T) {
if q.ListenPreprocessorFn != nil {
ctx, item = q.ListenPreprocessorFn(ctx, item)
}
Expand Down

0 comments on commit 95758ba

Please sign in to comment.