Skip to content

Conversation

@ronsigal
Copy link
Collaborator

  1. Implemented RxInvoker for rxjava and rxjava2 classes.
  2. Extended Resteasy proxies to support reactive classes.

@ronsigal
Copy link
Collaborator Author

Replaces pull request #1493 .

@ronsigal ronsigal mentioned this pull request Apr 27, 2018
@ronsigal
Copy link
Collaborator Author

This pull request replaces #1493. I've responded to the comments on #1493 over there.

@ronsigal ronsigal requested review from asoldano and jimma April 27, 2018 02:13
@FroMage
Copy link
Contributor

FroMage commented Apr 27, 2018

Hi,

Did you see all my questions about the @Stream use-case? I haven't seen an answer to them from you yet, is it normal?

@ronsigal ronsigal force-pushed the 1798g branch 2 times, most recently from 79c6f18 to 4a5d3ed Compare April 30, 2018 22:33

import java.util.concurrent.CompletionStage;

public interface AsyncClientResponseProvider<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc

Copy link
Member

@asoldano asoldano left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is massive! :-)
Overall I like this quite a lot. Do you believe anything is still missing to properly address RESTEASY-1798, besides additional documentation?

* This class is not currently used, having been replaced by AsyncStreamSseResponseConsumer even for
* non-SSE connections.
*/
private static class AsyncStreamingResponseConsumer extends AsyncStreamResponseConsumer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still really the case? It looks needed above

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asoldano, ah, yes, it's back in use.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, thanks

}

@Test
@Ignore // @TODO Fix
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asoldano, there's a problem, maybe in undertow, such that after making a HEAD call, the connection hangs. I set it aside in RESTEASY-1885 for later attention.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, no problem, then just add a reference to RESTEASY-1885 in the TODO comment please :-)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asoldano done. Haven't committed it yet.

@asoldano asoldano added the main label May 8, 2018
@ronsigal
Copy link
Collaborator Author

ronsigal commented May 8, 2018

Hi @asoldano,

Yes, it's grown over time. ;-)

I would say that it's just about in a state that it makes a coherent package. The recent exchanges with @FroMage have led me to think more about the compatibility of different client and server modes. For example, if a resource method is annotated @stream(mode=Stream.MODE.SSE) and it returns a Flowable, can the result be read by a pure SSE client. It turns out the answer is "yes". Can it be read by a FlowableRxInvoker? Right at the moment, it's not working, which I'm trying to figure out. Can a pure SSE resource method be read by a FlowableRxInvoker? I don't know yet. Eventually, it would be nice if all of these scenarios worked. For now, I'm trying to get a coherent picture so I can say something sensible in the documentation. I'll see what I can figure out tonight.

-Ron

Following up: The answers are "yes", "yes", and "yes", if "@stream(mode=Stream.MODE.SSE)" is replaced by "@produces("text/event-stream"), as demonstrated in

  1. org.jboss.resteasy.test.rx.rxjava.RxObservableSSECompatibilityTest,
  2. org.jboss.resteasy.test.rx.rxjava2.RxObservableSSECompatibilityTest,
  3. org.jboss.resteasy.test.rx.rxjava2.RxFlowableSSECompatibilityTest,

* @tpSince RESTEasy 4.0
*/
@Test
@Ignore // Doesn't currently work.
Copy link
Contributor

@marekkopecky marekkopecky May 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ronsigal Is this a regression? Can you please provide same details about this ignore?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marekkopecky, this is a new version of an existing test, which is now PublisherResponseNoStreamTest.testTextErrorDeferred(). The original still works, so it's not a regression. I've added a comment.

@asoldano
Copy link
Member

asoldano commented May 9, 2018

@ronsigal OK, excellent, thanks for the status update. I'm starting looking at the doc you just pushed in the mean time.

</programlisting>

<para>
The way to think about <methodname>logRunningOpAsync()</methodname> is that the method is asynchronously
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo, s/logRunningOpAsync/longRunningOpAsync/g

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marekkopecky, got it. Thanks.

