Skip to content

Commit

Permalink
代码微调,可能编译不通过。。。明天继续
Browse files Browse the repository at this point in the history
  • Loading branch information
黄志磊 committed Jan 19, 2016
1 parent c984999 commit 1b435c9
Show file tree
Hide file tree
Showing 15 changed files with 357 additions and 155 deletions.
20 changes: 13 additions & 7 deletions mpush-api/src/main/java/com/shinemo/mpush/api/Client.java
@@ -1,19 +1,25 @@
package com.shinemo.mpush.api;

import io.netty.channel.ChannelHandler;

public interface Client {

void init();

void start();
boolean isConnected();

void stop();
String getHost();

boolean isConnected();
int getPort();

void close(String cause);

boolean isEnabled();

void resetHbTimes();

int inceaseAndGetHbTimes();

String getUri();
String getUrl();

ChannelHandler getHandler();
void startHeartBeat() throws Exception;

}
Expand Up @@ -34,4 +34,10 @@ public interface Connection {
boolean heartbeatTimeout();

void updateLastReadTime();

int inceaseAndGetHbTimes();

void resetHbTimes();

long getLastReadTime();
}
@@ -1,5 +1,7 @@
package com.shinemo.mpush.api.connection;

import java.util.List;

import io.netty.channel.Channel;

