Skip to content

Commit

Permalink
代码重构
Browse files Browse the repository at this point in the history
  • Loading branch information
ohun committed Dec 29, 2015
1 parent 0435b60 commit 06760dc
Show file tree
Hide file tree
Showing 25 changed files with 375 additions and 235 deletions.
43 changes: 20 additions & 23 deletions mpush-api/src/main/java/com/shinemo/mpush/api/Client.java
Expand Up @@ -2,27 +2,24 @@


public interface Client { public interface Client {


public void close(final String cause); void init();


public String toString(); void start();


public boolean isEnabled(); void close(final String cause);


public boolean isConnected(); boolean isEnabled();


public void resetHbTimes(); boolean isConnected();


public int inceaseAndGetHbTimes(); void resetHbTimes();


public void startHeartBeat() throws Exception; int inceaseAndGetHbTimes();


/** String getHost();
* host:port
*/ int getPort();
public String getUrl();

String getUri();
public String getRemoteHost();

public int getRemotePort();

} }
2 changes: 2 additions & 0 deletions mpush-api/src/main/java/com/shinemo/mpush/api/Connection.java
Expand Up @@ -17,6 +17,8 @@ public interface Connection {


void send(Packet packet); void send(Packet packet);


Channel channel();

String getId(); String getId();


boolean isClosed(); boolean isClosed();
Expand Down
7 changes: 3 additions & 4 deletions mpush-api/src/main/java/com/shinemo/mpush/api/Constants.java
Expand Up @@ -10,12 +10,11 @@ public interface Constants {
byte[] EMPTY_BYTES = new byte[0]; byte[] EMPTY_BYTES = new byte[0];
int MAX_PACKET_SIZE = 1024; int MAX_PACKET_SIZE = 1024;
int HEADER_LEN = 13; int HEADER_LEN = 13;
short MAGIC_NUM = 1122;
byte HB = '\n';
int COMPRESS_LIMIT = 1024 * 10; int COMPRESS_LIMIT = 1024 * 10;
byte CRYPTO_FLAG = 0x01; byte CRYPTO_FLAG = 0x01;
byte COMPRESS_FLAG = 0x02; byte COMPRESS_FLAG = 0x02;
long TIME_DELAY = 120L; long TIME_DELAY = 1L;


String JVM_LOG_PATH = "/opt/"; String JVM_LOG_PATH = "/opt/";


Expand All @@ -28,5 +27,5 @@ public interface Constants {


int MIN_WORK_POOL_SIZE = 10; int MIN_WORK_POOL_SIZE = 10;
int MAX_WORK_POOL_SIZE = 250; int MAX_WORK_POOL_SIZE = 250;
int HEARTBEAT_TIME = 60 * 2 * 1000;//2min int HEARTBEAT_TIME = 1000;//2min
} }
@@ -0,0 +1,7 @@
package com.shinemo.mpush.api.event;

/**
* Created by ohun on 2015/12/29.
*/
public interface Event {
}
@@ -0,0 +1,16 @@
package com.shinemo.mpush.api.event;

import com.shinemo.mpush.api.Connection;

/**
* Created by ohun on 2015/12/29.
*/
public class HandshakeEvent implements Event {
public final Connection connection;
public final int heartbeat;

public HandshakeEvent(Connection connection, int heartbeat) {
this.connection = connection;
this.heartbeat = heartbeat;
}
}
@@ -1,6 +1,8 @@
package com.shinemo.mpush.api.protocol; package com.shinemo.mpush.api.protocol;


import com.shinemo.mpush.api.Constants; import com.shinemo.mpush.api.Constants;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;


import java.io.Serializable; import java.io.Serializable;


Expand All @@ -11,6 +13,7 @@
* length(4)+cmd(1)+cc(2)+flags(1)+sessionId(4)+lrc(1)+body(n) * length(4)+cmd(1)+cc(2)+flags(1)+sessionId(4)+lrc(1)+body(n)
*/ */
public final class Packet implements Serializable { public final class Packet implements Serializable {
public static final byte HB_PACKET = '\n';
private static final long serialVersionUID = -2725825199998223372L; private static final long serialVersionUID = -2725825199998223372L;
public byte cmd; //命令 public byte cmd; //命令
public short cc; //校验码 暂时没有用到 public short cc; //校验码 暂时没有用到
Expand Down Expand Up @@ -55,4 +58,8 @@ public String toString() {
", body=" + Arrays.toString(body) + ", body=" + Arrays.toString(body) +
'}'; '}';
} }

public static ByteBuf getHBPacket() {
return Unpooled.buffer(1).writeByte(HB_PACKET);
}
} }
@@ -1,22 +1,34 @@
package com.shinemo.mpush.core; package com.shinemo.mpush.core;




