Skip to content

Commit

Permalink
add hand shake
Browse files Browse the repository at this point in the history
  • Loading branch information
闫逍旭 committed Dec 25, 2015
1 parent 0e76c92 commit 08dcb3d
Show file tree
Hide file tree
Showing 28 changed files with 935 additions and 89 deletions.
19 changes: 11 additions & 8 deletions mpush-api/src/main/java/com/shinemo/mpush/api/Connection.java
Expand Up @@ -9,24 +9,27 @@
*/ */
public interface Connection { public interface Connection {


void setConnectionInfo(ConnectionInfo info);

String getId(); String getId();


void send(Packet packet); void send(Packet packet);


boolean isClosed(); boolean isClosed();


boolean isOpen(); boolean isOpen();

int getHbTimes(); int getHbTimes();

void close(); void close();


boolean isConnected(); boolean isConnected();

boolean isEnable();

void init(Channel channel);

String remoteIp();


boolean isEnable();


void init(Channel channel);

String remoteIp();

} }
Expand Up @@ -4,14 +4,16 @@
* Created by ohun on 2015/12/22. * Created by ohun on 2015/12/22.
*/ */
public class ConnectionInfo { public class ConnectionInfo {
public final String os; public final String osName;
public final String clientVer; public final String osVersion;
public final String clientVersion;
public final String deviceId; public final String deviceId;
public final String desKey; public final String desKey;


public ConnectionInfo(String os, String clientVer, String deviceId, String desKey) { public ConnectionInfo(String osName, String osVersion, String clientVersion, String deviceId, String desKey) {
this.os = os; this.osName = osName;
this.clientVer = clientVer; this.osVersion = osVersion;
this.clientVersion = clientVersion;
this.deviceId = deviceId; this.deviceId = deviceId;
this.desKey = desKey; this.desKey = desKey;
} }
Expand Down
10 changes: 6 additions & 4 deletions mpush-api/src/main/java/com/shinemo/mpush/api/Constants.java
Expand Up @@ -7,21 +7,23 @@
*/ */
public interface Constants { public interface Constants {
Charset UTF_8 = Charset.forName("UTF-8"); Charset UTF_8 = Charset.forName("UTF-8");
byte[] EMPTY_BYTES = new byte[0];
int MAX_PACKET_SIZE = 1024; int MAX_PACKET_SIZE = 1024;
int HEADER_LEN = 13; int HEADER_LEN = 13;
byte MAGIC_NUM1 = (byte) 33; byte MAGIC_NUM1 = (byte) 33;
byte MAGIC_NUM2 = (byte) 99; byte MAGIC_NUM2 = (byte) 99;
long TIME_DELAY = 58L; long TIME_DELAY = 58L;

String JVM_LOG_PATH = "/opt/"; String JVM_LOG_PATH = "/opt/";

int THREAD_QUEUE_SIZE = 10000; int THREAD_QUEUE_SIZE = 10000;
int MIN_POOL_SIZE = 50; int MIN_POOL_SIZE = 50;
int MAX_POOL_SIZE = 500; int MAX_POOL_SIZE = 500;

int MIN_BOSS_POOL_SIZE = 10; int MIN_BOSS_POOL_SIZE = 10;
int MAX_BOSS_POLL_SIZE = 50; int MAX_BOSS_POLL_SIZE = 50;

int MIN_WORK_POOL_SIZE = 10; int MIN_WORK_POOL_SIZE = 10;
int MAX_WORK_POOL_SIZE = 250; int MAX_WORK_POOL_SIZE = 250;
int HEARTBEAT_TIME = 60 * 2 * 1000;//2min
} }
24 changes: 12 additions & 12 deletions mpush-api/src/main/java/com/shinemo/mpush/api/RouterInfo.java
Expand Up @@ -4,20 +4,20 @@
* Created by ohun on 2015/12/23. * Created by ohun on 2015/12/23.
*/ */
public class RouterInfo { public class RouterInfo {
private String ip; private String serverIp;
private String os; private String osName;
private String clientVer; private String clientVer;


public RouterInfo(String ip) { public RouterInfo(String serverIp) {
this.ip = ip; this.serverIp = serverIp;
} }


public String getOs() { public String getOsName() {
return os; return osName;
} }


public void setOs(String os) { public void setOsName(String osName) {
this.os = os; this.osName = osName;
} }


public String getClientVer() { public String getClientVer() {
Expand All @@ -28,11 +28,11 @@ public void setClientVer(String clientVer) {
this.clientVer = clientVer; this.clientVer = clientVer;
} }


public String getIp() { public String getServerIp() {
return ip; return serverIp;
} }


public void setIp(String ip) { public void setServerIp(String serverIp) {
this.ip = ip; this.serverIp = serverIp;
} }
} }
14 changes: 14 additions & 0 deletions mpush-api/src/main/java/com/shinemo/mpush/api/Server.java
@@ -0,0 +1,14 @@
package com.shinemo.mpush.api;

/**
* Created by ohun on 2015/12/24.
*/
public interface Server {
void init();

void start();

void stop();

boolean isRunning();
}
Expand Up @@ -6,6 +6,7 @@
import com.shinemo.mpush.api.Request; import com.shinemo.mpush.api.Request;
import com.shinemo.mpush.core.ConnectionManager; import com.shinemo.mpush.core.ConnectionManager;
import com.shinemo.mpush.core.MessageReceiver; import com.shinemo.mpush.core.MessageReceiver;
import com.shinemo.mpush.core.NettyConnection;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
Expand All @@ -19,52 +20,40 @@
*/ */
@ChannelHandler.Sharable @ChannelHandler.Sharable
public class ConnectionHandler extends ChannelHandlerAdapter { public class ConnectionHandler extends ChannelHandlerAdapter {

private static final Logger log = LoggerFactory.getLogger(ConnectionHandler.class); private static final Logger log = LoggerFactory.getLogger(ConnectionHandler.class);

private MessageReceiver receiver; private final MessageReceiver receiver;

public ConnectionHandler(MessageReceiver receiver) { public ConnectionHandler(MessageReceiver receiver) {
this.receiver = receiver; this.receiver = receiver;
} }


@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.warn(ctx.channel().remoteAddress()+", channelRead"); log.warn(ctx.channel().remoteAddress() + ", channelRead");
Connection connection = ConnectionManager.INSTANCE.get(ctx.channel()); Connection connection = ConnectionManager.INSTANCE.get(ctx.channel());

receiver.onMessage(new Request((Packet) msg, connection)); receiver.onMessage(new Request((Packet) msg, connection));
} }


@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ConnectionManager.INSTANCE.remove(ctx.channel()); ConnectionManager.INSTANCE.remove(ctx.channel());
log.warn("",ctx.channel().remoteAddress()+", exceptionCaught"); log.warn(ctx.channel().remoteAddress() + ", exceptionCaught");
} }


@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.warn(ctx.channel().remoteAddress()+", channelActive"); log.warn(ctx.channel().remoteAddress() + ", channelActive");
super.channelActive(ctx); Connection connection = new NettyConnection();
} connection.init(ctx.channel());

