Skip to content

Commit

Permalink
add push callback
Browse files Browse the repository at this point in the history
  • Loading branch information
ohun committed Dec 30, 2015
1 parent 60f77be commit d24c6a2
Show file tree
Hide file tree
Showing 15 changed files with 100 additions and 32 deletions.
Expand Up @@ -10,13 +10,20 @@ public enum Command {
LOGOUT(4),
BIND(5),
UNBIND(6),
KICK(7),
FAST_CONNECT(8),
ERROR(9),
OK(10),
PUSH(11),
API(12),
GATEWAY_PUSH(13),
FAST_CONNECT(7),
ERROR(8),
OK(9),
API(10),
KICK(11),
GATEWAY_KICK(12),
PUSH(13),
GATEWAY_PUSH(14),
NOTIFICATION(15),
GATEWAY_NOTIFICATION(16),
CHAT(17),
GATEWAY_CHAT(18),
GROUP(19),
GATEWAY_GROUP(20),
UNKNOWN(-1);

Command(int cmd) {
Expand Down
Expand Up @@ -4,12 +4,12 @@
import com.shinemo.mpush.api.PacketReceiver;
import com.shinemo.mpush.api.PushSender;
import com.shinemo.mpush.api.connection.Connection;
import com.shinemo.mpush.common.message.gateway.GatewayPushMessage;
import com.shinemo.mpush.api.protocol.Command;
import com.shinemo.mpush.api.router.ClientLocation;
import com.shinemo.mpush.common.MessageDispatcher;
import com.shinemo.mpush.common.handler.ErrorMessageHandler;
import com.shinemo.mpush.common.handler.OkMessageHandler;
import com.shinemo.mpush.common.message.gateway.GatewayPushMessage;
import com.shinemo.mpush.common.router.RemoteRouter;
import com.shinemo.mpush.common.router.RemoteRouterManager;
import com.shinemo.mpush.common.router.RouterCenter;
Expand Down
17 changes: 17 additions & 0 deletions mpush-common/src/main/java/com/shinemo/mpush/common/ErrorCode.java
@@ -0,0 +1,17 @@
package com.shinemo.mpush.common;

/**
* Created by ohun on 2015/12/30.
*/
public enum ErrorCode {
OFFLINE(1, "user offline"),
PUSH_CLIENT_FAILURE(2, "push to client failure"),;

ErrorCode(int code, String errorMsg) {
this.errorMsg = errorMsg;
this.errorCode = (byte) code;
}

public final byte errorCode;
public final String errorMsg;
}
Expand Up @@ -3,6 +3,7 @@
import com.shinemo.mpush.api.connection.Connection;
import com.shinemo.mpush.api.protocol.Command;
import com.shinemo.mpush.api.protocol.Packet;
import com.shinemo.mpush.common.ErrorCode;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener;

Expand Down Expand Up @@ -47,8 +48,9 @@ public ErrorMessage setReason(String reason) {
return this;
}

public ErrorMessage setCode(byte code) {
this.code = code;
public ErrorMessage setErrorCode(ErrorCode code) {
this.code = code.errorCode;
this.reason = code.errorMsg;
return this;
}

Expand Down
@@ -1,25 +1,49 @@
package com.shinemo.mpush.common.router;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.shinemo.mpush.api.router.RouterManager;

import java.util.concurrent.TimeUnit;

/**
* Created by ohun on 2015/12/23.
*/
public class RemoteRouterManager implements RouterManager<RemoteRouter> {
// TODO: 2015/12/30 add local cache
// TODO: 2015/12/30 可以增加一层本地缓存,防止疯狂查询redis, 但是要注意失效问题及数据不一致问题
private final Cache<String, RemoteRouter> cache = CacheBuilder
.newBuilder()
.expireAfterWrite(5, TimeUnit.MINUTES)
.expireAfterAccess(5, TimeUnit.MINUTES)
.build();


@Override
public RemoteRouter register(String userId, RemoteRouter route) {
return null;
RemoteRouter old = cache.getIfPresent(userId);
cache.put(userId, route);
return old;
}

@Override
public boolean unRegister(String userId) {
cache.invalidate(userId);
return true;
}

@Override
public RemoteRouter lookup(String userId) {
return null;
return cache.getIfPresent(userId);
}

/**
* 如果推送失败,可能是缓存不一致了,可以让本地缓存失效
* <p>
* 失效对应的本地缓存
*
* @param userId
*/
public void invalidateLocalCache(String userId) {
cache.invalidate(userId);
}
}
@@ -0,0 +1,8 @@
package com.shinemo.mpush.core.client;

/**
* Created by ohun on 2015/12/30.
*/
public class GatewayClient {

}
@@ -1,6 +1,7 @@
package com.shinemo.mpush.core.handler;

import com.shinemo.mpush.api.connection.Connection;
import com.shinemo.mpush.common.ErrorCode;
import com.shinemo.mpush.common.message.ErrorMessage;
import com.shinemo.mpush.common.message.OkMessage;
import com.shinemo.mpush.common.message.PushMessage;
Expand All @@ -24,24 +25,36 @@ public GatewayPushMessage decode(Packet packet, Connection connection) {
@Override
public void handle(final GatewayPushMessage message) {
Router<?> router = RouterCenter.INSTANCE.lookup(message.userId);
if (router.getRouteType() == Router.RouterType.LOCAL) {
if (router == null) {
//1.路由信息不存在说明用户此时不在线
ErrorMessage
.from(message)
.setErrorCode(ErrorCode.OFFLINE)
.send();
} else if (router.getRouteType() == Router.RouterType.LOCAL) {
//2.如果是本地路由信息,说明用户链接在当前机器,直接把消息下发到客户端
Connection connection = (Connection) router.getRouteValue();
PushMessage pushMessage = new PushMessage(message.content, connection);
pushMessage.send(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
OkMessage.from(message).send();
} else {
if (future.isSuccess()) {//推送成功
OkMessage
.from(message)
.setData(message.userId)
.send();
} else {//推送失败
ErrorMessage
.from(message)
.setCode((byte) 1)
.setReason("push to client error")
.setErrorCode(ErrorCode.PUSH_CLIENT_FAILURE)
.send();
}
}
});

} else {
//3.如果是远程路由,说明此时用户已经跑到另一台机器上了
// 需要通过GatewayClient或ZK把消息推送到另外一台机器上
// TODO: 2015/12/30 send message to other server
}
}
Expand Down
Expand Up @@ -7,7 +7,7 @@
/**
* Created by ohun on 2015/12/22.
*/
public final class HeartBeatHandler implements MessageHandler {
public final class HeartbeatHandler implements MessageHandler {
@Override
public void handle(Packet packet, Connection connection) {
System.err.println("receive client heartbeat, time="
Expand Down
Expand Up @@ -8,7 +8,7 @@
/**
* Created by ohun on 2015/12/30.
*/
public class ConnectionServer extends NettyServer {
public final class ConnectionServer extends NettyServer {

public ConnectionServer(int port, ChannelHandler channelHandler) {
super(port, channelHandler);
Expand Down
Expand Up @@ -6,7 +6,7 @@
/**
* Created by ohun on 2015/12/30.
*/
public class GatewayServer extends NettyServer {
public final class GatewayServer extends NettyServer {

public GatewayServer(int port, ChannelHandler channelHandler) {
super(port, channelHandler);
Expand Down
@@ -1,4 +1,4 @@
package com.shinemo.mpush.core;
package com.shinemo.mpush.core.server;


import com.shinemo.mpush.api.connection.ConnectionManager;
Expand Down
Expand Up @@ -2,9 +2,10 @@

import com.shinemo.mpush.api.protocol.Command;
import com.shinemo.mpush.common.MessageDispatcher;
import com.shinemo.mpush.core.ServerChannelHandler;
import com.shinemo.mpush.core.server.ServerChannelHandler;
import com.shinemo.mpush.core.handler.BindUserHandler;
import com.shinemo.mpush.core.handler.HandshakeHandler;
import com.shinemo.mpush.core.handler.HeartbeatHandler;
import com.shinemo.mpush.netty.connection.NettyConnectionManager;
import com.shinemo.mpush.core.server.ConnectionServer;
import com.shinemo.mpush.netty.server.NettyServer;
Expand All @@ -26,8 +27,8 @@ public void testStart() throws Exception {

MessageDispatcher receiver = new MessageDispatcher();
receiver.register(Command.HANDSHAKE, new HandshakeHandler());
receiver.register(Command.HEARTBEAT, new BindUserHandler());
receiver.register(Command.BIND, new BindUserHandler());
receiver.register(Command.HEARTBEAT, new HeartbeatHandler());
NettyConnectionManager connectionManager = new NettyConnectionManager();
connectionManager.registerEventBus();
ChannelHandler handler = new ServerChannelHandler(connectionManager, receiver);
Expand All @@ -37,10 +38,7 @@ public void testStart() throws Exception {

Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
server.stop();
} catch (Exception e) {
}
server.stop();
}
});
}
Expand Down
Expand Up @@ -12,7 +12,6 @@ public class CipherBoxTest {

@Test
public void testGetPrivateKey() throws Exception {
CipherBox.INSTANCE.getPrivateKey();
}

@Test
Expand Down
Expand Up @@ -21,7 +21,7 @@
/**
* Created by ohun on 2015/12/22.
*/
public class NettyConnectionManager implements ConnectionManager {
public final class NettyConnectionManager implements ConnectionManager {
private static final Logger LOGGER = LoggerFactory.getLogger(NettyConnectionManager.class);

public void registerEventBus() {
Expand Down
Expand Up @@ -3,7 +3,7 @@

public enum PathEnum {

CONNECTION_SERVER_ALL_HOST("/cs/allhost/machine","连接服务器应用注册的路径"){
CONNECTION_SERVER_ALL_HOST("/cs/hosts/machine","连接服务器应用注册的路径"){
@Override
public String getPathByIp(String ip) {
return getPath();
Expand Down

0 comments on commit d24c6a2

Please sign in to comment.