Skip to content

Commit

Permalink
[RESTEASY-2083] Fix issue when SseBroadcasterImpl.notifyOnCloseListen…
Browse files Browse the repository at this point in the history
…ers removes wrong sink from the outputQueue (#1796) (#1863)

* Fix issue when SseBroadcasterImpl.notifyOnCloseListeners removes wrong sink from the outputQueue

* Progress: add test case

* Progress: respect code-style check

* Progress: respect code-style check

Fix
  • Loading branch information
asoldano committed Feb 13, 2019
1 parent b5ab15c commit 79dc7a1
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 9 deletions.
Expand Up @@ -314,4 +314,14 @@ private boolean contains(String[] ss, String t)
}
return false;
}

@Override
public boolean equals(Object o) {
return this == o;
}

@Override
public int hashCode() {
return ((Object)this).hashCode();
}
}
13 changes: 13 additions & 0 deletions testsuite/integration-tests/pom.xml
Expand Up @@ -99,6 +99,19 @@
</profiles>

<dependencies>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>1.9.5</version>
<exclusions>
<exclusion>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>

<!-- TODO Workaround dependency for arquillian to work with container using Remoting 5. Remove when updated version of
wildfly-arquillian-container-managed is available -->
<dependency>
Expand Down
@@ -1,19 +1,29 @@
package org.jboss.resteasy.test.providers.sse;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.SseEventSink;

import org.jboss.resteasy.plugins.providers.sse.OutboundSseEventImpl;
import org.jboss.resteasy.plugins.providers.sse.SseBroadcasterImpl;
import org.jboss.resteasy.plugins.providers.sse.SseEventOutputImpl;
import org.jboss.resteasy.spi.HttpRequest;
import org.jboss.resteasy.spi.ResteasyAsynchronousContext;
import org.jboss.resteasy.spi.ResteasyProviderFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.SseEventSink;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;

/***
*
* @author Nicolas NESMON
Expand Down Expand Up @@ -186,6 +196,66 @@ public void testErrorListeners() throws Exception
}
}

@Before
public void before(){
HttpRequest request = mock(HttpRequest.class);
ResteasyAsynchronousContext resteasyAsynchronousContext = mock(ResteasyAsynchronousContext.class);
doReturn(resteasyAsynchronousContext).when(request).getAsyncContext();

//prevent NPE in SseEventOutputImpl ctr
ResteasyProviderFactory.pushContext(org.jboss.resteasy.spi.HttpRequest.class, request);
}

@Test
public void testRemoveDisconnectedEventSink() throws Exception {
SseBroadcasterImpl sseBroadcasterImpl = new SseBroadcasterImpl();

final ConcurrentLinkedQueue<SseEventSink> outputQueue = getOutputQueue(sseBroadcasterImpl);
CountDownLatch countDownLatch = new CountDownLatch(2);

//we want to test against actual SseEventOutputImpl
final SseEventSink sseEventSink1 = new SseEventOutputImpl(null);
final SseEventSink sseEventSink2 = new SseEventOutputImpl(null);


sseBroadcasterImpl.register(sseEventSink1);
sseBroadcasterImpl.register(sseEventSink2);


sseBroadcasterImpl.onClose(ses -> {
countDownLatch.countDown();
});

sseBroadcasterImpl.onError((ses, error) -> {
//error is an NPE thrown by SseEventOutputImpl#send
countDownLatch.countDown();
});

sseEventSink2.close();

sseBroadcasterImpl.broadcast(new OutboundSseEventImpl.BuilderImpl().data("Test").build());

if (!countDownLatch.await(5, TimeUnit.SECONDS))
{
fail("All close listeners should have been notified");
} else {
Assert.assertTrue(outputQueue.size() == 1);
Assert.assertSame(outputQueue.peek(), sseEventSink1);
}
}

private ConcurrentLinkedQueue<SseEventSink> getOutputQueue(SseBroadcasterImpl sseBroadcasterImpl) throws NoSuchFieldException, IllegalAccessException {
Field fld = SseBroadcasterImpl.class.getDeclaredField("outputQueue");
fld.setAccessible(true);
return (ConcurrentLinkedQueue<SseEventSink>) fld.get(sseBroadcasterImpl);
}

@org.junit.After
public void after(){
//revert contextual data
ResteasyProviderFactory.pushContext(org.jboss.resteasy.spi.HttpRequest.class, null);
}

private SseEventSink newSseEventSink()
{
return newSseEventSink(null);
Expand Down

0 comments on commit 79dc7a1

Please sign in to comment.