Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kafka] bytecode instrumentation - sync #3055

Merged
merged 28 commits into from
Nov 22, 2023

Conversation

lachmatt
Copy link
Contributor

@lachmatt lachmatt commented Oct 30, 2023

Why

Towards #2854

What

Bytecode instrumentation for Confluent.Kafka client.

Instrumentation is based on datadog's instrumentation. @zacharycmontoya please take a look and approve if you accept it from datadog's side.

Implementation follows appropriate semantic conventions for messaging and kafka specifically.

publish spans are started at the beginning of Produce method, and finished at the end of it (or when delivery handler completes, if set).
process spans are started at the end of Consume method, and finished when next Consume method starts, consumer is closed, disposed, or unsubscribes from the topic.
receive spans are started and stopped at the end of Consume method (appropriate comment in code added)

Note:
peer.service (recommended) and server.address (conditionally required, if available) attributes are not being added right now.
As per spec, their value should be a name of the broker given message was sent to/received from.
I haven't found a good way to extract that information.

TODO:
Instrumentation of async API (i.e ProduceAsync), preferrably in a separate PR.

Tests

Included in PR.

Checklist

  • CHANGELOG.md is updated.
  • Documentation is updated.
  • New features are covered by tests.

Copy link
Contributor

@Kielek Kielek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general LGTM.
I have some doubts if we should use attribtes or tags in our codebase.
.NET api is using diffferent names than OTel

@lachmatt lachmatt marked this pull request as ready for review October 31, 2023 11:20
@lachmatt lachmatt requested a review from a team as a code owner October 31, 2023 11:20
@lachmatt
Copy link
Contributor Author

I will adjust duck types to follow best practices from the doc.

@zacharycmontoya
Copy link
Contributor

Thanks for posting the PR! I can take a look later this week


internal static CallTargetReturn OnMethodEnd<TTarget>(TTarget instance, Exception? exception, in CallTargetState state)
{
ConsumerCache.Remove(instance!);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we really guarantee that instance is not null here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an instance method so it's safe to assume that the instance is not null

return CallTargetState.GetDefault();
}

internal static CallTargetReturn<TResponse> OnMethodEnd<TTarget, TResponse>(TTarget instance, TResponse response, Exception? exception, in CallTargetState state)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since TTarget is always Confluent.Kafka.Consumer'2, you can always duck type (no chance of failure unless their API changes) and avoid doing it later. The only thing that changes is that now you'll need to change your call to the ConsumerCache so it's like: ConsumerCache.TryGet(instance.Instance!, out var groupId)

Suggested change
internal static CallTargetReturn<TResponse> OnMethodEnd<TTarget, TResponse>(TTarget instance, TResponse response, Exception? exception, in CallTargetState state)
internal static CallTargetReturn<TResponse> OnMethodEnd<TTarget, TResponse>(TTarget instance, TResponse response, Exception? exception, in CallTargetState state)
where TTarget : IClientName, IDuckType

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems there is additional investigation needed as this approach doesn't work for me. I created an #3136 for tracking purposes.

{
internal static CallTargetState OnMethodBegin<TTarget, TTopicPartition, TMessage, TDeliveryHandler>(
TTarget instance, TTopicPartition topicPartition, TMessage message, TDeliveryHandler deliveryHandler)
where TMessage : IKafkaMessage, IDuckType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to my other suggestion, you can duck type immediately here instead of later on, since we know the type will always have the right structure.

Suggested change
where TMessage : IKafkaMessage, IDuckType
where TTarget : IClientName, IDuckType
where TMessage : IKafkaMessage, IDuckType

Copy link
Contributor Author

@lachmatt lachmatt Nov 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#3055 (comment)
Tracked under #3136

}

// Store as state information if delivery handler was set
return new CallTargetState(activity, deliveryHandler is null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By passing a bool as the object state argument, you're unnecessarily doing a box operation. I suggest you pass the deliveryHandler object as-is and later do the null-check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in fb2e35b

Copy link
Contributor

@zacharycmontoya zacharycmontoya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with a couple of small suggestions

Copy link

@pyohannes pyohannes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave this a review in the light of the very recent changes to messaging semantic conventions.

Comment on lines 70 to 73
spanName = $"{consumeResult.Topic} {MessagingAttributes.Values.ProcessOperationName}";
}

spanName ??= MessagingAttributes.Values.ProcessOperationName;
Copy link

@pyohannes pyohannes Nov 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "process" operation was removed in favor of a "deliver" operation for push-based scenarios, and a "receive" operation for pull-based scenarios.

As this instruments a polling method, the operation name should be "receive".

https://github.com/open-telemetry/semantic-conventions/blob/main/docs/messaging/messaging-spans.md#operation-names

Copy link
Contributor Author

@lachmatt lachmatt Nov 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pyohannes
Apart from adjusting operation name, I think where activity is stopped should be adjusted as well.
receive operation is defined to be connected with a consumer requesting one or more messages.
Taking that into account, I think activity created here should be stopped here as well.
Let me know if that makes sense.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct, good catch.

Your current logic will result in correct "process" span durations only if the following holds true:

// Attempting to consume another message marks the end
// of a previous message processing

However, there are many consumption scenarios where this is not the case. For example, think about fetching messages and then distributing them to different threads for processing.

You should create "Receive" spans here measuring the duration of the Consume operation and linking to the context attached to the message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed in fb2e35b

}

spanName ??= MessagingAttributes.Values.ProcessOperationName;
var activity = KafkaCommon.Source.StartActivity(spanName, ActivityKind.Consumer, propagatedContext.ActivityContext);
Copy link

@pyohannes pyohannes Nov 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is now required to link to the propagated context (instead of parenting).

I'd recommend to link to the extracted context, and parent with the currently activated context, if there's any.

https://github.com/open-telemetry/semantic-conventions/blob/main/docs/messaging/messaging-spans.md#consumer-spans

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed in fb2e35b

Copy link
Contributor Author

@lachmatt lachmatt Nov 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is additional section in OTEP that I think applies here:

For single-message scenarios, and if the "Deliver" or "Receive" spans would be
root spans of a new trace, the creation context may also be used as a parent on
those operations in addition to being added as a link. Keeping single-messages
operations in the same trace can greatly improve the user experience.

Situation described above will often be the case here.

I noticed this was discussed in a PR updating semantic conventions for messaging, but finally removed.

Would your recommendation be to not do it at all, or maybe add a configuration option to allow to opt-in for this behavior?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we didn't want to give a clear recommendation for this in the specification.

However, when you think that in your case it's beneficial for the user experience, it's perfectly fine to set the remote context as parent (in addition to setting it as link). I would only do this when there's no current activity.

}
}

public static void SetCommonAttributes(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strongly recommend to also set server.address, as this will be crucial to differentiate between different Kafka instances.

The semantic conventions mandate it as "required if available".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, I haven't found a good way to extract that information, as mentioned in PR's description. I created #3137 for tracking.

@lachmatt lachmatt marked this pull request as draft November 17, 2023 15:02
@lachmatt lachmatt marked this pull request as ready for review November 20, 2023 08:02
…ntegrations/ConsumerConsumeSyncIntegration.cs
@Kielek Kielek enabled auto-merge (squash) November 22, 2023 09:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants