Skip to content

Commit

Permalink
Merge pull request #15986 from Ladicek/reactive-messaging-live-reload
Browse files Browse the repository at this point in the history
Support live reload for SmallRye Reactive Messaging
  • Loading branch information
cescoffier committed Apr 5, 2021
2 parents c7b5494 + 7e5be03 commit 03df125
Show file tree
Hide file tree
Showing 11 changed files with 352 additions and 1 deletion.
Expand Up @@ -73,6 +73,11 @@
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
Expand Down
@@ -0,0 +1,104 @@
package io.quarkus.smallrye.reactivemessaging.amqp.devmode.nohttp;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.logging.LogRecord;
import java.util.stream.Collectors;

import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.smallrye.reactivemessaging.amqp.AnonymousAmqpBroker;
import io.quarkus.test.QuarkusDevModeTest;

public class AmqpDevModeNoHttpTest {
@RegisterExtension
static QuarkusDevModeTest TEST = new QuarkusDevModeTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(Producer.class, Consumer.class,
AnonymousAmqpBroker.class, ProtonProtocolManagerFactory.class)
.addAsResource("broker.xml")
.addAsResource("application.properties"))
.setLogRecordPredicate(r -> r.getLoggerName().equals(Consumer.class.getName()));

@BeforeAll
public static void startBroker() {
AnonymousAmqpBroker.start();
}

@AfterAll
public static void stopBroker() {
AnonymousAmqpBroker.stop();
}

// For all tests below: we don't know exactly when the AMQP connector receives credits from the broker
// and then requests items from the producer, so we don't know what number will be sent to the queue first.
// What we do know, and hence test, is the relationship between consecutive numbers in the queue.
// See also https://github.com/smallrye/smallrye-reactive-messaging/issues/1125.

@Test
public void testProducerUpdate() {
await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> {
List<LogRecord> log = TEST.getLogRecords();
assertThat(log).hasSizeGreaterThanOrEqualTo(5);

List<Long> nums = log.stream()
.map(it -> Long.parseLong(it.getMessage()))
.collect(Collectors.toList());

long last = nums.get(nums.size() - 1);
assertThat(nums).containsSequence(last - 4, last - 3, last - 2, last - 1, last);
});

TEST.modifySourceFile(Producer.class, s -> s.replace("* 1", "* 2"));

await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> {
List<LogRecord> log = TEST.getLogRecords();
assertThat(log).hasSizeGreaterThanOrEqualTo(5);

List<Long> nums = log.stream()
.map(it -> Long.parseLong(it.getMessage()))
.collect(Collectors.toList());

long last = nums.get(nums.size() - 1);
assertThat(nums).containsSequence(last - 8, last - 6, last - 4, last - 2, last);
});
}

@Test
public void testConsumerUpdate() {
await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> {
List<LogRecord> log = TEST.getLogRecords();
assertThat(log).hasSizeGreaterThanOrEqualTo(5);

List<Long> nums = log.stream()
.map(it -> Long.parseLong(it.getMessage()))
.collect(Collectors.toList());

long last = nums.get(nums.size() - 1);
assertThat(nums).containsSequence(last - 4, last - 3, last - 2, last - 1, last);
});

TEST.modifySourceFile(Consumer.class, s -> s.replace("* 1", "* 3"));

await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> {
List<LogRecord> log = TEST.getLogRecords();
assertThat(log).hasSizeGreaterThanOrEqualTo(5);

List<Long> nums = log.stream()
.map(it -> Long.parseLong(it.getMessage()))
.collect(Collectors.toList());

long last = nums.get(nums.size() - 1);
assertThat(nums).containsSequence(last - 12, last - 9, last - 6, last - 3, last);
});
}
}
@@ -0,0 +1,16 @@
package io.quarkus.smallrye.reactivemessaging.amqp.devmode.nohttp;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;

@ApplicationScoped
public class Consumer {
private static final Logger log = Logger.getLogger(Consumer.class);

@Incoming("in")
public void consume(long content) {
log.info(content * 1);
}
}
@@ -0,0 +1,20 @@
package io.quarkus.smallrye.reactivemessaging.amqp.devmode.nohttp;

import java.time.Duration;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.reactivestreams.Publisher;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
public class Producer {
@Outgoing("source")
public Publisher<Long> generate() {
return Multi.createFrom().ticks().every(Duration.ofMillis(200))
.onOverflow().drop()
.map(i -> i * 1);
}
}
Expand Up @@ -3,6 +3,8 @@
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.jboss.jandex.DotName;

