Skip to content
This repository has been archived by the owner on May 3, 2018. It is now read-only.

Commit

Permalink
Websocket chat demo.
Browse files Browse the repository at this point in the history
Apparently Ant is smart enough to grab everything.
Fixed a minor bug with server-initiated closing handshakes. (Eats
pings.)
  • Loading branch information
JonathanD committed Feb 21, 2012
1 parent 1311121 commit 0bfcb57
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 19 deletions.
43 changes: 26 additions & 17 deletions java/org/apache/catalina/websocket/WebSocketConnection.java
Expand Up @@ -68,12 +68,14 @@ public static enum WebSocketState {
*
* @param WebSocketFrame
* the frame to send
* @returns true if the frame was sent (false if connection was not open)
* @returns true if the frame was sent (false if the connection
* was not open, in which case the frame is swallowed)
* @throws IOException
*/
public boolean send(WebSocketFrame frame) throws IOException {
// Don't send unless the connection is open
if(readyState != WebSocketState.OPEN) {
swallowFrame(frame);
return false;
}

Expand Down Expand Up @@ -129,8 +131,8 @@ protected void onOpen() {
}

/**
* Called when the connection is closed normally (subclasses may override
* this method)
* Called when the connection is closed
* (subclasses may override this method)
*/
protected void onClose() {
// Subclasses may override this method
Expand Down Expand Up @@ -158,7 +160,8 @@ public SocketState onData() throws IOException {
if (frame.isData()) {
send(WebSocketFrame
.makeCloseFrame(StatusCode.ProtocolError));
close(false);
onError();
closeImmediately();
}
} else {
// This frame is the first frame of a new message
Expand All @@ -167,7 +170,8 @@ public SocketState onData() throws IOException {
if (frame.getOpcode() == OpCode.Continuation) {
send(WebSocketFrame
.makeCloseFrame(StatusCode.ProtocolError));
close(false);
onError();
closeImmediately();
}
}

Expand All @@ -193,6 +197,7 @@ public SocketState onData() throws IOException {
readyState = WebSocketState.CLOSED;
onError();
return SocketState.CLOSED;

} catch (WebSocketClosedException c) {
// Tell the protocol above to drop the TCP
return SocketState.CLOSED;
Expand All @@ -218,13 +223,15 @@ private void handleControl(WebSocketFrame frame) throws IOException {
// Control frames must not be fragmented
if (frame.isFin() == false) {
send(WebSocketFrame.makeCloseFrame(StatusCode.ProtocolError));
close(false);
onError();
closeImmediately();
}

// Control frames must not have extended length
if (frame.getPayloadLength() > 125) {
send(WebSocketFrame.makeCloseFrame(StatusCode.ProtocolError));
close(false);
onError();
closeImmediately();
}

switch (frame.getOpcode()) {
Expand All @@ -243,12 +250,12 @@ private void handleControl(WebSocketFrame frame) throws IOException {
// Are we expecting a closing reply?
if(readyState == WebSocketState.CLOSING) {
// Handshake complete
close(true);
closeImmediately();
}

// Reply with a close
send(WebSocketFrame.closeFrame());
close(true);
closeImmediately();
break;
}
}
Expand All @@ -273,25 +280,27 @@ private void analyzeIncomingClose(WebSocketFrame close) throws IOException {
// information which is at least two bytes long
if (close.getPayloadLength() == 1) {
send(WebSocketFrame.makeCloseFrame(StatusCode.ProtocolError));
close(false);
onError();
closeImmediately();
}

Long statusCode = close.decodeStatusCode();
if (statusCode != null) {
if (!WebSocketFrame.StatusCode.isValid(statusCode)) {
//System.out.println("Close code invalid " + statusCode);
close(false);
onError();
closeImmediately();
}
}
}

private void close(boolean normalClose) throws IOException {
/**
* Drops the underlying TCP connection
* @throws IOException
*/
private void closeImmediately() throws IOException {
readyState = WebSocketState.CLOSED;
if(normalClose) {
onClose();
} else {
onError();
}
onClose();
throw new WebSocketClosedException();
}

Expand Down
8 changes: 8 additions & 0 deletions java/org/apache/catalina/websocket/WebSocketFrame.java
Expand Up @@ -541,6 +541,14 @@ public Reader getPayloadReader() {
return new InputStreamReader(payload, charsetDecoder);
}

/**
* @returns the payload as a byte array (must check length first)
* @throws IOException
*/
public byte[] getPayloadArray() throws IOException {
return readAll(payload, (int) (payloadLength));
}

public void setPayload(FiniteStream newPayload) {
payloadLength = newPayload.remaining();
payload = newPayload;
Expand Down
82 changes: 82 additions & 0 deletions webapps/examples/WEB-INF/classes/websocket/Chat.java
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package websocket;

import java.io.IOException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;

import org.apache.catalina.websocket.WebSocketConnection;
import org.apache.catalina.websocket.WebSocketFrame;
import org.apache.catalina.websocket.WebSocketServlet;

public class Chat extends WebSocketServlet {

private static final long serialVersionUID = 1L;

private final List<WebSocketConnection> connections =
new LinkedList<WebSocketConnection>();

@Override
protected WebSocketConnection createWebSocketConnection() {
return new ChatConnection();
}

private final class ChatConnection extends WebSocketConnection {

private static final long maxMessageSize = 65536; // 64KB

@Override
protected void onOpen() {
connections.add(this);
}

@Override
protected void onMessage(WebSocketFrame frame) throws IOException {

// There may be some clever way to fork input streams in
// a 1) single-threaded and 2) blocking environment, but I
// haven't discovered it yet. For now, we'll assume that
// people only send reasonable-sized chat messages.
if(frame.getPayloadLength() > maxMessageSize) {
System.out.println("Payload too big for a chat message");
close();
}

// Pull the payload into a byte array
byte[] payload = frame.getPayloadArray();

//System.out.println(Arrays.toString(payload));

for(WebSocketConnection connection : connections) {
frame.setPayload(payload);
connection.send(frame);
}
}

@Override
protected void onClose() {
connections.remove(this);
}

@Override
protected void onError() {
System.err.println("Chat WebSocket error");
}
}
}
11 changes: 9 additions & 2 deletions webapps/examples/WEB-INF/web.xml
Expand Up @@ -347,8 +347,15 @@
<url-pattern>/async/stockticker</url-pattern>
</servlet-mapping>

<!-- WebSocket Examples -->
<servlet>
<!-- WebSocket Examples --><servlet>
<servlet-name>wsChat</servlet-name>
<servlet-class>websocket.Chat</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>wsChat</servlet-name>
<url-pattern>/websocket/chat</url-pattern>
</servlet-mapping>
<servlet>
<servlet-name>wsEchoStream</servlet-name>
<servlet-class>websocket.EchoStream</servlet-class>
</servlet>
Expand Down

0 comments on commit 0bfcb57

Please sign in to comment.