ConnectionManager.INSTANCE.add(connection);
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.warn(ctx.channel().remoteAddress()+", channelInactive");
super.channelInactive(ctx);
}

@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
log.warn(ctx.channel().remoteAddress()+", disconnect");
super.disconnect(ctx, promise);
ConnectionManager.INSTANCE.remove(ctx.channel());
} }


@Override @Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.warn(ctx.channel().remoteAddress()+", close"); log.warn(ctx.channel().remoteAddress() + ", channelInactive");
super.close(ctx, promise);
ConnectionManager.INSTANCE.remove(ctx.channel()); ConnectionManager.INSTANCE.remove(ctx.channel());
} }

} }
Expand Up @@ -5,6 +5,7 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import com.shinemo.mpush.api.Server;
import com.shinemo.mpush.connection.netty.NettySharedHolder; import com.shinemo.mpush.connection.netty.NettySharedHolder;
import com.shinemo.mpush.connection.netty.encoder.PacketDecoder; import com.shinemo.mpush.connection.netty.encoder.PacketDecoder;
import com.shinemo.mpush.connection.netty.encoder.PacketEncoder; import com.shinemo.mpush.connection.netty.encoder.PacketEncoder;
Expand All @@ -20,18 +21,19 @@
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicBoolean;


/** /**
* Created by ohun on 2015/12/22. * Created by ohun on 2015/12/22.
*/ */
public class ConnectionServer { public class ConnectionServer implements Server {

private static final Logger log = LoggerFactory.getLogger(ConnectionServer.class); private static final Logger log = LoggerFactory.getLogger(ConnectionServer.class);

private final AtomicBoolean startFlag = new AtomicBoolean(false); private final AtomicBoolean startFlag = new AtomicBoolean(false);



private final int port; private final int port;
private EventLoopGroup bossGroup; private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup; private EventLoopGroup workerGroup;
Expand All @@ -40,19 +42,30 @@ public ConnectionServer(int port) {
this.port = port; this.port = port;
} }


@Override
public void init() {

}

@Override
public boolean isRunning() {
return startFlag.get();
}

@Override
public void stop() { public void stop() {
log.info("netty server stop now"); log.info("netty server stop now");
this.startFlag.set(false); this.startFlag.set(false);
if (workerGroup != null) workerGroup.shutdownGracefully(); if (workerGroup != null) workerGroup.shutdownGracefully();
if (bossGroup != null) bossGroup.shutdownGracefully(); if (bossGroup != null) bossGroup.shutdownGracefully();
} }


@Override
public void start() { public void start() {

if (!startFlag.compareAndSet(false, true)) { if (!startFlag.compareAndSet(false, true)) {
return; return;
} }

/*** /***
* NioEventLoopGroup 是用来处理I/O操作的多线程事件循环器, * NioEventLoopGroup 是用来处理I/O操作的多线程事件循环器,
* Netty提供了许多不同的EventLoopGroup的实现用来处理不同传输协议。 * Netty提供了许多不同的EventLoopGroup的实现用来处理不同传输协议。
Expand All @@ -64,8 +77,8 @@ public void start() {
* 如何知道多少个线程已经被使用,如何映射到已经创建的Channels上都需要依赖于EventLoopGroup的实现, * 如何知道多少个线程已经被使用,如何映射到已经创建的Channels上都需要依赖于EventLoopGroup的实现,
* 并且可以通过构造函数来配置他们的关系。 * 并且可以通过构造函数来配置他们的关系。
*/ */
this.bossGroup = new NioEventLoopGroup(0,ThreadPoolUtil.getBossExecutor()); this.bossGroup = new NioEventLoopGroup(0, ThreadPoolUtil.getBossExecutor());
this.workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(),ThreadPoolUtil.getWorkExecutor()); this.workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(), ThreadPoolUtil.getWorkExecutor());


try { try {


Expand All @@ -86,18 +99,16 @@ public void start() {
* 这里告诉Channel如何获取新的连接. * 这里告诉Channel如何获取新的连接.
*/ */
b.channel(NioServerSocketChannel.class); b.channel(NioServerSocketChannel.class);


MessageReceiver receiver = new MessageReceiver(); final ConnectionHandler connectionHandler = new ConnectionHandler(new MessageReceiver());

final ConnectionHandler connectionHandler = new ConnectionHandler(receiver);


/*** /***
* 这里的事件处理类经常会被用来处理一个最近的已经接收的Channel。 * 这里的事件处理类经常会被用来处理一个最近的已经接收的Channel。
* ChannelInitializer是一个特殊的处理类, * ChannelInitializer是一个特殊的处理类,
* 他的目的是帮助使用者配置一个新的Channel。 * 他的目的是帮助使用者配置一个新的Channel。
* 也许你想通过增加一些处理类比如NettyServerHandler来配置一个新的Channel * 也许你想通过增加一些处理类比如NettyServerHandler来配置一个新的Channel
* 或者其对应的ChannelPipeline来实现你的网络程序。 * 或者其对应的ChannelPipeline来实现你的网络程序。
* 当你的程序变的复杂时,可能你会增加更多的处理类到pipline上 * 当你的程序变的复杂时,可能你会增加更多的处理类到pipeline上
* 然后提取这些匿名类到最顶层的类上。 * 然后提取这些匿名类到最顶层的类上。
*/ */
b.childHandler(new ChannelInitializer<SocketChannel>() { // (4) b.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
Expand All @@ -116,33 +127,35 @@ public void initChannel(SocketChannel ch) throws Exception {
* 请参考ChannelOption和详细的ChannelConfig实现的接口文档以此可以对ChannelOptions的有一个大概的认识。 * 请参考ChannelOption和详细的ChannelConfig实现的接口文档以此可以对ChannelOptions的有一个大概的认识。
*/ */
b.option(ChannelOption.SO_BACKLOG, 1024); b.option(ChannelOption.SO_BACKLOG, 1024);

/*** /***
* option()是提供给NioServerSocketChannel用来接收进来的连接。 * option()是提供给NioServerSocketChannel用来接收进来的连接。
* childOption()是提供给由父管道ServerChannel接收到的连接, * childOption()是提供给由父管道ServerChannel接收到的连接,
* 在这个例子中也是NioServerSocketChannel。 * 在这个例子中也是NioServerSocketChannel。
*/ */
b.childOption(ChannelOption.SO_KEEPALIVE, true); b.childOption(ChannelOption.SO_KEEPALIVE, true);

b.option(ChannelOption.ALLOCATOR, NettySharedHolder.byteBufAllocator); b.option(ChannelOption.ALLOCATOR, NettySharedHolder.byteBufAllocator);
b.childOption(ChannelOption.ALLOCATOR, NettySharedHolder.byteBufAllocator); b.childOption(ChannelOption.ALLOCATOR, NettySharedHolder.byteBufAllocator);



/*** /***
* 绑定端口并启动去接收进来的连接 * 绑定端口并启动去接收进来的连接
*/ */
ChannelFuture f = b.bind(port).sync(); ChannelFuture f = b.bind(port).sync();


log.info("server start ok on:"+port); log.info("server start ok on:" + port);


/** /**
* 这里会一直等待,直到socket被关闭 * 这里会一直等待,直到socket被关闭
*/ */
f.channel().closeFuture().sync(); f.channel().closeFuture().sync();


log.info("server start ok on:" + port);

} catch (Exception e) { } catch (Exception e) {
log.error("server start exception",e); log.error("server start exception", e);
/*** /***
* 优雅关闭 * 优雅关闭
*/ */
Expand Down
Expand Up @@ -32,6 +32,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx); super.channelInactive(ctx);
logger.info("client,"+ctx.channel().remoteAddress().toString(),"channelInactive"); logger.info("client,"+ctx.channel().remoteAddress().toString(),"channelInactive");
} }

@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg); super.channelRead(ctx, msg);
Expand Down
5 changes: 4 additions & 1 deletion mpush-core/pom.xml
Expand Up @@ -67,7 +67,10 @@
<groupId>com.google.guava</groupId> <groupId>com.google.guava</groupId>
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
</dependency> </dependency>

<dependency>
<groupId>com.shinemo.mpush</groupId>
<artifactId>mpush-tools</artifactId>
</dependency>
</dependencies> </dependencies>


</project> </project>

0 comments on commit 08dcb3d

Please sign in to comment.