Skip to content

Commit

Permalink
增加支持单台机器启动多个实例
Browse files Browse the repository at this point in the history
  • Loading branch information
夜色 committed Dec 7, 2016
1 parent cf5f9d9 commit 79f9658
Show file tree
Hide file tree
Showing 18 changed files with 147 additions and 55 deletions.
22 changes: 22 additions & 0 deletions mpush-api/src/main/java/com/mpush/api/router/ClientLocation.java
Expand Up @@ -34,6 +34,11 @@ public final class ClientLocation {
*/
private String host;

/**
* 长链接所在的机器端口
*/
private int port;

/**
* 客户端系统类型
*/
Expand Down Expand Up @@ -68,6 +73,15 @@ public ClientLocation setHost(String host) {
return this;
}

public int getPort() {
return port;
}

public ClientLocation setPort(int port) {
this.port = port;
return this;
}

public String getOsName() {
return osName;
}
Expand Down Expand Up @@ -120,6 +134,14 @@ public ClientLocation offline() {
return this;
}

public boolean isThisPC(String host, int port) {
return this.port == port && this.host.equals(host);
}

public String getHostAndPort() {
return host + ":" + port;
}

public static ClientLocation from(Connection connection) {
SessionContext context = connection.getSessionContext();
ClientLocation location = new ClientLocation();
Expand Down
Expand Up @@ -27,8 +27,7 @@
import com.mpush.core.server.GatewayUDPConnector;

import static com.mpush.tools.config.CC.mp.net.udpGateway;
import static com.mpush.zk.node.ZKServerNode.csNode;
import static com.mpush.zk.node.ZKServerNode.gsNode;
import static com.mpush.zk.node.ZKServerNode.*;

/**
* Created by yxx on 2016/5/14.
Expand All @@ -43,8 +42,8 @@ public ServerLauncher() {
chain.boot()
.setNext(new ZKBoot())//1.启动ZK节点数据变化监听
.setNext(new RedisBoot())//2.注册redis sever 到ZK
.setNext(new ServerBoot(ConnectionServer.I(), csNode()))//3.启动长连接服务
.setNext(new ServerBoot(udpGateway() ? GatewayUDPConnector.I() : GatewayServer.I(), gsNode()))//4.启动网关服务
.setNext(new ServerBoot(ConnectionServer.I(), CS_NODE))//3.启动长连接服务
.setNext(new ServerBoot(udpGateway() ? GatewayUDPConnector.I() : GatewayServer.I(), GS_NODE))//4.启动网关服务
.setNext(new ServerBoot(AdminServer.I(), null))//5.启动控制台服务
.setNext(new PushCenterBoot())//6.启动http代理服务,解析dns
.setNext(new HttpProxyBoot())//6.启动http代理服务,解析dns
Expand Down
Expand Up @@ -50,9 +50,9 @@ public void init(Listener listener) {
listener.onSuccess();
}

abstract public Connection getConnection(String ip);
abstract public Connection getConnection(String hostAndPort);

abstract public <M extends BaseMessage> Function<String, Void> send(Function<Connection, M> creator, Function<M, Void> sender);
abstract public <M extends BaseMessage> void send(String hostAndPort, Function<Connection, M> creator, Function<M, Void> sender);

abstract public <M extends BaseMessage> void broadcast(Function<Connection, M> creator, Consumer<M> sender);

Expand Down
Expand Up @@ -62,8 +62,8 @@ public void clear() {
}

@Override
public Connection getConnection(String ip) {
GatewayClient client = ip_client.get(ip);
public Connection getConnection(String hostAndPort) {
GatewayClient client = ip_client.get(hostAndPort);
if (client == null) {
return null;//TODO create client
}
Expand All @@ -77,8 +77,10 @@ public Connection getConnection(String ip) {


@Override
public <M extends BaseMessage> Function<String, Void> send(Function<Connection, M> creator, Function<M, Void> sender) {
return creator.compose(this::getConnection).andThen(sender);
public <M extends BaseMessage> void send(String hostAndPort, Function<Connection, M> creator, Function<M, Void> sender) {
creator.compose(this::getConnection)
.andThen(sender)
.apply(hostAndPort);
}

@Override
Expand All @@ -89,7 +91,7 @@ public <M extends BaseMessage> void broadcast(Function<Connection, M> creator, C
}

private void restartClient(final GatewayClient client) {
ip_client.remove(client.getHost());
ip_client.remove(client.getHostAndPort());
client.stop(new Listener() {
@Override
public void onSuccess(Object... args) {
Expand All @@ -105,7 +107,7 @@ public void onFailure(Throwable cause) {

private void removeClient(ZKServerNode node) {
if (node != null) {
Client client = ip_client.remove(node.getIp());
Client client = ip_client.remove(node.getHostAndPort());
if (client != null) {
client.stop(null);
}
Expand All @@ -117,7 +119,7 @@ private void addClient(final String host, final int port) {
client.start(new Listener() {
@Override
public void onSuccess(Object... args) {
ip_client.put(host, client);
ip_client.put(client.getHostAndPort(), client);
}

@Override
Expand Down
Expand Up @@ -61,7 +61,7 @@ public void init(Listener listener) {
@Override
public void put(String fullPath, ZKServerNode node) {
super.put(fullPath, node);
ip_address.put(node.getIp(), new InetSocketAddress(node.getIp(), node.getPort()));
ip_address.put(node.getHostAndPort(), new InetSocketAddress(node.getIp(), node.getPort()));
}

@Override
Expand All @@ -79,30 +79,23 @@ public void clear() {
}

@Override
public Connection getConnection(String ip) {
public Connection getConnection(String hostAndPort) {
return connector.getConnection();
}

@SuppressWarnings("unchecked")
@Override
public <T extends BaseMessage> Function<String, Void> send(Function<Connection, T> creator, Function<T, Void> sender) {

Holder<InetSocketAddress> holder = new Holder<>();

Function<String, Connection> getConn = host -> {
InetSocketAddress recipient = ip_address.get(host);
if (recipient == null) return null;
holder.set(recipient);
return connector.getConnection();
};

Function<T, T> setRecipientFun = message -> {
if (message != null) {
message.setRecipient(holder.get());
}
return message;
};

return creator.compose(getConn).andThen(setRecipientFun).andThen(sender);
public <T extends BaseMessage> void send(String hostAndPort, Function<Connection, T> creator, Function<T, Void> sender) {
InetSocketAddress recipient = ip_address.get(hostAndPort);
if (recipient == null) {// gateway server 找不到,直接返回推送失败
creator.apply(null);
return;
}

creator.compose(this::getConnection)
.andThen(message -> (T) message.setRecipient(recipient))
.andThen(sender)
.apply(hostAndPort);
}

@Override
Expand Down
Expand Up @@ -60,16 +60,16 @@ private FutureTask<Boolean> send0(PushContext ctx) {

@Override
public FutureTask<Boolean> send(PushContext ctx) {
if (ctx.getUserId() != null) {
if (ctx.isBroadcast()) {
return send0(ctx.setUserId(null));
} else if (ctx.getUserId() != null) {
return send0(ctx);
} else if (ctx.getUserIds() != null) {
FutureTask<Boolean> task = null;
for (String userId : ctx.getUserIds()) {
task = send0(ctx.setUserId(userId));
}
return task;
} else if (ctx.isBroadcast()) {
return send0(ctx.setUserId(null));
} else {
throw new PushException("param error.");
}
Expand Down
Expand Up @@ -80,6 +80,7 @@ private void sendToConnServer(RemoteRouter remoteRouter) {

//2.通过网关连接,把消息发送到所在机器
connectionFactory.send(
location.getHostAndPort(),
connection -> {
timeLine.addTimePoint("check-gateway-conn");
if (connection == null) {
Expand Down Expand Up @@ -108,7 +109,7 @@ private void sendToConnServer(RemoteRouter remoteRouter) {
}
return null;
}
).apply(location.getHost());
);
}

private void submit(Status status) {
Expand Down
Expand Up @@ -40,6 +40,8 @@ public final class GatewayKickUserMessage extends ByteBufMessage implements Kick
public String connId;
public int clientType;
public String targetServer;
public int targetPort;


public GatewayKickUserMessage(Packet message, Connection connection) {
super(message, connection);
Expand All @@ -58,6 +60,7 @@ public void decode(ByteBuf body) {
connId = decodeString(body);
clientType = decodeInt(body);
targetServer = decodeString(body);
targetPort = decodeInt(body);
}

@Override
Expand All @@ -67,6 +70,7 @@ public void encode(ByteBuf body) {
encodeString(body, connId);
encodeInt(body, clientType);
encodeString(body, targetServer);
encodeInt(body, targetPort);
}

public GatewayKickUserMessage setUserId(String userId) {
Expand Down Expand Up @@ -94,6 +98,11 @@ public GatewayKickUserMessage setTargetServer(String targetServer) {
return this;
}

public GatewayKickUserMessage setTargetPort(int targetPort) {
this.targetPort = targetPort;
return this;
}

@Override
public String getUserId() {
return userId;
Expand All @@ -119,6 +128,11 @@ public String getTargetServer() {
return targetServer;
}

@Override
public int getTargetPort() {
return targetPort;
}

@Override
public String toString() {
return "GatewayKickUserMessage{" +
Expand All @@ -127,6 +141,7 @@ public String toString() {
", connId='" + connId + '\'' +
", clientType=" + clientType +
", targetServer='" + targetServer + '\'' +
", targetPort=" + targetPort +
'}';
}
}
12 changes: 12 additions & 0 deletions mpush-common/src/main/java/com/mpush/common/net/KickRemoteMsg.java
Expand Up @@ -19,6 +19,11 @@

package com.mpush.common.net;

import com.mpush.tools.Utils;
import com.mpush.tools.config.CC;

import static com.mpush.zk.node.ZKServerNode.GS_NODE;

/**
* Created by ohun on 16/10/23.
*
Expand All @@ -34,4 +39,11 @@ public interface KickRemoteMsg {
int getClientType();

String getTargetServer();

int getTargetPort();

default boolean isTargetPC() {
return this.getTargetPort() == GS_NODE.getPort()
&& this.getTargetServer().equals(Utils.getLocalIp());
}
}
Expand Up @@ -32,11 +32,13 @@
import com.mpush.core.router.LocalRouter;
import com.mpush.core.router.RouterCenter;
import com.mpush.tools.Utils;
import com.mpush.tools.config.CC;
import com.mpush.tools.log.Logs;

import static com.mpush.common.ErrorCode.OFFLINE;
import static com.mpush.common.ErrorCode.PUSH_CLIENT_FAILURE;
import static com.mpush.common.ErrorCode.ROUTER_CHANGE;
import static com.mpush.zk.node.ZKServerNode.GS_NODE;

/**
* Created by ohun on 16/10/24.
Expand Down Expand Up @@ -149,7 +151,7 @@ private void checkRemote(GatewayPushMessage message) {
}

//2.如果查出的远程机器是当前机器,说明路由已经失效,此时用户已下线,需要删除失效的缓存
if (Utils.getLocalIp().equals(remoteRouter.getRouteValue().getHost())) {
if (remoteRouter.getRouteValue().isThisPC(GS_NODE.getIp(), GS_NODE.getPort())) {

ErrorMessage.from(message).setErrorCode(OFFLINE).setData(userId + ',' + clientType).sendRaw();

Expand Down
Expand Up @@ -32,6 +32,7 @@ class RedisKickRemoteMessage implements KickRemoteMsg {
private String connId;
private int clientType;
private String targetServer;
private int targetPort;

public RedisKickRemoteMessage setUserId(String userId) {
this.userId = userId;
Expand All @@ -58,6 +59,11 @@ public RedisKickRemoteMessage setTargetServer(String targetServer) {
return this;
}

public RedisKickRemoteMessage setTargetPort(int targetPort) {
this.targetPort = targetPort;
return this;
}

@Override
public String getUserId() {
return userId;
Expand All @@ -83,6 +89,11 @@ public String getTargetServer() {
return targetServer;
}

@Override
public int getTargetPort() {
return targetPort;
}

@Override
public String toString() {
return "KickRemoteMsg{"
Expand All @@ -91,6 +102,7 @@ public String toString() {
+ ", connId='" + connId + '\''
+ ", clientType='" + clientType + '\''
+ ", targetServer='" + targetServer + '\''
+ ", targetPort=" + targetPort
+ '}';
}
}
Expand Up @@ -26,10 +26,13 @@
import com.mpush.common.router.RemoteRouter;
import com.mpush.common.router.RemoteRouterManager;
import com.mpush.tools.Utils;
import com.mpush.tools.config.CC;
import com.mpush.tools.event.EventBus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.mpush.zk.node.ZKServerNode.GS_NODE;

/**
* Created by ohun on 2015/12/23.
*
Expand All @@ -55,7 +58,8 @@ public final class RouterCenter {
public boolean register(String userId, Connection connection) {
ClientLocation location = ClientLocation
.from(connection)
.setHost(Utils.getLocalIp());
.setHost(Utils.getLocalIp())
.setPort(GS_NODE.getPort());

LocalRouter localRouter = new LocalRouter(connection);
RemoteRouter remoteRouter = new RemoteRouter(location);
Expand Down

0 comments on commit 79f9658

Please sign in to comment.