Open
Description
Bug description
Hello,
we are migrating a camel 2.23 application to camel quarkus 4 - concrete:
Quarkus 3.8.6.redhat-00004
Apache Camel 4.4.0.redhat-00041
(we already tested with version 3.15.2.redhat-00003 / Camel 4.8.0 and behaviour is the same)
Our application creates a MDCUnitOfWork to get some additional data logged - see source below.
Each route gets triggered by a message received from rabbitMQ using Spring RabbitMQ component.
Now it happens that MDC values do not get cleared/replaced properly
and the logs show values from a different route - See screenshot line 1 and 3.
Root cause may be an issue with jboss logmanager and VertxMDC.
dependencies for logging:
<dependency>
<groupId>org.jboss.slf4j</groupId>
<artifactId>slf4j-jboss-logmanager</artifactId>
</dependency>
<dependency>
<groupId>io.quarkiverse.loggingjson</groupId>
<artifactId>quarkus-logging-json</artifactId>
<version>3.1.0</version>
</dependency>
public class CustomMDCUnitOfWork extends MDCUnitOfWork {
static final String MDC_KEY_SERVICE_ID = "zoo.service";
static final String MDC_KEY_TRACING_ID = "zoo.tracing_id";
static final String MDC_KEY_COMPANY_ID = "zoo.company";
static final String MDC_KEY_ROUTE_ID = "zoo.route";
private final String originalServiceId;
private final String originalTracingId;
private final String originalCompanyId;
private final String originalRouteId;
public CustomMDCUnitOfWork(final Exchange exchange) {
super(exchange,
exchange.getContext().getInflightRepository(),
exchange.getContext().getMDCLoggingKeysPattern(),
exchange.getContext().isAllowUseOriginalMessage(),
exchange.getContext().isUseBreadcrumb());
this.originalServiceId = MDC.get(MDC_KEY_SERVICE_ID);
this.originalTracingId = MDC.get(MDC_KEY_TRACING_ID);
this.originalCompanyId = MDC.get(MDC_KEY_COMPANY_ID);
this.originalRouteId = MDC.get(MDC_KEY_ROUTE_ID);
putMDCValues(exchange);
}
//did not override prepareMDC of base class to prevent fragile base class issues
protected void putMDCValues(final Exchange exchange) {
MDC.put(MDC_KEY_SERVICE_ID, LoggingUtils.findServiceId(exchange));
MDC.put(MDC_KEY_TRACING_ID, LoggingUtils.findTracingId(exchange));
MDC.put(MDC_KEY_COMPANY_ID, LoggingUtils.findCompanyId(exchange));
MDC.put(MDC_KEY_ROUTE_ID, LoggingUtils.findRouteId(exchange));
}
@Override
public UnitOfWork newInstance(final Exchange exchange) {
return new CustomMDCUnitOfWork(exchange);
}
@Override
public AsyncCallback beforeProcess(final Processor processor, final Exchange exchange, final AsyncCallback callback) {
putMDCValues(exchange);
return super.beforeProcess(processor, exchange, new CustomMDCCallback(callback));
}
@Override
public void clear() {
super.clear();
if (this.originalServiceId != null) {
MDC.put(MDC_KEY_SERVICE_ID, this.originalServiceId);
} else {
MDC.remove(MDC_KEY_SERVICE_ID);
}
if (this.originalTracingId != null) {
MDC.put(MDC_KEY_TRACING_ID, this.originalTracingId);
} else {
MDC.remove(MDC_KEY_TRACING_ID);
}
if (this.originalCompanyId != null) {
MDC.put(MDC_KEY_COMPANY_ID, this.originalCompanyId);
} else {
MDC.remove(MDC_KEY_COMPANY_ID);
}
if (this.originalRouteId != null) {
MDC.put(MDC_KEY_ROUTE_ID, this.originalRouteId);
} else {
MDC.remove(MDC_KEY_ROUTE_ID);
}
}
private static final class CustomMDCCallback implements AsyncCallback {
private final AsyncCallback delegate;
private final String serviceId;
private final String tracingId;
private final String companyId;
private final String routeId;
private CustomMDCCallback(AsyncCallback delegate) {
this.delegate = delegate;
this.serviceId = MDC.get(MDC_KEY_SERVICE_ID);
this.tracingId = MDC.get(MDC_KEY_TRACING_ID);
this.companyId = MDC.get(MDC_KEY_COMPANY_ID);
this.routeId = MDC.get(MDC_KEY_ROUTE_ID);
}
public void done(boolean doneSync) {
try {
if (!doneSync) {
if (serviceId != null) {
MDC.put(MDC_KEY_SERVICE_ID, serviceId);
}
if (tracingId != null) {
MDC.put(MDC_KEY_TRACING_ID, tracingId);
}
if (companyId != null) {
MDC.put(MDC_KEY_COMPANY_ID, companyId);
}
if (routeId != null) {
MDC.put(MDC_KEY_ROUTE_ID, routeId);
}
}
} finally {
delegate.done(doneSync);
}
}
@Override
public String toString() {
return delegate.toString();
}
}
}
@ApplicationScoped
public class CustomMDCUnitOfWorkFactory implements UnitOfWorkFactory {
@Override
public UnitOfWork createUnitOfWork(final Exchange exchange) {
return new CustomMDCUnitOfWork(exchange);
}
@Override
public void afterPropertiesConfigured(final CamelContext camelContext) {
//nothing to do
}
}
@ApplicationScoped
public class LoggingObserver {
private static final List<String> FILTERED_KEYS = List.of(AUTHORIZATION_HEADER, OLINGO_ENDPOINT_HTTP_HEADERS);
private final Logger logger;
@Inject
public LoggingObserver(final Logger logger) {
this.logger = logger;
}
public void onExchangeSendingEvent(@Observes ExchangeSendingEvent exchangeSendingEvent) {
logger.info("start request to endpoint {} with headers {}", getEndpointString(exchangeSendingEvent.getEndpoint()),
getFilteredHeaders(exchangeSendingEvent.getExchange()));
}
public void onExchangeSentEvent(@Observes ExchangeSentEvent exchangeSentEvent) {
logger.info("request to endpoint {} finished in {}ms", getEndpointString(exchangeSentEvent.getEndpoint()), exchangeSentEvent.getTimeTaken());
}
private String getEndpointString(final Endpoint endpoint) {
return endpoint.toString();
}
private Map<String, Object> getFilteredHeaders(final Exchange exchange) {
final ExtendedCamelContext context = exchange.getContext().getCamelContextExtension();
final Map<String, Object> result = context.getHeadersMapFactory().newMap();
for (Map.Entry<String, Object> entry : exchange.getIn().getHeaders().entrySet()) {
final Map.Entry<String, Object> mapEntry = mapEntry(entry);
result.put(mapEntry.getKey(), mapEntry.getValue());
}
return result;
}
private Map.Entry<String, Object> mapEntry(final Map.Entry<String, Object> entry) {
if (FILTERED_KEYS.contains(entry.getKey())) {
return new AbstractMap.SimpleEntry<>(entry.getKey(), "<removed>");
} else {
return entry;
}
}
}