Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support live reload for SmallRye Reactive Messaging #15986

Merged
merged 1 commit into from Apr 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -73,6 +73,11 @@
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
Expand Down
@@ -0,0 +1,104 @@
package io.quarkus.smallrye.reactivemessaging.amqp.devmode.nohttp;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.logging.LogRecord;
import java.util.stream.Collectors;

import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.smallrye.reactivemessaging.amqp.AnonymousAmqpBroker;
import io.quarkus.test.QuarkusDevModeTest;

public class AmqpDevModeNoHttpTest {
@RegisterExtension
static QuarkusDevModeTest TEST = new QuarkusDevModeTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(Producer.class, Consumer.class,
AnonymousAmqpBroker.class, ProtonProtocolManagerFactory.class)
.addAsResource("broker.xml")
.addAsResource("application.properties"))
.setLogRecordPredicate(r -> r.getLoggerName().equals(Consumer.class.getName()));

@BeforeAll
public static void startBroker() {
AnonymousAmqpBroker.start();
}

@AfterAll
public static void stopBroker() {
AnonymousAmqpBroker.stop();
}

// For all tests below: we don't know exactly when the AMQP connector receives credits from the broker
// and then requests items from the producer, so we don't know what number will be sent to the queue first.
// What we do know, and hence test, is the relationship between consecutive numbers in the queue.
// See also https://github.com/smallrye/smallrye-reactive-messaging/issues/1125.

@Test
public void testProducerUpdate() {
await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> {
List<LogRecord> log = TEST.getLogRecords();
assertThat(log).hasSizeGreaterThanOrEqualTo(5);

List<Long> nums = log.stream()
.map(it -> Long.parseLong(it.getMessage()))
.collect(Collectors.toList());

long last = nums.get(nums.size() - 1);
assertThat(nums).containsSequence(last - 4, last - 3, last - 2, last - 1, last);
});

TEST.modifySourceFile(Producer.class, s -> s.replace("* 1", "* 2"));

await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> {
List<LogRecord> log = TEST.getLogRecords();
assertThat(log).hasSizeGreaterThanOrEqualTo(5);

List<Long> nums = log.stream()
.map(it -> Long.parseLong(it.getMessage()))
.collect(Collectors.toList());

long last = nums.get(nums.size() - 1);
assertThat(nums).containsSequence(last - 8, last - 6, last - 4, last - 2, last);
});
}

@Test
public void testConsumerUpdate() {
await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> {
List<LogRecord> log = TEST.getLogRecords();
assertThat(log).hasSizeGreaterThanOrEqualTo(5);

List<Long> nums = log.stream()
.map(it -> Long.parseLong(it.getMessage()))
.collect(Collectors.toList());

long last = nums.get(nums.size() - 1);
assertThat(nums).containsSequence(last - 4, last - 3, last - 2, last - 1, last);
});

TEST.modifySourceFile(Consumer.class, s -> s.replace("* 1", "* 3"));

await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> {
List<LogRecord> log = TEST.getLogRecords();
assertThat(log).hasSizeGreaterThanOrEqualTo(5);

List<Long> nums = log.stream()
.map(it -> Long.parseLong(it.getMessage()))
.collect(Collectors.toList());

long last = nums.get(nums.size() - 1);
assertThat(nums).containsSequence(last - 12, last - 9, last - 6, last - 3, last);
});
}
}
@@ -0,0 +1,16 @@
package io.quarkus.smallrye.reactivemessaging.amqp.devmode.nohttp;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;

@ApplicationScoped
public class Consumer {
private static final Logger log = Logger.getLogger(Consumer.class);

@Incoming("in")
public void consume(long content) {
log.info(content * 1);
}
}
@@ -0,0 +1,20 @@
package io.quarkus.smallrye.reactivemessaging.amqp.devmode.nohttp;

import java.time.Duration;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.reactivestreams.Publisher;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
public class Producer {
@Outgoing("source")
public Publisher<Long> generate() {
return Multi.createFrom().ticks().every(Duration.ofMillis(200))
.onOverflow().drop()
.map(i -> i * 1);
}
}
Expand Up @@ -3,6 +3,8 @@
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.jboss.jandex.DotName;