import com.google.common.eventbus.Subscribe;
import com.shinemo.mpush.api.Connection; import com.shinemo.mpush.api.Connection;


import com.shinemo.mpush.api.event.HandshakeEvent;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.internal.chmv8.ConcurrentHashMapV8; import io.netty.util.internal.chmv8.ConcurrentHashMapV8;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;


/** /**
* Created by ohun on 2015/12/22. * Created by ohun on 2015/12/22.
*/ */
public class ConnectionManager { public class ConnectionManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionManager.class);
private static final String IDLE_HANDLER_NAME = "heartbeatHandler";
public static final ConnectionManager INSTANCE = new ConnectionManager(); public static final ConnectionManager INSTANCE = new ConnectionManager();


public ConnectionManager() {
EventBus.INSTANCE.register(this);
}

//可能会有20w的链接数 //可能会有20w的链接数
private final ConcurrentMap<String, Connection> connections = new ConcurrentHashMapV8<String, Connection>(); private final ConcurrentMap<String, Connection> connections = new ConcurrentHashMapV8<String, Connection>();


Expand Down Expand Up @@ -54,4 +66,13 @@ public List<Connection> getConnections() {
return new ArrayList<Connection>(connections.values()); return new ArrayList<Connection>(connections.values());
} }



@Subscribe
public void onHandshakeSuccess(HandshakeEvent event) {
int r = event.heartbeat + 3000;
int w = event.heartbeat + 3000;
Channel channel = event.connection.channel();
channel.pipeline().addFirst(new IdleStateHandler(r, w, 0, TimeUnit.MILLISECONDS));
LOGGER.warn("NettyChannel setHeartbeat readTimeout={}, writeTimeout={}", r, w);
}
} }
45 changes: 45 additions & 0 deletions mpush-core/src/main/java/com/shinemo/mpush/core/EventBus.java
@@ -0,0 +1,45 @@

package com.shinemo.mpush.core;

import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.SubscriberExceptionContext;
import com.google.common.eventbus.SubscriberExceptionHandler;
import com.shinemo.mpush.api.event.Event;
import com.shinemo.mpush.tools.thread.ThreadPoolUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Executor;

/**
* Created by ohun on 2015/12/29.
*/
public class EventBus {
private static final Logger LOGGER = LoggerFactory.getLogger(EventBus.class);
public static final EventBus INSTANCE = new EventBus();
private final com.google.common.eventbus.EventBus eventBus;

public EventBus() {
Executor executor = ThreadPoolUtil.getThreadPoolManager().getThreadExecutor("event-bus-pool", 4, 4);
eventBus = new AsyncEventBus(executor, new SubscriberExceptionHandler() {
@Override
public void handleException(Throwable exception, SubscriberExceptionContext context) {
LOGGER.error("event bus subscriber ex", exception);
}
});
}

public void post(Event event) {
eventBus.post(event);
}


public void register(Object bean) {
eventBus.register(bean);
}

public void unregister(Object bean) {
eventBus.unregister(bean);
}

}
Expand Up @@ -73,6 +73,11 @@ public void operationComplete(ChannelFuture future) throws Exception {
} }
} }


@Override
public Channel channel() {
return channel;
}

@Override @Override
public boolean isClosed() { public boolean isClosed() {
return false; return false;
Expand Down
Expand Up @@ -4,6 +4,8 @@
import com.shinemo.mpush.api.Constants; import com.shinemo.mpush.api.Constants;
import com.shinemo.mpush.api.MessageHandler; import com.shinemo.mpush.api.MessageHandler;
import com.shinemo.mpush.api.SessionContext; import com.shinemo.mpush.api.SessionContext;
import com.shinemo.mpush.api.event.HandshakeEvent;
import com.shinemo.mpush.core.EventBus;
import com.shinemo.mpush.core.message.ErrorMessage; import com.shinemo.mpush.core.message.ErrorMessage;
import com.shinemo.mpush.core.message.HandShakeMessage; import com.shinemo.mpush.core.message.HandShakeMessage;
import com.shinemo.mpush.core.message.HandshakeSuccessMessage; import com.shinemo.mpush.core.message.HandshakeSuccessMessage;
Expand Down Expand Up @@ -70,6 +72,8 @@ public void handle(HandShakeMessage message) {
.setClientVersion(message.clientVersion) .setClientVersion(message.clientVersion)
.setDeviceId(message.deviceId); .setDeviceId(message.deviceId);


//8.触发握手成功事件
EventBus.INSTANCE.post(new HandshakeEvent(message.getConnection(), Constants.HEARTBEAT_TIME));
LOGGER.info("会话密钥:{},clientKey={}, serverKey={}", sessionKey, clientKey, serverKey); LOGGER.info("会话密钥:{},clientKey={}, serverKey={}", sessionKey, clientKey, serverKey);
} }
} }
@@ -0,0 +1,81 @@
package com.shinemo.mpush.core.handler;


import com.shinemo.mpush.api.Constants;
import com.shinemo.mpush.api.protocol.Packet;
import com.shinemo.mpush.api.Connection;
import com.shinemo.mpush.api.Receiver;
import com.shinemo.mpush.core.ConnectionManager;
import com.shinemo.mpush.core.NettyConnection;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Created by ohun on 2015/12/19.
*/
@ChannelHandler.Sharable
public class ServerChannelHandler extends ChannelHandlerAdapter {

private static final Logger LOGGER = LoggerFactory.getLogger(ServerChannelHandler.class);

private final Receiver receiver;

public ServerChannelHandler(Receiver receiver) {
this.receiver = receiver;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Connection connection = ConnectionManager.INSTANCE.get(ctx.channel());
receiver.onReceive((Packet) msg, connection);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ConnectionManager.INSTANCE.remove(ctx.channel());
LOGGER.error(ctx.channel().remoteAddress() + ", exceptionCaught", cause);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LOGGER.warn(ctx.channel().remoteAddress() + ", channelActive");
Connection connection = new NettyConnection();
connection.init(ctx.channel());
ConnectionManager.INSTANCE.add(connection);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LOGGER.warn(ctx.channel().remoteAddress() + ", channelInactive");
ConnectionManager.INSTANCE.remove(ctx.channel());
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent stateEvent = (IdleStateEvent) evt;
switch (stateEvent.state()) {
case READER_IDLE:
ConnectionManager.INSTANCE.remove(ctx.channel());
ctx.close();
LOGGER.warn("heartbeat read timeout, chanel closed!");
break;
case WRITER_IDLE:
ctx.writeAndFlush(Packet.getHBPacket());
LOGGER.warn("heartbeat write timeout, do write an EOL.");
break;
case ALL_IDLE:
}
} else {
LOGGER.warn("One user event Triggered. evt=" + evt);
super.userEventTriggered(ctx, evt);
}
}
}

This file was deleted.

0 comments on commit 06760dc

Please sign in to comment.