Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Channel mono #42

Merged
merged 5 commits into from
Jan 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions src/main/java/reactor/rabbitmq/ChannelCloseHandlers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package reactor.rabbitmq;

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.SignalType;

import java.util.function.BiConsumer;

public class ChannelCloseHandlers {

public static class SenderChannelCloseHandler implements BiConsumer<SignalType, Channel> {

private static final Logger LOGGER = LoggerFactory.getLogger(SenderChannelCloseHandler.class);

@Override
public void accept(SignalType signalType, Channel channel) {
int channelNumber = channel.getChannelNumber();
LOGGER.debug("closing channel {} by signal {}", channelNumber, signalType);
try {
if (channel.isOpen() && channel.getConnection().isOpen()) {
channel.close();
}
} catch (Exception e) {
LOGGER.warn("Channel {} didn't close normally: {}", channelNumber, e.getMessage());
}
}
}

}
1 change: 1 addition & 0 deletions src/main/java/reactor/rabbitmq/SendOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ public SendOptions exceptionHandler(BiConsumer<Sender.SendContext, Exception> ex
this.exceptionHandler = exceptionHandler;
return this;
}

}
64 changes: 33 additions & 31 deletions src/main/java/reactor/rabbitmq/Sender.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,23 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;

/**
* Reactive abstraction to create resources and send messages.
Expand All @@ -58,6 +61,10 @@ public class Sender implements AutoCloseable {

private final Mono<? extends Connection> connectionMono;

private final Mono<? extends Channel> channelMono;

private final BiConsumer<SignalType, Channel> channelCloseHandler;

private final AtomicBoolean hasConnection = new AtomicBoolean(false);

private final Mono<? extends Channel> resourceManagementChannelMono;
Expand All @@ -83,6 +90,10 @@ public Sender(SenderOptions options) {
.doOnSubscribe(c -> hasConnection.set(true))
.subscribeOn(this.connectionSubscriptionScheduler)
.cache();
this.channelMono = options.getChannelMono();
this.channelCloseHandler = options.getChannelCloseHandler() == null ?
new ChannelCloseHandlers.SenderChannelCloseHandler() :
options.getChannelCloseHandler();
this.privateResourceManagementScheduler = options.getResourceManagementScheduler() == null;
this.resourceManagementScheduler = options.getResourceManagementScheduler() == null ?
createScheduler("rabbitmq-sender-resource-creation") : options.getResourceManagementScheduler();
Expand All @@ -102,8 +113,9 @@ public Mono<Void> send(Publisher<OutboundMessage> messages, SendOptions options)
// TODO using a pool of channels?
// would be much more efficient if send is called very often
// less useful if seldom called, only for long or infinite message flux
final Mono<Channel> currentChannelMono = connectionMono.map(CHANNEL_CREATION_FUNCTION).cache();
final Mono<? extends Channel> currentChannelMono = getChannelMono();
final BiConsumer<SendContext, Exception> exceptionHandler = options.getExceptionHandler();

return currentChannelMono.flatMapMany(channel ->
Flux.from(messages)
.doOnNext(message -> {
Expand All @@ -119,17 +131,7 @@ public Mono<Void> send(Publisher<OutboundMessage> messages, SendOptions options)
}
})
.doOnError(e -> LOGGER.warn("Send failed with exception {}", e))
.doFinally(st -> {
int channelNumber = channel.getChannelNumber();
LOGGER.info("closing channel {} by signal {}", channelNumber, st);
try {
if (channel.isOpen() && channel.getConnection().isOpen()) {
channel.close();
}
} catch (Exception e) {
LOGGER.warn("Channel {} didn't close normally: {}", channelNumber, e.getMessage());
}
})
.doFinally(st -> channelCloseHandler.accept(st, channel))
).then();
}

Expand All @@ -141,17 +143,21 @@ public Flux<OutboundMessageResult> sendWithPublishConfirms(Publisher<OutboundMes
// TODO using a pool of channels?
// would be much more efficient if send is called very often
// less useful if seldom called, only for long or infinite message flux
final Mono<Channel> channelMono = connectionMono.map(CHANNEL_CREATION_FUNCTION)
.map(channel -> {
final Mono<? extends Channel> currentChannelMono = getChannelMono();

return currentChannelMono.map(channel -> {
try {
channel.confirmSelect();
} catch (IOException e) {
throw new RabbitFluxException("Error while setting publisher confirms on channel", e);
}
return channel;
});
})
.flatMapMany(channel -> new PublishConfirmOperator(messages, channel, channelCloseHandler, options));
}

return channelMono.flatMapMany(channel -> new PublishConfirmOperator(messages, channel, options));
private Mono<? extends Channel> getChannelMono() {
return channelMono != null ? channelMono : connectionMono.map(CHANNEL_CREATION_FUNCTION);
}

public RpcClient rpcClient(String exchange, String routingKey) {
Expand Down Expand Up @@ -452,15 +458,18 @@ private static class PublishConfirmOperator

private final SendOptions options;

public PublishConfirmOperator(Publisher<OutboundMessage> source, Channel channel, SendOptions options) {
private final BiConsumer<SignalType, Channel> channelCloseHandler;

public PublishConfirmOperator(Publisher<OutboundMessage> source, Channel channel, BiConsumer<SignalType, Channel> channelCloseHandler, SendOptions options) {
super(Flux.from(source));
this.channel = channel;
this.channelCloseHandler = channelCloseHandler;
this.options = options;
}

@Override
public void subscribe(CoreSubscriber<? super OutboundMessageResult> actual) {
source.subscribe(new PublishConfirmSubscriber(channel, actual, options));
source.subscribe(new PublishConfirmSubscriber(channel, channelCloseHandler, actual, options));
}
}

Expand All @@ -479,8 +488,11 @@ private static class PublishConfirmSubscriber implements

private final BiConsumer<SendContext, Exception> exceptionHandler;

private PublishConfirmSubscriber(Channel channel, Subscriber<? super OutboundMessageResult> subscriber, SendOptions options) {
private final BiConsumer<SignalType, Channel> channelCloseHandler;

private PublishConfirmSubscriber(Channel channel, BiConsumer<SignalType, Channel> channelCloseHandler, Subscriber<? super OutboundMessageResult> subscriber, SendOptions options) {
this.channel = channel;
this.channelCloseHandler = channelCloseHandler;
this.subscriber = subscriber;
this.exceptionHandler = options.getExceptionHandler();
}
Expand Down Expand Up @@ -563,7 +575,7 @@ public void onNext(OutboundMessage message) {
public void onError(Throwable throwable) {
if (state.compareAndSet(SubscriberState.ACTIVE, SubscriberState.COMPLETE) ||
state.compareAndSet(SubscriberState.OUTBOUND_DONE, SubscriberState.COMPLETE)) {
closeResources();
channelCloseHandler.accept(SignalType.ON_ERROR, channel);
// complete the flux state
subscriber.onError(throwable);
} else if (firstException.compareAndSet(null, throwable) && state.get() == SubscriberState.COMPLETE) {
Expand Down Expand Up @@ -594,21 +606,11 @@ private void handleError(Exception e, OutboundMessageResult result) {
private void maybeComplete() {
boolean done = state.compareAndSet(SubscriberState.OUTBOUND_DONE, SubscriberState.COMPLETE);
if (done) {
closeResources();
channelCloseHandler.accept(SignalType.ON_COMPLETE, channel);
subscriber.onComplete();
}
}

private void closeResources() {
try {
if (channel.isOpen()) {
channel.close();
}
} catch (Exception e) {
// not much we can do here
}
}

public <T> boolean checkComplete(T t) {
boolean complete = state.get() == SubscriberState.COMPLETE;
if (complete && firstException.get() == null) {
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/reactor/rabbitmq/SenderOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Scheduler;

import java.util.function.BiConsumer;
import java.util.function.Supplier;

/**
Expand All @@ -37,6 +39,10 @@ public class SenderOptions {

private Mono<? extends Connection> connectionMono;

private Mono<? extends Channel> channelMono;

private BiConsumer<SignalType, Channel> channelCloseHandler;

private Scheduler resourceManagementScheduler;

private Scheduler connectionSubscriptionScheduler;
Expand Down Expand Up @@ -100,6 +106,24 @@ public Mono<? extends Connection> getConnectionMono() {
return connectionMono;
}

public SenderOptions channelMono(Mono<? extends Channel> channelMono) {
this.channelMono = channelMono;
return this;
}

public Mono<? extends Channel> getChannelMono() {
return channelMono;
}

public BiConsumer<SignalType, Channel> getChannelCloseHandler() {
return channelCloseHandler;
}

public SenderOptions channelCloseHandler(BiConsumer<SignalType, Channel> channelCloseHandler) {
this.channelCloseHandler = channelCloseHandler;
return this;
}

public SenderOptions resourceManagementChannelMono(Mono<? extends Channel> resourceManagementChannelMono) {
this.resourceManagementChannelMono = resourceManagementChannelMono;
return this;
Expand Down
118 changes: 118 additions & 0 deletions src/test/java/reactor/rabbitmq/RabbitFluxTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.ChannelCloseHandlers.SenderChannelCloseHandler;
import reactor.test.StepVerifier;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

Expand Down Expand Up @@ -465,6 +468,87 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
assertEquals(nbMessages, counter.get());
}

@Test
public void senderRetryCreateChannel() throws Exception {
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
Connection mockConnection = mock(Connection.class);
when(mockConnectionFactory.newConnection()).thenReturn(mockConnection);
when(mockConnection.createChannel())
.thenThrow(new IOException("already closed exception"))
.thenThrow(new IOException("already closed exception"))
.thenReturn(connection.createChannel());

int nbMessages = 10;

Flux<OutboundMessage> msgFlux = Flux.range(0, nbMessages).map(i -> new OutboundMessage("", queue, "".getBytes()));

sender = createSender(new SenderOptions().connectionFactory(mockConnectionFactory));

StepVerifier.create(sender.send(msgFlux).retry(2))
.verifyComplete();
verify(mockConnection, times(3)).createChannel();

StepVerifier.create(consume(queue, nbMessages))
.expectNextCount(nbMessages)
.verifyComplete();
}

@Test
public void senderRetryNotWorkingWhenCreateChannelIsCached() throws Exception {
int nbMessages = 10;

Connection mockConnection = mock(Connection.class);
Channel mockChannel = mock(Channel.class);
when(mockConnection.createChannel())
.thenThrow(new RuntimeException("already closed exception"))
.thenThrow(new RuntimeException("already closed exception"))
.thenReturn(mockChannel);

Flux<OutboundMessage> msgFlux = Flux.range(0, nbMessages).map(i -> new OutboundMessage("", queue, "".getBytes()));

SenderOptions senderOptions = new SenderOptions()
.channelMono(Mono.just(mockConnection).map(this::createChannel).cache());

sender = createSender(senderOptions);

StepVerifier.create(sender.send(msgFlux).retry(2))
.expectError(RabbitFluxException.class)
.verify();

verify(mockChannel, never()).basicPublish(anyString(), anyString(), any(AMQP.BasicProperties.class), any(byte[].class));
verify(mockChannel, never()).close();
}

@Test
public void senderWithCustomChannelCloseHandler() throws Exception {
int nbMessages = 10;
Flux<OutboundMessage> msgFlux = Flux.range(0, nbMessages).map(i -> new OutboundMessage("", queue, "".getBytes()));

SenderChannelCloseHandler channelCloseHandler = mock(SenderChannelCloseHandler.class);
doNothing().when(channelCloseHandler).accept(any(SignalType.class), any(Channel.class));
Mono<Channel> monoChannel = Mono.fromCallable(() -> connection.createChannel()).cache();
SenderOptions senderOptions = new SenderOptions().channelCloseHandler(channelCloseHandler).channelMono(monoChannel);

sender = createSender(senderOptions);

Mono<Void> sendTwice = Mono.when(sender.send(msgFlux), sender.send(msgFlux))
.doFinally(signalType -> {
try {
monoChannel.block().close();
} catch (Exception e) {
throw new RabbitFluxException(e);
}
});

StepVerifier.create(sendTwice)
.verifyComplete();
verify(channelCloseHandler, times(2)).accept(SignalType.ON_COMPLETE, monoChannel.block());

StepVerifier.create(consume(queue, nbMessages * 2))
.expectNextCount(nbMessages * 2)
.verifyComplete();
}

@Test
public void publishConfirms() throws Exception {
int nbMessages = 10;
Expand Down Expand Up @@ -499,6 +583,8 @@ public void publishConfirmsErrorWhilePublishing() throws Exception {
Channel mockChannel = mock(Channel.class);
when(mockConnectionFactory.newConnection()).thenReturn(mockConnection);
when(mockConnection.createChannel()).thenReturn(mockChannel);
when(mockConnection.isOpen()).thenReturn(true);
when(mockChannel.getConnection()).thenReturn(mockConnection);

AtomicLong publishSequence = new AtomicLong();
when(mockChannel.getNextPublishSeqNo()).thenAnswer(invocation -> publishSequence.incrementAndGet());
Expand Down Expand Up @@ -824,4 +910,36 @@ private void sendAndReceiveMessages(String queue) throws Exception {
assertTrue(latch.await(5, TimeUnit.SECONDS));
subscriber.dispose();
}

private Flux<Delivery> consume(final String queue, int nbMessages) throws Exception {
return consume(queue, nbMessages, Duration.ofSeconds(1L));
}

private Flux<Delivery> consume(final String queue, int nbMessages, Duration timeout) throws Exception {
Channel channel = connection.createChannel();
Flux<Delivery> consumeFlux = Flux.create(emitter -> Mono.just(nbMessages).map(AtomicInteger::new).subscribe(countdown -> {
DeliverCallback deliverCallback = (consumerTag, message) -> {
emitter.next(message);
if (countdown.decrementAndGet() <= 0) {
emitter.complete();
}
};
CancelCallback cancelCallback = consumerTag -> {
};
try {
channel.basicConsume(queue, true, deliverCallback, cancelCallback);
} catch (IOException e) {
e.printStackTrace();
}
}));
return consumeFlux.timeout(timeout);
}

private Channel createChannel(Connection connection) {
try {
return connection.createChannel();
} catch (Exception e) {
throw new RabbitFluxException(e);
}
}
}