Skip to content

Commit

Permalink
网关新增sctp/udt协议支持
Browse files Browse the repository at this point in the history
  • Loading branch information
夜色 committed Dec 26, 2016
1 parent 808d724 commit 216bcae
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 36 deletions.
2 changes: 1 addition & 1 deletion conf/reference.conf
Expand Up @@ -44,7 +44,7 @@ mp {
admin-server-port=3002 //控制台服务端口, 内部端口
gateway-server-port=3001 //网关服务端口, 内部端口
gateway-client-port=4000 //UDP 客户端端口
gateway-server-net=udp //网关服务使用的网络类型tcp/udp
gateway-server-net=udp //网关服务使用的网络类型tcp/udp/sctp/udt
gateway-server-multicast="239.239.239.88" //239.0.0.0~239.255.255.255为本地管理组播地址,仅在特定的本地范围内有效
gateway-client-multicast="239.239.239.99" //239.0.0.0~239.255.255.255为本地管理组播地址,仅在特定的本地范围内有效

Expand Down
Expand Up @@ -32,11 +32,13 @@
import com.mpush.tools.config.CC.mp.net.snd_buf;
import com.mpush.tools.thread.NamedPoolThreadFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.*;
import io.netty.channel.sctp.nio.NioSctpChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.udt.nio.NioUdtProvider;
import io.netty.handler.traffic.GlobalChannelTrafficShapingHandler;

import java.nio.channels.spi.SelectorProvider;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

Expand Down Expand Up @@ -101,4 +103,20 @@ protected void initOptions(Bootstrap b) {
if (snd_buf.gateway_client > 0) b.option(ChannelOption.SO_SNDBUF, snd_buf.gateway_client);
if (rcv_buf.gateway_client > 0) b.option(ChannelOption.SO_RCVBUF, rcv_buf.gateway_client);
}

@Override
public ChannelFactory<? extends Channel> getChannelFactory() {
if (CC.mp.net.tcpGateway()) return super.getChannelFactory();
if (CC.mp.net.udtGateway()) return NioUdtProvider.BYTE_CONNECTOR;
if (CC.mp.net.sctpGateway()) return NioSctpChannel::new;
return super.getChannelFactory();
}

@Override
public SelectorProvider getSelectorProvider() {
if (CC.mp.net.tcpGateway()) return super.getSelectorProvider();
if (CC.mp.net.udtGateway()) return NioUdtProvider.BYTE_PROVIDER;
if (CC.mp.net.sctpGateway()) return super.getSelectorProvider();
return super.getSelectorProvider();
}
}
25 changes: 21 additions & 4 deletions mpush-core/src/main/java/com/mpush/core/server/GatewayServer.java
Expand Up @@ -30,12 +30,13 @@
import com.mpush.tools.thread.NamedPoolThreadFactory;
import com.mpush.tools.thread.ThreadNames;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.*;
import io.netty.channel.sctp.SctpServerChannel;
import io.netty.channel.sctp.nio.NioSctpServerChannel;
import io.netty.channel.udt.nio.NioUdtProvider;
import io.netty.handler.traffic.GlobalChannelTrafficShapingHandler;

import java.nio.channels.spi.SelectorProvider;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

Expand Down Expand Up @@ -153,4 +154,20 @@ protected void initOptions(ServerBootstrap b) {
public ChannelHandler getChannelHandler() {
return channelHandler;
}

@Override
public ChannelFactory<? extends ServerChannel> getChannelFactory() {
if (CC.mp.net.tcpGateway()) return super.getChannelFactory();
if (CC.mp.net.udtGateway()) return NioUdtProvider.BYTE_ACCEPTOR;
if (CC.mp.net.sctpGateway()) return NioSctpServerChannel::new;
return super.getChannelFactory();
}

@Override
public SelectorProvider getSelectorProvider() {
if (CC.mp.net.tcpGateway()) return super.getSelectorProvider();
if (CC.mp.net.udtGateway()) return NioUdtProvider.BYTE_PROVIDER;
if (CC.mp.net.sctpGateway()) return super.getSelectorProvider();
return super.getSelectorProvider();
}
}
8 changes: 8 additions & 0 deletions mpush-netty/pom.xml
Expand Up @@ -44,5 +44,13 @@
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-udt</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-sctp</artifactId>
</dependency>
</dependencies>
</project>
Expand Up @@ -22,30 +22,24 @@
import com.mpush.api.service.BaseService;
import com.mpush.api.service.Client;
import com.mpush.api.service.Listener;
import com.mpush.api.service.ServiceException;
import com.mpush.netty.codec.PacketDecoder;
import com.mpush.netty.codec.PacketEncoder;
import com.mpush.tools.config.CC;
import com.mpush.tools.thread.ThreadNames;
import com.mpush.tools.thread.pool.ThreadPoolManager;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.epoll.Native;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import java.nio.channels.spi.SelectorProvider;

