Skip to content

Commit

Permalink
Mitigate against another kafka leak
Browse files Browse the repository at this point in the history
  • Loading branch information
trask committed May 13, 2022
1 parent d6cf074 commit d08e17a
Showing 1 changed file with 23 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,29 +65,32 @@ public static void onExit(

Context parentContext = currentContext();
ReceivedRecords receivedRecords = ReceivedRecords.create(records, timer);
if (consumerReceiveInstrumenter().shouldStart(parentContext, receivedRecords)) {
Context context = consumerReceiveInstrumenter().start(parentContext, receivedRecords);
consumerReceiveInstrumenter().end(context, receivedRecords, null, error);

// we're storing the context of the receive span so that process spans can use it as parent
// context even though the span has ended
// this is the suggested behavior according to the spec batch receive scenario:
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#batch-receiving
VirtualField<ConsumerRecords<?, ?>, Context> consumerRecordsContext =
VirtualField.find(ConsumerRecords.class, Context.class);
consumerRecordsContext.set(records, context);
// it's important not to suppress consumer span creation here because this instrumentation can
// leak the context and so there may be a leaked consumer span in the context, in which
// case it's important to overwrite the leaked span instead of suppressing the correct span
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
Context context = consumerReceiveInstrumenter().start(parentContext, receivedRecords);
consumerReceiveInstrumenter().end(context, receivedRecords, null, error);

// disable process tracing and store the receive span for each individual record too
boolean previousValue = KafkaClientsConsumerProcessTracing.setEnabled(false);
try {
VirtualField<ConsumerRecord<?, ?>, Context> consumerRecordContext =
VirtualField.find(ConsumerRecord.class, Context.class);
for (ConsumerRecord<?, ?> record : records) {
consumerRecordContext.set(record, context);
}
} finally {
KafkaClientsConsumerProcessTracing.setEnabled(previousValue);
// we're storing the context of the receive span so that process spans can use it as parent
// context even though the span has ended
// this is the suggested behavior according to the spec batch receive scenario:
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#batch-receiving
VirtualField<ConsumerRecords<?, ?>, Context> consumerRecordsContext =
VirtualField.find(ConsumerRecords.class, Context.class);
consumerRecordsContext.set(records, context);

// disable process tracing and store the receive span for each individual record too
boolean previousValue = KafkaClientsConsumerProcessTracing.setEnabled(false);
try {
VirtualField<ConsumerRecord<?, ?>, Context> consumerRecordContext =
VirtualField.find(ConsumerRecord.class, Context.class);
for (ConsumerRecord<?, ?> record : records) {
consumerRecordContext.set(record, context);
}
} finally {
KafkaClientsConsumerProcessTracing.setEnabled(previousValue);
}
}
}
Expand Down

0 comments on commit d08e17a

Please sign in to comment.