Skip to content

Commit

Permalink
接入层增加websocket协议支持
Browse files Browse the repository at this point in the history
  • Loading branch information
夜色 committed Dec 18, 2016
1 parent 4438e3a commit 3b5b4be
Showing 1 changed file with 20 additions and 15 deletions.
Expand Up @@ -3,8 +3,12 @@
import com.mpush.api.PacketReceiver;
import com.mpush.api.connection.Connection;
import com.mpush.api.connection.ConnectionManager;
import com.mpush.api.event.ConnectionCloseEvent;
import com.mpush.api.protocol.Packet;
import com.mpush.netty.codec.PacketDecoder;
import com.mpush.netty.connection.NettyConnection;
import com.mpush.tools.event.EventBus;
import com.mpush.tools.log.Logs;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
Expand All @@ -19,8 +23,7 @@
*/
@ChannelHandler.Sharable
public class WebSocketChannelHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
private final Logger logger = LoggerFactory.getLogger(WebSocketChannelHandler.class.getSimpleName());

private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketChannelHandler.class);
private final ConnectionManager connectionManager;
private final PacketReceiver receiver;

Expand All @@ -33,34 +36,36 @@ public WebSocketChannelHandler(ConnectionManager connectionManager, PacketReceiv
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
if (frame instanceof TextWebSocketFrame) {
String text = ((TextWebSocketFrame) frame).text();
receiver.onReceive(PacketDecoder.decodeFrame(text), connectionManager.get(ctx.channel()));
Connection connection = connectionManager.get(ctx.channel());
Packet packet = PacketDecoder.decodeFrame(text);
LOGGER.debug("channelRead conn={}, packet={}", ctx.channel(), connection.getSessionContext(), packet);
receiver.onReceive(packet, connection);
} else {
String message = "unsupported frame type: " + frame.getClass().getName();
throw new UnsupportedOperationException(message);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Connection connection = connectionManager.get(ctx.channel());
Logs.CONN.error("client caught ex, conn={}", connection);
LOGGER.error("caught an ex, channel={}, conn={}", ctx.channel(), connection, cause);
ctx.close();
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Logs.CONN.info("client connected conn={}", ctx.channel());
Connection connection = new NettyConnection();
connection.init(ctx.channel(), false);
connectionManager.add(connection);
logger.info("connect active, channel={}", ctx.channel());
super.channelActive(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
connectionManager.removeAndClose(ctx.channel());
super.channelInactive(ctx);
logger.info("connect inactive, channel={}", ctx.channel());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
connectionManager.removeAndClose(ctx.channel());
super.exceptionCaught(ctx, cause);
logger.info("connect ex, channel={}", ctx.channel(), cause);
Connection connection = connectionManager.removeAndClose(ctx.channel());
EventBus.I.post(new ConnectionCloseEvent(connection));
Logs.CONN.info("client disconnected conn={}", connection);
}
}

0 comments on commit 3b5b4be

Please sign in to comment.