Skip to content

Commit

Permalink
Update to new JSR-370 APIs (2017-02-02)
Browse files Browse the repository at this point in the history
  • Loading branch information
asoldano committed Apr 21, 2017
1 parent fb601ce commit e372882
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 138 deletions.
Original file line number Diff line number Diff line change
@@ -1,45 +1,37 @@
package org.jboss.resteasy.plugins.providers.sse;

import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import javax.ws.rs.Flow.Subscriber;
import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.SseBroadcaster;
import javax.ws.rs.sse.SseEventOutput;

public class SseBroadcasterImpl implements SseBroadcaster
{
private final Set<SseEventOutput> outputs = Collections.newSetFromMap(new ConcurrentHashMap<SseEventOutput, Boolean>());
private final Set<Listener> listeners = Collections.newSetFromMap(new ConcurrentHashMap<Listener, Boolean>());

@Override
public boolean register(Listener listener)
{
return listeners.add(listener);
}

@Override
public boolean register(SseEventOutput output)
{
return outputs.add(output);
}
private final Set<Subscriber<? super OutboundSseEvent>> outputs = Collections.newSetFromMap(new ConcurrentHashMap<Subscriber<? super OutboundSseEvent>, Boolean>());
private final Set<Consumer<SseEventOutput>> onCloseConsumers = Collections.newSetFromMap(new ConcurrentHashMap<Consumer<SseEventOutput>, Boolean>());
private final Set<BiConsumer<SseEventOutput, Exception>> onExceptionConsumers = Collections.newSetFromMap(new ConcurrentHashMap<BiConsumer<SseEventOutput, Exception>, Boolean>());

@Override
public void broadcast(OutboundSseEvent event)
{
for (final SseEventOutput output : outputs)
for (final Subscriber<? super OutboundSseEvent> output : outputs)
{
try
{
output.write(event);
output.onNext(event);
}
catch (final IOException ex)
catch (final Exception ex)
{
for (Listener listener : listeners)
output.onError(ex);
for (BiConsumer<SseEventOutput, Exception> oec : onExceptionConsumers)
{
listener.onException(output, ex);
oec.accept((SseEventOutput)output, ex);
}
}
}
Expand All @@ -48,23 +40,42 @@ public void broadcast(OutboundSseEvent event)
@Override
public void close()
{
for (final SseEventOutput output : outputs)
for (final Subscriber<? super OutboundSseEvent> output : outputs)
{
try
{
output.close();
for (Listener listener : listeners)
output.onComplete();
for (Consumer<SseEventOutput> consumer : onCloseConsumers)
{
listener.onClose(output);
consumer.accept((SseEventOutput)output);
}
}
catch (final IOException ex)
catch (final Exception ex)
{
for (Listener listener : listeners)
output.onError(ex);
for (BiConsumer<SseEventOutput, Exception> oec : onExceptionConsumers)
{
listener.onException(output, ex);
oec.accept((SseEventOutput)output, ex);
}
}
}
}

@Override
public void onException(BiConsumer<SseEventOutput, Exception> onException)
{
onExceptionConsumers.add(onException);
}

@Override
public void onClose(Consumer<SseEventOutput> onClose)
{
onCloseConsumers.add(onClose);
}

@Override
public void subscribe(Subscriber<? super OutboundSseEvent> subscriber)
{
outputs.add(subscriber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.Map;

import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Flow.Subscription;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.ServerErrorException;
import javax.ws.rs.core.GenericType;
Expand Down Expand Up @@ -74,23 +75,50 @@ public void close() throws IOException
}

@Override
public void write(OutboundSseEvent event) throws IOException
public boolean isClosed()
{
return closed;
}

@Override
public void onSubscribe(Subscription subscription)
{
// TODO Auto-generated method stub

}

@Override
public void onNext(OutboundSseEvent event)
{
ResteasyProviderFactory.pushContextDataMap(contextDataMap);
if (event != null)
{
ByteArrayOutputStream bout = new ByteArrayOutputStream();
writer.writeTo(event, event.getClass(), null, new Annotation[]
{}, event.getMediaType(), null, bout);
response.getOutputStream().write(bout.toByteArray());
try {
if (event != null)
{
ByteArrayOutputStream bout = new ByteArrayOutputStream();
writer.writeTo(event, event.getClass(), null, new Annotation[]{}, event.getMediaType(), null, bout);
response.getOutputStream().write(bout.toByteArray());
}
response.getOutputStream().write(END);
response.flushBuffer();
} catch (Exception e) {
throw new ProcessingException(e);
}
response.getOutputStream().write(END);
response.flushBuffer();
}

@Override
public boolean isClosed()
public void onError(Throwable throwable)
{
return closed;
// TODO Auto-generated method stub
}

@Override
public void onComplete()
{
//TODO is this OK??
try {
close();
} catch (IOException e) {
throw new ProcessingException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import javax.ws.rs.Flow.Subscriber;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
Expand All @@ -38,8 +39,8 @@ private enum State {
private final boolean disableKeepAlive;
private final ScheduledExecutorService executor;
private final AtomicReference<State> state = new AtomicReference<>(State.READY);
private final List<Listener> unboundListeners = new CopyOnWriteArrayList<>();
private final ConcurrentMap<String, List<Listener>> boundListeners = new ConcurrentHashMap<>();
private final List<Subscriber<InboundSseEvent>> unboundListeners = new CopyOnWriteArrayList<>();
private final ConcurrentMap<String, List<Subscriber<InboundSseEvent>>> boundListeners = new ConcurrentHashMap<>();

public static class SourceBuilder extends Builder
{
Expand Down Expand Up @@ -78,20 +79,6 @@ public Builder target(WebTarget endpoint)
return new SourceBuilder(endpoint);
}

@Override
public Builder register(Listener listener)
{
//TODO: this api should be revised
return this;
}

@Override
public Builder register(Listener listener, String eventName, String... eventNames)
{
//TODO: this api should be revised
return this;
}

@Override
public Builder reconnectingEvery(long delay, TimeUnit unit)
{
Expand Down Expand Up @@ -178,38 +165,41 @@ public void close()
this.close(5, TimeUnit.SECONDS);
}

public void register(final Listener listener)
@Override
public void subscribe(Subscriber<? super InboundSseEvent> subscriber)
{
register(listener, null);
subscribe((Subscriber<InboundSseEvent>)subscriber, null); //TODO is this OK?
}

public void register(final Listener listener, final String eventName, final String... eventNames)
@Override
public void subscribe(Subscriber<InboundSseEvent> subscriber, String eventName, String... eventNames)
{
if (eventName == null)
{
unboundListeners.add(listener);
unboundListeners.add(subscriber);
}
else
{
addBoundListener(eventName, listener);
addBoundListener(eventName, subscriber);

if (eventNames != null)
{
for (String name : eventNames)
{
addBoundListener(name, listener);
addBoundListener(name, subscriber);
}
}
}

}

private void addBoundListener(final String name, final Listener listener)
private void addBoundListener(final String name, final Subscriber<InboundSseEvent> subscriber)
{
List<Listener> listeners = boundListeners.putIfAbsent(name,
new CopyOnWriteArrayList<>(Collections.singleton(listener)));
if (listeners != null)
List<Subscriber<InboundSseEvent>> subscribers = boundListeners.putIfAbsent(name,
new CopyOnWriteArrayList<>(Collections.singleton(subscriber)));
if (subscribers != null)
{
listeners.add(listener);
subscribers.add(subscriber);
}
}

Expand All @@ -235,7 +225,7 @@ public boolean close(final long timeout, final TimeUnit unit)
return true;
}

private class EventHandler implements Runnable, Listener
private class EventHandler implements Runnable
{

private final CountDownLatch connectedLatch;
Expand Down Expand Up @@ -301,8 +291,7 @@ public void awaitConnected()

}

@Override
public void onEvent(final InboundSseEvent event)
private void onEvent(final InboundSseEvent event)
{
if (event == null)
{
Expand All @@ -319,10 +308,10 @@ public void onEvent(final InboundSseEvent event)
final String eventName = event.getName();
if (eventName != null)
{
final List<Listener> eventListeners = boundListeners.get(eventName);
if (eventListeners != null)
final List<Subscriber<InboundSseEvent>> eventSubscribers = boundListeners.get(eventName);
if (eventSubscribers != null)
{
notify(eventListeners, event);
notify(eventSubscribers, event);
}
}
notify(unboundListeners, event);
Expand All @@ -342,13 +331,13 @@ private Invocation.Builder buildRequest()
return request;
}

private void notify(final Collection<Listener> listeners, final InboundSseEvent event)
private void notify(final Collection<Subscriber<InboundSseEvent>> subscribers, final InboundSseEvent event)
{
if (listeners != null)
if (subscribers != null)
{
for (Listener listener : listeners)
for (Subscriber<InboundSseEvent> subscriber : subscribers)
{
listener.onEvent(event);
subscriber.onNext(event);
}
}
}
Expand Down

0 comments on commit e372882

Please sign in to comment.