public abstract class NettyTCPClient extends BaseService implements Client {
private static final Logger LOGGER = LoggerFactory.getLogger(NettyTCPClient.class);
Expand All @@ -59,17 +53,16 @@ public NettyTCPClient(String host, int port) {
this.port = port;
}

private void createClient(Listener listener, EventLoopGroup workerGroup, Class<? extends SocketChannel> clazz) {
private void createClient(Listener listener, EventLoopGroup workerGroup, ChannelFactory<? extends Channel> channelFactory) {
this.workerGroup = workerGroup;
Bootstrap b = new Bootstrap();
b.group(workerGroup)//
.option(ChannelOption.TCP_NODELAY, true)//
.option(ChannelOption.SO_REUSEADDR, true)//
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)//
.channel(clazz);
b.handler(new ChannelInitializer<SocketChannel>() { // (4)
.channelFactory(channelFactory);
b.handler(new ChannelInitializer<Channel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
public void initChannel(Channel ch) throws Exception {
initPipeline(ch.pipeline());
}
});
Expand All @@ -88,18 +81,18 @@ public void initChannel(SocketChannel ch) throws Exception {

private void createNioClient(Listener listener) {
NioEventLoopGroup workerGroup = new NioEventLoopGroup(
1, new DefaultThreadFactory(ThreadNames.T_TCP_CLIENT)
1, new DefaultThreadFactory(ThreadNames.T_TCP_CLIENT), getSelectorProvider()
);
workerGroup.setIoRatio(getIoRate());
createClient(listener, workerGroup, NioSocketChannel.class);
createClient(listener, workerGroup, getChannelFactory());
}

private void createEpollClient(Listener listener) {
EpollEventLoopGroup workerGroup = new EpollEventLoopGroup(
1, new DefaultThreadFactory(ThreadNames.T_TCP_CLIENT)
);
workerGroup.setIoRatio(getIoRate());
createClient(listener, workerGroup, EpollSocketChannel.class);
createClient(listener, workerGroup, EpollSocketChannel::new);
}

protected void initPipeline(ChannelPipeline pipeline) {
Expand Down Expand Up @@ -154,6 +147,15 @@ protected void doStop(Listener listener) throws Throwable {

protected void initOptions(Bootstrap b) {
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 4000);
b.option(ChannelOption.TCP_NODELAY, true);
}

public ChannelFactory<? extends Channel> getChannelFactory() {
return NioSocketChannel::new;
}

public SelectorProvider getSelectorProvider() {
return SelectorProvider.provider();
}

public String getHost() {
Expand Down
Expand Up @@ -36,10 +36,12 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.udt.nio.NioUdtProvider;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.channels.spi.SelectorProvider;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -103,7 +105,7 @@ public void start(final Listener listener) {
}
}

private void createServer(Listener listener, EventLoopGroup boss, EventLoopGroup work, Class<? extends ServerChannel> clazz) {
private void createServer(Listener listener, EventLoopGroup boss, EventLoopGroup work, ChannelFactory<? extends ServerChannel> channelFactory) {
/***
* NioEventLoopGroup 是用来处理I/O操作的多线程事件循环器,
* Netty提供了许多不同的EventLoopGroup的实现用来处理不同传输协议。
Expand Down Expand Up @@ -133,7 +135,7 @@ private void createServer(Listener listener, EventLoopGroup boss, EventLoopGroup
* ServerSocketChannel以NIO的selector为基础进行实现的,用来接收新的连接
* 这里告诉Channel如何获取新的连接.
*/
b.channel(clazz);
b.channelFactory(channelFactory);


/***
Expand All @@ -145,9 +147,9 @@ private void createServer(Listener listener, EventLoopGroup boss, EventLoopGroup
* 当你的程序变的复杂时,可能你会增加更多的处理类到pipeline上,
* 然后提取这些匿名类到最顶层的类上。
*/
b.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
b.childHandler(new ChannelInitializer<Channel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {//每连上一个链接调用一次
public void initChannel(Channel ch) throws Exception {//每连上一个链接调用一次
initPipeline(ch.pipeline());
}
});
Expand Down Expand Up @@ -175,18 +177,41 @@ public void initChannel(SocketChannel ch) throws Exception {//每连上一个链
}

private void createNioServer(Listener listener) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(getBossThreadNum(), getBossThreadFactory());
NioEventLoopGroup workerGroup = new NioEventLoopGroup(getWorkThreadNum(), getWorkThreadFactory());
bossGroup.setIoRatio(100);
workerGroup.setIoRatio(getIoRate());
createServer(listener, bossGroup, workerGroup, NioServerSocketChannel.class);
EventLoopGroup bossGroup = getBossGroup();
EventLoopGroup workerGroup = getWorkerGroup();

if (bossGroup == null) {
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(getBossThreadNum(), getBossThreadFactory(), getSelectorProvider());
nioEventLoopGroup.setIoRatio(100);
bossGroup = nioEventLoopGroup;
}

if (workerGroup == null) {
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(getWorkThreadNum(), getWorkThreadFactory(), getSelectorProvider());
nioEventLoopGroup.setIoRatio(getIoRate());
workerGroup = nioEventLoopGroup;
}

createServer(listener, bossGroup, workerGroup, getChannelFactory());
}

private void createEpollServer(Listener listener) {
EpollEventLoopGroup bossGroup = new EpollEventLoopGroup(getBossThreadNum(), getBossThreadFactory());
EpollEventLoopGroup workerGroup = new EpollEventLoopGroup(getWorkThreadNum(), getWorkThreadFactory());
workerGroup.setIoRatio(getIoRate());
createServer(listener, bossGroup, workerGroup, EpollServerSocketChannel.class);
EventLoopGroup bossGroup = getBossGroup();
EventLoopGroup workerGroup = getWorkerGroup();

if (bossGroup == null) {
EpollEventLoopGroup epollEventLoopGroup = new EpollEventLoopGroup(getBossThreadNum(), getBossThreadFactory());
epollEventLoopGroup.setIoRatio(100);
bossGroup = epollEventLoopGroup;
}

if (workerGroup == null) {
EpollEventLoopGroup epollEventLoopGroup = new EpollEventLoopGroup(getWorkThreadNum(), getWorkThreadFactory());
epollEventLoopGroup.setIoRatio(getIoRate());
workerGroup = epollEventLoopGroup;
}

createServer(listener, bossGroup, workerGroup, EpollServerSocketChannel::new);
}

/***
Expand Down Expand Up @@ -281,7 +306,19 @@ protected boolean useNettyEpoll() {
return false;
}

public EventLoopGroup getBossGroup() {
return bossGroup;
}

public EventLoopGroup getWorkerGroup() {
return workerGroup;
}

public ChannelFactory<? extends ServerChannel> getChannelFactory() {
return NioServerSocketChannel::new;
}

public SelectorProvider getSelectorProvider() {
return SelectorProvider.provider();
}
}
12 changes: 12 additions & 0 deletions mpush-tools/src/main/java/com/mpush/tools/config/CC.java
Expand Up @@ -95,10 +95,22 @@ interface net {
String gateway_server_multicast = cfg.getString("gateway-server-multicast");
String gateway_client_multicast = cfg.getString("gateway-client-multicast");

static boolean tcpGateway() {
return "tcp".equals(gateway_server_net);
}

static boolean udpGateway() {
return "udp".equals(gateway_server_net);
}

static boolean udtGateway() {
return "udt".equals(gateway_server_net);
}

static boolean sctpGateway() {
return "sctp".equals(gateway_server_net);
}


interface public_ip_mapping {

Expand Down
10 changes: 10 additions & 0 deletions pom.xml
Expand Up @@ -109,6 +109,16 @@
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-udt</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-sctp</artifactId>
<version>${netty.version}</version>
</dependency>
<!-- ======================================== -->
<!-- 子项目依赖 -->
<!-- ======================================== -->
Expand Down

0 comments on commit 216bcae

Please sign in to comment.