Skip to content

Commit

Permalink
use new netty client factory
Browse files Browse the repository at this point in the history
  • Loading branch information
黄志磊 committed Jan 22, 2016
1 parent bdeea45 commit ff57ca3
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 141 deletions.
Expand Up @@ -37,7 +37,7 @@ public final class ClientChannelHandler extends ChannelHandlerAdapter {


@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Client client = NettyClientFactory.INSTANCE.get(ctx.channel()); Client client = NettyClientFactory.INSTANCE.getCientByChannel(ctx.channel());
client.getConnection().updateLastReadTime(); client.getConnection().updateLastReadTime();
if(client instanceof SecurityNettyClient){ if(client instanceof SecurityNettyClient){


Expand Down Expand Up @@ -91,18 +91,17 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception


@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Client client = NettyClientFactory.INSTANCE.get(ctx.channel()); NettyClientFactory.INSTANCE.remove(ctx.channel());
NettyClientFactory.INSTANCE.remove(client);
LOGGER.error("caught an ex, channel={}", ctx.channel(), cause); LOGGER.error("caught an ex, channel={}", ctx.channel(), cause);
} }


@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
LOGGER.info("client connect channel={}", ctx.channel()); LOGGER.info("client connect channel={}", ctx.channel());
Connection connection = new NettyConnection(); Connection connection = new NettyConnection();
Client client = NettyClientFactory.INSTANCE.get(ctx.channel());
Client client = NettyClientFactory.INSTANCE.getCientByChannel(ctx.channel());
if(client instanceof SecurityNettyClient){ if(client instanceof SecurityNettyClient){

tryFastConnect((SecurityNettyClient)client); tryFastConnect((SecurityNettyClient)client);
}else{ }else{
connection.init(ctx.channel(), false); connection.init(ctx.channel(), false);
Expand All @@ -118,8 +117,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LOGGER.info("client disconnect channel={}", ctx.channel()); LOGGER.info("client disconnect channel={}", ctx.channel());
Client client = NettyClientFactory.INSTANCE.get(ctx.channel()); NettyClientFactory.INSTANCE.remove(ctx.channel());;
NettyClientFactory.INSTANCE.remove(client);
} }


private void tryFastConnect(SecurityNettyClient securityNettyClient) { private void tryFastConnect(SecurityNettyClient securityNettyClient) {
Expand Down

This file was deleted.

@@ -1,6 +1,7 @@
package com.shinemo.mpush.netty.client; package com.shinemo.mpush.netty.client;


import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Map;


import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand All @@ -12,29 +13,31 @@
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;


import com.google.common.collect.Maps;
import com.shinemo.mpush.api.Client; import com.shinemo.mpush.api.Client;
import com.shinemo.mpush.netty.codec.PacketDecoder; import com.shinemo.mpush.netty.codec.PacketDecoder;
import com.shinemo.mpush.netty.codec.PacketEncoder; import com.shinemo.mpush.netty.codec.PacketEncoder;


public class NettyClientFactory extends AbstractNettyClientFactory { public class NettyClientFactory {


private static final Logger log = LoggerFactory.getLogger(NettyClientFactory.class); private static final Logger log = LoggerFactory.getLogger(NettyClientFactory.class);


public static final NettyClientFactory INSTANCE = new NettyClientFactory(); public static final NettyClientFactory INSTANCE = new NettyClientFactory();

private final Map<Channel, Client> channel2Client = Maps.newConcurrentMap();


public Client createClient(String host, int port, final ChannelHandler handler,boolean security) { public void createClient(String host, int port, final ChannelHandler handler,boolean security) {

EventLoopGroup workerGroup = new NioEventLoopGroup();
final Bootstrap bootstrap = new Bootstrap(); final Bootstrap bootstrap = new Bootstrap();
EventLoopGroup workerGroup = new NioEventLoopGroup();
bootstrap.group(workerGroup)// bootstrap.group(workerGroup)//
.option(ChannelOption.TCP_NODELAY, true)// .option(ChannelOption.TCP_NODELAY, true)//
.option(ChannelOption.SO_REUSEADDR, true)// .option(ChannelOption.SO_REUSEADDR, true)//
.option(ChannelOption.SO_KEEPALIVE, true)// .option(ChannelOption.SO_KEEPALIVE, true)//
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)// .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)//
.channel(NioSocketChannel.class) .channel(NioSocketChannel.class)
.handler(handler) .handler(handler)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 4000); .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 4000);

bootstrap.handler(new ChannelInitializer<SocketChannel>() { // (4) bootstrap.handler(new ChannelInitializer<SocketChannel>() { // (4)
@Override @Override
public void initChannel(SocketChannel ch) throws Exception { public void initChannel(SocketChannel ch) throws Exception {
Expand All @@ -44,35 +47,31 @@ public void initChannel(SocketChannel ch) throws Exception {
} }
}); });


ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
if (future.awaitUninterruptibly(4000) && future.isSuccess() && future.channel().isActive()) { ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
Channel channel = future.channel(); if (future.awaitUninterruptibly(4000) && future.isSuccess() && future.channel().isActive()) {
if(security){ Channel channel = future.channel();
Client client = new SecurityNettyClient(host,port, channel);
return client; Client client = null;
if(security){
client = new SecurityNettyClient(host,port, channel);
}else{ }else{
Client client = new NettyClient(host,port, channel); client = new NettyClient(host,port, channel);
return client;
} }

channel2Client.put(channel, client);
} else { } else {
future.cancel(true); future.cancel(true);
future.channel().close(); future.channel().close();
log.warn("[remoting] failure to connect:" + host+","+port); log.warn("[remoting] failure to connect:" + host+","+port);
} }
return null;
}

public Client getClient(final Client client) throws Exception {
return get(client.getHost(),client.getPort());
}

public Client getClient(final Channel channel){
return getClient(channel);
} }


public void remove(final Client client) { public Client getCientByChannel(final Channel channel) {
super.remove(client); return channel2Client.get(channel);
} }


public void remove(final Channel channel) {
channel2Client.remove(channel);
}

} }

0 comments on commit ff57ca3

Please sign in to comment.