Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RxJava handling multiple Observable results #775

Closed
wants to merge 1 commit into from

Conversation

@jmnarloch
Copy link
Contributor

jmnarloch commented Jan 18, 2016

PR that handles multiple values returned by the Observable.

I've seen lately remark that in order to use the current implementation you need to use Observable<List<T>>, which is quite counter intuitive since the Observable by itself can produce multiple results, so this PR tries to address this issue, by aggregating the results and postponing the actual moment when they are going to "applied" on the DeferredResult.

The only thing I was thinking of is whether arbitrary wrapping the values into a plain Object array is a good way to approach this.

@spencergibb
Copy link
Member

spencergibb commented Jan 18, 2016

@rstoyanchev
Copy link

rstoyanchev commented Jan 18, 2016

Indeed this code could use some improvement. Observable produces 0..N items while DeferredResult accepts a single result. That's a better fit for rx.Single. For an Observable, you can accumulate the items before rendering the response as in this PR, possibly even using Observable.toList() in the implementation.

The response of course would not be rendered until all items are accumulated. Alternatively for a more streaming scenario where each item is rendered with a message converter, something similar to the ResponseBodyEmitterReturnValueHandler could be done.

@spencergibb
Copy link
Member

spencergibb commented Jan 18, 2016

@rstoyanchev Interesting, so support rx.Single similar to the current Observable support and move Observable support to something like ResponseBodyEmitterReturnValueHandler.

@rstoyanchev
Copy link

rstoyanchev commented Jan 18, 2016

That would be more idiomatic I think. Otherwise with an Observable return type there is no way of telling whether the intent is to accumulate + render or stream (i.e. render each item).

@sdeleuze
Copy link

sdeleuze commented Jan 19, 2016

👍

@jmnarloch
Copy link
Contributor Author

jmnarloch commented Jan 19, 2016

Interesting, so support rx.Single similar to the current Observable support
and move Observable support to something like
ResponseBodyEmitterReturnValueHandler.

I can try to prepare a pull request later for that :)

@spencergibb
Copy link
Member

spencergibb commented Jan 19, 2016

@jmnarloch that would be great! I think we should close this one in lieu of the new one.

@jmnarloch jmnarloch closed this Jan 19, 2016
@jmnarloch
Copy link
Contributor Author

jmnarloch commented Jan 19, 2016

I've made the PR, it's look promising: #778
Though maybe some extra tests would be useful.

@rstoyanchev
Copy link

rstoyanchev commented Mar 14, 2016

@jmnarloch, @spencergibb, after giving this full consideration for including in Spring Framework 4.3 we've reached a conclusion that hopefully will be helpful even if it doesn't go all the way.

We've decided not to include built-in support in 4.3 and leave that to 5.0. We could have easily and cleanly added support for Single and Observable for SSE scenarios but there are still questions surrounding the use of Observable more generally that can only be addressed fundamentally in spring-reactive and Spring Framework 5.

I did however make some changes that should make addressing your internal needs easier. See this pull request. You could register a DeferredResultHandler with an Single adapter and a ResponseBodyEmitterResultHandler with an Observable adapter which creates an SseEmitter when "Content-Type:text/event-stream".

I should clarify I didn't think it necessary to expose timeout customizations because Observable and Single have built-in timeout related capabilities that are probably sufficient. For example in your adapter code you can create DeferredResult and SseEmitter with some expanded timeout (say from 10 to 30 seconds) and combine that with the rx timeout operator to supply default values with some fluctuation per use case.

For indicating SSE, it would be easy enough to return ResponseEntity.ok().contentType(new MediaType("text/event-stream").body(observable) or some shortcut resulting in the same.

As to how to treat an Observable return value generally, this is where it might be easier for you to decide based on your use cases. You can make more assumptions than we can but it's also easy enough for a controller to do observable.toList().toSingle(). So in the end support for Single and Observable with SSE is probably the sweet spot.

Hope this helps.

@spencergibb
Copy link
Member

spencergibb commented Mar 14, 2016

@rstoyanchev any reason you reopened this one instead of #778?

@rstoyanchev
Copy link

rstoyanchev commented Mar 14, 2016

No, no reason. I think this was linked from the PR somewhere.

@spencergibb
Copy link
Member

spencergibb commented Mar 14, 2016

Thanks for your work! Will integrate the new changes with boot 1.4 and spring 4.3 and look forward to removing this with spring 5!

@rstoyanchev
Copy link

rstoyanchev commented Mar 14, 2016

No worries, this was overdue to consider with the 5.0 work progressing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked issues

Successfully merging this pull request may close these issues.

None yet

4 participants
You can’t perform that action at this time.