Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reactive messaging fails when using PublisherBuilder or ProcessorBuilder #4587

Closed
misl opened this issue Oct 15, 2019 · 12 comments
Closed

Reactive messaging fails when using PublisherBuilder or ProcessorBuilder #4587

misl opened this issue Oct 15, 2019 · 12 comments
Assignees
Milestone

Comments

@misl
Copy link
Contributor

misl commented Oct 15, 2019

When switching from Quarkus 0.23.2 to 0.24.0 my application resulted in the following error:

2019-10-15 14:58:43,386 ERROR [it.tra.dis.mes.dat.DataPointProcessor#process] (vert.x-eventloop-thread-1) The method it.traeck.dispatch.messaging.data.DataPointProcessor#process has thrown an exception: java.lang.ClassCastException: class [B cannot be cast to class io.smallrye.reactive.messaging.mqtt.MqttMessage ([B is in module java.base of loader 'bootstrap'; io.smallrye.reactive.messaging.mqtt.MqttMessage is in unnamed module of loader 'app')
	at it.traeck.dispatch.messaging.data.DataPointProcessor_SmallryeMessagingInvoker_process_ea6c045679c66cce80caed88218a4aec51b70518.invoke(DataPointProcessor_SmallryeMessagingInvoker_process_ea6c045679c66cce80caed88218a4aec51b70518.zig:46)
	at io.smallrye.reactive.messaging.AbstractMediator.invoke(AbstractMediator.java:61)
	at io.smallrye.reactive.messaging.ProcessorMediator.lambda$processMethodReturningAPublisherBuilderOfPayloadsAndConsumingPayloads$6(ProcessorMediator.java:201)
	at java.base/java.util.function.Function.lambda$andThen$1(Function.java:88)
	at io.smallrye.reactive.streams.stages.FlatMapStageFactory$FlatMapStage.lambda$apply$1(FlatMapStageFactory.java:43)
	at io.reactivex.internal.operators.flowable.FlowableConcatMap$ConcatMapImmediate.drain(FlowableConcatMap.java:284)
	at io.reactivex.internal.operators.flowable.FlowableConcatMap$BaseConcatMapSubscriber.onNext(FlowableConcatMap.java:159)
	at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.lambda$onNext$2(ContextPropagatorOnFlowableCreateAction.java:50)
	at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
	at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.onNext(ContextPropagatorOnFlowableCreateAction.java:50)
	at io.reactivex.internal.util.HalfSerializer.onNext(HalfSerializer.java:45)
	at io.reactivex.internal.subscribers.StrictSubscriber.onNext(StrictSubscriber.java:97)
	...

While looking around with the debugger I noticed ProcessorMediator.processMethodReturningAPublisherBuilderOfPayloadsAndConsumingPayloads() is being called. This message tried to invoke the handler with just the payload instead of the surrounding Message object. The reason for this Production.STREAM_OF_PAYLOAD is being configured instead of STREAM_OF_MESSAGE.

My handler looks like this:

@Incoming("device-data")
@Outgoing("entity-data")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public PublisherBuilder<MqttMessage<String>> process( final MqttMessage<byte[]> message ) {
    return ReactiveStreams.of( message )
        .filter( Objects::nonNull )
        .peek( messageLogger::log )
        .map( wrapper( this::consume ) )
        .map( wrapper( this::transform ) )
        .onErrorResumeWith( e -> {
          if ( e instanceof FailedMessageException ) {
            exceptionLogger.log( (FailedMessageException) e );
          } else {
            LOGGER.error( "Message failed:", e );
          }
          return ReactiveStreams.empty();
        } )
        ;
}

Initially I thought this was an issue somewhere in SmallRye messaging. That's why I created an issue there. However further investigation lead to the Quarkus reactive messaging extension.

It appears that QuarkusMediatorConfigurationUtil.JandexGenericTypeAssignable.check() results in NotAssignable where Assignable is expected. Especially the following part: argumentClass.isAssignableFrom(target). This basicaly results in AmqpMessage.isAssignalbleFrom(Message) or MqttMessage.isAssignalbleFrom(Message)

Should it not be the other way around? target.isAssignableFrom(argumentClass)

I ran into it using Mqtt but I think it applies to Amqp as well.

@misl
Copy link
Contributor Author

misl commented Oct 15, 2019

Fixed this locally and indeed I get the expected behavior now.

@gsmet
Copy link
Member

gsmet commented Oct 15, 2019

@misl looks like you're ready to prepare a PR (and add a test!) :).

@geoand
Copy link
Contributor

geoand commented Oct 15, 2019

Exactly what @gsmet said 😁

@misl
Copy link
Contributor Author

misl commented Oct 16, 2019

PR should be easy, except for the test :-(

I think the current tests are very limited. This makes me wonder what the test coverage is. Is there any test coverage information available?

@geoand
Copy link
Contributor

geoand commented Oct 16, 2019

The library itself has plenty of tests. The Quarkus integrations has fewer but that's the point, to improve that :)

@geoand
Copy link
Contributor

geoand commented Oct 16, 2019

@misl you could add something existing to the integration test, no?

@hguerrero
Copy link
Contributor

This issue applies to Kafka too

@misl
Copy link
Contributor Author

misl commented Oct 17, 2019

I created the following pull request: #4628

Unfortunately no additional unittest. I tried various ways of triggering the error. This indeed resulted in NotAssignable (leading to STREAM_OF_PAYLOAD instead of STREAM_OF_MESSAGE) but the unittests still ran successfully.

@geoand
Copy link
Contributor

geoand commented Oct 17, 2019

@misl Which unit test are you talking about?

@geoand
Copy link
Contributor

geoand commented Oct 17, 2019

@misl
Copy link
Contributor Author

misl commented Oct 17, 2019

@geoand I had a look at those. Initially I thought the coverage was not too well, but this turned out to be wrong. Coverage is quite good it is mostly error conditions that lack coverage.

I also looked at smalrye message provider to see if I could steal/copy a unittest. But it appears unittests here and at smallrye do not use any subclasses of (MicroProfile) Message. So in that case both argumentClass.isAssignableFrom(target) have the same outcome target.isAssignableFrom(argumentClass).

I tried to introduce a subclass of Message just for testing purposes. This allowed me to add the following to the test bean.

    @Incoming("processed-wrapped-a")
    @Outgoing("processed-wrapped-b")
    public PublisherBuilder<SubClassedMessage<String>> filterWrapped(PublisherBuilder<Message<byte[]>> input) {
        return input
            .map( item -> SubClassedMessage.of( new String(item.getPayload()) ) )
            .filter(item -> item.getPayload().length() > 4);
    }

But this also allowed me to only use the subclass on the outgoing part. This however did not result in any errors (failing testcases). Like this I noticed the NotAssignable remark I made earlier. I probably need the subclass in the incoming side. Unfortunately my knowledge is the limiting factor as I do not know how to accomplish that.

The alternative I see would be adding testcases the smallrye messaging Kafka, Amqp and Mqtt extensions. Since those modules do use a subclass of Message.

@geoand
Copy link
Contributor

geoand commented Oct 17, 2019

@misl thanks for the great update!

Lets see what @cescoffier wants to do about tests.

gsmet added a commit that referenced this issue Oct 21, 2019
#4587: Reversed isAssignableFrom arguments
@gsmet gsmet closed this as completed Oct 21, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants