Skip to content

Commit

Permalink
add push callback
Browse files Browse the repository at this point in the history
  • Loading branch information
闫逍旭 committed Dec 30, 2015
1 parent 3af301e commit e6e36f9
Show file tree
Hide file tree
Showing 15 changed files with 100 additions and 32 deletions.
Expand Up @@ -10,13 +10,20 @@ public enum Command {
LOGOUT(4), LOGOUT(4),
BIND(5), BIND(5),
UNBIND(6), UNBIND(6),
KICK(7), FAST_CONNECT(7),
FAST_CONNECT(8), ERROR(8),
ERROR(9), OK(9),
OK(10), API(10),
PUSH(11), KICK(11),
API(12), GATEWAY_KICK(12),
GATEWAY_PUSH(13), PUSH(13),
GATEWAY_PUSH(14),
NOTIFICATION(15),
GATEWAY_NOTIFICATION(16),
CHAT(17),
GATEWAY_CHAT(18),
GROUP(19),
GATEWAY_GROUP(20),
UNKNOWN(-1); UNKNOWN(-1);


Command(int cmd) { Command(int cmd) {
Expand Down
Expand Up @@ -4,12 +4,12 @@
import com.shinemo.mpush.api.PacketReceiver; import com.shinemo.mpush.api.PacketReceiver;
import com.shinemo.mpush.api.PushSender; import com.shinemo.mpush.api.PushSender;
import com.shinemo.mpush.api.connection.Connection; import com.shinemo.mpush.api.connection.Connection;
import com.shinemo.mpush.common.message.gateway.GatewayPushMessage;
import com.shinemo.mpush.api.protocol.Command; import com.shinemo.mpush.api.protocol.Command;
import com.shinemo.mpush.api.router.ClientLocation; import com.shinemo.mpush.api.router.ClientLocation;
import com.shinemo.mpush.common.MessageDispatcher; import com.shinemo.mpush.common.MessageDispatcher;
import com.shinemo.mpush.common.handler.ErrorMessageHandler; import com.shinemo.mpush.common.handler.ErrorMessageHandler;
import com.shinemo.mpush.common.handler.OkMessageHandler; import com.shinemo.mpush.common.handler.OkMessageHandler;
import com.shinemo.mpush.common.message.gateway.GatewayPushMessage;
import com.shinemo.mpush.common.router.RemoteRouter; import com.shinemo.mpush.common.router.RemoteRouter;
import com.shinemo.mpush.common.router.RemoteRouterManager; import com.shinemo.mpush.common.router.RemoteRouterManager;
import com.shinemo.mpush.common.router.RouterCenter; import com.shinemo.mpush.common.router.RouterCenter;
Expand Down
17 changes: 17 additions & 0 deletions mpush-common/src/main/java/com/shinemo/mpush/common/ErrorCode.java
@@ -0,0 +1,17 @@
package com.shinemo.mpush.common;

/**
* Created by ohun on 2015/12/30.
*/
public enum ErrorCode {
OFFLINE(1, "user offline"),
PUSH_CLIENT_FAILURE(2, "push to client failure"),;

ErrorCode(int code, String errorMsg) {
this.errorMsg = errorMsg;
this.errorCode = (byte) code;
}

public final byte errorCode;
public final String errorMsg;
}
Expand Up @@ -3,6 +3,7 @@
import com.shinemo.mpush.api.connection.Connection; import com.shinemo.mpush.api.connection.Connection;
import com.shinemo.mpush.api.protocol.Command; import com.shinemo.mpush.api.protocol.Command;
import com.shinemo.mpush.api.protocol.Packet; import com.shinemo.mpush.api.protocol.Packet;
import com.shinemo.mpush.common.ErrorCode;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;


Expand Down Expand Up @@ -47,8 +48,9 @@ public ErrorMessage setReason(String reason) {
return this; return this;
} }


public ErrorMessage setCode(byte code) { public ErrorMessage setErrorCode(ErrorCode code) {
this.code = code; this.code = code.errorCode;
this.reason = code.errorMsg;
return this; return this;
} }


Expand Down
@@ -1,25 +1,49 @@
package com.shinemo.mpush.common.router; package com.shinemo.mpush.common.router;


import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.shinemo.mpush.api.router.RouterManager; import com.shinemo.mpush.api.router.RouterManager;


import java.util.concurrent.TimeUnit;

/** /**
* Created by ohun on 2015/12/23. * Created by ohun on 2015/12/23.
*/ */
public class RemoteRouterManager implements RouterManager<RemoteRouter> { public class RemoteRouterManager implements RouterManager<RemoteRouter> {
// TODO: 2015/12/30 add local cache // TODO: 2015/12/30 可以增加一层本地缓存,防止疯狂查询redis, 但是要注意失效问题及数据不一致问题
private final Cache<String, RemoteRouter> cache = CacheBuilder
.newBuilder()
.expireAfterWrite(5, TimeUnit.MINUTES)
.expireAfterAccess(5, TimeUnit.MINUTES)
.build();



@Override @Override
public RemoteRouter register(String userId, RemoteRouter route) { public RemoteRouter register(String userId, RemoteRouter route) {
return null; RemoteRouter old = cache.getIfPresent(userId);
cache.put(userId, route);
return old;
} }


@Override @Override
public boolean unRegister(String userId) { public boolean unRegister(String userId) {
cache.invalidate(userId);
return true; return true;
} }


@Override @Override
public RemoteRouter lookup(String userId) { public RemoteRouter lookup(String userId) {
return null; return cache.getIfPresent(userId);
}

/**
* 如果推送失败,可能是缓存不一致了,可以让本地缓存失效
* <p>
* 失效对应的本地缓存
*
* @param userId
*/
public void invalidateLocalCache(String userId) {
cache.invalidate(userId);
} }
} }
@@ -0,0 +1,8 @@
package com.shinemo.mpush.core.client;

/**
* Created by ohun on 2015/12/30.
*/
public class GatewayClient {

}
@@ -1,6 +1,7 @@
package com.shinemo.mpush.core.handler; package com.shinemo.mpush.core.handler;


import com.shinemo.mpush.api.connection.Connection; import com.shinemo.mpush.api.connection.Connection;
import com.shinemo.mpush.common.ErrorCode;
import com.shinemo.mpush.common.message.ErrorMessage; import com.shinemo.mpush.common.message.ErrorMessage;
import com.shinemo.mpush.common.message.OkMessage; import com.shinemo.mpush.common.message.OkMessage;
import com.shinemo.mpush.common.message.PushMessage; import com.shinemo.mpush.common.message.PushMessage;
Expand All @@ -24,24 +25,36 @@ public GatewayPushMessage decode(Packet packet, Connection connection) {
@Override @Override
public void handle(final GatewayPushMessage message) { public void handle(final GatewayPushMessage message) {
Router<?> router = RouterCenter.INSTANCE.lookup(message.userId); Router<?> router = RouterCenter.INSTANCE.lookup(message.userId);
if (router.getRouteType() == Router.RouterType.LOCAL) { if (router == null) {
//1.路由信息不存在说明用户此时不在线
ErrorMessage
.from(message)
.setErrorCode(ErrorCode.OFFLINE)
.send();
} else if (router.getRouteType() == Router.RouterType.LOCAL) {
//2.如果是本地路由信息,说明用户链接在当前机器,直接把消息下发到客户端
Connection connection = (Connection) router.getRouteValue(); Connection connection = (Connection) router.getRouteValue();
PushMessage pushMessage = new PushMessage(message.content, connection); PushMessage pushMessage = new PushMessage(message.content, connection);
pushMessage.send(new ChannelFutureListener() { pushMessage.send(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) { if (future.isSuccess()) {//推送成功
OkMessage.from(message).send(); OkMessage
} else { .from(message)
.setData(message.userId)
.send();
} else {//推送失败
ErrorMessage ErrorMessage
.from(message) .from(message)
.setCode((byte) 1) .setErrorCode(ErrorCode.PUSH_CLIENT_FAILURE)
.setReason("push to client error")
.send(); .send();
} }
} }
}); });

} else { } else {
//3.如果是远程路由,说明此时用户已经跑到另一台机器上了
// 需要通过GatewayClient或ZK把消息推送到另外一台机器上
// TODO: 2015/12/30 send message to other server // TODO: 2015/12/30 send message to other server
} }
} }
Expand Down
Expand Up @@ -7,7 +7,7 @@
/** /**
* Created by ohun on 2015/12/22. * Created by ohun on 2015/12/22.
*/ */
public final class HeartBeatHandler implements MessageHandler { public final class HeartbeatHandler implements MessageHandler {
@Override @Override
public void handle(Packet packet, Connection connection) { public void handle(Packet packet, Connection connection) {
System.err.println("receive client heartbeat, time=" System.err.println("receive client heartbeat, time="
Expand Down
Expand Up @@ -8,7 +8,7 @@
/** /**
* Created by ohun on 2015/12/30. * Created by ohun on 2015/12/30.
*/ */
public class ConnectionServer extends NettyServer { public final class ConnectionServer extends NettyServer {


public ConnectionServer(int port, ChannelHandler channelHandler) { public ConnectionServer(int port, ChannelHandler channelHandler) {
super(port, channelHandler); super(port, channelHandler);
Expand Down
Expand Up @@ -6,7 +6,7 @@
/** /**
* Created by ohun on 2015/12/30. * Created by ohun on 2015/12/30.
*/ */
public class GatewayServer extends NettyServer { public final class GatewayServer extends NettyServer {


public GatewayServer(int port, ChannelHandler channelHandler) { public GatewayServer(int port, ChannelHandler channelHandler) {
super(port, channelHandler); super(port, channelHandler);
Expand Down
@@ -1,4 +1,4 @@
package com.shinemo.mpush.core; package com.shinemo.mpush.core.server;




import com.shinemo.mpush.api.connection.ConnectionManager; import com.shinemo.mpush.api.connection.ConnectionManager;
Expand Down
Expand Up @@ -2,9 +2,10 @@


import com.shinemo.mpush.api.protocol.Command; import com.shinemo.mpush.api.protocol.Command;
import com.shinemo.mpush.common.MessageDispatcher; import com.shinemo.mpush.common.MessageDispatcher;
import com.shinemo.mpush.core.ServerChannelHandler; import com.shinemo.mpush.core.server.ServerChannelHandler;
import com.shinemo.mpush.core.handler.BindUserHandler; import com.shinemo.mpush.core.handler.BindUserHandler;
import com.shinemo.mpush.core.handler.HandshakeHandler; import com.shinemo.mpush.core.handler.HandshakeHandler;
import com.shinemo.mpush.core.handler.HeartbeatHandler;
import com.shinemo.mpush.netty.connection.NettyConnectionManager; import com.shinemo.mpush.netty.connection.NettyConnectionManager;
import com.shinemo.mpush.core.server.ConnectionServer; import com.shinemo.mpush.core.server.ConnectionServer;
import com.shinemo.mpush.netty.server.NettyServer; import com.shinemo.mpush.netty.server.NettyServer;
Expand All @@ -26,8 +27,8 @@ public void testStart() throws Exception {


MessageDispatcher receiver = new MessageDispatcher(); MessageDispatcher receiver = new MessageDispatcher();
receiver.register(Command.HANDSHAKE, new HandshakeHandler()); receiver.register(Command.HANDSHAKE, new HandshakeHandler());
receiver.register(Command.HEARTBEAT, new BindUserHandler());
receiver.register(Command.BIND, new BindUserHandler()); receiver.register(Command.BIND, new BindUserHandler());
receiver.register(Command.HEARTBEAT, new HeartbeatHandler());
NettyConnectionManager connectionManager = new NettyConnectionManager(); NettyConnectionManager connectionManager = new NettyConnectionManager();
connectionManager.registerEventBus(); connectionManager.registerEventBus();
ChannelHandler handler = new ServerChannelHandler(connectionManager, receiver); ChannelHandler handler = new ServerChannelHandler(connectionManager, receiver);
Expand All @@ -37,10 +38,7 @@ public void testStart() throws Exception {


Runtime.getRuntime().addShutdownHook(new Thread() { Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() { public void run() {
try { server.stop();
server.stop();
} catch (Exception e) {
}
} }
}); });
} }
Expand Down
Expand Up @@ -12,7 +12,6 @@ public class CipherBoxTest {


@Test @Test
public void testGetPrivateKey() throws Exception { public void testGetPrivateKey() throws Exception {
CipherBox.INSTANCE.getPrivateKey();
} }


@Test @Test
Expand Down
Expand Up @@ -21,7 +21,7 @@
/** /**
* Created by ohun on 2015/12/22. * Created by ohun on 2015/12/22.
*/ */
public class NettyConnectionManager implements ConnectionManager { public final class NettyConnectionManager implements ConnectionManager {
private static final Logger LOGGER = LoggerFactory.getLogger(NettyConnectionManager.class); private static final Logger LOGGER = LoggerFactory.getLogger(NettyConnectionManager.class);


public void registerEventBus() { public void registerEventBus() {
Expand Down
Expand Up @@ -3,7 +3,7 @@


public enum PathEnum { public enum PathEnum {


CONNECTION_SERVER_ALL_HOST("/cs/allhost/machine","连接服务器应用注册的路径"){ CONNECTION_SERVER_ALL_HOST("/cs/hosts/machine","连接服务器应用注册的路径"){
@Override @Override
public String getPathByIp(String ip) { public String getPathByIp(String ip) {
return getPath(); return getPath();
Expand Down

0 comments on commit e6e36f9

Please sign in to comment.