</programlisting>

<para>
Here, the byte arrays are written to the ...
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to be completed, right? :-)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asoldano, yes. :))

</para>

<programlisting>
&lt;List&lt;Thing&gt;&gt; Flowable&lt;List&lt;Thing&gt;&gt; Flowable.create(FlowableOnSubscribe&lt;List&lt;Thing&gt;&gt; source, BackpressureStrategy mode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The beginning of this line should not be here, right?
Basically, use this:
Flowable&lt;List&lt;Thing&gt;&gt; Flowable.create(FlowableOnSubscribe&lt;List&lt;Thing&gt;&gt; source, BackpressureStrategy mode);
instead of this:
&lt;List&lt;Thing&gt;&gt; Flowable&lt;List&lt;Thing&gt;&gt; Flowable.create(FlowableOnSubscribe&lt;List&lt;Thing&gt;&gt; source, BackpressureStrategy mode);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marekkopecky, well, Flowable.create() is defined

public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode)

so <T> in this case <List<Thing>>. But I'll delete <List<Thing>>.

@Override
public void subscribe(FlowableEmitter&lt;List&lt;Thing&gt;&gt; emitter) throws Exception {
for (int i = 0; i &lt; listSize; i++) {
List&lt;Thing&gt; list = new ArrayList&lt;Thing&gt;()&lt;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

;char should be at the end of this line instead of &lt; ...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aaarrgh. Yes, thanks.

System.out.println("result: " + stage.toCompletableFuture().get());
}

private CompletionStage&lt;String*gt; getCompletionStage() {
Copy link
Contributor

@marekkopecky marekkopecky May 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/*gt;/&gt;/g

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marekkopecky, OK, got it. Thanks.

@marekkopecky
Copy link
Contributor

@ronsigal I started Rx2FlowableTest#testGet test. Test passes, but server prints many warnings, like this:

16:26:38,038 WARN  [org.jboss.modules] (ServerService Thread Pool -- 23) Failed to define class org.jboss.resteasy.rxjava2.ObservableRxInvokerProvider in Module "org.jboss.resteasy.resteasy-rxjava2" from local module loader @123f1134 (finder: local module finder @7d68ef40 (roots: /home/mkopecky/playground/resteasy/git/testsuite/integration-tests/target/test-server/wildfly-12.0.0.Final/modules,/home/mkopecky/playground/resteasy/git/testsuite/integration-tests/target/test-server/wildfly-12.0.0.Final/modules/system/layers/base)): java.lang.NoClassDefFoundError: Failed to link org/jboss/resteasy/rxjava2/ObservableRxInvokerProvider (Module "org.jboss.resteasy.resteasy-rxjava2" from local module loader @123f1134 (finder: local module finder @7d68ef40 (roots: /home/mkopecky/playground/resteasy/git/testsuite/integration-tests/target/test-server/wildfly-12.0.0.Final/modules,/home/mkopecky/playground/resteasy/git/testsuite/integration-tests/target/test-server/wildfly-12.0.0.Final/modules/system/layers/base))): javax/ws/rs/client/RxInvokerProvider

I use this command: mvn surefire:test@default-test -Dmaven.test.redirectTestOutputToFile=false -Dtest=Rx2FlowableTest#testGet

Can you please check it? I can provide you more logs, if you want ...

it holds at most one potential value. <classname>Flowable</classname> implements
<classname>io.reactivex.Publisher</classname>, and <classname>Observable</classname> is very
similar to <classname>Flowable</classname> except that it doesn't support backpressure.
So, if you import <code>resteasy-rxjava2</code>, you can just start returning these reactive types from your
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ronsigal, @asoldano: "import resteasy-rxjava2" ... Are there some future plans to add resteasy-rxjava2 module to wf? (I know, that this can't be in scope of this PR and in scope of RESTEASY-1798)

Currently, if I want to import resteasy-rxjava2 to WF, I need to copy ${RESTEASY_GIT}/jboss-modules/target/modules/org/jboss/resteasy/resteasy-rxjava2 directory to WF and add resteasy-rxjava2 dependency to jboss-deployment-structure.xml file in deployment ... and it is quite complicated in my PoV ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My opinion here is that we'll have to add the resteasy-rxjava and resteasy-rxjava2 modules into wildfly, possibly as tech-preview

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add resteasy-rxjava2 only? (without resteasy-rxjava(1))

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @marekkopecky,

  1. Re the Rx2FlowableTest error messages: That sounds familiar. I added

<module name="javax.ws.rs.api"/>

to the resteasy-rxjava2 module.xml file in jboss-modules a while ago (at home). I think that's what fixed it. I should have a new commit soon.

  1. Re jboss-deployment-structure.xml: Is that necessary? I just unzip resteasy-jboss-modules-4.0.0-SNAPSHOT.zip

  2. Re wildfly: I defer to Alessio on that question.

  3. Re Just resteasy-rxjava2: I guess the question is: How far along is the rx community in upgrading from rxjava to rxjava2. @marekkopecky, @FroMage, what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ronsigal

  1. You are right, resteasy-jboss-modules-4.0.0-SNAPSHOT.zip is much more easier, thank you!

  2. In my PoV, we shouldn't add to WF something, that is at the end of life (= rxjava(1)). If there occurs some bug in rxjava, I assume that this bug won't be fixed (cc: @asoldano )

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, yes, Marek is probably right on rxjava becoming obsolete. Still, at least the rxjava2 module should be added, to simplify wildfly user acess to the new features. To be re-evaluated in the next future.

</para>
</sect1>

<sect1>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last two sections need to be finished too ...

I add comment here just for tracking purpose ...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marekkopecky, coming soon!!

@ronsigal ronsigal force-pushed the 1798g branch 4 times, most recently from b2d661f to 43a7d7e Compare May 23, 2018 23:48
1) Implemented RxInvoker for rxjava and rxjava2 classes.
2) Extended Resteasy proxies to support reactive classes.

[RESTEASY-1798]

Internationalized resteasy-rxjava, resteasy-rxjava2.

[RESTEASY-1798]

Added tests for transforming to and from CompletionStage and reactive
classes.

[RESTEASY-1798]

Add mode to @stream.

[RESTEASY-1798]

First draft of reactive documentation.

RESTEASY-1798

1) Various corrections. 2) New and updated tests.

[RESTEASY-1798]

Fixed some texts, added some javadoc.

[RESTEASY-1798]

Fixed syntactic errors.

[RESTEASY-1798]

Fixed missing declarations in tests.

[RESTEASY-1798]

1. Improved stream element media type handling; 2) minor test updates;
and 3) additions to reactive chapter in User Guide.

[RESTEASY-1798]

Modified handling of media type in OutboundSseEventImpl to fix failing
unit test.
@asoldano asoldano merged commit 9082515 into resteasy:master May 28, 2018
</para>

<programlisting>
applicaton/x-stream-raw;"element-type=&lt;element-type&gt;"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry I only noticed this now @ronsigal but I think it's wrong: in all use-cases I can think of, I need the content type to be set by @Produces and not set to the useless x-stream-raw that no existing client can handle. Can we change this behaviour?

}
}
}
else if (o instanceof String)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ronsigal This condition doesn't make sense. if (o instanceof String) statement is reachable only if o == null (see line 227). And if o == null, if (o instanceof String) is evaluated to false. So if (o instanceof String) is always evaluated as false, so lines 239-247 are useless now. Can you please check&&fix it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marekkopecky, d'oh! It looks like I copy and pasted to the wrong place. Thanks.

public Byte[] readFrom(Class<Byte[]> type, Type genericType, Annotation[] annotations, MediaType mediaType, MultivaluedMap<String, String> httpHeaders, InputStream entityStream) throws IOException, WebApplicationException {
ArrayList<Byte> list = new ArrayList<Byte>();
int b = entityStream.read();
while (b != -1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't be better to add b = entityStream.read(); to while loop? This still (forever) adds to list first int from stream ...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marekkopecky, d'oh^2.

jaxrsResponse = (BuiltResponse) Response.ok().type(MediaType.SERVER_SENT_EVENTS).build();
ResourceMethodInvoker method =(ResourceMethodInvoker) request.getAttribute(ResourceMethodInvoker.class.getName());
Produces produces = method.getMethod().getAnnotation(Produces.class);
if (produces != null & contains(produces.value(), MediaType.SERVER_SENT_EVENTS))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo? '&' -> '&&'?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marekkopecky, you're right again. Oddly, I think the result is the same even though the semantics are different.


if (name == null)
{
name = String.format("sse-event-source(%s)", target.getUri());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Name is never used, can you please check it? I know, that this line is not changed in this PR ...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marekkopecky, I'll ask Jim about this one. It looks like he might have intended to use it.


private MediaType[] getAccept()
{
if (syncInvoker instanceof ClientInvocationBuilder)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct? syncInvoker is always ClientInvocationBuilder ...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marekkopecky, you're right, but javax.ws.rs.client.SyncInvoker is an interface, so, in principle, someone could use a different implementation, right?


private MediaType[] getAccept()
{
if (syncInvoker instanceof ClientInvocationBuilder)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here ...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marekkopecky, same here ...

ronsigal added a commit to ronsigal/Resteasy that referenced this pull request May 31, 2018
Minor corrections pointed out on
resteasy#1503.
@ronsigal ronsigal mentioned this pull request May 31, 2018
@ronsigal
Copy link
Collaborator Author

@marekkopecky, thank you for taking the time to look at this stuff. I've created #1531 for follow up changes.

ronsigal added a commit to ronsigal/Resteasy that referenced this pull request Jun 9, 2018
Minor corrections pointed out on
resteasy#1503.

[RESTEASY-1798]

Changed treatment of RAW streaming.
asoldano pushed a commit that referenced this pull request Jun 10, 2018
Minor corrections pointed out on
#1503.

[RESTEASY-1798]

Changed treatment of RAW streaming.
NicoNes pushed a commit to NicoNes/Resteasy that referenced this pull request Jun 17, 2018
Minor corrections pointed out on
resteasy#1503.

[RESTEASY-1798]

Changed treatment of RAW streaming.
@SuppressWarnings("deprecation")
public class ObservableRxInvokerImpl implements ObservableRxInvoker
{
private static Object monitor = new Object();
Copy link
Contributor

@NicoNes NicoNes Jul 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @ronsigal,

I was having a look at this class, in relation to what I've done in #1599, and just for understanding I was wondering what is this monitor for?
According to the code I suppose that it prevents both multiple and concurrent invocations of sseEventSource.open(...) for a given eventSource. I'm OK with it.
But does this monitor needs to be global and shared beetwen all Observable instance ?

I mean why didn't you use a local sync mech as follow. It will improve concurrency between multiple invocations on the same ObservableRxInvokerImpl instance:

	private <T> Observable<T> eventSourceToObservable(SseEventSourceImpl sseEventSource, Class<T> clazz, String verb, Entity<?> entity, MediaType[] mediaTypes)
	{
		Observable<T> observable = Observable.create(
				new Observable.OnSubscribe<T>() {
				   
				   private final AtomicBoolean open= new AtomicBoolean();
				   
					@Override
					public void call(Subscriber<? super T> sub) {
						sseEventSource.register(
								(InboundSseEvent e) -> {T t = e.readData(clazz, ((InboundSseEventImpl) e).getMediaType()); sub.onNext(t);},
								(Throwable t) -> sub.onError(t),
								() -> sub.onCompleted());
						if (open.compareAndSet(false, true))
						{
							sseEventSource.open(null, verb, entity, mediaTypes);
						}
					}
				});
		return observable;
	}

I suppose there must be an good reason bu I do not get it yet.

Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants