Skip to content

Commit

Permalink
go: add sarama observability
Browse files Browse the repository at this point in the history
Signed-off-by: Victor Login <batazor@evrone.com>
  • Loading branch information
batazor committed Apr 8, 2023
1 parent 04094ee commit df83d27
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 4 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ require (
github.com/uptrace/opentelemetry-go-extra/otelzap v0.1.21
go.etcd.io/etcd/client/v3 v3.5.6
go.mongodb.org/mongo-driver v1.11.4
go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.40.0
go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo v0.40.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.40.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.40.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1637,6 +1637,8 @@ go.opentelemetry.io/contrib v0.20.0/go.mod h1:G/EtFaa6qaN7+LxqfIAT3GiZa7Wv5DTBUz
go.opentelemetry.io/contrib v1.0.0/go.mod h1:EH4yDYeNoaTqn/8yCWQmfNB78VHfGX2Jt2bvnvzBlGM=
go.opentelemetry.io/contrib v1.14.0 h1:XHKHbkvibjNRzXAIrubQXzNLImvkZrhOB2M1OodfAZs=
go.opentelemetry.io/contrib v1.14.0/go.mod h1:O3SXx534x0bWzGJlxXiUXpV7Ao7Iweib+s/urIXELrs=
go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.40.0 h1:yZ4ggdmq8F5loX3cDFuiJRJpD00FVIQ/WZ2zqbcpOwk=
go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.40.0/go.mod h1:12rKQHsBJpbAfaVMWUpiujAFNDJ9a15jwP/Nf2qgPV4=
go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo v0.40.0 h1:hATJDiGtTPWglqQRlWUiT5df32bOu9AJV41djhfF4Ig=
go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo v0.40.0/go.mod h1:nkEFz9FW/KZC65rsd8yrHm4aBKa5STMpe4/Xb5+LG64=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0/go.mod h1:oVGt1LRbBOBq1A5BQLlUg9UaU/54aiHw8cgjV3aWZ/E=
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/mq/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ type Consumer struct {
ch query.Response
}

// Setup is run at the beginning of a new session, before ConsumeClaim
// Setup is run at the beginning of a new session, before ConsumeClaim.
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited.
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
Expand Down
9 changes: 7 additions & 2 deletions internal/pkg/mq/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/Shopify/sarama"
"github.com/heptiolabs/healthcheck"
"github.com/spf13/viper"
"go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama"

"github.com/shortlink-org/shortlink/internal/pkg/mq/query"
)
Expand Down Expand Up @@ -92,11 +93,15 @@ func (k *Kafka) Publish(ctx context.Context, target string, message query.Messag
}

func (mq *Kafka) Subscribe(target string, message query.Response) error {
consumer := Consumer{
consumer := &Consumer{
ch: message,
}

if err := mq.consumer.Consume(context.Background(), []string{target}, &consumer); err != nil {
// OpenTelemetry
handler := otelsarama.WrapConsumerGroupHandler(consumer)

err := mq.consumer.Consume(context.Background(), []string{target}, handler)
if err != nil {
return err
}

Expand Down

0 comments on commit df83d27

Please sign in to comment.