Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -774,8 +774,10 @@
<configuration>
<java>
<includes>
<include>src/main/java/com/rabbitmq/AmqpClientTestExtension.java</include>
<include>src/main/java/com/rabbitmq/client/ConnectionFactory.java</include>
<include>src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java</include>
<include>src/main/java/com/rabbitmq/client/impl/Environment.java</include>
<include>src/main/java/com/rabbitmq/client/observation/**/*.java</include>
<include>src/test/java/com/rabbitmq/client/test/functional/MicrometerObservationCollectorMetrics.java</include>
<include>src/test/java/com/rabbitmq/client/test/NettyTest.java</include>
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/com/rabbitmq/client/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -1063,6 +1063,10 @@ public ConnectionFactory setCredentialsRefreshService(
protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IOException {
if (netty) {
if (this.frameHandlerFactory == null) {
Predicate<ShutdownSignalException> recoveryCondition =
this.connectionRecoveryTriggeringCondition == null
? AutorecoveringConnection.DEFAULT_CONNECTION_RECOVERY_TRIGGERING_CONDITION
: this.connectionRecoveryTriggeringCondition;
this.frameHandlerFactory =
new NettyFrameHandlerFactory(
this.nettyConf.eventLoopGroup,
Expand All @@ -1072,7 +1076,9 @@ protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IO
this.nettyConf.enqueuingTimeout,
connectionTimeout,
socketConf,
maxInboundMessageBodySize);
maxInboundMessageBodySize,
this.automaticRecovery,
recoveryCondition);
}
return this.frameHandlerFactory;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ private ChannelState(Channel channel) {
*
* @deprecated Use {@link #markRejectedMessage(boolean)} instead
*/
@Deprecated
protected abstract void markRejectedMessage();

/**
Expand Down
60 changes: 33 additions & 27 deletions src/main/java/com/rabbitmq/client/impl/Environment.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand All @@ -12,40 +13,45 @@
//
// If you have any questions regarding licensing, please contact us at
// info@rabbitmq.com.

package com.rabbitmq.client.impl;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/**
* Infers information about the execution environment, e.g.
* security permissions.
* Package-protected API.
* Infers information about the execution environment, e.g. security permissions. Package-protected
* API.
*/
public class Environment {

/**
* This method is deprecated and subject to removal in the next major release.
*
* There is no replacement for this method, as it used to use the
* {@link SecurityManager}, which is itself deprecated and subject to removal.
* @deprecated
* @return always returns true
*/
@Deprecated
public static boolean isAllowedToModifyThreads() {
return true;
}
/**
* This method is deprecated and subject to removal in the next major release.
*
* <p>There is no replacement for this method, as it used to use the {@link SecurityManager},
* which is itself deprecated and subject to removal.
*
* @deprecated
* @return always returns true
*/
@Deprecated
public static boolean isAllowedToModifyThreads() {
return true;
}

static Thread newThread(Runnable runnable, String name) {
return newThread(Executors.defaultThreadFactory(), runnable, name);
}

public static Thread newThread(ThreadFactory factory, Runnable runnable, String name) {
Thread t = factory.newThread(runnable);
t.setName(name);
return t;
}
public static Thread newThread(ThreadFactory factory, Runnable runnable, String name) {
Thread t = factory.newThread(runnable);
t.setName(name);
return t;
}

public static Thread newThread(ThreadFactory factory, Runnable runnable, String name, boolean isDaemon) {
Thread t = newThread(factory, runnable, name);
t.setDaemon(isDaemon);
return t;
}
public static Thread newThread(
ThreadFactory factory, Runnable runnable, String name, boolean isDaemon) {
Thread t = newThread(factory, runnable, name);
t.setDaemon(isDaemon);
return t;
}
}
132 changes: 87 additions & 45 deletions src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.rabbitmq.client.Address;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MalformedFrameException;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.SocketConfigurator;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -51,16 +52,16 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.net.ssl.SSLHandshakeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -73,6 +74,7 @@ public final class NettyFrameHandlerFactory extends AbstractFrameHandlerFactory
private final Consumer<Channel> channelCustomizer;
private final Consumer<Bootstrap> bootstrapCustomizer;
private final Duration enqueuingTimeout;
private final Predicate<ShutdownSignalException> willRecover;

public NettyFrameHandlerFactory(
EventLoopGroup eventLoopGroup,
Expand All @@ -82,14 +84,30 @@ public NettyFrameHandlerFactory(
Duration enqueuingTimeout,
int connectionTimeout,
SocketConfigurator configurator,
int maxInboundMessageBodySize) {
int maxInboundMessageBodySize,
boolean automaticRecovery,
Predicate<ShutdownSignalException> recoveryCondition) {
super(connectionTimeout, configurator, sslContextFactory != null, maxInboundMessageBodySize);
this.eventLoopGroup = eventLoopGroup;
this.sslContextFactory = sslContextFactory == null ? connName -> null : sslContextFactory;
this.channelCustomizer = channelCustomizer == null ? Utils.noOpConsumer() : channelCustomizer;
this.bootstrapCustomizer =
bootstrapCustomizer == null ? Utils.noOpConsumer() : bootstrapCustomizer;
this.enqueuingTimeout = enqueuingTimeout;
this.willRecover =
sse -> {
if (!automaticRecovery) {
return false;
} else {
try {
return recoveryCondition.test(sse);
} catch (Exception e) {
// we assume it will recover, so we take the safe path to dispatch the closing
// it avoids the risk of deadlock
return true;
}
}
};
}

private static void closeNettyState(Channel channel, EventLoopGroup eventLoopGroup) {
Expand Down Expand Up @@ -133,6 +151,7 @@ public FrameHandler create(Address addr, String connectionName) throws IOExcepti
sslContext,
this.eventLoopGroup,
this.enqueuingTimeout,
this.willRecover,
this.channelCustomizer,
this.bootstrapCustomizer);
}
Expand Down Expand Up @@ -163,6 +182,7 @@ private NettyFrameHandler(
SslContext sslContext,
EventLoopGroup elg,
Duration enqueuingTimeout,
Predicate<ShutdownSignalException> willRecover,
Consumer<Channel> channelCustomizer,
Consumer<Bootstrap> bootstrapCustomizer)
throws IOException {
Expand All @@ -180,6 +200,14 @@ private NettyFrameHandler(
} else {
this.eventLoopGroup = null;
}

if (b.config().group() == null) {
throw new IllegalStateException("The event loop group is not set");
} else if (b.config().group().isShuttingDown()) {
LOGGER.warn("The Netty loop group was shut down, it is not possible to connect or recover");
throw new IllegalStateException("The event loop group was shut down");
}

if (b.config().channelFactory() == null) {
b.channel(NioSocketChannel.class);
}
Expand All @@ -195,7 +223,8 @@ private NettyFrameHandler(
int lengthFieldOffset = 3;
int lengthFieldLength = 4;
int lengthAdjustement = 1;
AmqpHandler amqpHandler = new AmqpHandler(maxInboundMessageBodySize, this::close);
AmqpHandler amqpHandler =
new AmqpHandler(maxInboundMessageBodySize, this::close, willRecover);
int port = ConnectionFactory.portOrDefault(addr.getPort(), sslContext != null);
b.handler(
new ChannelInitializer<SocketChannel>() {
Expand Down Expand Up @@ -296,6 +325,10 @@ public void sendHeader() {

@Override
public void initialize(AMQConnection connection) {
LOGGER.debug(
"Setting connection {} to AMQP handler {}",
connection.getClientProvidedName(),
this.handler.id);
this.handler.connection = connection;
}

Expand Down Expand Up @@ -333,7 +366,6 @@ public void writeFrame(Frame frame) throws IOException {
if (canWriteNow) {
this.doWriteFrame(frame);
} else {
this.handler.logEvents();
throw new IOException("Frame enqueuing failed");
}
} catch (InterruptedException e) {
Expand Down Expand Up @@ -404,14 +436,30 @@ private static class AmqpHandler extends ChannelInboundHandlerAdapter {

private final int maxPayloadSize;
private final Runnable closeSequence;
private final Predicate<ShutdownSignalException> willRecover;
private volatile AMQConnection connection;
private volatile Channel ch;
private final AtomicBoolean writable = new AtomicBoolean(true);
private final AtomicReference<CountDownLatch> writableLatch =
new AtomicReference<>(new CountDownLatch(1));

private AmqpHandler(int maxPayloadSize, Runnable closeSequence) {
private final AtomicBoolean shutdownDispatched = new AtomicBoolean(false);
private static final AtomicInteger SEQUENCE = new AtomicInteger(0);
private final String id;

private AmqpHandler(
int maxPayloadSize,
Runnable closeSequence,
Predicate<ShutdownSignalException> willRecover) {
this.maxPayloadSize = maxPayloadSize;
this.closeSequence = closeSequence;
this.willRecover = willRecover;
this.id = "amqp-handler-" + SEQUENCE.getAndIncrement();
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ch = ctx.channel();
super.channelActive(ctx);
}

@Override
Expand Down Expand Up @@ -444,49 +492,16 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if (noProblem
&& (!this.connection.isRunning() || this.connection.hasBrokerInitiatedShutdown())) {
// looks like the frame was Close-Ok or Close
ctx.executor().submit(() -> this.connection.doFinalShutdown());
this.dispatchShutdownToConnection(() -> this.connection.doFinalShutdown());
}
} finally {
m.release();
}
}

private static class Event {
private final long time;
private final String label;

public Event(long time, String label) {
this.time = time;
this.label = label;
}

@Override
public String toString() {
return this.label + " " + this.time;
}
}

private static final int MAX_EVENTS = 100;
private final Queue<Event> events = new ConcurrentLinkedQueue<>();

private void logEvents() {
if (this.events.size() > 0) {
long start = this.events.peek().time;
LOGGER.info("channel writability history:");
events.forEach(e -> LOGGER.info("{}: {}", (e.time - start) / 1_000_000, e.label));
}
}

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
boolean canWrite = ctx.channel().isWritable();
Event event = new Event(System.nanoTime(), Boolean.toString(canWrite));
if (this.events.size() >= MAX_EVENTS) {
this.events.poll();
this.events.offer(event);
}
this.events.add(event);

if (this.writable.compareAndSet(!canWrite, canWrite)) {
if (canWrite) {
CountDownLatch latch = writableLatch.getAndSet(new CountDownLatch(1));
Expand All @@ -502,12 +517,13 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio
public void channelInactive(ChannelHandlerContext ctx) {
if (needToDispatchIoError()) {
AMQConnection c = this.connection;
LOGGER.debug("Dispatching shutdown when channel became inactive ({})", this.id);
if (c.isOpen()) {
// it is likely to be an IO exception
c.handleIoError(null);
this.dispatchShutdownToConnection(() -> c.handleIoError(null));
} else {
// just in case, the call is idempotent anyway
c.doFinalShutdown();
this.dispatchShutdownToConnection(c::doFinalShutdown);
}
}
}
Expand All @@ -533,7 +549,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
this.connection.getAddress().getHostName(),
this.connection.getPort());
if (needToDispatchIoError()) {
this.connection.handleHeartbeatFailure();
this.dispatchShutdownToConnection(() -> this.connection.handleHeartbeatFailure());
}
} else if (e.state() == IdleState.WRITER_IDLE) {
this.connection.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0));
Expand All @@ -545,7 +561,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc

private void handleIoError(Throwable cause) {
if (needToDispatchIoError()) {
this.connection.handleIoError(cause);
this.dispatchShutdownToConnection(() -> this.connection.handleIoError(cause));
} else {
this.closeSequence.run();
}
Expand All @@ -563,6 +579,32 @@ private boolean isWritable() {
private CountDownLatch writableLatch() {
return this.writableLatch.get();
}

protected void dispatchShutdownToConnection(Runnable connectionShutdownRunnable) {
if (this.shutdownDispatched.compareAndSet(false, true)) {
String name = "rabbitmq-connection-shutdown-" + this.id;
AMQConnection c = this.connection;
if (c == null || ch == null) {
// not enough information, we dispatch in separate thread
Environment.newThread(connectionShutdownRunnable, name).start();
} else {
if (ch.eventLoop().inEventLoop()) {
if (this.willRecover.test(c.getCloseReason()) || ch.eventLoop().isShuttingDown()) {
// the connection will recover, we don't want this to happen in the event loop,
// it could cause a deadlock, so using a separate thread
// name = name + "-" + c;
Environment.newThread(connectionShutdownRunnable, name).start();
} else {
// no recovery, it is safe to dispatch in the event loop
ch.eventLoop().submit(connectionShutdownRunnable);
}
} else {
// not in the event loop, we can run it in the same thread
connectionShutdownRunnable.run();
}
}
}
}
}

private static final class ProtocolVersionMismatchHandler extends ChannelInboundHandlerAdapter {
Expand Down
Loading