/**
Expand All @@ -12,4 +14,8 @@ public interface ConnectionManager {
void remove(Channel channel);

void add(Connection connection);

List<Connection> getConnections();

void init();
}
Expand Up @@ -28,4 +28,10 @@ public byte[] decrypt(byte[] data) {
public byte[] encrypt(byte[] data) {
return RSAUtils.encryptByPublicKey(data, publicKey);
}

@Override
public String toString() {
return "RsaCipher [privateKey=" + new String(privateKey.getEncoded()) + ", publicKey=" + new String(publicKey.getEncoded()) + "]";
}

}
@@ -1,5 +1,8 @@
package com.shinemo.mpush.core.server;

import java.util.concurrent.TimeUnit;

import com.shinemo.mpush.api.connection.ConnectionManager;
import com.shinemo.mpush.api.protocol.Command;
import com.shinemo.mpush.common.MessageDispatcher;
import com.shinemo.mpush.core.handler.BindUserHandler;
Expand All @@ -8,6 +11,10 @@
import com.shinemo.mpush.core.handler.HeartBeatHandler;
import com.shinemo.mpush.netty.connection.NettyConnectionManager;
import com.shinemo.mpush.netty.server.NettyServer;
import com.shinemo.mpush.netty.server.ScanAllConnectionTimerTask;
import com.shinemo.mpush.netty.util.NettySharedHolder;
import com.shinemo.mpush.tools.config.ConfigCenter;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
Expand All @@ -17,9 +24,12 @@
*/
public final class ConnectionServer extends NettyServer {
private ServerChannelHandler channelHandler;

private ConnectionManager connectionManager = new NettyConnectionManager();

public ConnectionServer(int port) {
super(port);
NettySharedHolder.HASHED_WHEEL_TIMER.newTimeout(new ScanAllConnectionTimerTask(connectionManager), ConfigCenter.holder.scanConnTaskCycle()/1000, TimeUnit.SECONDS);
}

@Override
Expand All @@ -29,7 +39,6 @@ public void init() {
receiver.register(Command.HANDSHAKE, new HandshakeHandler());
receiver.register(Command.BIND, new BindUserHandler());
receiver.register(Command.FAST_CONNECT, new FastConnectHandler());
NettyConnectionManager connectionManager = new NettyConnectionManager();
connectionManager.init();
channelHandler = new ServerChannelHandler(true, connectionManager, receiver);
}
Expand Down
Expand Up @@ -11,6 +11,7 @@
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import org.apache.commons.lang3.builder.ToStringBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -38,6 +39,7 @@ public ServerChannelHandler(boolean security, ConnectionManager connectionManage
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Connection connection = connectionManager.get(ctx.channel());
LOGGER.warn("update currentTime:"+ctx.channel()+","+ToStringBuilder.reflectionToString(msg));
connection.updateLastReadTime();
receiver.onReceive((Packet) msg, connection);
}
Expand Down
4 changes: 2 additions & 2 deletions mpush-cs/src/main/java/com/shinemo/mpush/cs/client/Main.java
Expand Up @@ -9,7 +9,7 @@

public class Main {

public static void main(String[] args) {
public static void main(String[] args) throws Exception {
ConnectionClientMain main = new ConnectionClientMain();
main.start();

Expand All @@ -19,7 +19,7 @@ public static void main(String[] args) {
int index = (int) ((Math.random() % serverList.size()) * serverList.size());
ConnectionServerApplication server = serverList.get(index);
ClientChannelHandler handler = new ClientChannelHandler();
final Client client = NettyClientFactory.INSTANCE.createGet(server.getIp(), server.getPort(), handler);
final Client client = NettyClientFactory.INSTANCE.get(server.getIp(), server.getPort(), handler);
client.init();
Thread t = new Thread(new Runnable() {
@Override
Expand Down
@@ -1,9 +1,11 @@
package com.shinemo.mpush.netty.client;

import java.text.MessageFormat;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import io.netty.channel.ChannelHandler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -48,21 +50,32 @@ public Client get(final String remoteHost, final int port) {
String key = String.format(format, remoteHost, port);
return cachedClients.getIfPresent(key);
}

public Client createGet(String remoteHost, int port, ChannelHandler handler) {
Client client = createClient(remoteHost, port, handler);
if (client != null) {
String key = String.format(format, remoteHost, port);
cachedClients.put(key, client);
}
return client;

public Client get(final String remoteHost,final int port,final ChannelHandler handler) throws Exception{
String key = String.format(format, remoteHost, port);
Client client = cachedClients.get(key, new Callable<Client>() {
@Override
public Client call() throws Exception {
Client client = createClient(remoteHost, port, handler);
if(client!=null){
client.startHeartBeat();
}
return client;
}
});
if(client == null || !client.isConnected()){
cachedClients.invalidate(key);
return null;
}
return client;
}


abstract Client createClient(String remoteHost, int port, ChannelHandler handler);

public void remove(Client client) {
if (client != null) {
cachedClients.invalidate(client.getUri());
cachedClients.invalidate(client.getUrl());
LOGGER.warn(MessageFormat.format("[Remoting] {0} is removed", client));
}
}
Expand Down
@@ -1,95 +1,109 @@
package com.shinemo.mpush.netty.client;

import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

import com.shinemo.mpush.netty.codec.PacketDecoder;
import com.shinemo.mpush.netty.codec.PacketEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.shinemo.mpush.api.Client;
import com.shinemo.mpush.api.protocol.Packet;
import com.shinemo.mpush.netty.util.NettySharedHolder;
import com.shinemo.mpush.tools.config.ConfigCenter;

public class NettyClient implements Client {
private static final Logger LOGGER = LoggerFactory.getLogger(NettyClient.class);

private static final String format = "%s:%s";

private static final Logger log = LoggerFactory.getLogger(NettyClient.class);

private final ChannelHandler handler;
private final String host;
private final int port;
private Channel channel;
private int hbTimes;

public NettyClient(final String host, final int port, ChannelHandler handler) {
public NettyClient(final String host, final int port, Channel channel) {
this.host = host;
this.port = port;
this.handler = handler;
this.channel = channel;
}

@Override
public void init() {
this.stop();
EventLoopGroup workerGroup = new NioEventLoopGroup();
final Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)//
.option(ChannelOption.TCP_NODELAY, true)//
.option(ChannelOption.SO_REUSEADDR, true)//
.option(ChannelOption.SO_KEEPALIVE, true)//
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)//
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 4000);

bootstrap.handler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(PacketEncoder.INSTANCE);
ch.pipeline().addLast(handler);
}
});

ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
if (future.awaitUninterruptibly(4000) && future.isSuccess() && future.channel().isActive()) {
channel = future.channel();
} else {
future.cancel(true);
future.channel().close();
LOGGER.warn("[remoting] failure to connect:" + host + "," + port);
}
}
public void close(String cause) {
if (!StringUtils.isBlank(cause) && !"null".equals(cause.trim())) {
log.error("close channel:"+cause);
}
this.channel.close();
}

@Override
public void start() {
if (channel != null) {
try {
channel.closeFuture().sync();
} catch (InterruptedException e) {
}
}
}
@Override
public boolean isEnabled() {
return channel.isWritable();
}

@Override
public void stop() {
if (channel != null) {
channel.close();
}
}
@Override
public boolean isConnected() {
return channel.isActive();
}

@Override
public boolean isConnected() {
return channel.isActive();
}
@Override
public void resetHbTimes() {
hbTimes = 0;
}

@Override
public String getUri() {
return host + ":" + port;
}
@Override
public int inceaseAndGetHbTimes() {
return ++hbTimes;
}

@Override
public void startHeartBeat() throws Exception {
NettySharedHolder.HASHED_WHEEL_TIMER.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
try {
ChannelFuture channelFuture = channel.writeAndFlush(Packet.getHBPacket());
channelFuture.addListener(new ChannelFutureListener() {

@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
if (!channel.isActive()) {
log.warn("client send hb msg false:" + channel.remoteAddress().toString() + ",channel is not active");
}
log.warn("client send msg hb false:" + channel.remoteAddress().toString());
} else {
log.warn("client send msg hb success:" + channel.remoteAddress().toString());
}
}
});
} finally {
if (channel.isActive()) {
NettySharedHolder.HASHED_WHEEL_TIMER.newTimeout(this, ConfigCenter.holder.scanConnTaskCycle()/1000, TimeUnit.SECONDS);
}
}
}
}, ConfigCenter.holder.scanConnTaskCycle()/1000, TimeUnit.SECONDS);
}


@Override
public String getUrl() {
return String.format(format, host, port);
}

@Override
public String getHost() {
return host;
}

@Override
public int getPort() {
return port;
}

@Override
public ChannelHandler getHandler() {
return handler;
}
}

0 comments on commit 1b435c9

Please sign in to comment.