Skip to content

Commit

Permalink
UNDERTOW-605 Improve SSE connection failure handling
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Jan 19, 2016
1 parent 8f95887 commit c2a9cfc
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 13 deletions.
4 changes: 4 additions & 0 deletions core/src/main/java/io/undertow/UndertowLogger.java
Expand Up @@ -21,6 +21,7 @@
import io.undertow.client.ClientConnection;
import io.undertow.server.HttpServerExchange;
import io.undertow.server.ServerConnection;
import io.undertow.server.handlers.sse.ServerSentEventConnection;
import io.undertow.util.HeaderMap;
import io.undertow.util.HttpString;
import org.jboss.logging.BasicLogger;
Expand Down Expand Up @@ -348,4 +349,7 @@ void nodeConfigCreated(URI connectionURI, String balancer, String domain, String
@Message(id = 5073, value = "Thread %s (id=%s) was previously reported to be stuck but has completed. It was active for approximately %s milliseconds. There is/are still %s thread(s) that are monitored by this Valve and may be stuck.")
void stuckThreadCompleted(String threadName, long threadId, long active, int stuckCount);

@LogMessage(level = ERROR)
@Message(id = 5074, value = "Failed to invoke error callback %s for SSE task")
void failedToInvokeFailedCallback(ServerSentEventConnection.EventCallback callback, @Cause Exception e);
}
Expand Up @@ -18,6 +18,8 @@

package io.undertow.server.handlers.sse;

import io.undertow.UndertowLogger;
import io.undertow.connector.PooledByteBuffer;
import io.undertow.security.api.SecurityContext;
import io.undertow.security.idm.Account;
import io.undertow.server.HttpServerExchange;
Expand All @@ -29,7 +31,6 @@
import org.xnio.ChannelListener;
import org.xnio.ChannelListeners;
import org.xnio.IoUtils;
import io.undertow.connector.PooledByteBuffer;
import org.xnio.XnioExecutor;
import org.xnio.channels.StreamSinkChannel;

Expand All @@ -39,8 +40,8 @@
import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.security.Principal;
import java.util.HashMap;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
Expand Down Expand Up @@ -89,6 +90,7 @@ public void handleEvent(StreamSinkChannel channel) {
for (ChannelListener<ServerSentEventConnection> listener : closeTasks) {
ChannelListeners.invokeChannelListener(ServerSentEventConnection.this, listener);
}
IoUtils.safeClose(ServerSentEventConnection.this);
}
});
this.sink.getWriteSetter().set(writeListener);
Expand Down Expand Up @@ -369,11 +371,34 @@ public boolean isOpen() {

@Override
public void close() throws IOException {
close(new ClosedChannelException());
}

private void close(IOException e) throws IOException {
if (openUpdater.compareAndSet(this, 1, 0)) {
if (pooled != null) {
pooled.close();
pooled = null;
}

for (SSEData i : buffered) {
if (i.callback != null) {
try {
i.callback.failed(this, i.data, i.event, i.id, e);
} catch (Exception ex) {
UndertowLogger.REQUEST_LOGGER.failedToInvokeFailedCallback(i.callback, ex);
}
}
}
for (SSEData i : queue) {
if (i.callback != null) {
try {
i.callback.failed(this, i.data, i.event, i.id, e);
} catch (Exception ex) {
UndertowLogger.REQUEST_LOGGER.failedToInvokeFailedCallback(i.callback, ex);
}
}
}
queue.clear();
buffered.clear();
sink.shutdownWrites();
Expand Down Expand Up @@ -488,16 +513,6 @@ public void handleEvent(StreamSinkChannel channel) {
}

private void handleException(IOException e) {
for (SSEData i : buffered) {
if(i.callback != null) {
i.callback.failed(this, i.data, i.event, i.id, e);
}
}
for(SSEData i : queue) {
if(i.callback != null) {
i.callback.failed(this, i.data, i.event, i.id, e);
}
}
IoUtils.safeClose(this);
IoUtils.safeClose(this, sink, exchange.getConnection());
}
}
Expand Up @@ -30,6 +30,11 @@
import org.xnio.IoUtils;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* @author Stuart Douglas
Expand Down Expand Up @@ -121,4 +126,41 @@ public void failed(ServerSentEventConnection connection, String data, String eve
}
}

@Test
public void testConnectionFail() throws IOException, InterruptedException {

final Socket socket = new Socket(DefaultServer.getHostAddress("default"), DefaultServer.getHostPort("default"));
final CountDownLatch latch = new CountDownLatch(1);
DefaultServer.setRootHandler(new ServerSentEventHandler(new ServerSentEventConnectionCallback() {
@Override
public void connected(ServerSentEventConnection connection, String lastEventId) {
while (connection.isOpen()) {
connection.send("hello", new ServerSentEventConnection.EventCallback() {
@Override
public void done(ServerSentEventConnection connection, String data, String event, String id) {
}

@Override
public void failed(ServerSentEventConnection connection, String data, String event, String id, IOException e) {
latch.countDown();
}
});
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}));
InputStream in = socket.getInputStream();
OutputStream out = socket.getOutputStream();
out.write(("GET / HTTP/1.1\r\n\r\n").getBytes());
out.flush();
out.close();
in.close();
if(!latch.await(10, TimeUnit.SECONDS)) {
Assert.fail();
}
}
}

0 comments on commit c2a9cfc

Please sign in to comment.