Skip to content

[otel] Redis not instrumenting in kafka consumer's processing #2812

@nifrasinnovent

Description

@nifrasinnovent

I'm facing an issue of instrumenting the redis after i received a message from kafka consumer. But mongo & kafka producer are instrumenting in the same consumer's processing

Expected Behavior

redis should push the spans along with consumer's traceid

Current Behavior

except redis, kafka producer & mongo are instrumenting

Possible Solution

Seems like an issue of the context, but not sure

Steps to Reproduce

`
func redisInit(ctx context.Context) {
    RedisDB = redis.NewClient(&redis.Options{
	    Addr:     viper.GetString("redis.host") + ":" + viper.GetString("redis.port"),
	    Password: viper.GetString("redis.password"),
	    DB:       0,
    })
    if err := redisotel.InstrumentTracing(RedisDB); err != nil {
	    panic(err)
    }

    result, err := RedisDB.Ping(ctx).Result()
    if err != nil {
	    fmt.Println("Error pinging Redis server:", err)
	    return
    }

    fmt.Println("Redis Ping result:", result)
}

`

`
func (idc *KafkaController) ProcessMessage(topic string, msg *sarama.ConsumerMessage) <-chan bool {

done := make(chan bool)
propagators := propagation.TraceContext{}
ctx := propagators.Extract(context.Background(), otelsarama.NewConsumerMessageCarrier(msg))
propagators.Inject(ctx, otelsarama.NewConsumerMessageCarrier(msg))
message := string(msg.Value)
databases.RedisDB.Get(ctx, "1245DGF").Result()
go func() {
	defer close(done)

	switch topics.KafkaConsumer(topic) {
	case topics.KAFKA_IOT_LIVE_VEHICLE_TOPIC:
		data := topics.KafkaIotLiveVehicleTopic{}
		if err := json.Unmarshal([]byte(message), &data); err != nil {
			zap.Error(ctx, "vehicle controller ProcessMessage", err)
			return
		}
		helper.PrintConsumerTime(ctx, "kafka_cg START: MERGED_DEVICE_VEHICLE_TOPIC: "+data.Data.ImeiNumber)
		start := time.Now()
		if data.Data.Iot {
			err := <-idc.VehicleService.AddVehicleIdToVehicle(ctx, &data.Data)
			if err != nil {
				zap.Error(ctx, "AddVehicleIdToVehicle", err)
				return
			}
		}
		elapsed := time.Since(start).String()
		helper.PrintConsumerTime(ctx, "TRACING_TIME UTCTIME: AddVehicleIdToVehicle took: "+elapsed)
		idc.IotDeviceServices.ProcessIotDeviceData(ctx, data.Data)
		helper.PrintConsumerTime(ctx, "TRACING_TIME UTCTIME: ProcessIotDeviceData took: "+elapsed)
		helper.PrintConsumerTime(ctx, "kafka_cg END: MERGED_DEVICE_VEHICLE_TOPIC"+data.Data.ImeiNumber)

	case topics.KAFKA_EVENTS:

	default:
		break
	}

	done <- true
}()
return done

}
`

Context (Environment)

Running on local Ubuntu machine

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions