Skip to content

Commit

Permalink
add hand shake
Browse files Browse the repository at this point in the history
  • Loading branch information
ohun committed Dec 25, 2015
1 parent 3c96abf commit 8bbae9c
Show file tree
Hide file tree
Showing 28 changed files with 935 additions and 89 deletions.
19 changes: 11 additions & 8 deletions mpush-api/src/main/java/com/shinemo/mpush/api/Connection.java
Expand Up @@ -9,24 +9,27 @@
*/
public interface Connection {

void setConnectionInfo(ConnectionInfo info);

String getId();

void send(Packet packet);

boolean isClosed();

boolean isOpen();

int getHbTimes();

void close();

boolean isConnected();
boolean isConnected();

boolean isEnable();

void init(Channel channel);

String remoteIp();

boolean isEnable();

void init(Channel channel);

String remoteIp();

}
Expand Up @@ -4,14 +4,16 @@
* Created by ohun on 2015/12/22.
*/
public class ConnectionInfo {
public final String os;
public final String clientVer;
public final String osName;
public final String osVersion;
public final String clientVersion;
public final String deviceId;
public final String desKey;

public ConnectionInfo(String os, String clientVer, String deviceId, String desKey) {
this.os = os;
this.clientVer = clientVer;
public ConnectionInfo(String osName, String osVersion, String clientVersion, String deviceId, String desKey) {
this.osName = osName;
this.osVersion = osVersion;
this.clientVersion = clientVersion;
this.deviceId = deviceId;
this.desKey = desKey;
}
Expand Down
10 changes: 6 additions & 4 deletions mpush-api/src/main/java/com/shinemo/mpush/api/Constants.java
Expand Up @@ -7,21 +7,23 @@
*/
public interface Constants {
Charset UTF_8 = Charset.forName("UTF-8");
byte[] EMPTY_BYTES = new byte[0];
int MAX_PACKET_SIZE = 1024;
int HEADER_LEN = 13;
byte MAGIC_NUM1 = (byte) 33;
byte MAGIC_NUM2 = (byte) 99;
long TIME_DELAY = 58L;

String JVM_LOG_PATH = "/opt/";

int THREAD_QUEUE_SIZE = 10000;
int MIN_POOL_SIZE = 50;
int MAX_POOL_SIZE = 500;

int MIN_BOSS_POOL_SIZE = 10;
int MAX_BOSS_POLL_SIZE = 50;

int MIN_WORK_POOL_SIZE = 10;
int MAX_WORK_POOL_SIZE = 250;
int HEARTBEAT_TIME = 60 * 2 * 1000;//2min
}
24 changes: 12 additions & 12 deletions mpush-api/src/main/java/com/shinemo/mpush/api/RouterInfo.java
Expand Up @@ -4,20 +4,20 @@
* Created by ohun on 2015/12/23.
*/
public class RouterInfo {
private String ip;
private String os;
private String serverIp;
private String osName;
private String clientVer;

public RouterInfo(String ip) {
this.ip = ip;
public RouterInfo(String serverIp) {
this.serverIp = serverIp;
}

public String getOs() {
return os;
public String getOsName() {
return osName;
}

public void setOs(String os) {
this.os = os;
public void setOsName(String osName) {
this.osName = osName;
}

public String getClientVer() {
Expand All @@ -28,11 +28,11 @@ public void setClientVer(String clientVer) {
this.clientVer = clientVer;
}

public String getIp() {
return ip;
public String getServerIp() {
return serverIp;
}

public void setIp(String ip) {
this.ip = ip;
public void setServerIp(String serverIp) {
this.serverIp = serverIp;
}
}
14 changes: 14 additions & 0 deletions mpush-api/src/main/java/com/shinemo/mpush/api/Server.java
@@ -0,0 +1,14 @@
package com.shinemo.mpush.api;

/**
* Created by ohun on 2015/12/24.
*/
public interface Server {
void init();

void start();

void stop();

boolean isRunning();
}
Expand Up @@ -6,6 +6,7 @@
import com.shinemo.mpush.api.Request;
import com.shinemo.mpush.core.ConnectionManager;
import com.shinemo.mpush.core.MessageReceiver;
import com.shinemo.mpush.core.NettyConnection;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -19,52 +20,40 @@
*/
@ChannelHandler.Sharable
public class ConnectionHandler extends ChannelHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(ConnectionHandler.class);
private MessageReceiver receiver;

private static final Logger log = LoggerFactory.getLogger(ConnectionHandler.class);

private final MessageReceiver receiver;

public ConnectionHandler(MessageReceiver receiver) {
this.receiver = receiver;
}
this.receiver = receiver;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.warn(ctx.channel().remoteAddress()+", channelRead");
Connection connection = ConnectionManager.INSTANCE.get(ctx.channel());
log.warn(ctx.channel().remoteAddress() + ", channelRead");
Connection connection = ConnectionManager.INSTANCE.get(ctx.channel());

receiver.onMessage(new Request((Packet) msg, connection));
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ConnectionManager.INSTANCE.remove(ctx.channel());
log.warn("",ctx.channel().remoteAddress()+", exceptionCaught");
log.warn(ctx.channel().remoteAddress() + ", exceptionCaught");
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.warn(ctx.channel().remoteAddress()+", channelActive");
super.channelActive(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.warn(ctx.channel().remoteAddress()+", channelInactive");
super.channelInactive(ctx);
}

@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
log.warn(ctx.channel().remoteAddress()+", disconnect");
super.disconnect(ctx, promise);
ConnectionManager.INSTANCE.remove(ctx.channel());
log.warn(ctx.channel().remoteAddress() + ", channelActive");
Connection connection = new NettyConnection();
connection.init(ctx.channel());
ConnectionManager.INSTANCE.add(connection);
}

@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
log.warn(ctx.channel().remoteAddress()+", close");
super.close(ctx, promise);
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.warn(ctx.channel().remoteAddress() + ", channelInactive");
ConnectionManager.INSTANCE.remove(ctx.channel());
}

}
Expand Up @@ -5,6 +5,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.shinemo.mpush.api.Server;
import com.shinemo.mpush.connection.netty.NettySharedHolder;
import com.shinemo.mpush.connection.netty.encoder.PacketDecoder;
import com.shinemo.mpush.connection.netty.encoder.PacketEncoder;
Expand All @@ -20,18 +21,19 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* Created by ohun on 2015/12/22.
*/
public class ConnectionServer {
private static final Logger log = LoggerFactory.getLogger(ConnectionServer.class);
public class ConnectionServer implements Server {

private static final Logger log = LoggerFactory.getLogger(ConnectionServer.class);

private final AtomicBoolean startFlag = new AtomicBoolean(false);



private final int port;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
Expand All @@ -40,19 +42,30 @@ public ConnectionServer(int port) {
this.port = port;
}

@Override
public void init() {

}

@Override
public boolean isRunning() {
return startFlag.get();
}

@Override
public void stop() {
log.info("netty server stop now");
log.info("netty server stop now");
this.startFlag.set(false);
if (workerGroup != null) workerGroup.shutdownGracefully();
if (bossGroup != null) bossGroup.shutdownGracefully();
}

@Override
public void start() {

if (!startFlag.compareAndSet(false, true)) {
return;
}

/***
* NioEventLoopGroup 是用来处理I/O操作的多线程事件循环器,
* Netty提供了许多不同的EventLoopGroup的实现用来处理不同传输协议。
Expand All @@ -64,8 +77,8 @@ public void start() {
* 如何知道多少个线程已经被使用,如何映射到已经创建的Channels上都需要依赖于EventLoopGroup的实现,
* 并且可以通过构造函数来配置他们的关系。
*/
this.bossGroup = new NioEventLoopGroup(0,ThreadPoolUtil.getBossExecutor());
this.workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(),ThreadPoolUtil.getWorkExecutor());
this.bossGroup = new NioEventLoopGroup(0, ThreadPoolUtil.getBossExecutor());
this.workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(), ThreadPoolUtil.getWorkExecutor());

try {

Expand All @@ -86,18 +99,16 @@ public void start() {
* 这里告诉Channel如何获取新的连接.
*/
b.channel(NioServerSocketChannel.class);

MessageReceiver receiver = new MessageReceiver();

final ConnectionHandler connectionHandler = new ConnectionHandler(receiver);

final ConnectionHandler connectionHandler = new ConnectionHandler(new MessageReceiver());

/***
* 这里的事件处理类经常会被用来处理一个最近的已经接收的Channel。
* ChannelInitializer是一个特殊的处理类,
* 他的目的是帮助使用者配置一个新的Channel。
* 也许你想通过增加一些处理类比如NettyServerHandler来配置一个新的Channel
* 或者其对应的ChannelPipeline来实现你的网络程序。
* 当你的程序变的复杂时,可能你会增加更多的处理类到pipline上
* 当你的程序变的复杂时,可能你会增加更多的处理类到pipeline上
* 然后提取这些匿名类到最顶层的类上。
*/
b.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
Expand All @@ -116,33 +127,35 @@ public void initChannel(SocketChannel ch) throws Exception {
* 请参考ChannelOption和详细的ChannelConfig实现的接口文档以此可以对ChannelOptions的有一个大概的认识。
*/
b.option(ChannelOption.SO_BACKLOG, 1024);

/***
* option()是提供给NioServerSocketChannel用来接收进来的连接。
* childOption()是提供给由父管道ServerChannel接收到的连接,
* 在这个例子中也是NioServerSocketChannel。
*/
b.childOption(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.ALLOCATOR, NettySharedHolder.byteBufAllocator);
b.childOption(ChannelOption.ALLOCATOR, NettySharedHolder.byteBufAllocator);

b.option(ChannelOption.ALLOCATOR, NettySharedHolder.byteBufAllocator);
b.childOption(ChannelOption.ALLOCATOR, NettySharedHolder.byteBufAllocator);


/***
* 绑定端口并启动去接收进来的连接
*/
ChannelFuture f = b.bind(port).sync();

log.info("server start ok on:"+port);
log.info("server start ok on:" + port);


/**
* 这里会一直等待,直到socket被关闭
*/
f.channel().closeFuture().sync();


log.info("server start ok on:" + port);

} catch (Exception e) {
log.error("server start exception",e);
log.error("server start exception", e);
/***
* 优雅关闭
*/
Expand Down
Expand Up @@ -32,6 +32,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
logger.info("client,"+ctx.channel().remoteAddress().toString(),"channelInactive");
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
Expand Down
5 changes: 4 additions & 1 deletion mpush-core/pom.xml
Expand Up @@ -67,7 +67,10 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>com.shinemo.mpush</groupId>
<artifactId>mpush-tools</artifactId>
</dependency>
</dependencies>

</project>

0 comments on commit 8bbae9c

Please sign in to comment.