Skip to content
Browse files

Refactoring TyrusWebSocketEngine (removing a lot of anonymous classes)

  • Loading branch information...
1 parent 3f4145a commit 2e0ef560fa8bfbfe2a73d45eb5a788309a3d6f66 Jitendra Kotamraju committed Oct 25, 2013
Showing with 192 additions and 172 deletions.
  1. +192 −172 core/src/main/java/org/glassfish/tyrus/core/TyrusWebSocketEngine.java
View
364 core/src/main/java/org/glassfish/tyrus/core/TyrusWebSocketEngine.java
@@ -180,151 +180,128 @@ public UpgradeInfo upgrade(final UpgradeRequest request, final UpgradeResponse r
final ProtocolHandler protocolHandler = loadHandler(request);
if (protocolHandler == null) {
handleUnsupportedVersion(request, response);
- return new UpgradeInfo() {
- @Override
- public UpgradeStatus getStatus() {
- return UpgradeStatus.HANDSHAKE_FAILED;
- }
-
- @Override
- public Connection createConnection(Writer writer, Connection.CloseListener cl) {
- return null;
- }
- };
+ return HANDSHAKE_FAILED_UPGRADE_INFO;
}
-
protocolHandler.handshake(app, request, response);
-
- return new UpgradeInfo() {
- @Override
- public UpgradeStatus getStatus() {
- return UpgradeStatus.SUCCESS;
- }
-
- @Override
- public Connection createConnection(final Writer writer, final Connection.CloseListener closeListener) {
-
- protocolHandler.setWriter(writer);
- final WebSocket socket = app.createSocket(protocolHandler, app);
- final WebSocketHolder holder = new WebSocketHolder(protocolHandler, socket, null, app);
-
- socket.onConnect();
-
- return new Connection() {
-
- private final ReadHandler readHandler = TyrusWebSocketEngine.this.getReadHandler(holder);
-
- @Override
- public ReadHandler getReadHandler() {
- return readHandler;
- }
-
- @Override
- public Writer getWriter() {
- return writer;
- }
-
- @Override
- public CloseListener getCloseListener() {
- return closeListener;
- }
-
- @Override
- public void close(CloseReason reason) {
- socket.close(reason.getCloseCode().getCode(), reason.getReasonPhrase());
- }
- };
- }
- };
+ return new SuccessfulUpgradeInfo(app, protocolHandler, incomingBufferSize);
}
} catch (HandshakeException e) {
LOGGER.log(Level.SEVERE, e.getMessage(), e);
-
response.setStatus(e.getCode());
-
- return new UpgradeInfo() {
- @Override
- public UpgradeStatus getStatus() {
- return UpgradeStatus.HANDSHAKE_FAILED;
- }
-
- @Override
- public Connection createConnection(Writer writer, Connection.CloseListener cl) {
- return null;
- }
- };
+ return HANDSHAKE_FAILED_UPGRADE_INFO;
}
response.setStatus(500);
-
- return new UpgradeInfo() {
- @Override
- public UpgradeStatus getStatus() {
- return UpgradeStatus.NOT_APPLICABLE;
- }
-
- @Override
- public Connection createConnection(Writer writer, Connection.CloseListener cl) {
- return null;
- }
- };
+ return NOT_APPLICABLE_UPGRADE_INFO;
}
- public ReadHandler getReadHandler(final WebSocketHolder webSocketHolder) {
-
- return new ReadHandler() {
- @Override
- public void handle(ByteBuffer data) {
- TyrusWebSocketEngine.this.processData(webSocketHolder, data);
- }
- };
+ public ReadHandler getReadHandler(WebSocketHolder webSocketHolder) {
+ return new TyrusReadHandler(webSocketHolder, incomingBufferSize);
}
- private void processData(WebSocketHolder holder, ByteBuffer data) {
- if (holder == null) {
- // TODO?
- return;
+ private static class TyrusReadHandler implements ReadHandler {
+ private final WebSocketHolder webSocketHolder;
+ private final int incomingBufferSize;
+
+ TyrusReadHandler(final WebSocketHolder webSocketHolder, int incomingBufferSize) {
+ this.webSocketHolder = webSocketHolder;
+ this.incomingBufferSize = incomingBufferSize;
}
- try {
- if (data != null && data.hasRemaining()) {
+ public void handle(ByteBuffer data) {
+ if (webSocketHolder == null) {
+ // TODO?
+ return;
+ }
- if (holder.buffer != null) {
- data = appendBuffers(holder.buffer, data);
- } else {
- int newSize = data.remaining();
- if (newSize > incomingBufferSize) {
- throw new IllegalArgumentException("Buffer overflow.");
- } else {
- final int roundedSize = (newSize % BUFFER_STEP_SIZE) > 0 ? ((newSize / BUFFER_STEP_SIZE) + 1) * BUFFER_STEP_SIZE : newSize;
- final ByteBuffer result = ByteBuffer.allocate(roundedSize > incomingBufferSize ? newSize : roundedSize);
- result.flip();
- data = appendBuffers(result, data);
- }
- }
+ try {
+ if (data != null && data.hasRemaining()) {
- do {
- final DataFrame result = holder.handler.unframe(data);
- if (result == null) {
- holder.buffer = data;
- break;
+ if (webSocketHolder.buffer != null) {
+ data = appendBuffers(webSocketHolder.buffer, data);
} else {
- result.respond(holder.webSocket);
+ int newSize = data.remaining();
+ if (newSize > incomingBufferSize) {
+ throw new IllegalArgumentException("Buffer overflow.");
+ } else {
+ final int roundedSize = (newSize % BUFFER_STEP_SIZE) > 0 ? ((newSize / BUFFER_STEP_SIZE) + 1) * BUFFER_STEP_SIZE : newSize;
+ final ByteBuffer result = ByteBuffer.allocate(roundedSize > incomingBufferSize ? newSize : roundedSize);
+ result.flip();
+ data = appendBuffers(result, data);
+ }
}
- } while (true);
+
+ do {
+ final DataFrame result = webSocketHolder.handler.unframe(data);
+ if (result == null) {
+ webSocketHolder.buffer = data;
+ break;
+ } else {
+ result.respond(webSocketHolder.webSocket);
+ }
+ } while (true);
+ }
+ } catch (FramingException e) {
+ e.printStackTrace();
+ webSocketHolder.webSocket.onClose(new CloseReason(CloseReason.CloseCodes.getCloseCode(e.getClosingCode()), e.getMessage()));
+ } catch (Exception wse) {
+ wse.printStackTrace();
+
+ // client-side only
+ if (webSocketHolder.application == null) {
+ webSocketHolder.webSocket.onClose(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, wse.getMessage()));
+ // server
+ } else if (webSocketHolder.application.onError(webSocketHolder.webSocket, wse)) {
+ webSocketHolder.webSocket.onClose(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, wse.getMessage()));
+ }
}
- } catch (FramingException e) {
- e.printStackTrace();
- holder.webSocket.onClose(new CloseReason(CloseReason.CloseCodes.getCloseCode(e.getClosingCode()), e.getMessage()));
- } catch (Exception wse) {
- wse.printStackTrace();
-
- // client-side only
- if (holder.application == null) {
- holder.webSocket.onClose(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, wse.getMessage()));
- // server
- } else if (holder.application.onError(holder.webSocket, wse)) {
- holder.webSocket.onClose(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, wse.getMessage()));
+ }
+
+ /**
+ * Concatenates two buffers into one. If buffer given as first argument has enough space for putting
+ * the other one, it will be done and the original buffer will be returned. Otherwise new buffer will
+ * be created.
+ *
+ * @param buffer first buffer.
+ * @param buffer1 second buffer.
+ * @return concatenation.
+ */
+ private ByteBuffer appendBuffers(ByteBuffer buffer, ByteBuffer buffer1) {
+
+ final int limit = buffer.limit();
+ final int capacity = buffer.capacity();
+ final int remaining = buffer.remaining();
+ final int len = buffer1.remaining();
+
+ // buffer1 will be appended to buffer
+ if (len < (capacity - limit)) {
+
+ buffer.mark();
+ buffer.position(limit);
+ buffer.limit(capacity);
+ buffer.put(buffer1);
+ buffer.limit(limit + len);
+ buffer.reset();
+ return buffer;
+ // Remaining data is moved to left. Then new data is appended
+ } else if (remaining + len < capacity) {
+ buffer.compact();
+ buffer.put(buffer1);
+ buffer.flip();
+ return buffer;
+ // create new buffer
+ } else {
+ int newSize = remaining + len;
+ if (newSize > incomingBufferSize) {
+ throw new IllegalArgumentException("Buffer overflow.");
+ } else {
+ final int roundedSize = (newSize % BUFFER_STEP_SIZE) > 0 ? ((newSize / BUFFER_STEP_SIZE) + 1) * BUFFER_STEP_SIZE : newSize;
+ final ByteBuffer result = ByteBuffer.allocate(roundedSize > incomingBufferSize ? newSize : roundedSize);
+ result.put(buffer);
+ result.put(buffer1);
+ result.flip();
+ return result;
+ }
}
}
}
@@ -333,53 +310,7 @@ public void setIncomingBufferSize(int incomingBufferSize) {
this.incomingBufferSize = incomingBufferSize;
}
- /**
- * Concatenates two buffers into one. If buffer given as first argument has enough space for putting
- * the other one, it will be done and the original buffer will be returned. Otherwise new buffer will
- * be created.
- *
- * @param buffer first buffer.
- * @param buffer1 second buffer.
- * @return concatenation.
- */
- private ByteBuffer appendBuffers(ByteBuffer buffer, ByteBuffer buffer1) {
-
- final int limit = buffer.limit();
- final int capacity = buffer.capacity();
- final int remaining = buffer.remaining();
- final int len = buffer1.remaining();
-
- // buffer1 will be appended to buffer
- if (len < (capacity - limit)) {
-
- buffer.mark();
- buffer.position(limit);
- buffer.limit(capacity);
- buffer.put(buffer1);
- buffer.limit(limit + len);
- buffer.reset();
- return buffer;
- // Remaining data is moved to left. Then new data is appended
- } else if (remaining + len < capacity) {
- buffer.compact();
- buffer.put(buffer1);
- buffer.flip();
- return buffer;
- // create new buffer
- } else {
- int newSize = remaining + len;
- if (newSize > incomingBufferSize) {
- throw new IllegalArgumentException("Buffer overflow.");
- } else {
- final int roundedSize = (newSize % BUFFER_STEP_SIZE) > 0 ? ((newSize / BUFFER_STEP_SIZE) + 1) * BUFFER_STEP_SIZE : newSize;
- final ByteBuffer result = ByteBuffer.allocate(roundedSize > incomingBufferSize ? newSize : roundedSize);
- result.put(buffer);
- result.put(buffer1);
- result.flip();
- return result;
- }
- }
- }
+
/**
* Registers the specified {@link WebSocketApplication} with the
@@ -484,4 +415,93 @@ public WebSocketHolder(final ProtocolHandler handler, final WebSocket socket, fi
this.application = application;
}
}
+
+ private static final UpgradeInfo NOT_APPLICABLE_UPGRADE_INFO =
+ new NoConnectionUpgradeInfo(UpgradeStatus.NOT_APPLICABLE);
+
+ private static final UpgradeInfo HANDSHAKE_FAILED_UPGRADE_INFO =
+ new NoConnectionUpgradeInfo(UpgradeStatus.HANDSHAKE_FAILED);
+
+
+ private static class NoConnectionUpgradeInfo implements UpgradeInfo {
+ private final UpgradeStatus status;
+
+ NoConnectionUpgradeInfo(UpgradeStatus status) {
+ this.status = status;
+ }
+
+ @Override
+ public UpgradeStatus getStatus() {
+ return status;
+ }
+
+ @Override
+ public Connection createConnection(Writer writer, Connection.CloseListener closeListener) {
+ return null;
+ }
+ }
+
+ private static class SuccessfulUpgradeInfo implements UpgradeInfo {
+
+ private final WebSocketApplication app;
+ private final ProtocolHandler protocolHandler;
+ private final int incomingBufferSize;
+
+ SuccessfulUpgradeInfo(WebSocketApplication app, ProtocolHandler protocolHandler, int incomingBufferSize) {
+ this.app = app;
+ this.protocolHandler = protocolHandler;
+ this.incomingBufferSize = incomingBufferSize;
+ }
+
+ @Override
+ public UpgradeStatus getStatus() {
+ return UpgradeStatus.SUCCESS;
+ }
+
+ @Override
+ public Connection createConnection(Writer writer, Connection.CloseListener closeListener) {
+ return new TyrusConnection(app, protocolHandler, incomingBufferSize, writer, closeListener);
+ }
+ }
+
+ static class TyrusConnection implements Connection {
+
+ private final ReadHandler readHandler;
+ private final Writer writer;
+ private final CloseListener closeListener;
+ private final WebSocket socket;
+
+ TyrusConnection(WebSocketApplication app, ProtocolHandler protocolHandler, int incomingBufferSize, Writer writer, Connection.CloseListener closeListener) {
+ protocolHandler.setWriter(writer);
+ final WebSocket socket = app.createSocket(protocolHandler, app);
+ final WebSocketHolder holder = new WebSocketHolder(protocolHandler, socket, null, app);
+
+ socket.onConnect();
+ this.socket = socket;
+ this.readHandler = new TyrusReadHandler(holder, incomingBufferSize);
+ this.writer = writer;
+ this.closeListener = closeListener;
+ }
+
+ @Override
+ public ReadHandler getReadHandler() {
+ return readHandler;
+ }
+
+ @Override
+ public Writer getWriter() {
+ return writer;
+ }
+
+ @Override
+ public CloseListener getCloseListener() {
+ return closeListener;
+ }
+
+ @Override
+ public void close(CloseReason reason) {
+ socket.close(reason.getCloseCode().getCode(), reason.getReasonPhrase());
+ }
+ }
+
}

0 comments on commit 2e0ef56

Please sign in to comment.
Something went wrong with that request. Please try again.