Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-1444: Listener Observability Initial Commit #1500

Merged
merged 31 commits into from
Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
6f35711
GH-1444: Listener Observability Initial Commit
garyrussell Sep 6, 2022
3c767f1
Rename Sender/Receiver contexts and other PR review comments.
garyrussell Sep 7, 2022
0886783
Rename contexts to Rabbit...; supply default KeyValues via the conven…
garyrussell Sep 7, 2022
355f5db
Javadoc polishing.
garyrussell Sep 7, 2022
b39a5f9
Don't add default KV to high-card KVs.
garyrussell Sep 7, 2022
393ed51
Fix previous commit.
garyrussell Sep 7, 2022
c282d46
Fix contextual name (receiver side).
garyrussell Sep 7, 2022
c5b2975
Fix checkstyle.
garyrussell Sep 7, 2022
9de2da2
Polish previous commit.
garyrussell Sep 7, 2022
55f0437
Fix contextual name (sender side)
garyrussell Sep 7, 2022
3079268
Remove contextual names from observations.
garyrussell Sep 7, 2022
0cf3e55
Fix checkstyle.
garyrussell Sep 7, 2022
d929b0f
Remove customization of KeyValues from conventions.
garyrussell Sep 7, 2022
b1bd023
Add `getDefaultConvention()` to observations.
garyrussell Sep 13, 2022
be4be28
Fix since 3.0.
garyrussell Sep 13, 2022
d422412
Support wider convention customization.
garyrussell Sep 14, 2022
d762752
Convention type safety.
garyrussell Sep 14, 2022
f1ac117
Fix Test - not sure why PR build succeeded.
garyrussell Sep 14, 2022
03ccb33
Add Meters to ObservationTests.
garyrussell Sep 14, 2022
2557794
Fix checkstyle.
garyrussell Sep 14, 2022
970ba6c
Make INSTANCE final.
garyrussell Sep 14, 2022
a528d09
Add integration test.
garyrussell Sep 14, 2022
1927f3b
Test all available integrations.
garyrussell Sep 15, 2022
cb6ba4c
Remove redundant test code.
garyrussell Sep 15, 2022
d0b758e
Move getContextualName to conventions.
garyrussell Sep 15, 2022
0043280
Add docs.
garyrussell Sep 15, 2022
599d5b7
Fix doc link.
garyrussell Sep 15, 2022
38589a0
Remove unnecessary method overrides; make tag names more meaningful.
garyrussell Sep 15, 2022
d5464ab
Move getName() from contexts to conventions.
garyrussell Sep 17, 2022
4ec81a1
Fix Race in Test
garyrussell Sep 19, 2022
2651f1c
Fix Race in Test.
garyrussell Sep 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ project('spring-rabbit') {
optionalApi "ch.qos.logback:logback-classic:$logbackVersion"
optionalApi 'org.apache.logging.log4j:log4j-core'
optionalApi 'io.micrometer:micrometer-core'
api 'io.micrometer:micrometer-observation'
optionalApi 'io.micrometer:micrometer-tracing'
// Spring Data projection message binding support
optionalApi ("org.springframework.data:spring-data-commons") {
Expand All @@ -398,6 +399,7 @@ project('spring-rabbit') {
testApi project(':spring-rabbit-junit')
testImplementation("com.willowtreeapps.assertk:assertk-jvm:$assertkVersion")
testImplementation "org.hibernate.validator:hibernate-validator:$hibernateValidationVersion"
testImplementation 'io.micrometer:micrometer-observation-test'
testImplementation 'io.micrometer:micrometer-tracing-bridge-brave'
testImplementation 'io.micrometer:micrometer-tracing-test'
testImplementation 'io.micrometer:micrometer-tracing-integration-test'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,10 +21,13 @@

import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.context.ApplicationContext;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

import com.rabbitmq.client.Channel;
import io.micrometer.observation.ObservationRegistry;

/**
* @author Mark Fisher
Expand All @@ -40,6 +43,8 @@ public abstract class RabbitAccessor implements InitializingBean {

private volatile boolean transactional;

private ObservationRegistry observationRegistry;

public boolean isChannelTransacted() {
return this.transactional;
}
Expand Down Expand Up @@ -113,4 +118,17 @@ protected RuntimeException convertRabbitAccessException(Exception ex) {
return RabbitExceptionTranslator.convertRabbitAccessException(ex);
}

protected void obtainObservationRegistry(@Nullable ApplicationContext appContext) {
if (this.observationRegistry == null && appContext != null) {
ObjectProvider<ObservationRegistry> registry =
appContext.getBeanProvider(ObservationRegistry.class);
this.observationRegistry = registry.getIfUnique();
}
}

@Nullable
protected ObservationRegistry getObservationRegistry() {
return this.observationRegistry;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
import org.springframework.amqp.rabbit.support.ValueExpression;
import org.springframework.amqp.rabbit.support.micrometer.RabbitMessageSenderContext;
import org.springframework.amqp.rabbit.support.micrometer.RabbitTemplateObservation;
import org.springframework.amqp.rabbit.support.micrometer.RabbitTemplateObservationConvention;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.amqp.support.converter.SmartMessageConverter;
Expand All @@ -83,6 +86,8 @@
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.expression.BeanFactoryResolver;
import org.springframework.context.expression.MapAccessor;
import org.springframework.core.ParameterizedTypeReference;
Expand All @@ -108,6 +113,8 @@
import com.rabbitmq.client.Return;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;

/**
* <p>
Expand Down Expand Up @@ -152,7 +159,7 @@
* @since 1.0
*/
public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count
implements BeanFactoryAware, RabbitOperations, ChannelAwareMessageListener,
implements BeanFactoryAware, RabbitOperations, ChannelAwareMessageListener, ApplicationContextAware,
ListenerContainerAware, PublisherCallbackChannel.Listener, BeanNameAware, DisposableBean {

private static final String UNCHECKED = "unchecked";
Expand Down Expand Up @@ -198,6 +205,8 @@ public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count

private final AtomicInteger containerInstance = new AtomicInteger();

private ApplicationContext applicationContext;

private String exchange = DEFAULT_EXCHANGE;

private String routingKey = DEFAULT_ROUTING_KEY;
Expand Down Expand Up @@ -258,13 +267,19 @@ public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count

private ErrorHandler replyErrorHandler;

private boolean useChannelForCorrelation;

private boolean observationEnabled;

private RabbitTemplateObservationConvention observationConvention = new RabbitTemplateObservationConvention();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marcingrzejszczak ,

is this what you have expected?

I thought the idea was just about a general ObservationConvention<RabbitMessageSenderContext> and that's it...

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


private volatile boolean usingFastReplyTo;

private volatile boolean evaluatedFastReplyTo;

private volatile boolean isListener;

private boolean useChannelForCorrelation;
private volatile boolean observationRegistryObtained;

/**
* Convenient constructor for use with setter injection. Don't forget to set the connection factory.
Expand Down Expand Up @@ -297,6 +312,30 @@ public final void setConnectionFactory(ConnectionFactory connectionFactory) {
}
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

/**
* Enable observation via micrometer.
* @param observationEnabled true to enable.
* @since 3.0
*/
public void setObservationEnabled(boolean observationEnabled) {
this.observationEnabled = observationEnabled;
}

/**
* Set an observation convention; used to add additional key/values to observations.
* @param observationConvention the convention.
* @since 3.0
*/
public void setObservationConvention(RabbitTemplateObservationConvention observationConvention) {
Assert.notNull(observationConvention, "'observationConvention' cannot be null");
this.observationConvention = observationConvention;
}

/**
* The name of the default exchange to use for send operations when none is specified. Defaults to <code>""</code>
* which is the default exchange in the broker (per the AMQP specification).
Expand Down Expand Up @@ -2348,7 +2387,7 @@ private boolean isPublisherConfirmsOrReturns(ConnectionFactory connectionFactory
* @throws IOException If thrown by RabbitMQ API methods.
*/
public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message,
boolean mandatory, @Nullable CorrelationData correlationData) throws IOException {
boolean mandatory, @Nullable CorrelationData correlationData) {

String exch = nullSafeExchange(exchangeArg);
String rKey = nullSafeRoutingKey(routingKeyArg);
Expand Down Expand Up @@ -2378,14 +2417,33 @@ public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Me
logger.debug("Publishing message [" + messageToUse
+ "] on exchange [" + exch + "], routingKey = [" + rKey + "]");
}
sendToRabbit(channel, exch, rKey, mandatory, messageToUse);
observeTheSend(channel, messageToUse, mandatory, exch, rKey);
// Check if commit needed
if (isChannelLocallyTransacted(channel)) {
// Transacted channel created by this template -> commit.
RabbitUtils.commitIfNecessary(channel);
}
}

protected void observeTheSend(Channel channel, Message message, boolean mandatory, String exch, String rKey) {

if (!this.observationRegistryObtained) {
obtainObservationRegistry(this.applicationContext);
this.observationRegistryObtained = true;
}
Observation observation;
ObservationRegistry registry = getObservationRegistry();
if (!this.observationEnabled || registry == null) {
observation = Observation.NOOP;
}
else {
observation = RabbitTemplateObservation.TEMPLATE_OBSERVATION.observation(registry,
new RabbitMessageSenderContext(message, this.beanName, exch + "/" + rKey))
.observationConvention(this.observationConvention);
}
observation.observe(() -> sendToRabbit(channel, exch, rKey, mandatory, message));
}

/**
* Return the exchange or the default exchange if null.
* @param exchange the exchange.
Expand All @@ -2407,10 +2465,16 @@ public String nullSafeRoutingKey(String rk) {
}

protected void sendToRabbit(Channel channel, String exchange, String routingKey, boolean mandatory,
Message message) throws IOException {
Message message) {

BasicProperties convertedMessageProperties = this.messagePropertiesConverter
.fromMessageProperties(message.getMessageProperties(), this.encoding);
channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
try {
channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
}
catch (IOException ex) {
throw RabbitExceptionTranslator.convertRabbitAccessException(ex);
}
}

private void setupConfirm(Channel channel, Message message, @Nullable CorrelationData correlationDataArg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.micrometer.RabbitListenerObservation;
import org.springframework.amqp.rabbit.support.micrometer.RabbitListenerObservationConvention;
import org.springframework.amqp.rabbit.support.micrometer.RabbitMessageReceiverContext;
import org.springframework.amqp.support.ConditionalExceptionLogger;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.amqp.support.postprocessor.MessagePostProcessorUtils;
Expand Down Expand Up @@ -91,6 +94,8 @@

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ShutdownSignalException;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;

/**
* @author Mark Pollack
Expand Down Expand Up @@ -240,6 +245,8 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor

private boolean micrometerEnabled = true;

private boolean observationEnabled = false;

private boolean isBatchListener;

private long consumeDelay;
Expand All @@ -254,6 +261,8 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor

private MessageAckListener messageAckListener = (success, deliveryTag, cause) -> { };

private RabbitListenerObservationConvention observationConvention = new RabbitListenerObservationConvention();

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
Expand Down Expand Up @@ -1151,14 +1160,37 @@ public void setMicrometerTags(Map<String, String> tags) {
}

/**
* Set to false to disable micrometer listener timers.
* Set to false to disable micrometer listener timers. When true, ignored
* if {@link #setObservationEnabled(boolean)} is set to true.
* @param micrometerEnabled false to disable.
* @since 2.2
* @see #setObservationEnabled(boolean)
*/
public void setMicrometerEnabled(boolean micrometerEnabled) {
this.micrometerEnabled = micrometerEnabled;
}

/**
* Enable observation via micrometer; disables basic Micrometer timers enabled
* by {@link #setMicrometerEnabled(boolean)}.
* @param observationEnabled true to enable.
* @since 3.0
* @see #setMicrometerEnabled(boolean)
*/
public void setObservationEnabled(boolean observationEnabled) {
this.observationEnabled = observationEnabled;
}

/**
* Set an observation convention; used to add additional key/values to observations.
* @param observationConvention the convention.
* @since 3.0
*/
public void setObservationConvention(RabbitListenerObservationConvention observationConvention) {
Assert.notNull(observationConvention, "'observationConvention' cannot be null");
this.observationConvention = observationConvention;
}

/**
* Get the consumeDelay - a time to wait before consuming in ms.
* @return the consume delay.
Expand Down Expand Up @@ -1230,7 +1262,7 @@ public void afterPropertiesSet() {
validateConfiguration();
initialize();
try {
if (this.micrometerHolder == null && MICROMETER_PRESENT && this.micrometerEnabled
if (this.micrometerHolder == null && MICROMETER_PRESENT && this.micrometerEnabled && !this.observationEnabled
&& this.applicationContext != null) {
String id = getListenerId();
if (id == null) {
Expand Down Expand Up @@ -1402,6 +1434,7 @@ public void start() {
}
}
}
obtainObservationRegistry(this.applicationContext);
try {
logger.debug("Starting Rabbit listener container.");
configureAdminIfNeeded();
Expand Down Expand Up @@ -1499,8 +1532,23 @@ protected void invokeErrorHandler(Throwable ex) {
* @see #invokeListener
* @see #handleListenerException
*/
@SuppressWarnings(UNCHECKED)
protected void executeListener(Channel channel, Object data) {
Observation observation;
ObservationRegistry registry = getObservationRegistry();
if (!this.observationEnabled || data instanceof List || registry == null) {
observation = Observation.NOOP;
}
else {
Message message = (Message) data;
observation = RabbitListenerObservation.LISTENER_OBSERVATION.observation(registry,
new RabbitMessageReceiverContext(message, getListenerId()))
.observationConvention(this.observationConvention);
}
observation.observe(() -> executeListenerAndHandleException(channel, data));
}

@SuppressWarnings(UNCHECKED)
protected void executeListenerAndHandleException(Channel channel, Object data) {
if (!isRunning()) {
if (logger.isWarnEnabled()) {
logger.warn(
Expand Down