Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reactive Consumer Function not deserializing data from rabbitmq correctly #2056

Closed
dmsilver2 opened this issue Nov 24, 2020 · 30 comments
Closed

Comments

@dmsilver2
Copy link

dmsilver2 commented Nov 24, 2020

I am using the 3.0.9 release and have came across a bug.

I am trying to receive a message using rabbitmq as the bindier.

I have a reactive function

@Bean
public Function<Flux<MyMessage>, Mono<Void>> consumer() {
return myMessages -> myMessages.flatMap(m -> Mono.fromCallable(() -> {
// In my actual application, I am doing a synchronous blocking task here. I am not just logging
log.info("My val -> {}", m.getVal()); // val is null
return null;
}).subscribeOn(Shedulers.elastic())).then();
}

The problem is MyMessage val is null.

When I try using the Consumer function I do see the value

@Bean
public Consumer<MyMessage> consumer() {
return m -> {
log.info("My val -> {}", m.getVal()); // val is as expected
};
}

It seems that the reactive consumer function is not deserializing the message correctly.

@dmsilver2
Copy link
Author

Update

When I used spring cloud version, Hoxton.SR8, the reactive consumer works as expect. There must be a conflict that must be resolved with the latest spring cloud version.

@lgraf
Copy link

lgraf commented Nov 25, 2020

I see the same behaviour after upgrading from Hoxton.SR8 to Hoxton.SR9. It seems that the message payload is not correctly deserialized. Despite i have a different setup, the underlying problem may be related.

Setup
Spring Boot 2.4
Spring Cloud Stream (org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR9)
Azure Service Bus Topic Stream Binder (spring-cloud-azure-servicebus-topic-stream-binder:1.2.8)

I don't use reactive functions but simple functional composition, not sure if this makes any difference at all.

cloud:
    stream:
      function:
        definition: convert|doSomething
        bindings:
          convert|doSomething-in-0: example-binding
      bindings:
        example-binding:
          destination: example-topic
          group: example-subscription
@Bean
public Function<Message<MyMessageClass>, MyDomainClass> convert() {
    return ( Message<MyMessageClass> message ) -> {
        MyMessageClass payload = message.getPayload(); // <-- all payload values are null
        return new MyDomainClass( payload.getVal() ); 
    };
}

@Bean
public Consumer<MyDomainClass> doSomething( MyService service ) {
    return service::doSomething;
}

When downgrading back to Hoxton.SR8 the message payload values are set as expected.

@olegz
Copy link
Contributor

olegz commented Nov 25, 2020

Can you please try with SR10? I can't seem to reproduce it and also pushed a missing test this morning to both master and 3.0.x branch to validat - f80f85e

@dmsilver2
Copy link
Author

@olegz
The latest version of hoxton is SR9. It was released a few weeks of ago on 2020-11-09.
https://github.com/spring-cloud/spring-cloud-release/wiki/Spring-Cloud-Hoxton-Release-Notes

@olegz
Copy link
Contributor

olegz commented Nov 25, 2020

@dmsilver2 Yes, Hoxton latest is SR9, however Spring Cloud Stream has its own releases cycles which generally match up with Spring Cloud, but not always. So the latest is Horsham.SR10 which is 3.0.10.
It is also reflected on project page

@ziodave
Copy link

ziodave commented Nov 25, 2020

I am experiencing this issue too with SR9 and Azure Service Bus. I was able to see that the conversionHint is added to the message converter because the type is believed to be Flux (will try to share more details later and will soon test SR10).

@ziodave
Copy link

ziodave commented Nov 25, 2020

I think this is related: #2054

@dmsilver2
Copy link
Author

dmsilver2 commented Nov 26, 2020

@olegz
I have tried adding the dependency to my pom but it is still not working. I am perhaps adding it incorrectly or need to do something more. Please advise.

I have added to my dependencies section
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-dependencies</artifactId>
<version>Horsham.SR10</version>
<type>pom</type>
</dependency>

@ziodave
Copy link

ziodave commented Nov 26, 2020

@dmsilver2 take a look here: https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/kafka-security-samples/kafka-ssl-demo/pom.xml

