Skip to content

Commit

Permalink
Pulling Jim Ma latest changes to support latest jaxrs api
Browse files Browse the repository at this point in the history
  • Loading branch information
asoldano committed Apr 21, 2017
1 parent bd9c3cf commit f79a70a
Show file tree
Hide file tree
Showing 14 changed files with 262 additions and 188 deletions.
2 changes: 1 addition & 1 deletion jboss-modules/build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@
</module-def>

<module-def name="org.jboss.resteasy.resteasy-jaxrs">
<maven-resource group="org.jboss.resteasy" artifact="resteasy-jaxrs"/>
<maven-resource group="org.jboss.resteasy" artifact="resteasy-jaxrs"/>
<maven-resource group="org.jboss.resteasy" artifact="resteasy-client"/>
</module-def>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.Response;

import java.util.concurrent.Future;

/**
Expand Down Expand Up @@ -241,4 +242,28 @@ public <T> Future<T> method(String name, Entity<?> entity, InvocationCallback<T>
invocation.setEntity(entity);
return invocation.submit(callback);
}

@Override
public Future<Response> patch() {
// TODO Auto-generated method stub
return null;
}

@Override
public <T> Future<T> patch(Class<T> responseType) {
// TODO Auto-generated method stub
return null;
}

@Override
public <T> Future<T> patch(GenericType<T> responseType) {
// TODO Auto-generated method stub
return null;
}

@Override
public <T> Future<T> patch(InvocationCallback<T> callback) {
// TODO Auto-generated method stub
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import javax.ws.rs.client.CompletionStageRxInvoker;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.NioInvoker;
import javax.ws.rs.client.RxInvoker;
import javax.ws.rs.core.CacheControl;
import javax.ws.rs.core.Cookie;
Expand Down Expand Up @@ -316,11 +315,6 @@ public Invocation.Builder property(String name, Object value)
return this;
}

@Override
public NioInvoker nio()
{
throw new NotImplementedYetException();
}

@Override
public CompletionStageRxInvoker rx()
Expand Down Expand Up @@ -361,4 +355,22 @@ public <T extends RxInvoker> T rx(Class<T> clazz, ExecutorService executorServic
throw new RuntimeException(Messages.MESSAGES.unableToInstantiate(clazz), e);
}
}

@Override
public Response patch() {
// TODO Auto-generated method stub
return null;
}

@Override
public <T> T patch(Class<T> responseType) {
// TODO Auto-generated method stub
return null;
}

@Override
public <T> T patch(GenericType<T> responseType) {
// TODO Auto-generated method stub
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -386,4 +386,22 @@ public CompletionStageRxInvoker executor(ExecutorService executor)
this.executor = executor;
return this;
}

@Override
public CompletionStage<Response> patch() {
// TODO Auto-generated method stub
return null;
}

@Override
public <T> CompletionStage<T> patch(Class<T> responseType) {
// TODO Auto-generated method stub
return null;
}

@Override
public <T> CompletionStage<T> patch(GenericType<T> responseType) {
// TODO Auto-generated method stub
return null;
}
}
Original file line number Diff line number Diff line change
@@ -1,96 +1,66 @@
package org.jboss.resteasy.plugins.providers.sse;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import javax.ws.rs.Flow.Subscriber;
import javax.ws.rs.Flow.Subscription;
import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.SseBroadcaster;
import javax.ws.rs.sse.SseEventSink;

public class SseBroadcasterImpl implements SseBroadcaster
{
private final Map<Subscriber<? super OutboundSseEvent>, Subscription> subscribers = new ConcurrentHashMap<>();
private final Set<Consumer<Subscriber<? super OutboundSseEvent>>> onCloseConsumers = new CopyOnWriteArraySet<>();
private final Set<BiConsumer<Subscriber<? super OutboundSseEvent>, Exception>> onExceptionConsumers = new CopyOnWriteArraySet<>();
private ConcurrentLinkedQueue<SseEventSink> outputQueue = new ConcurrentLinkedQueue<SseEventSink>();
private final List<BiConsumer<SseEventSink, Throwable>> onErrorConsumers = new CopyOnWriteArrayList<>();
private final List<Consumer<SseEventSink>> closeConsumers = new CopyOnWriteArrayList<>();

@Override
public void broadcast(OutboundSseEvent event)
{
for (final Subscriber<? super OutboundSseEvent> subscriber : subscribers.keySet())
{
try
{
subscriber.onNext(event);
}
catch (final Exception ex)
{
subscriber.onError(ex); //TODO is this required?
onExceptionConsumers.forEach(exceptioner -> exceptioner.accept(subscriber, ex));
}
}
public SseBroadcasterImpl () {

}

@Override
public void close()
{
for (final Subscriber<? super OutboundSseEvent> output : subscribers.keySet())
{
try
{
output.onComplete();
for (Consumer<Subscriber<? super OutboundSseEvent>> consumer : onCloseConsumers)
{
consumer.accept(output);
}
}
catch (final Exception ex)
{
output.onError(ex); //TODO is this required?
onCloseConsumers.forEach(occ -> occ.accept(output));
}
}
outputQueue.forEach(evenSink -> {closeConsumers.forEach(consumer-> {consumer.accept(evenSink);});});
outputQueue.clear();
}

@Override
public void onException(BiConsumer<Subscriber<? super OutboundSseEvent>, Exception> onException)
public void onError(BiConsumer<SseEventSink, Throwable> onError)
{
onExceptionConsumers.add(onException);
onErrorConsumers.add(onError);
}

@Override
public void onClose(Consumer<Subscriber<? super OutboundSseEvent>> onClose)
{
onCloseConsumers.add(onClose);
public void onClose(Consumer<SseEventSink> onClose)
{
closeConsumers.add(onClose);
}

@Override
public void subscribe(Subscriber<? super OutboundSseEvent> subscriber)
public void register(SseEventSink sseEventSink)
{
final Subscription subscription = new Subscription()
{
public void request(long n)
{
}
outputQueue.add(sseEventSink);

}

@Override
public void cancel()
{
}
@Override
public CompletionStage<?> broadcast(OutboundSseEvent event)
{
outputQueue.forEach(eventSink -> {SseEventOutputImpl outputImpl=(SseEventOutputImpl)eventSink; outputImpl.send(event, callAllErrConsumers());});
//return event immediately and doesn't block anything
return CompletableFuture.supplyAsync(() -> { return event;});
}

BiConsumer<SseEventSink, Throwable> callAllErrConsumers() {
return (eventSink, err) -> {
onErrorConsumers.forEach(consumer -> {consumer.accept(eventSink, err);});
};

try
{
subscriber.onSubscribe(subscription);
subscribers.put(subscriber, subscription);
}
catch (final Exception ex)
{
subscriber.onError(ex);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;

import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Flow.Subscription;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.ServerErrorException;
import javax.ws.rs.core.GenericType;
Expand Down Expand Up @@ -62,7 +64,7 @@ public SseEventOutputImpl(final MessageBodyWriter<OutboundSseEvent> writer)
}

@Override
public void close() throws IOException
public void close()
{
if (request.getAsyncContext().isSuspended() && request.getAsyncContext().getAsyncResponse() != null) {
if (request.getAsyncContext().isSuspended()) {
Expand All @@ -80,15 +82,25 @@ public boolean isClosed()
{
return closed;
}

@Override
public void onSubscribe(Subscription subscription)
public CompletionStage<?> send(OutboundSseEvent event)
{
subscription.request(Long.MAX_VALUE);
return send(event, (a, b) -> {});
}

@Override
public void onNext(OutboundSseEvent event)
//We need this to make it async enough
public CompletionStage<?> send(OutboundSseEvent event, BiConsumer<SseEventSink, Throwable> errorConsumer)
{
CompletableFuture<Object> future = CompletableFuture
.supplyAsync(() -> {writeEvent(event); return event;});
//TODO: log this
future.exceptionally((Throwable ex) -> { errorConsumer.accept(this, ex); return ex;});
return future;
}


protected void writeEvent(OutboundSseEvent event)
{
ResteasyProviderFactory.pushContextDataMap(contextDataMap);
try {
Expand All @@ -106,21 +118,4 @@ public void onNext(OutboundSseEvent event)
ResteasyProviderFactory.removeContextDataLevel();
}
}

@Override
public void onError(Throwable throwable)
{
// TODO Auto-generated method stub
}

@Override
public void onComplete()
{
//TODO is this OK??
try {
close();
} catch (IOException e) {
throw new ProcessingException(e);
}
}
}

0 comments on commit f79a70a

Please sign in to comment.