/
NettyClientFactory.java
77 lines (61 loc) · 2.75 KB
/
NettyClientFactory.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package com.shinemo.mpush.netty.client;
import java.net.InetSocketAddress;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import com.google.common.collect.Maps;
import com.shinemo.mpush.api.Client;
import com.shinemo.mpush.netty.codec.PacketDecoder;
import com.shinemo.mpush.netty.codec.PacketEncoder;
public class NettyClientFactory {
private static final Logger log = LoggerFactory.getLogger(NettyClientFactory.class);
public static final NettyClientFactory INSTANCE = new NettyClientFactory();
private final Map<Channel, Client> channel2Client = Maps.newConcurrentMap();
public void createClient(String host, int port, final ChannelHandler handler,boolean security) {
final Bootstrap bootstrap = new Bootstrap();
EventLoopGroup workerGroup = new NioEventLoopGroup();
bootstrap.group(workerGroup)//
.option(ChannelOption.TCP_NODELAY, true)//
.option(ChannelOption.SO_REUSEADDR, true)//
.option(ChannelOption.SO_KEEPALIVE, true)//
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)//
.channel(NioSocketChannel.class)
.handler(handler)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 4000);
bootstrap.handler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(PacketEncoder.INSTANCE);
ch.pipeline().addLast(handler);
}
});
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
if (future.awaitUninterruptibly(4000) && future.isSuccess() && future.channel().isActive()) {
Channel channel = future.channel();
Client client = null;
if(security){
client = new SecurityNettyClient(host,port, channel);
}else{
client = new NettyClient(host,port, channel);
}
channel2Client.put(channel, client);
} else {
future.cancel(true);
future.channel().close();
log.warn("[remoting] failure to connect:" + host+","+port);
}
}
public Client getCientByChannel(final Channel channel) {
return channel2Client.get(channel);
}
public void remove(final Channel channel) {
channel2Client.remove(channel);
}
}