In my case, since I am also using spring-cloud-dependencies I had to put spring-cloud-stream-dependencies first in dependencyManagement (the exclusion didn't work):


    <dependencyManagement>
        <dependencies>

            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-dependencies</artifactId>
                <version>${spring-cloud-stream.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.springframework.cloud</groupId>
                        <artifactId>spring-cloud-stream-dependencies</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>

        </dependencies>
    </dependencyManagement>

@ziodave
Copy link

ziodave commented Nov 26, 2020

The issue is still there for me with Horsham.SR10. The issue is due to the conversionHint set by SimpleFunctionRegistry from spring-cloud-function-context-3.0.11.RELEASE which at line 838 is setting Flux with this FunctionTypeUtils.getGenericType(type):

? this.fromMessage((Message<?>) value, (Class<?>) rawType, FunctionTypeUtils.getGenericType(type))

Which in turn makes the Json converter look for a Jackson View.

My function signature is Function<Flux<PojoIn>, Flux<PojoOut>>.

@ziodave
Copy link

ziodave commented Nov 26, 2020

Downgrading to Hoxton.SR8, Spring Cloud Stream 3.0.8, fixes the issue for me, i.e. the conversionHint is not set anymore.

@olegz
Copy link
Contributor

olegz commented Nov 26, 2020

I am still struggling to understand, since there s a test that validates it. Can someone please provide a way to reproduce the issue (a test in a form of PR or just copy/paste the code here)?

@ziodave
Copy link

ziodave commented Nov 26, 2020

I'll try to share a sample project later today.

@ziodave
Copy link

ziodave commented Nov 26, 2020

In the meanwhile I had to upgrade again to Hoxton.SR9 since Hoxton.SR8 has issues with Postgres (at least for me), and I was able to pinpoint the artifact causing the issue to spring-cloud-function 3.0.11.RELEASE, spring-cloud-function 3.0.10.RELEASE works fine.

My pom.xml dependencyManagement section now looks like this:

    <dependencyManagement>
        <dependencies>

            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-function-dependencies</artifactId>
                <version>3.0.10.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Hoxton.SR9</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

        </dependencies>
    </dependencyManagement>

@ziodave
Copy link

ziodave commented Nov 26, 2020

@olegz find the repo here: https://github.com/ziodave/spring-cloud-stream-issue-2056

I am using Azure Service Bus, if you'd like I can give you a temporary end-point to use for testing. POJOs are really simple (just one single value property).

  1. Configure the Azure Service Bus connection strings in src/main/resources/config/application.yml and send-pojo.php
  2. Send a message to the queue, you can use send-pojo.php.
  3. An exception will be raised in logs, since the value property is null.

This doesn't happen with spring-cloud-function 3.0.10.RELEASE, uncomment the following in pom.xml to quickly switch to that release:

<!--            <dependency>-->
<!--                <groupId>org.springframework.cloud</groupId>-->
<!--                <artifactId>spring-cloud-function-dependencies</artifactId>-->
<!--                <version>3.0.10.RELEASE</version>-->
<!--                <type>pom</type>-->
<!--                <scope>import</scope>-->
<!--            </dependency>-->

@dmsilver2
Copy link
Author

I have confirmed what @ziodave has said. When I use spring-cloud-function 3.0.10.RELEASE the deserialization issue goes away and when I use 3.0.11.RELEASE the issue happens. Thankyou ziodave!

olegz added a commit to olegz/spring-cloud-stream that referenced this issue Dec 3, 2020
@olegz
Copy link
Contributor

olegz commented Dec 3, 2020

Guys, I am still struggling to reproduce it. I just added another test with the last commit on 3.0.x branch and all is good. The project that was shared by @ziodave . . I am not even sure where to begin as I was expecting some bare minimum but it appears to be a full blown project.

@ziodave
Copy link

ziodave commented Dec 3, 2020

I was expecting some bare minimum but it appears to be a full blown project.

@olegz it's only one file: https://github.com/ziodave/spring-cloud-stream-issue-2056/blob/main/src/main/java/com/example/demo/pojo/PojoConfiguration.java with the following configuration:

spring:
  main:
    banner-mode: off
  cloud:
    azure:
      msi-enabled: false
      servicebus.connection-string: ...
    stream:
      bindings:
        handler-in-0:
          destination: pojo-in
        handler-out-0:
          destination: pojo-out
    function:
      definition: handler

@olegz
Copy link
Contributor

olegz commented Dec 3, 2020

@ziodave then my last commit does exactly that. What am I missing?

@ziodave
Copy link

ziodave commented Dec 3, 2020

@olegz we experienced the issue with 3.0.11.RELEASE. Are you testing with 3.0.13.BUILD-SNAPSHOT, maybe it has in fixed already in 3.0.13.BUILD-SNAPSHOT?

@olegz
Copy link
Contributor

olegz commented Dec 3, 2020

Yes, I am sorry. The tests that I added are based on the current snapshots. But I forgot yo mention that last week we had a release of both functions and stream (due to customer support issue). The issue was addressed before the release so it is available to you.
The latest s-c-stream release is Horsham.SR10 (3.0.10.RELEASE) which brings s-c-function 3.0.12.RELEASE.

Let me know

@ziodave
Copy link

ziodave commented Dec 7, 2020

Horsham.SR10 (3.0.10.RELEASE) for me brings s-c-function 3.0.11.RELEASE.

@ziodave
Copy link

ziodave commented Dec 7, 2020

I think this depends from the fact that Spring Cloud Function is a dependency of spring-cloud-dependencies-Hoxton.SR9.pom, not of spring-cloud-stream-dependencies-Horsham.SR10.pom.

      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-function-dependencies</artifactId>
        <version>${spring-cloud-function.version}</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>

To bring s-c-function 3.0.12.RELEASE this is what I did, please let me know if it's correct:

   <dependencyManagement>
        <dependencies>
            <!-- This can be removed when this issue is fixed:
            https://github.com/spring-cloud/spring-cloud-stream/issues/2056 -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-function-dependencies</artifactId>
                <version>3.0.12.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-dependencies</artifactId>
                <version>Horsham.SR10</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Hoxton.SR9</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

        </dependencies>
    </dependencyManagement>

@olegz
Copy link
Contributor

olegz commented Dec 7, 2020

Yes, this is correct since this was an independent release outside of Hoxton. In other words there is no corresponding release of spring-cloud, which is really not a requirement since we have our own BOM (as reflected on the project site), but we try as much as we can to match the releases for user convenience.

Anyway, should I assume the issue is resolved?

@ziodave
Copy link

ziodave commented Dec 7, 2020

I still got Null, but I'll have to run a more thorough check.

@olegz
Copy link
Contributor

olegz commented Dec 7, 2020

Sure, please let me know so we have enough time to address it for the next release

@lenagainulina
Copy link

We are experiencing the same problem. Is there any update on that?

@olegz
Copy link
Contributor

olegz commented Feb 3, 2021

@lenagainulina which s-c-function version you are using? You may need to explicitly identify 3.0.12.RELEASE.
In any event please provide more details

@lenagainulina
Copy link

lenagainulina commented Feb 8, 2021

Hi @olegz and sorry for the delayed response. Originally I was using 3.0.11.RELEASE, I tried switching to 3.0.12.RELEASE, but it didn't solve the issue for me, message payload was deserialized with null values. I've described my case here with a minimal example: https://stackoverflow.com/questions/65297525/spring-cloud-stream-functional-approach-message-conversion-produces-an-object-w . The difference from the case described above is that I'm using kafka binder.
Yesterday I tried switching to Spring Cloud 2020.0.1 and Spring Boot 2.4.2 and the issue is gone.
pom.xml

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.2</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<java.version>11</java.version>
		<spring-cloud.version>2020.0.1</spring-cloud.version>
	</properties>

	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>${spring-cloud.version}</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

@olegz
Copy link
Contributor

olegz commented Feb 8, 2021

I guess it was combination of work in both s-c-function and s-c-stream that fixed the issue.
In any event I am going to close it as it appears that the issue is addressed albeit indirectly. Feel free to re-open, comment is you believe there is still some work that needs to be done.

@olegz olegz closed this as completed Feb 8, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants