Skip to content

Commit

Permalink
use new netty client factory
Browse files Browse the repository at this point in the history
  • Loading branch information
黄志磊 committed Jan 22, 2016
1 parent c24f38a commit 38a8b80
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 141 deletions.
Expand Up @@ -37,7 +37,7 @@ public final class ClientChannelHandler extends ChannelHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Client client = NettyClientFactory.INSTANCE.get(ctx.channel());
Client client = NettyClientFactory.INSTANCE.getCientByChannel(ctx.channel());
client.getConnection().updateLastReadTime();
if(client instanceof SecurityNettyClient){

Expand Down Expand Up @@ -91,18 +91,17 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Client client = NettyClientFactory.INSTANCE.get(ctx.channel());
NettyClientFactory.INSTANCE.remove(client);
NettyClientFactory.INSTANCE.remove(ctx.channel());
LOGGER.error("caught an ex, channel={}", ctx.channel(), cause);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LOGGER.info("client connect channel={}", ctx.channel());
Connection connection = new NettyConnection();
Client client = NettyClientFactory.INSTANCE.get(ctx.channel());

Client client = NettyClientFactory.INSTANCE.getCientByChannel(ctx.channel());
if(client instanceof SecurityNettyClient){

tryFastConnect((SecurityNettyClient)client);
}else{
connection.init(ctx.channel(), false);
Expand All @@ -118,8 +117,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LOGGER.info("client disconnect channel={}", ctx.channel());
Client client = NettyClientFactory.INSTANCE.get(ctx.channel());
NettyClientFactory.INSTANCE.remove(client);
NettyClientFactory.INSTANCE.remove(ctx.channel());;
}

private void tryFastConnect(SecurityNettyClient securityNettyClient) {
Expand Down

This file was deleted.

@@ -1,6 +1,7 @@
package com.shinemo.mpush.netty.client;

import java.net.InetSocketAddress;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -12,29 +13,31 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import com.google.common.collect.Maps;
import com.shinemo.mpush.api.Client;
import com.shinemo.mpush.netty.codec.PacketDecoder;
import com.shinemo.mpush.netty.codec.PacketEncoder;

public class NettyClientFactory extends AbstractNettyClientFactory {
public class NettyClientFactory {

private static final Logger log = LoggerFactory.getLogger(NettyClientFactory.class);

public static final NettyClientFactory INSTANCE = new NettyClientFactory();

private final Map<Channel, Client> channel2Client = Maps.newConcurrentMap();

public Client createClient(String host, int port, final ChannelHandler handler,boolean security) {

EventLoopGroup workerGroup = new NioEventLoopGroup();
public void createClient(String host, int port, final ChannelHandler handler,boolean security) {
final Bootstrap bootstrap = new Bootstrap();
EventLoopGroup workerGroup = new NioEventLoopGroup();
bootstrap.group(workerGroup)//
.option(ChannelOption.TCP_NODELAY, true)//
.option(ChannelOption.TCP_NODELAY, true)//
.option(ChannelOption.SO_REUSEADDR, true)//
.option(ChannelOption.SO_KEEPALIVE, true)//
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)//
.channel(NioSocketChannel.class)
.handler(handler)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 4000);

bootstrap.handler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
Expand All @@ -44,35 +47,31 @@ public void initChannel(SocketChannel ch) throws Exception {
}
});

ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
if (future.awaitUninterruptibly(4000) && future.isSuccess() && future.channel().isActive()) {
Channel channel = future.channel();
if(security){
Client client = new SecurityNettyClient(host,port, channel);
return client;

ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
if (future.awaitUninterruptibly(4000) && future.isSuccess() && future.channel().isActive()) {
Channel channel = future.channel();

Client client = null;
if(security){
client = new SecurityNettyClient(host,port, channel);
}else{
Client client = new NettyClient(host,port, channel);
return client;
client = new NettyClient(host,port, channel);
}

} else {
future.cancel(true);
future.channel().close();
log.warn("[remoting] failure to connect:" + host+","+port);
}
return null;
}

public Client getClient(final Client client) throws Exception {
return get(client.getHost(),client.getPort());
}

public Client getClient(final Channel channel){
return getClient(channel);
channel2Client.put(channel, client);
} else {
future.cancel(true);
future.channel().close();
log.warn("[remoting] failure to connect:" + host+","+port);
}
}

public void remove(final Client client) {
super.remove(client);
}
public Client getCientByChannel(final Channel channel) {
return channel2Client.get(channel);
}

public void remove(final Channel channel) {
channel2Client.remove(channel);
}

}

0 comments on commit 38a8b80

Please sign in to comment.