import io.smallrye.reactive.messaging.MutinyEmitter;
Expand All @@ -17,6 +19,7 @@
public final class ReactiveMessagingDotNames {

static final DotName VOID = DotName.createSimple(void.class.getName());
static final DotName OBJECT = DotName.createSimple(Object.class.getName());
static final DotName INCOMING = DotName.createSimple(Incoming.class.getName());
static final DotName INCOMINGS = DotName.createSimple(Incomings.class.getName());
static final DotName OUTGOING = DotName.createSimple(Outgoing.class.getName());
Expand All @@ -35,6 +38,9 @@ public final class ReactiveMessagingDotNames {
static final DotName MERGE = DotName.createSimple(Merge.class.getName());
static final DotName BROADCAST = DotName.createSimple(Broadcast.class.getName());

static final DotName INCOMING_CONNECTOR_FACTORY = DotName.createSimple(IncomingConnectorFactory.class.getName());
static final DotName OUTGOING_CONNECTOR_FACTORY = DotName.createSimple(OutgoingConnectorFactory.class.getName());

static final DotName SMALLRYE_BLOCKING = DotName.createSimple(io.smallrye.common.annotation.Blocking.class.getName());

// Do not directly reference the MetricDecorator (due to its direct references to MP Metrics, which may not be present)
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.jboss.jandex.AnnotationValue;
import org.jboss.jandex.ClassInfo;
import org.jboss.jandex.DotName;
import org.jboss.jandex.IndexView;
import org.jboss.jandex.MethodInfo;
import org.jboss.jandex.Type;
import org.jboss.logging.Logger;
Expand All @@ -43,9 +44,11 @@
import io.quarkus.arc.processor.InjectionPointInfo;
import io.quarkus.deployment.Feature;
import io.quarkus.deployment.GeneratedClassGizmoAdaptor;
import io.quarkus.deployment.IsDevelopment;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.GeneratedClassBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
Expand All @@ -67,6 +70,8 @@
import io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingRecorder;
import io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingRecorder.SmallRyeReactiveMessagingContext;
import io.quarkus.smallrye.reactivemessaging.runtime.WorkerConfiguration;
import io.quarkus.smallrye.reactivemessaging.runtime.devmode.DevModeSupportConnectorFactory;
import io.quarkus.smallrye.reactivemessaging.runtime.devmode.DevModeSupportConnectorFactoryInterceptor;
import io.smallrye.reactive.messaging.Invoker;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.smallrye.reactive.messaging.extension.ChannelConfiguration;
Expand Down Expand Up @@ -483,4 +488,39 @@ private String generateInvoker(BeanInfo bean, MethodInfo method, ClassOutput cla
return generatedName.replace('/', '.');
}

@BuildStep(onlyIf = IsDevelopment.class)
void devmodeSupport(CombinedIndexBuildItem index, BuildProducer<AdditionalBeanBuildItem> beans,
BuildProducer<AnnotationsTransformerBuildItem> transformations) {
beans.produce(new AdditionalBeanBuildItem(DevModeSupportConnectorFactory.class,
DevModeSupportConnectorFactoryInterceptor.class));

transformations.produce(new AnnotationsTransformerBuildItem(new AnnotationsTransformer() {
@Override
public boolean appliesTo(AnnotationTarget.Kind kind) {
return kind == AnnotationTarget.Kind.CLASS;
}

@Override
public void transform(TransformationContext ctx) {
ClassInfo clazz = ctx.getTarget().asClass();
if (doesImplement(clazz, ReactiveMessagingDotNames.INCOMING_CONNECTOR_FACTORY, index.getIndex())
|| doesImplement(clazz, ReactiveMessagingDotNames.OUTGOING_CONNECTOR_FACTORY, index.getIndex())) {
ctx.transform().add(DevModeSupportConnectorFactory.class).done();
}
}

private boolean doesImplement(ClassInfo clazz, DotName iface, IndexView index) {
while (clazz != null && !clazz.name().equals(ReactiveMessagingDotNames.OBJECT)) {
if (clazz.interfaceNames().contains(iface)) {
return true;
}

clazz = index.getClassByName(clazz.superName());
}

return false;
}
}));
}

}
@@ -0,0 +1,14 @@
package io.quarkus.smallrye.reactivemessaging.runtime.devmode;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import javax.interceptor.InterceptorBinding;

@InterceptorBinding
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface DevModeSupportConnectorFactory {
}
@@ -0,0 +1,82 @@
package io.quarkus.smallrye.reactivemessaging.runtime.devmode;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import javax.annotation.Priority;
import javax.interceptor.AroundInvoke;
import javax.interceptor.Interceptor;
import javax.interceptor.InvocationContext;

import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

@Interceptor
@DevModeSupportConnectorFactory
@Priority(Interceptor.Priority.PLATFORM_BEFORE + 10)
public class DevModeSupportConnectorFactoryInterceptor {
private static Supplier<CompletableFuture<Boolean>> onMessage;

static void register(Supplier<CompletableFuture<Boolean>> onMessage) {
DevModeSupportConnectorFactoryInterceptor.onMessage = onMessage;
}

@AroundInvoke
public Object intercept(InvocationContext ctx) throws Exception {
if (onMessage == null) {
return ctx.proceed();
}

if (ctx.getMethod().getName().equals("getPublisherBuilder")) {
PublisherBuilder<Message<?>> result = (PublisherBuilder<Message<?>>) ctx.proceed();
return result.flatMapCompletionStage(msg -> {
CompletableFuture<Message<?>> future = new CompletableFuture<>();
onMessage.get().whenComplete((restarted, error) -> {
if (!restarted) {
// if restarted, a new stream is already running,
// no point in emitting an event to the old stream
future.complete(msg);
}
});
return future;
});
}

if (ctx.getMethod().getName().equals("getSubscriberBuilder")) {
SubscriberBuilder<Message<?>, Void> result = (SubscriberBuilder<Message<?>, Void>) ctx.proceed();
return ReactiveStreams.fromSubscriber(new Subscriber<Message<?>>() {
private Subscriber<Message<?>> subscriber;

@Override
public void onSubscribe(Subscription s) {
subscriber = result.build();
subscriber.onSubscribe(s);
}

@Override
public void onNext(Message<?> o) {
subscriber.onNext(o);
onMessage.get().join();
}

@Override
public void onError(Throwable t) {
subscriber.onError(t);
onMessage.get().join();
}

@Override
public void onComplete() {
subscriber.onComplete();
onMessage.get().join();
}
});
}

return ctx.proceed();
}
}

0 comments on commit 03df125

Please sign in to comment.