Skip to content

Messaging instrumentation abstraction

Jorge Esteban Quilcate Otoya edited this page Mar 26, 2020 · 8 revisions

Background and Motivation

Especially noticed in the Brave project, there are similar sampling and data policy concerns that apply to messaging in the same was as they do http. Users do not want to individually configure N libraries with similar features, rather one object that each of the relevant libraries can use. This can reduce copy/paste and modeling errors as well. Notably, messaging tracing is complex due to aspects such as producer/consumer relationships. An abstraction can help centralize tooling and documentation in the same way that we do for http Perhaps more importantly, this allows for compatibility tests to occur, also like http-tests, letting instrumentation authors find bugs faster and easier.

Status

language status tools supported
java draft kafka, jms

Commonly parsed data

The tag names used in spans or metrics will vary, but the data needed will vary less. For example, even if we use a standard name in zipkin for a message channel (topic or queue), such as "messaging.channel", when exporting to other systems such as Amazon, this tag will be renamed. For this reason, we should consider the same parsing style as used in http, which focuses on getting the data independently from how it is named.

When Producer side sends data to a messaging broker:

  public Span startSend(Chan channel, Msg message) {
    C carrier = adapter.carrier(message);
    TraceContext maybeParent = tracing.currentTraceContext().get();
    Span span;
    if (maybeParent == null) {
      span = tracing.tracer().nextSpan(extractor.extract(carrier));
    } else {
      span = tracing.tracer().newChild(maybeParent);
    }

    if (!span.isNoop()) {
      span.kind(Span.Kind.PRODUCER);
      parser.start("send", adapter, channel, message, span.context(), span.customizer());
      String remoteServiceName = adapter.brokerName(channel);
      if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
      span.start();
    }

    injector.inject(span.context(), carrier);

    return span;
  }

The following are functions that parse data, noted for whether they are default data, must be supported, or should be supported.

function required default? description supported implementations (required implies all must support)
channel_name true span.tags.messaging.channel_name kafka, jms
channel_kind true span.tags.messaging.channel_kind kafka, jms
message_id false span.tags.messaging.message_id Message identifier. We choose to name it message_id as most implementations (JMS, AMQP, RocketMQ) kafka, jms
broker_name true span.remote_service_name kafka, jms
protocol true span.tags.messaging.protocol Messaging protocol and version e.g. kafka, jms, stomp, amqp

Producer

+------------------------+   +------+    +---------+
|                        |   |      |    |         |
| prepare/business-logic +-->+ send +--->+ channel |
|                        |   |      |    |         |
+------------------------+   +------+    +---------+

Scenario: Sending a message when trace context is available

When send(message) when business logic has a parent context. This is the most common case where send(message) call happens inside context with an active Span.

Scenario: Sending a message when trace context is injected on the carrier

When send(message) where message has context injected on the carrier. i.e. there is a consumer before producer. This should allow to chain producers and consumers.

Consumer

+---------+      +--------------+      +------------------------+
|         |      |              |      |                        |
| channel +----->+ poll/receive +----->+ process/business-logic |
|         |      |              |      |                        |
+---------+      +--------------+      +------------------------+

Scenario: Receiving a message with context injected

When receive(message) when message/carrier has trace context injected. i.e. there is a producer before consumer.

Scenario: Receiving a message with no context injected

When receive(message) when message/carrier does not have context available.

Scenario: Consuming vs. Processing

There is a gap between messages received on the consumer-side, and processing logic applied.

Also, not necessarially all messages consumed are processed.

@adriancole mentioned on gitter: for example, if there's a bulk pull of 2000 and only 20 are processed @jeqo: a flag could be added to avoid creating consume spans when users want to create/continue traces from a subset of the incoming messages.

producer:send -> ] [ -> consumer:poll -> processor:process when flag=true with context

consumer:poll -> processor:process when flag=true and msg without context

producer:send -> ] [ -> ... processor:process when flag=false and msg with context

processor:process when flag=false when msg without context

Scenario: Receiving a batch of messages

When receiving a batch of messages when polling (e.g. Kafka Consumer poll).

This scenario is no different from previous ones, but it can combine 2 previous ones when receiving a batch (e.g. some messages with trace context injected and other without).

@jeqo: currently we are doing shared root span per partition batch which could make sense for very edge cases, but I don't think it should be default, and could be supported on the processor side if a flag to not create consume spans is in place.

Scenario: Control Continuation of Traces

@jeqo: ref:https://github.com/openzipkin/brave/issues/1039 I believe this feature could be key when instrumenting dataflows (...-consume-producer-consume-producer-...) as these usually involve flat maps that cause very big, and hard to grasp, traces. I envision a capability to break traces at some point, but keeping "links" between them.

Design Concerns

Thread disconnect between message consumption and processing

There can be a lag between message consumption and processing. This implies a possible requirement to store the parent span context in the message even for local processing. Another option is an identity-based lookup table.

Here's the code that shows this problem

MessageDispatch md = unconsumedMessages.dequeueNoWait();

// there’s often no stack relationship between message
// consumption and processing!

if (md != null) {
    dispatch(md);
    return true;
}

Here's the concern put visually.

Screen Shot 2019-05-09 at 8 15 16 AM

Broker-side tracing

In the case of message brokers, like RabbitMQ or Kafka, a server-side trace could evidence broker-specific actions that could be useful to represent: in the case of RabbitMQ, server-side could be traced to evidence the execution of routing rules.

In kafka, for client-side there are interceptors that Brave is using, but for broker-side, it needs something similar to map its behavior. The idea of a "broker interceptor" has been discussed in the Kafka community and the is a proposal been discussed as well, but haven't been implemented as part of the project yet--though other sites have been experimenting with this (eg. LinkedIn's fork has the conecept of "observer" https://github.com/linkedin/kafka/commit/a378c8980af16e3c6d3f6550868ac0fd5a58682e).

Without this abstraction it's curenntly difficult to hook into how the kafka cluster works.

In general, it's still an open question how to represent spans from the broker side as these are not consumer spans per se.

Clean context from Message Carrier

ref: https://github.com/openzipkin/brave/pull/1047

Draft to explore refactoring the approached used on messaging instrumentation (1) where context is first extracted and cleared from headers to later inject if needed, to an approach (2) where context is extracted to later clear context form headers if needed to then inject.

Messaging instrumentation has mainly 2 features on consumer side: onConsume that under (1) it extracts and clear headers, start/finish on-consume span, and injects context. And nextSpan that under (2) it extracts and clear headers, and creates a span from extracted context.

Current effects of approach (1):

  • By clearing context after extracting we make sure context is not "leaked" and propagated downstream.
    • On on-consume feature we eventually inject context again, then this is not as critical.
    • On nextSpan feature this restrict users to have only one child from onConsume span as we remove headers. This (potentially) could restrict users on how to continue a trace. There could be scenarios where users do a batch of operations after a message is consumed, requiring multiple spans to be created from onConsume span.

Following approach (2) we can allow users to continue onConsume span multiple times.

A side-effect of approach (2) is to reduce impact on current JMS consumer implementation:

  • JMS consumed message properties (i.e. headers) need to be "resetted" before injecting context. This currently requires to cache non-context properties, reset all properties, and write non-context properties again. All these to inject context later. Under approach (1) we do the reset after extracting context, to at the end of instrumentation inject new context, impacting both features onConsume and nextSpan
  • With approach (2) we can move reset logic right before injecting. By doing this only onConsume we remove this overhead from nextSpan feature.
Clone this wiki locally