Skip to content

Commit

Permalink
[RESTEASY-1775] SseImpl should return a new SseBroadcaster instance i…
Browse files Browse the repository at this point in the history
…nstead of a singleton (#1375)

* newBroadcaster() should return a new SseBroadcaster instance

Signed-off-by: NicoNes <nicolas.nesmon@gmail.com>

* Fix SseBroadcaster test

Signed-off-by: NicoNes <nicolas.nesmon@gmail.com>
  • Loading branch information
NicoNes authored and asoldano committed Jan 9, 2018
1 parent 2f16b0f commit 41d10b5
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 22 deletions.
Expand Up @@ -6,11 +6,8 @@

public class SseImpl implements Sse
{
//spec leader said there will be a request scope broadcaster and a static broadcaster
//implement a static boradcaster first
public static SseBroadcaster broadCaster = new SseBroadcasterImpl();

@Override
@Override
public OutboundSseEvent.Builder newEventBuilder()
{
return new OutboundSseEventImpl.BuilderImpl();
Expand All @@ -19,6 +16,6 @@ public OutboundSseEvent.Builder newEventBuilder()
@Override
public SseBroadcaster newBroadcaster()
{
return broadCaster;
return new SseBroadcasterImpl();
}
}
@@ -1,11 +1,14 @@
package org.jboss.resteasy.test.providers.sse;

import java.io.IOException;
import java.util.concurrent.ExecutorService;

import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.container.ResourceContext;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.Sse;
Expand All @@ -17,13 +20,20 @@ public class AnotherSseResource
{

private final Object outputLock = new Object();
private final Object sseBroadcasterLock = new Object();

@Context
private Sse sse;

private volatile SseEventSink eventSink;

private SseBroadcaster sseBroadcaster;
private volatile SseBroadcaster sseBroadcaster;

private final SseResource sseResource;

public AnotherSseResource(SseResource sseResource) {
this.sseResource = sseResource;
}

@GET
@Path("/subscribe")
Expand All @@ -36,13 +46,27 @@ public void subscribe(@Context SseEventSink sink) throws IOException
throw new IllegalStateException("No client connected.");
}
eventSink = sink;
//subscribe
if (sseBroadcaster == null)
{
sseBroadcaster = sse.newBroadcaster();
synchronized (this.sseBroadcasterLock) {
//subscribe
if (sseBroadcaster == null)
{
sseBroadcaster = sse.newBroadcaster();
}
}
this.sseResource.subscribe(sink);
sseBroadcaster.register(sink);
}

@POST
@Path("/broadcast")
public void broadcast(String message) throws IOException
{
if (this.sseBroadcaster == null)
{
throw new IllegalStateException("No Sse broadcaster created.");
}
this.sseBroadcaster.broadcast(sse.newEvent(message));
}

@DELETE
@Produces(MediaType.TEXT_PLAIN)
Expand Down
Expand Up @@ -15,8 +15,9 @@ public Set<Object> getSingletons()
{
if (singletons.isEmpty())
{
singletons.add(new SseResource());
singletons.add(new AnotherSseResource());
SseResource sseResource = new SseResource();
singletons.add(sseResource);
singletons.add(new AnotherSseResource(sseResource));
singletons.add(new EscapingSseResource());
}
return singletons;
Expand Down
Expand Up @@ -35,6 +35,7 @@ public class SseResource
{

private final Object outputLock = new Object();
private final Object sseBroadcasterLock = new Object();

@Context
private Sse sse;
Expand All @@ -44,7 +45,7 @@ public class SseResource

private volatile SseEventSink eventSink;

private SseBroadcaster sseBroadcaster;
private volatile SseBroadcaster sseBroadcaster;

private Object openLock = new Object();

Expand Down Expand Up @@ -138,21 +139,23 @@ public void subscribe(@Context SseEventSink sink) throws IOException
{
throw new IllegalStateException("No client connected.");
}
//subscribe
if (sseBroadcaster == null)
{
sseBroadcaster = sse.newBroadcaster();
synchronized (this.sseBroadcasterLock) {
//subscribe
if (sseBroadcaster == null)
{
sseBroadcaster = sse.newBroadcaster();
}
}
sseBroadcaster.register(sink);
}

@POST
@Path("/broadcast")
public void broadcast(String message) throws IOException
public void broadcast(String message)
{
if (sseBroadcaster == null)
if (this.sseBroadcaster == null)
{
sseBroadcaster = sse.newBroadcaster();
throw new IllegalStateException("No Sse broadcaster created.");
}
ExecutorService service = (ExecutorService) servletContext
.getAttribute(ExecutorServletContextListener.TEST_EXECUTOR);
Expand Down
Expand Up @@ -172,7 +172,7 @@ public void testSseEvent() throws Exception
@InSequence(4)
public void testBroadcast() throws Exception
{
final CountDownLatch latch = new CountDownLatch(2);
final CountDownLatch latch = new CountDownLatch(3);
Client client = new ResteasyClientBuilder().build();
WebTarget target = client.target(generateURL("/service/server-sent-events/subscribe"));
final String textMessage = "This is broadcast message";
Expand All @@ -198,6 +198,8 @@ public void testBroadcast() throws Exception

client.target(generateURL("/service/server-sent-events/broadcast")).request()
.post(Entity.entity(textMessage, MediaType.SERVER_SENT_EVENTS));
client2.target(generateURL("/service/sse/broadcast")).request()
.post(Entity.entity(textMessage, MediaType.SERVER_SENT_EVENTS));
Assert.assertTrue("Waiting for broadcast event to be delivered has timed out.", latch.await(20, TimeUnit.SECONDS));

//one subscriber is closed and test if another subscriber works
Expand Down

0 comments on commit 41d10b5

Please sign in to comment.