Skip to content

Commit

Permalink
调整push server
Browse files Browse the repository at this point in the history
  • Loading branch information
黄志磊 committed Jan 17, 2016
1 parent 9fe7d6f commit 03d36b1
Show file tree
Hide file tree
Showing 13 changed files with 496 additions and 509 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.shinemo.mpush.common;

import java.util.List;


import com.google.common.collect.Lists;
import com.shinemo.mpush.tools.Jsons;
import com.shinemo.mpush.tools.config.ConfigCenter;
import com.shinemo.mpush.tools.redis.RedisGroup;
import com.shinemo.mpush.tools.spi.ServiceContainer;
import com.shinemo.mpush.tools.zk.ZKPath;
import com.shinemo.mpush.tools.zk.ZkRegister;
import com.shinemo.mpush.tools.zk.listener.DataChangeListener;
import com.shinemo.mpush.tools.zk.listener.impl.RedisPathListener;

public abstract class AbstractClient {

protected List<DataChangeListener> dataChangeListeners = Lists.newArrayList();

protected ZkRegister zkRegister = ServiceContainer.getInstance(ZkRegister.class);

public AbstractClient() {
registerListener(new RedisPathListener());
}


public void registerListener(DataChangeListener listener){
dataChangeListeners.add(listener);
}

//step1 启动 zk
private void initZK(){
zkRegister.init();
}

//step2 获取redis
private void initRedis(){
boolean exist = zkRegister.isExisted(ZKPath.REDIS_SERVER.getPath());
if (!exist) {
List<RedisGroup> groupList = ConfigCenter.holder.redisGroups();
zkRegister.registerPersist(ZKPath.REDIS_SERVER.getPath(), Jsons.toJson(groupList));
}
}

//step3 注册listener
private void registerListeners(){
for(DataChangeListener listener:dataChangeListeners){
zkRegister.registerListener(listener);
}
}

//step4 初始化 listener data
private void initListenerData(){
for(DataChangeListener listener:dataChangeListeners){
listener.initData();
}
}

public void start(){
initZK();
initRedis();
registerListeners();
initListenerData();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ public void start(){
registerServerToZk(application.getServerRegisterZkPath(),Jsons.toJson(application));
}

public void startClient(){
initZK();
initRedis();
registerListeners();
initListenerData();
}

public void stopServer(Server server){
if(server!=null){
server.stop(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,8 @@ private void printList(){
}
}

public GatewayServerApplication get(String fullpath){
return holder.get(fullpath);
}

}
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package com.shinemo.mpush.netty.client;

import io.netty.channel.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.shinemo.mpush.api.Client;

public class NettyClientFactory extends AbstractNettyClientFactory {

public static final NettyClientFactory INSTANCE = new NettyClientFactory();

Client createClient(String host, int port, ChannelHandler handler) {
public Client createClient(String host, int port, ChannelHandler handler) {
return new NettyClient(host, port, handler);
}
}
Original file line number Diff line number Diff line change
@@ -1,64 +1,63 @@
//package com.shinemo.mpush.ps;
//
//import com.shinemo.mpush.api.connection.Connection;
//import com.shinemo.mpush.api.protocol.Command;
//import com.shinemo.mpush.api.protocol.Packet;
//import com.shinemo.mpush.common.ErrorCode;
//import com.shinemo.mpush.common.message.ErrorMessage;
//import com.shinemo.mpush.netty.connection.NettyConnection;
//import io.netty.channel.ChannelHandlerAdapter;
//import io.netty.channel.ChannelHandlerContext;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//
//import static com.shinemo.mpush.common.ErrorCode.OFFLINE;
//import static com.shinemo.mpush.common.ErrorCode.PUSH_CLIENT_FAILURE;
//import static com.shinemo.mpush.common.ErrorCode.ROUTER_CHANGE;
//
///**
// * Created by ohun on 2015/12/30.
// */
//public class GatewayClientChannelHandler extends ChannelHandlerAdapter {
// private static final Logger LOGGER = LoggerFactory.getLogger(GatewayClientChannelHandler.class);
// private final Connection connection = new NettyConnection();
//
// @Override
// public void channelActive(ChannelHandlerContext ctx) throws Exception {
// connection.init(ctx.channel(), false);
// }
//
// @Override
// public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// connection.close();
// }
//
// @Override
// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// if (msg instanceof Packet) {
// Packet packet = ((Packet) msg);
// PushRequest request = PushRequestBus.INSTANCE.remove(packet.sessionId);
// if (request == null) {
// LOGGER.warn("receive a gateway response, but request timeout. packet={}", packet);
// return;
// }
//
// if (packet.cmd == Command.OK.cmd) {
// request.success();
// } else {
// ErrorMessage message = new ErrorMessage(packet, connection);
// if (message.code == OFFLINE.errorCode) {
// request.offline();
// } else if (message.code == PUSH_CLIENT_FAILURE.errorCode) {
// request.failure();
// } else if (message.code == ROUTER_CHANGE.errorCode) {
// request.redirect();
// }
// LOGGER.warn("receive an error gateway response, message={}", message);
// }
// }
// }
//
// public Connection getConnection() {
// return connection;
// }
//}
package com.shinemo.mpush.ps;

import com.shinemo.mpush.api.connection.Connection;
import com.shinemo.mpush.api.protocol.Command;
import com.shinemo.mpush.api.protocol.Packet;
import com.shinemo.mpush.common.message.ErrorMessage;
import com.shinemo.mpush.netty.connection.NettyConnection;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.shinemo.mpush.common.ErrorCode.OFFLINE;
import static com.shinemo.mpush.common.ErrorCode.PUSH_CLIENT_FAILURE;
import static com.shinemo.mpush.common.ErrorCode.ROUTER_CHANGE;

/**
* Created by ohun on 2015/12/30.
*/
public class GatewayClientChannelHandler extends ChannelHandlerAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(GatewayClientChannelHandler.class);
private final Connection connection = new NettyConnection();

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
connection.init(ctx.channel(), false);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
connection.close();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof Packet) {
Packet packet = ((Packet) msg);
PushRequest request = PushRequestBus.INSTANCE.remove(packet.sessionId);
if (request == null) {
LOGGER.warn("receive a gateway response, but request timeout. packet={}", packet);
return;
}

if (packet.cmd == Command.OK.cmd) {
request.success();
} else {
ErrorMessage message = new ErrorMessage(packet, connection);
if (message.code == OFFLINE.errorCode) {
request.offline();
} else if (message.code == PUSH_CLIENT_FAILURE.errorCode) {
request.failure();
} else if (message.code == ROUTER_CHANGE.errorCode) {
request.redirect();
}
LOGGER.warn("receive an error gateway response, message={}", message);
}
}
}

public Connection getConnection() {
return connection;
}
}
56 changes: 23 additions & 33 deletions mpush-ps/src/main/java/com/shinemo/mpush/ps/GatewayClientMain.java
Original file line number Diff line number Diff line change
@@ -1,33 +1,23 @@
//package com.shinemo.mpush.ps;
//
//
//
//import com.shinemo.mpush.api.Server;
//import com.shinemo.mpush.common.AbstractServer;
//import com.shinemo.mpush.common.app.impl.GatewayServerApplication;
//import com.shinemo.mpush.tools.Jsons;
//
//public class GatewayClientMain extends AbstractServer<GatewayServerApplication>{
//
//
// private GatewayServerApplication gatewayServerApplication;
//
// public GatewayClientMain(){
//
// registerListener(new GatewayServerPathListener());
//
// connectionServerApplication = (ConnectionServerApplication)application;
// gatewayServerApplication = new GatewayServerApplication();
// connectionServerApplication.setGatewayServerApplication(gatewayServerApplication);
// gatewayServer = new GatewayServer(gatewayServerApplication.getPort());
// }
//
// @Override
// public Server getServer() {
// final int port = application.getPort();
// ConnectionServer connectionServer = new ConnectionServer(port);
// return connectionServer;
// }
//
//
//}
package com.shinemo.mpush.ps;

import java.util.Collection;

import com.shinemo.mpush.api.PushSender.Callback;
import com.shinemo.mpush.common.AbstractClient;
import com.shinemo.mpush.common.zk.listener.impl.GatewayServerPathListener;

public class GatewayClientMain extends AbstractClient {

private static final int defaultTimeout = 3000;

public GatewayClientMain() {
registerListener(new GatewayServerPathListener());
}

public void send(String content, Collection<String> userIds, Callback callback) {
for (String userId : userIds) {
PushRequest.build().setCallback(callback).setUserId(userId).setContent(content).setTimeout(defaultTimeout).send();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.shinemo.mpush.ps;

import java.util.Map;

import com.google.common.collect.Maps;
import com.shinemo.mpush.api.Client;
import com.shinemo.mpush.api.connection.Connection;
import com.shinemo.mpush.common.app.impl.GatewayServerApplication;
import com.shinemo.mpush.common.manage.impl.GatewayServerManage;
import com.shinemo.mpush.netty.client.NettyClientFactory;

public class GatewayClientManage extends GatewayServerManage{

private final Map<GatewayServerApplication, Client> application2Client = Maps.newConcurrentMap();

private final Map<String,Client> ip2Client = Maps.newConcurrentMap();

@Override
public void addOrUpdate(String fullPath, GatewayServerApplication application) {
super.addOrUpdate(fullPath, application);
Client client = NettyClientFactory.INSTANCE.createClient(application.getIp(), application.getPort(),new GatewayClientChannelHandler());
application2Client.put(application, client);
ip2Client.put(application.getIp()+":"+application.getPort(), client);
}


@Override
public void remove(String fullPath) {
GatewayServerApplication application = super.get(fullPath);
super.remove(fullPath);

if(application!=null){
Client client = application2Client.get(application);
if(client!=null){
client.stop();
}
}
ip2Client.remove(application.getIp()+":"+application.getPort());
}

public Client getClient(GatewayServerApplication application){
return application2Client.get(application);
}

public Connection getConnection(String ipAndPort) {
Client client = ip2Client.get(ipAndPort);
if (client == null) return null;
return ((GatewayClientChannelHandler) client.getHandler()).getConnection();
}


}

0 comments on commit 03d36b1

Please sign in to comment.