Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-1444: Listener Observability Initial Commit #1500

Merged
merged 31 commits into from
Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
6f35711
GH-1444: Listener Observability Initial Commit
garyrussell Sep 6, 2022
3c767f1
Rename Sender/Receiver contexts and other PR review comments.
garyrussell Sep 7, 2022
0886783
Rename contexts to Rabbit...; supply default KeyValues via the conven…
garyrussell Sep 7, 2022
355f5db
Javadoc polishing.
garyrussell Sep 7, 2022
b39a5f9
Don't add default KV to high-card KVs.
garyrussell Sep 7, 2022
393ed51
Fix previous commit.
garyrussell Sep 7, 2022
c282d46
Fix contextual name (receiver side).
garyrussell Sep 7, 2022
c5b2975
Fix checkstyle.
garyrussell Sep 7, 2022
9de2da2
Polish previous commit.
garyrussell Sep 7, 2022
55f0437
Fix contextual name (sender side)
garyrussell Sep 7, 2022
3079268
Remove contextual names from observations.
garyrussell Sep 7, 2022
0cf3e55
Fix checkstyle.
garyrussell Sep 7, 2022
d929b0f
Remove customization of KeyValues from conventions.
garyrussell Sep 7, 2022
b1bd023
Add `getDefaultConvention()` to observations.
garyrussell Sep 13, 2022
be4be28
Fix since 3.0.
garyrussell Sep 13, 2022
d422412
Support wider convention customization.
garyrussell Sep 14, 2022
d762752
Convention type safety.
garyrussell Sep 14, 2022
f1ac117
Fix Test - not sure why PR build succeeded.
garyrussell Sep 14, 2022
03ccb33
Add Meters to ObservationTests.
garyrussell Sep 14, 2022
2557794
Fix checkstyle.
garyrussell Sep 14, 2022
970ba6c
Make INSTANCE final.
garyrussell Sep 14, 2022
a528d09
Add integration test.
garyrussell Sep 14, 2022
1927f3b
Test all available integrations.
garyrussell Sep 15, 2022
cb6ba4c
Remove redundant test code.
garyrussell Sep 15, 2022
d0b758e
Move getContextualName to conventions.
garyrussell Sep 15, 2022
0043280
Add docs.
garyrussell Sep 15, 2022
599d5b7
Fix doc link.
garyrussell Sep 15, 2022
38589a0
Remove unnecessary method overrides; make tag names more meaningful.
garyrussell Sep 15, 2022
d5464ab
Move getName() from contexts to conventions.
garyrussell Sep 17, 2022
4ec81a1
Fix Race in Test
garyrussell Sep 19, 2022
2651f1c
Fix Race in Test.
garyrussell Sep 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ project('spring-rabbit') {
optionalApi "ch.qos.logback:logback-classic:$logbackVersion"
optionalApi 'org.apache.logging.log4j:log4j-core'
optionalApi 'io.micrometer:micrometer-core'
api 'io.micrometer:micrometer-observation'
optionalApi 'io.micrometer:micrometer-tracing'
// Spring Data projection message binding support
optionalApi ("org.springframework.data:spring-data-commons") {
Expand All @@ -398,6 +399,7 @@ project('spring-rabbit') {
testApi project(':spring-rabbit-junit')
testImplementation("com.willowtreeapps.assertk:assertk-jvm:$assertkVersion")
testImplementation "org.hibernate.validator:hibernate-validator:$hibernateValidationVersion"
testImplementation 'io.micrometer:micrometer-observation-test'
testImplementation 'io.micrometer:micrometer-tracing-bridge-brave'
testImplementation 'io.micrometer:micrometer-tracing-test'
testImplementation 'io.micrometer:micrometer-tracing-integration-test'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,10 +21,13 @@

import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.context.ApplicationContext;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

import com.rabbitmq.client.Channel;
import io.micrometer.observation.ObservationRegistry;

/**
* @author Mark Fisher
Expand All @@ -40,6 +43,8 @@ public abstract class RabbitAccessor implements InitializingBean {

private volatile boolean transactional;

private ObservationRegistry observationRegistry;

public boolean isChannelTransacted() {
return this.transactional;
}
Expand Down Expand Up @@ -113,4 +118,17 @@ protected RuntimeException convertRabbitAccessException(Exception ex) {
return RabbitExceptionTranslator.convertRabbitAccessException(ex);
}

protected void obtainObservationRegistry(@Nullable ApplicationContext appContext) {
if (this.observationRegistry == null && appContext != null) {
ObjectProvider<ObservationRegistry> registry =
appContext.getBeanProvider(ObservationRegistry.class);
this.observationRegistry = registry.getIfUnique();
}
}

@Nullable
protected ObservationRegistry getObservationRegistry() {
return this.observationRegistry;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
import org.springframework.amqp.rabbit.support.ValueExpression;
import org.springframework.amqp.rabbit.support.micrometer.DefaultRabbitTemplateObservationConvention;
import org.springframework.amqp.rabbit.support.micrometer.RabbitMessageSenderContext;
import org.springframework.amqp.rabbit.support.micrometer.RabbitTemplateObservation;
import org.springframework.amqp.rabbit.support.micrometer.RabbitTemplateObservationConvention;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.amqp.support.converter.SmartMessageConverter;
Expand All @@ -83,6 +87,8 @@
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.expression.BeanFactoryResolver;
import org.springframework.context.expression.MapAccessor;
import org.springframework.core.ParameterizedTypeReference;
Expand All @@ -108,6 +114,8 @@
import com.rabbitmq.client.Return;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;

/**
* <p>
Expand Down Expand Up @@ -152,7 +160,7 @@
* @since 1.0
*/
public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count
implements BeanFactoryAware, RabbitOperations, ChannelAwareMessageListener,
implements BeanFactoryAware, RabbitOperations, ChannelAwareMessageListener, ApplicationContextAware,
ListenerContainerAware, PublisherCallbackChannel.Listener, BeanNameAware, DisposableBean {

private static final String UNCHECKED = "unchecked";
Expand Down Expand Up @@ -198,6 +206,8 @@ public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count

private final AtomicInteger containerInstance = new AtomicInteger();

private ApplicationContext applicationContext;

private String exchange = DEFAULT_EXCHANGE;

private String routingKey = DEFAULT_ROUTING_KEY;
Expand Down Expand Up @@ -258,13 +268,20 @@ public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count

private ErrorHandler replyErrorHandler;

private boolean useChannelForCorrelation;

private boolean observationEnabled;

@Nullable
private RabbitTemplateObservationConvention observationConvention;

private volatile boolean usingFastReplyTo;

private volatile boolean evaluatedFastReplyTo;

private volatile boolean isListener;

private boolean useChannelForCorrelation;
private volatile boolean observationRegistryObtained;

/**
* Convenient constructor for use with setter injection. Don't forget to set the connection factory.
Expand Down Expand Up @@ -297,6 +314,29 @@ public final void setConnectionFactory(ConnectionFactory connectionFactory) {
}
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

/**
* Enable observation via micrometer.
* @param observationEnabled true to enable.
* @since 3.0
*/
public void setObservationEnabled(boolean observationEnabled) {
this.observationEnabled = observationEnabled;
}

/**
* Set an observation convention; used to add additional key/values to observations.
* @param observationConvention the convention.
* @since 3.0
*/
public void setObservationConvention(RabbitTemplateObservationConvention observationConvention) {
this.observationConvention = observationConvention;
}

/**
* The name of the default exchange to use for send operations when none is specified. Defaults to <code>""</code>
* which is the default exchange in the broker (per the AMQP specification).
Expand Down Expand Up @@ -2348,7 +2388,7 @@ private boolean isPublisherConfirmsOrReturns(ConnectionFactory connectionFactory
* @throws IOException If thrown by RabbitMQ API methods.
*/
public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message,
boolean mandatory, @Nullable CorrelationData correlationData) throws IOException {
boolean mandatory, @Nullable CorrelationData correlationData) {

String exch = nullSafeExchange(exchangeArg);
String rKey = nullSafeRoutingKey(routingKeyArg);
Expand Down Expand Up @@ -2378,14 +2418,34 @@ public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Me
logger.debug("Publishing message [" + messageToUse
+ "] on exchange [" + exch + "], routingKey = [" + rKey + "]");
}
sendToRabbit(channel, exch, rKey, mandatory, messageToUse);
observeTheSend(channel, messageToUse, mandatory, exch, rKey);
// Check if commit needed
if (isChannelLocallyTransacted(channel)) {
// Transacted channel created by this template -> commit.
RabbitUtils.commitIfNecessary(channel);
}
}

protected void observeTheSend(Channel channel, Message message, boolean mandatory, String exch, String rKey) {

if (!this.observationRegistryObtained) {
obtainObservationRegistry(this.applicationContext);
this.observationRegistryObtained = true;
}
Observation observation;
ObservationRegistry registry = getObservationRegistry();
if (!this.observationEnabled || registry == null) {
observation = Observation.NOOP;
}
else {
observation = RabbitTemplateObservation.TEMPLATE_OBSERVATION.observation(this.observationConvention,
DefaultRabbitTemplateObservationConvention.INSTANCE,
new RabbitMessageSenderContext(message, this.beanName, exch + "/" + rKey), registry);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we took a Supplier<Context> here instead of Context where the supplier was only called if ObservationRegistry wasn't a no-op, and therefore the Context object wouldn't be allocated in the no-op case, would that simplify things for you? I think then you wouldn't need the if-else this is contained in, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 from me as well. And I probably agree with @marcingrzejszczak as well.
Only the problem that @garyrussell is off today and next Monday is release day.
I can fix this quickly myself, but we have to be sure that this one is good to go for merging.

Thank you!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, we could just use a singleton for the No-op case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forget that, I shouldn’t comment from my iPad while not working. I see your point about them if/else.

we can go with this as is for Monday’s milestone and Polish later if you add the supplier variant.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is one, Gary, in the DocumentedObservation:

    default <T extends Observation.Context> Observation createNotStarted(
            @Nullable ObservationConvention<T> customConvention, @NonNull ObservationConvention<T> defaultConvention,
            @NonNull Supplier<T> contextSupplier, @NonNull ObservationRegistry registry) {
        if (registry.isNoop()) {
            return Observation.NOOP;
        }
        return observation(customConvention, defaultConvention, contextSupplier.get(), registry);
    }

I'm not sure why name is like that, but looks like it does exactly what is expected from us here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're working on unifying the names. Sorry about the commotion cc @ttddyy


}
observation.observe(() -> sendToRabbit(channel, exch, rKey, mandatory, message));
}

/**
* Return the exchange or the default exchange if null.
* @param exchange the exchange.
Expand All @@ -2407,10 +2467,16 @@ public String nullSafeRoutingKey(String rk) {
}

protected void sendToRabbit(Channel channel, String exchange, String routingKey, boolean mandatory,
Message message) throws IOException {
Message message) {

BasicProperties convertedMessageProperties = this.messagePropertiesConverter
.fromMessageProperties(message.getMessageProperties(), this.encoding);
channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
try {
channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
}
catch (IOException ex) {
throw RabbitExceptionTranslator.convertRabbitAccessException(ex);
}
}

private void setupConfirm(Channel channel, Message message, @Nullable CorrelationData correlationDataArg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.micrometer.DefaultRabbitListenerObservationConvention;
import org.springframework.amqp.rabbit.support.micrometer.RabbitListenerObservation;
import org.springframework.amqp.rabbit.support.micrometer.RabbitListenerObservationConvention;
import org.springframework.amqp.rabbit.support.micrometer.RabbitMessageReceiverContext;
import org.springframework.amqp.support.ConditionalExceptionLogger;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.amqp.support.postprocessor.MessagePostProcessorUtils;
Expand Down Expand Up @@ -91,6 +95,8 @@

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ShutdownSignalException;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;

/**
* @author Mark Pollack
Expand Down Expand Up @@ -240,6 +246,8 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor

private boolean micrometerEnabled = true;

private boolean observationEnabled = false;

private boolean isBatchListener;

private long consumeDelay;
Expand All @@ -254,6 +262,9 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor

private MessageAckListener messageAckListener = (success, deliveryTag, cause) -> { };

@Nullable
private RabbitListenerObservationConvention observationConvention;

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
Expand Down Expand Up @@ -1151,14 +1162,36 @@ public void setMicrometerTags(Map<String, String> tags) {
}

/**
* Set to false to disable micrometer listener timers.
* Set to false to disable micrometer listener timers. When true, ignored
* if {@link #setObservationEnabled(boolean)} is set to true.
* @param micrometerEnabled false to disable.
* @since 2.2
* @see #setObservationEnabled(boolean)
*/
public void setMicrometerEnabled(boolean micrometerEnabled) {
this.micrometerEnabled = micrometerEnabled;
}

/**
* Enable observation via micrometer; disables basic Micrometer timers enabled
* by {@link #setMicrometerEnabled(boolean)}.
* @param observationEnabled true to enable.
* @since 3.0
* @see #setMicrometerEnabled(boolean)
*/
public void setObservationEnabled(boolean observationEnabled) {
this.observationEnabled = observationEnabled;
}

/**
* Set an observation convention; used to add additional key/values to observations.
* @param observationConvention the convention.
* @since 3.0
*/
public void setObservationConvention(RabbitListenerObservationConvention observationConvention) {
this.observationConvention = observationConvention;
}

/**
* Get the consumeDelay - a time to wait before consuming in ms.
* @return the consume delay.
Expand Down Expand Up @@ -1230,7 +1263,7 @@ public void afterPropertiesSet() {
validateConfiguration();
initialize();
try {
if (this.micrometerHolder == null && MICROMETER_PRESENT && this.micrometerEnabled
if (this.micrometerHolder == null && MICROMETER_PRESENT && this.micrometerEnabled && !this.observationEnabled
&& this.applicationContext != null) {
String id = getListenerId();
if (id == null) {
Expand Down Expand Up @@ -1402,6 +1435,7 @@ public void start() {
}
}
}
obtainObservationRegistry(this.applicationContext);
try {
logger.debug("Starting Rabbit listener container.");
configureAdminIfNeeded();
Expand Down Expand Up @@ -1499,8 +1533,23 @@ protected void invokeErrorHandler(Throwable ex) {
* @see #invokeListener
* @see #handleListenerException
*/
@SuppressWarnings(UNCHECKED)
protected void executeListener(Channel channel, Object data) {
Observation observation;
ObservationRegistry registry = getObservationRegistry();
if (!this.observationEnabled || data instanceof List || registry == null) {
observation = Observation.NOOP;
}
else {
Message message = (Message) data;
observation = RabbitListenerObservation.LISTENER_OBSERVATION.observation(this.observationConvention,
DefaultRabbitListenerObservationConvention.INSTANCE,
new RabbitMessageReceiverContext(message, getListenerId()), registry);
}
observation.observe(() -> executeListenerAndHandleException(channel, data));
}

@SuppressWarnings(UNCHECKED)
protected void executeListenerAndHandleException(Channel channel, Object data) {
if (!isRunning()) {
if (logger.isWarnEnabled()) {
logger.warn(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.rabbit.support.micrometer;

import io.micrometer.common.KeyValues;

/**
* Default {@link RabbitListenerObservationConvention} for Rabbit listener key values.
*
* @author Gary Russell
* @since 3.0
*
*/
public class DefaultRabbitListenerObservationConvention implements RabbitListenerObservationConvention {

/**
* A singleton instance of the convention.
*/
public static final DefaultRabbitListenerObservationConvention INSTANCE =
new DefaultRabbitListenerObservationConvention();

@Override
public KeyValues getLowCardinalityKeyValues(RabbitMessageReceiverContext context) {
return KeyValues.of(RabbitListenerObservation.ListenerLowCardinalityTags.LISTENER_ID.asString(),
context.getListenerId());
}

@Override
public KeyValues getHighCardinalityKeyValues(RabbitMessageReceiverContext context) {
return KeyValues.empty();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here about default methods in the super interface.

}

@Override
public String getContextualName(RabbitMessageReceiverContext context) {
return context.getSource() + " receive";
}

}