import io.smallrye.reactive.messaging.MutinyEmitter;
Expand All @@ -17,6 +19,7 @@
public final class ReactiveMessagingDotNames {

static final DotName VOID = DotName.createSimple(void.class.getName());
static final DotName OBJECT = DotName.createSimple(Object.class.getName());
static final DotName INCOMING = DotName.createSimple(Incoming.class.getName());
static final DotName INCOMINGS = DotName.createSimple(Incomings.class.getName());
static final DotName OUTGOING = DotName.createSimple(Outgoing.class.getName());
Expand All @@ -35,6 +38,9 @@ public final class ReactiveMessagingDotNames {
static final DotName MERGE = DotName.createSimple(Merge.class.getName());
static final DotName BROADCAST = DotName.createSimple(Broadcast.class.getName());

static final DotName INCOMING_CONNECTOR_FACTORY = DotName.createSimple(IncomingConnectorFactory.class.getName());
static final DotName OUTGOING_CONNECTOR_FACTORY = DotName.createSimple(OutgoingConnectorFactory.class.getName());

static final DotName SMALLRYE_BLOCKING = DotName.createSimple(io.smallrye.common.annotation.Blocking.class.getName());

// Do not directly reference the MetricDecorator (due to its direct references to MP Metrics, which may not be present)
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.jboss.jandex.AnnotationValue;
import org.jboss.jandex.ClassInfo;
import org.jboss.jandex.DotName;
import org.jboss.jandex.IndexView;
import org.jboss.jandex.MethodInfo;
import org.jboss.jandex.Type;
import org.jboss.logging.Logger;
Expand All @@ -43,9 +44,11 @@
import io.quarkus.arc.processor.InjectionPointInfo;
import io.quarkus.deployment.Feature;
import io.quarkus.deployment.GeneratedClassGizmoAdaptor;
import io.quarkus.deployment.IsDevelopment;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.GeneratedClassBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
Expand All @@ -67,6 +70,8 @@
import io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingRecorder;
import io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingRecorder.SmallRyeReactiveMessagingContext;
import io.quarkus.smallrye.reactivemessaging.runtime.WorkerConfiguration;
import io.quarkus.smallrye.reactivemessaging.runtime.devmode.DevModeSupportConnectorFactory;
import io.quarkus.smallrye.reactivemessaging.runtime.devmode.DevModeSupportConnectorFactoryInterceptor;
import io.smallrye.reactive.messaging.Invoker;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.smallrye.reactive.messaging.extension.ChannelConfiguration;
Expand Down Expand Up @@ -483,4 +488,39 @@ private String generateInvoker(BeanInfo bean, MethodInfo method, ClassOutput cla
return generatedName.replace('/', '.');
}

@BuildStep(onlyIf = IsDevelopment.class)
void devmodeSupport(CombinedIndexBuildItem index, BuildProducer<AdditionalBeanBuildItem> beans,
BuildProducer<AnnotationsTransformerBuildItem> transformations) {
beans.produce(new AdditionalBeanBuildItem(DevModeSupportConnectorFactory.class,
DevModeSupportConnectorFactoryInterceptor.class));

transformations.produce(new AnnotationsTransformerBuildItem(new AnnotationsTransformer() {
@Override
public boolean appliesTo(AnnotationTarget.Kind kind) {
return kind == AnnotationTarget.Kind.CLASS;
}

@Override
public void transform(TransformationContext ctx) {
ClassInfo clazz = ctx.getTarget().asClass();
if (doesImplement(clazz, ReactiveMessagingDotNames.INCOMING_CONNECTOR_FACTORY, index.getIndex())
|| doesImplement(clazz, ReactiveMessagingDotNames.OUTGOING_CONNECTOR_FACTORY, index.getIndex())) {
ctx.transform().add(DevModeSupportConnectorFactory.class).done();
}
}

private boolean doesImplement(ClassInfo clazz, DotName iface, IndexView index) {
while (clazz != null && !clazz.name().equals(ReactiveMessagingDotNames.OBJECT)) {
if (clazz.interfaceNames().contains(iface)) {
return true;
}

clazz = index.getClassByName(clazz.superName());
}

return false;
}
}));
}

}
@@ -0,0 +1,14 @@
package io.quarkus.smallrye.reactivemessaging.runtime.devmode;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import javax.interceptor.InterceptorBinding;

@InterceptorBinding
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface DevModeSupportConnectorFactory {
}
@@ -0,0 +1,82 @@
package io.quarkus.smallrye.reactivemessaging.runtime.devmode;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import javax.annotation.Priority;
import javax.interceptor.AroundInvoke;
import javax.interceptor.Interceptor;
import javax.interceptor.InvocationContext;

import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

@Interceptor
@DevModeSupportConnectorFactory
@Priority(Interceptor.Priority.PLATFORM_BEFORE + 10)
public class DevModeSupportConnectorFactoryInterceptor {
private static Supplier<CompletableFuture<Boolean>> onMessage;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have been volatile

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that is true. I copied this pattern from other dev mode handlers, without too much thinking. I'll submit a PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done: #16263


static void register(Supplier<CompletableFuture<Boolean>> onMessage) {
DevModeSupportConnectorFactoryInterceptor.onMessage = onMessage;
}

@AroundInvoke
public Object intercept(InvocationContext ctx) throws Exception {
if (onMessage == null) {
return ctx.proceed();
}

if (ctx.getMethod().getName().equals("getPublisherBuilder")) {
PublisherBuilder<Message<?>> result = (PublisherBuilder<Message<?>>) ctx.proceed();
return result.flatMapCompletionStage(msg -> {
CompletableFuture<Message<?>> future = new CompletableFuture<>();
onMessage.get().whenComplete((restarted, error) -> {
if (!restarted) {
// if restarted, a new stream is already running,
// no point in emitting an event to the old stream
future.complete(msg);
}
});
return future;
});
}

if (ctx.getMethod().getName().equals("getSubscriberBuilder")) {
SubscriberBuilder<Message<?>, Void> result = (SubscriberBuilder<Message<?>, Void>) ctx.proceed();
return ReactiveStreams.fromSubscriber(new Subscriber<Message<?>>() {
private Subscriber<Message<?>> subscriber;

@Override
public void onSubscribe(Subscription s) {
subscriber = result.build();
subscriber.onSubscribe(s);
}

@Override
public void onNext(Message<?> o) {
subscriber.onNext(o);
onMessage.get().join();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this block the IO thread? Also won't the restart happen after the message has been delivered, rather than before, so the message is handled by the old code?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hum, right - this method is going to be called on this I/O thread. Why is it require to join actually?

In theory, the connectors are passing the reactive streams TCK meaning that the onNext / onError / onComplete methods cannot be called concurrently (but could be called by different threads).

Also, we may want to reverse the onNext and onMessage call to handle the message with the new code.

Copy link
Contributor Author

@Ladicek Ladicek Apr 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually did try something like onMessage.get().whenComplete((value, error) -> subscriber.onNext(o)), but that only works well with instrumentation-based reload. With full restart, you've got a new stream running, and no way to push the message there.

Note that this is @Outgoing -- so there's no user code handling the message (unless we had some kind of "message interceptors" per smallrye/smallrye-reactive-messaging#1141 :-) ). It's only the connector code at this point, submitting the message to a broker (or whatever is on the other side).

I trigger restart after submitting the message, so that the message isn't lost. Note that even if I triggered restart before submitting the message, it would still be processed by old code with full restart, because full restart takes a few seconds during which old code still runs. (At least that's what I observed.) And I wait for restart to finish so that no other message may come in and be lost. I realize blocking is a crude way to do it, but I figured that wouldn't be such a big deal in dev mode.

I'm open to other ideas of course.

I toyed with the idea of using the request protocol to handle this, but I really don't feel like I could do it correctly :-)

}

@Override
public void onError(Throwable t) {
subscriber.onError(t);
onMessage.get().join();
}

@Override
public void onComplete() {
subscriber.onComplete();
onMessage.get().join();
}
});
}

return ctx.proceed();
}
}