Skip to content

Commit

Permalink
网关服务增加UDP及组播支持
Browse files Browse the repository at this point in the history
  • Loading branch information
夜色 committed Oct 21, 2016
1 parent c57865e commit cc40ff9
Show file tree
Hide file tree
Showing 41 changed files with 1,274 additions and 213 deletions.
6 changes: 5 additions & 1 deletion conf/reference.conf
Expand Up @@ -37,8 +37,12 @@ mp {
#网络配置 #网络配置
net { net {
connect-server-port=3000 //长链接服务对外端口, 公网端口 connect-server-port=3000 //长链接服务对外端口, 公网端口
gateway-server-port=3001 //网关服务端口, 内部端口
admin-server-port=3002 //控制台服务端口, 内部端口 admin-server-port=3002 //控制台服务端口, 内部端口
gateway-server-port=3001 //网关服务端口, 内部端口
gateway-client-port=4000 //UDP 客户端端口
gateway-server-net=tcp //网关服务使用的网络类型tcp/udp
gateway-server-multicast="239.239.239.88"
gateway-client-multicast="239.239.239.99"
public-host-mapping { //本机局域网IP和公网IP的映射关系 public-host-mapping { //本机局域网IP和公网IP的映射关系
"10.0.10.156":"111.1.32.137" "10.0.10.156":"111.1.32.137"
"10.0.10.166":"111.1.33.138" "10.0.10.166":"111.1.33.138"
Expand Down
12 changes: 11 additions & 1 deletion mpush-api/src/main/java/com/mpush/api/protocol/Packet.java
Expand Up @@ -22,13 +22,15 @@
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;


import java.net.InetSocketAddress;

/** /**
* Created by ohun on 2015/12/19. * Created by ohun on 2015/12/19.
* length(4)+cmd(1)+cc(2)+flags(1)+sessionId(4)+lrc(1)+body(n) * length(4)+cmd(1)+cc(2)+flags(1)+sessionId(4)+lrc(1)+body(n)
* *
* @author ohun@live.cn * @author ohun@live.cn
*/ */
public final class Packet { public class Packet {
public static final int HEADER_LEN = 13; public static final int HEADER_LEN = 13;


public static final byte FLAG_CRYPTO = 0x01; public static final byte FLAG_CRYPTO = 0x01;
Expand Down Expand Up @@ -110,6 +112,14 @@ public boolean validLrc() {
return (lrc ^ calcLrc()) == 0; return (lrc ^ calcLrc()) == 0;
} }


public InetSocketAddress sender() {
return null;
}

public Packet response(Command command) {
return new Packet(command, sessionId);
}

@Override @Override
public String toString() { public String toString() {
return "Packet{" + return "Packet{" +
Expand Down
61 changes: 61 additions & 0 deletions mpush-api/src/main/java/com/mpush/api/protocol/UDPPacket.java
@@ -0,0 +1,61 @@
/*
* (C) Copyright 2015-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Contributors:
* ohun@live.cn (夜色)
*/

package com.mpush.api.protocol;

import java.net.InetSocketAddress;

/**
* Created by ohun on 16/10/21.
*
* @author ohun@live.cn (夜色)
*/
public final class UDPPacket extends Packet {
private final InetSocketAddress sender;

public UDPPacket(byte cmd, InetSocketAddress sender) {
super(cmd);
this.sender = sender;
}

public UDPPacket(byte cmd, int sessionId, InetSocketAddress sender) {
super(cmd, sessionId);
this.sender = sender;
}

public UDPPacket(Command cmd, InetSocketAddress sender) {
super(cmd);
this.sender = sender;
}

public UDPPacket(Command cmd, int sessionId, InetSocketAddress sender) {
super(cmd, sessionId);
this.sender = sender;
}

@Override
public InetSocketAddress sender() {
return sender;
}

@Override
public Packet response(Command command) {
return new UDPPacket(command, sessionId, sender);
}
}
Expand Up @@ -20,13 +20,17 @@
package com.mpush.bootstrap; package com.mpush.bootstrap;




import com.mpush.api.service.Server;
import com.mpush.api.service.Service;
import com.mpush.bootstrap.job.*; import com.mpush.bootstrap.job.*;
import com.mpush.core.server.AdminServer; import com.mpush.core.server.AdminServer;
import com.mpush.core.server.ConnectionServer; import com.mpush.core.server.ConnectionServer;
import com.mpush.core.server.GatewayServer; import com.mpush.core.server.GatewayServer;
import com.mpush.core.server.GatewayUDPConnector;
import com.mpush.zk.node.ZKServerNode; import com.mpush.zk.node.ZKServerNode;


import static com.mpush.tools.config.CC.mp.net.admin_server_port; import static com.mpush.tools.config.CC.mp.net.admin_server_port;
import static com.mpush.tools.config.CC.mp.net.udpGateway;


/** /**
* Created by yxx on 2016/5/14. * Created by yxx on 2016/5/14.
Expand All @@ -41,8 +45,8 @@ public ServerLauncher() {
ZKServerNode csNode = ZKServerNode.csNode(); ZKServerNode csNode = ZKServerNode.csNode();
ZKServerNode gsNode = ZKServerNode.gsNode(); ZKServerNode gsNode = ZKServerNode.gsNode();
ConnectionServer connectServer = new ConnectionServer(csNode.getPort()); ConnectionServer connectServer = new ConnectionServer(csNode.getPort());
GatewayServer gatewayServer = new GatewayServer(gsNode.getPort()); Server gatewayServer = udpGateway() ? new GatewayUDPConnector(gsNode.getPort()) : new GatewayServer(gsNode.getPort());
AdminServer adminServer = new AdminServer(admin_server_port, connectServer, gatewayServer); AdminServer adminServer = new AdminServer(admin_server_port, connectServer);


chain.boot() chain.boot()
.setNext(new ZKBoot())//1.启动ZK节点数据变化监听 .setNext(new ZKBoot())//1.启动ZK节点数据变化监听
Expand Down
Expand Up @@ -21,10 +21,8 @@


import com.mpush.api.service.Listener; import com.mpush.api.service.Listener;
import com.mpush.api.service.Server; import com.mpush.api.service.Server;
import com.mpush.tools.Jsons;
import com.mpush.tools.log.Logs; import com.mpush.tools.log.Logs;
import com.mpush.tools.thread.pool.ThreadPoolManager; import com.mpush.tools.thread.pool.ThreadPoolManager;
import com.mpush.zk.ZKClient;
import com.mpush.zk.ZKRegister; import com.mpush.zk.ZKRegister;
import com.mpush.zk.node.ZKServerNode; import com.mpush.zk.node.ZKServerNode;


Expand Down
Expand Up @@ -21,11 +21,11 @@


import com.google.common.eventbus.Subscribe; import com.google.common.eventbus.Subscribe;
import com.mpush.api.event.ConnectionCloseEvent; import com.mpush.api.event.ConnectionCloseEvent;
import com.mpush.netty.client.NettyClient; import com.mpush.netty.client.NettyTCPClient;
import com.mpush.tools.event.EventBus; import com.mpush.tools.event.EventBus;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;


public class ConnectClient extends NettyClient { public class ConnectClient extends NettyTCPClient {
private final ConnClientChannelHandler handler; private final ConnClientChannelHandler handler;


public ConnectClient(String host, int port, ClientConfig config) { public ConnectClient(String host, int port, ClientConfig config) {
Expand Down
Expand Up @@ -21,7 +21,7 @@


import com.mpush.api.connection.Connection; import com.mpush.api.connection.Connection;
import com.mpush.api.service.Listener; import com.mpush.api.service.Listener;
import com.mpush.netty.client.NettyClient; import com.mpush.netty.client.NettyTCPClient;
import com.mpush.tools.thread.NamedPoolThreadFactory; import com.mpush.tools.thread.NamedPoolThreadFactory;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
Expand All @@ -38,7 +38,7 @@
* *
* @author ohun@live.cn * @author ohun@live.cn
*/ */
public class GatewayClient extends NettyClient { public class GatewayClient extends NettyTCPClient {
private final GatewayClientChannelHandler handler = new GatewayClientChannelHandler(); private final GatewayClientChannelHandler handler = new GatewayClientChannelHandler();
private GlobalChannelTrafficShapingHandler trafficShapingHandler; private GlobalChannelTrafficShapingHandler trafficShapingHandler;
private ScheduledExecutorService trafficShapingExecutor; private ScheduledExecutorService trafficShapingExecutor;
Expand Down
Expand Up @@ -63,19 +63,19 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
} }
} }


private void handleOK(OkMessage message) { public static void handleOK(OkMessage message) {
if (message.cmd == Command.GATEWAY_PUSH.cmd) { if (message.cmd == Command.GATEWAY_PUSH.cmd) {
handPush(message, null, message.getPacket()); handPush(message, null, message.getPacket());
} }
} }


private void handleError(ErrorMessage message) { public static void handleError(ErrorMessage message) {
if (message.cmd == Command.GATEWAY_PUSH.cmd) { if (message.cmd == Command.GATEWAY_PUSH.cmd) {
handPush(null, message, message.getPacket()); handPush(null, message, message.getPacket());
} }
} }


private void handPush(OkMessage ok, ErrorMessage error, Packet packet) { private static void handPush(OkMessage ok, ErrorMessage error, Packet packet) {
PushRequest request = PushRequestBus.I.getAndRemove(packet.sessionId); PushRequest request = PushRequestBus.I.getAndRemove(packet.sessionId);
if (request == null) { if (request == null) {
LOGGER.warn("receive a gateway response, but request has timeout. ok={}, error={}", ok, error); LOGGER.warn("receive a gateway response, but request has timeout. ok={}, error={}", ok, error);
Expand Down
@@ -0,0 +1,54 @@
/*
* (C) Copyright 2015-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Contributors:
* ohun@live.cn (夜色)
*/

package com.mpush.client.gateway;

import com.mpush.api.connection.Connection;
import com.mpush.common.message.gateway.GatewayPushMessage;
import com.mpush.zk.cache.ZKServerNodeCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.function.Consumer;

/**
* Created by yxx on 2016/5/17.
*
* @author ohun@live.cn
*/
public abstract class GatewayConnectionFactory<T> extends ZKServerNodeCache {

final Logger logger = LoggerFactory.getLogger(this.getClass());

public void init() {

}

abstract public Connection getConnection(String ip);

abstract public T getNode(String ip);

abstract public Collection<T> getAllNode();

abstract public boolean send(String host, Consumer<GatewayPushMessage> consumer);

abstract public void broadcast(Consumer<GatewayPushMessage> consumer);

}
Expand Up @@ -23,24 +23,20 @@
import com.mpush.api.connection.Connection; import com.mpush.api.connection.Connection;
import com.mpush.api.service.Client; import com.mpush.api.service.Client;
import com.mpush.api.service.Listener; import com.mpush.api.service.Listener;
import com.mpush.zk.cache.ZKServerNodeCache; import com.mpush.common.message.gateway.GatewayPushMessage;
import com.mpush.zk.node.ZKServerNode; import com.mpush.zk.node.ZKServerNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;


/** /**
* Created by yxx on 2016/5/17. * Created by yxx on 2016/5/17.
* *
* @author ohun@live.cn * @author ohun@live.cn
*/ */
public class GatewayClientFactory extends ZKServerNodeCache { public class GatewayTCPConnectionFactory extends GatewayConnectionFactory<Connection> {
public static final GatewayClientFactory I = new GatewayClientFactory();

private final Logger logger = LoggerFactory.getLogger(GatewayClientFactory.class);


private final Map<String, GatewayClient> ip_client = Maps.newConcurrentMap(); private final Map<String, GatewayClient> ip_client = Maps.newConcurrentMap();


Expand All @@ -66,14 +62,7 @@ public void clear() {
} }
} }


public GatewayClient getClient(String ip) { @Override
GatewayClient client = ip_client.get(ip);
if (client == null) {
return null;//TODO create client
}
return client;
}

public Connection getConnection(String ip) { public Connection getConnection(String ip) {
GatewayClient client = ip_client.get(ip); GatewayClient client = ip_client.get(ip);
if (client == null) { if (client == null) {
Expand All @@ -87,6 +76,31 @@ public Connection getConnection(String ip) {
return null; return null;
} }


@Override
public Connection getNode(String ip) {
return getConnection(ip);
}

@Override
public Collection<Connection> getAllNode() {
return ip_client.values().stream().map(GatewayClient::getConnection).collect(Collectors.toList());
}

@Override
public boolean send(String ip, Consumer<GatewayPushMessage> consumer) {
Connection connection = getConnection(ip);
if (connection == null) return false;
consumer.accept(GatewayPushMessage.build(connection));
return true;
}

@Override
public void broadcast(Consumer<GatewayPushMessage> consumer) {
ip_client.forEach((s, client) -> {
consumer.accept(GatewayPushMessage.build(client.getConnection()));
});
}

private void restartClient(final GatewayClient client) { private void restartClient(final GatewayClient client) {
ip_client.remove(client.getHost()); ip_client.remove(client.getHost());
client.stop(new Listener() { client.stop(new Listener() {
Expand Down Expand Up @@ -125,8 +139,4 @@ public void onFailure(Throwable cause) {
} }
}); });
} }

public Collection<Connection> getAllConnections() {
return ip_client.values().stream().map(GatewayClient::getConnection).collect(Collectors.toList());
}
} }

0 comments on commit cc40ff9

Please sign in to comment.