Skip to content

Commit

Permalink
add push client
Browse files Browse the repository at this point in the history
  • Loading branch information
闫逍旭 committed Jan 7, 2016
1 parent 4304ae5 commit 5253f3a
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 79 deletions.
70 changes: 43 additions & 27 deletions mpush-client/src/main/java/com/shinemo/mpush/client/PushClient.java
Expand Up @@ -4,43 +4,62 @@
import com.shinemo.mpush.api.PushSender; import com.shinemo.mpush.api.PushSender;
import com.shinemo.mpush.api.connection.Connection; import com.shinemo.mpush.api.connection.Connection;
import com.shinemo.mpush.netty.client.NettyClientFactory; import com.shinemo.mpush.netty.client.NettyClientFactory;
import com.shinemo.mpush.tools.ConfigCenter;
import com.shinemo.mpush.tools.Jsons;
import com.shinemo.mpush.tools.thread.ThreadPoolUtil;
import com.shinemo.mpush.tools.zk.PathEnum;
import com.shinemo.mpush.tools.zk.ServerApp;
import com.shinemo.mpush.tools.zk.ZkUtil;


import java.io.IOException;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.concurrent.locks.LockSupport;


/** /**
* Created by ohun on 2015/12/30. * Created by ohun on 2015/12/30.
*/ */
public class PushClient implements PushSender { public class PushClient implements PushSender {


public NettyClientFactory clientFactory; public NettyClientFactory clientFactory = NettyClientFactory.INSTANCE;
private String host = "127.0.0.1";
private int port = 4000;
private int defaultTimeout = 3000; private int defaultTimeout = 3000;
private int port = 4000;


public void init() throws Exception { public void init() {
this.clientFactory = NettyClientFactory.INSTANCE;
}

public Connection getConnection(String ip) {
try { try {
Client client = clientFactory.get(ip, port); ConfigCenter.INSTANCE.init();
if (client == null) { } catch (IOException e) {
final Client client2 = clientFactory.createClient(ip, e.printStackTrace();
port, new PushClientChannelHandler()); }
client2.init(); List<String> nodes = ZkUtil.instance.getChildrenKeys(PathEnum.GATEWAY_SERVER.getPath());
new Thread(new Runnable() { if (nodes == null || nodes.isEmpty()) return;
@Override for (String name : nodes) {
public void run() { String json = ZkUtil.instance.get(PathEnum.GATEWAY_SERVER.getPathByName(name));
client2.start(); ServerApp server = Jsons.fromJson(json, ServerApp.class);
} if (server == null) continue;
}).start(); createClient(server.getIp(), server.getPort());
client = client2; }
} }
return ((PushClientChannelHandler) client.getHandler()).getConnection();
} catch (Exception e) {


private void createClient(String ip, int port) {
Client client = clientFactory.get(ip, port);
if (client == null) {
final Client cli = clientFactory.createGet(ip, port, new PushClientChannelHandler());
ThreadPoolUtil.newThread(new Runnable() {
@Override
public void run() {
cli.init();
cli.start();
}
}, "push-client-" + ip).start();
} }
return null; }

public Connection getConnection(String ip) {
Client client = clientFactory.get(ip, port);
if (client == null) return null;
return ((PushClientChannelHandler) client.getHandler()).getConnection();
} }


@Override @Override
Expand All @@ -55,7 +74,4 @@ public void send(String content, Collection<String> userIds, Callback callback)
.send(); .send();
} }
} }



} }
Expand Up @@ -144,12 +144,14 @@ private void sendToConnectionServer() {
this.onOffline(userId); this.onOffline(userId);
return; return;
} }

ClientLocation location = router.getRouteValue(); ClientLocation location = router.getRouteValue();
Connection connection = pushClient.getConnection(location.getHost()); Connection connection = pushClient.getConnection(location.getHost());
if (connection == null || !connection.isConnected()) { if (connection == null || !connection.isConnected()) {
this.onFailure(userId); this.onFailure(userId);
return; return;
} }

GatewayPushMessage pushMessage = new GatewayPushMessage(userId, content, connection); GatewayPushMessage pushMessage = new GatewayPushMessage(userId, content, connection);
pushMessage.send(new ChannelFutureListener() { pushMessage.send(new ChannelFutureListener() {
@Override @Override
Expand All @@ -161,6 +163,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
} }
} }
}); });

this.sessionId = pushMessage.getSessionId(); this.sessionId = pushMessage.getSessionId();
PushRequestBus.INSTANCE.add(this); PushRequestBus.INSTANCE.add(this);
} }
Expand Down
@@ -0,0 +1,50 @@
package com.shinemo.mpush.client;

import com.shinemo.mpush.api.PushSender;
import org.junit.Test;

import java.util.Arrays;
import java.util.concurrent.locks.LockSupport;

import static org.junit.Assert.*;

/**
* Created by ohun on 2016/1/7.
*/
public class PushClientTest {

@Test
public void testSend() throws Exception {

}

public static void main(String[] args) throws InterruptedException {
PushClient client = new PushClient();
client.init();
Thread.sleep(5000);
client.send("this a first push", Arrays.asList("user-0", "user-1", "user-2", "user-3", "user-4"),
new PushSender.Callback() {
@Override
public void onSuccess(String userId) {
System.err.println("push onSuccess userId=" + userId);
}

@Override
public void onFailure(String userId) {
System.err.println("push onFailure userId=" + userId);
}

@Override
public void onOffline(String userId) {
System.err.println("push onOffline userId=" + userId);
}

@Override
public void onTimeout(String userId) {
System.err.println("push onTimeout userId=" + userId);
}
}
);
LockSupport.park();
}
}
8 changes: 8 additions & 0 deletions mpush-client/src/test/resources/config.properties
@@ -0,0 +1,8 @@
ZK_SERVER=10.1.20.74:2181
MAX_PACKET_SIZE=10240
COMPRESS_LIMIT=10240
MIN_HEARTBEAT=10000
MAX_HEARTBEAT=1800000
MAX_HB_TIMEOUT_TIMES=2
PRIVATE_KEY=MIIBNgIBADANBgkqhkiG9w0BAQEFAASCASAwggEcAgEAAoGBAKCE8JYKhsbydMPbiO7BJVq1pbuJWJHFxOR7L8Hv3ZVkSG4eNC8DdwAmDHYu/wadfw0ihKFm2gKDcLHp5yz5UQ8PZ8FyDYvgkrvGV0ak4nc40QDJWws621dm01e/INlGKOIStAAsxOityCLv0zm5Vf3+My/YaBvZcB5mGUsPbx8fAgEAAoGAAy0+WanRqwRHXUzt89OsupPXuNNqBlCEqgTqGAt4Nimq6Ur9u2R1KXKXUotxjp71Ubw6JbuUWvJg+5Rmd9RjT0HOUEQF3rvzEepKtaraPhV5ejEIrB+nJWNfGye4yzLdfEXJBGUQzrG+wNe13izfRNXI4dN/6Q5npzqaqv0E1CkCAQACAQACAQACAQACAQA=
PUBLIC_KEY=MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCghPCWCobG8nTD24juwSVataW7iViRxcTkey/B792VZEhuHjQvA3cAJgx2Lv8GnX8NIoShZtoCg3Cx6ecs+VEPD2fBcg2L4JK7xldGpOJ3ONEAyVsLOttXZtNXvyDZRijiErQALMTorcgi79M5uVX9/jMv2Ggb2XAeZhlLD28fHwIDAQAB
30 changes: 30 additions & 0 deletions mpush-client/src/test/resources/logback.xml
@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<target>System.out</target>
<encoding>UTF-8</encoding>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>DEBUG</level>
</filter>
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} - [%thread] %-5level - %logger{35} - %msg%n</pattern>
</layout>
</appender>

<appender name="STDERR" class="ch.qos.logback.core.ConsoleAppender">
<target>System.err</target>
<encoding>UTF-8</encoding>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>WARN</level>
</filter>
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} - [%thread] %-5level - %logger{35} - %msg%n</pattern>
</layout>
</appender>


<root>
<level value="INFO" />
<appender-ref ref="STDOUT" />
</root>
</configuration>
15 changes: 5 additions & 10 deletions mpush-core/src/main/java/com/shinemo/mpush/core/App.java
Expand Up @@ -9,6 +9,7 @@
import com.shinemo.mpush.tools.Jsons; import com.shinemo.mpush.tools.Jsons;
import com.shinemo.mpush.tools.redis.RedisGroup; import com.shinemo.mpush.tools.redis.RedisGroup;
import com.shinemo.mpush.tools.redis.RedisNode; import com.shinemo.mpush.tools.redis.RedisNode;
import com.shinemo.mpush.tools.thread.ThreadPoolUtil;
import com.shinemo.mpush.tools.zk.PathEnum; import com.shinemo.mpush.tools.zk.PathEnum;
import com.shinemo.mpush.tools.zk.ServerApp; import com.shinemo.mpush.tools.zk.ServerApp;
import com.shinemo.mpush.tools.zk.ZkUtil; import com.shinemo.mpush.tools.zk.ZkUtil;
Expand Down Expand Up @@ -40,7 +41,7 @@ private void init() throws IOException {
} }


public void startConnectionServer() { public void startConnectionServer() {
Thread t = new Thread(new Runnable() { ThreadPoolUtil.newThread(new Runnable() {
@Override @Override
public void run() { public void run() {
final int port = ConfigCenter.INSTANCE.getConnectionServerPort(); final int port = ConfigCenter.INSTANCE.getConnectionServerPort();
Expand All @@ -60,14 +61,11 @@ public void onFailure(String message) {
} }
}); });
} }
}); }, "conn-server", false).start();
t.setDaemon(false);
t.setName("conn-server-thread");
t.start();
} }


public void startGatewayServer() { public void startGatewayServer() {
Thread t = new Thread(new Runnable() { ThreadPoolUtil.newThread(new Runnable() {
@Override @Override
public void run() { public void run() {
final int port = ConfigCenter.INSTANCE.getGatewayServerPort(); final int port = ConfigCenter.INSTANCE.getGatewayServerPort();
Expand All @@ -87,10 +85,7 @@ public void onFailure(String message) {
} }
}); });
} }
}); }, "gateway-server", false).start();
t.setDaemon(false);
t.setName("gateway-server-thread");
t.start();
} }


private void registerServerToZK(int port, PathEnum path) { private void registerServerToZK(int port, PathEnum path) {
Expand Down
Expand Up @@ -49,7 +49,7 @@ public ClientChannelHandler() {
deviceId = "test-device-id-100" + new Random().nextInt(5); deviceId = "test-device-id-100" + new Random().nextInt(5);
} }
if (userId == null) { if (userId == null) {
userId = "user_" + new Random().nextInt(5); userId = "user-" + new Random().nextInt(5);
} }
} }


Expand Down
Expand Up @@ -32,7 +32,7 @@ public void testClient() throws Exception {
String json = ZkUtil.instance.get(PathEnum.CONNECTION_SERVER.getPathByName(name)); String json = ZkUtil.instance.get(PathEnum.CONNECTION_SERVER.getPathByName(name));
ServerApp server = Jsons.fromJson(json, ServerApp.class); ServerApp server = Jsons.fromJson(json, ServerApp.class);
ClientChannelHandler handler = new ClientChannelHandler(); ClientChannelHandler handler = new ClientChannelHandler();
final Client client = NettyClientFactory.INSTANCE.get(server.getIp(), server.getPort(), handler); final Client client = NettyClientFactory.INSTANCE.createGet(server.getIp(), server.getPort(), handler);
client.init(); client.init();
Thread t = new Thread(new Runnable() { Thread t = new Thread(new Runnable() {
@Override @Override
Expand Down
Expand Up @@ -42,43 +42,29 @@ public void onRemoval(RemovalNotification<String, Client> notification) {
* *
* @param remoteHost * @param remoteHost
* @param port * @param port
* @param handler
* @return * @return
* @throws Exception * @throws Exception
*/ */
public Client get(final String remoteHost, final int port, final ChannelHandler handler) throws Exception { public Client get(final String remoteHost, final int port) {
final String key = String.format(format, remoteHost, port); String key = String.format(format, remoteHost, port);
Client client = cachedClients.get(key, new Callable<Client>() { return cachedClients.getIfPresent(key);
@Override
public Client call() throws Exception {
Client client = createClient(remoteHost, port, handler);
return client;
}
});
if (client == null) {
cachedClients.invalidate(key);
return null;
}
return client;
} }


public Client get(final String remoteHost, final int port) throws Exception { public Client createGet(String remoteHost, int port, ChannelHandler handler) {
return get(remoteHost, port, null); Client client = createClient(remoteHost, port, handler);
} if (client != null) {

String key = String.format(format, remoteHost, port);

cachedClients.put(key, client);
protected Client createClient(final String remoteHost, final int port) throws Exception { }
return createClient(remoteHost, port, null); return client;
} }


public abstract Client createClient(final String remoteHost, final int port, ChannelHandler handler) throws Exception; abstract Client createClient(String remoteHost, int port, ChannelHandler handler);



public void remove(Client client) { public void remove(Client client) {
if (client != null) { if (client != null) {
cachedClients.invalidate(client.getUri()); cachedClients.invalidate(client.getUri());
LOGGER.warn(MessageFormat.format("[Remoting] {0} is removed", client)); LOGGER.warn(MessageFormat.format("[Remoting] {0} is removed", client));
} }
} }

} }
Expand Up @@ -8,16 +8,9 @@


public class NettyClientFactory extends AbstractNettyClientFactory { public class NettyClientFactory extends AbstractNettyClientFactory {


private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientFactory.class);

public static final NettyClientFactory INSTANCE = new NettyClientFactory(); public static final NettyClientFactory INSTANCE = new NettyClientFactory();


public Client createClient(final String host, final int port, final ChannelHandler handler) throws Exception { Client createClient(String host, int port, ChannelHandler handler) {
return new NettyClient(host, port, handler); return new NettyClient(host, port, handler);
} }

public void remove(final Client client) {
super.remove(client);
}

} }
Expand Up @@ -5,6 +5,7 @@
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;


import com.shinemo.mpush.tools.thread.ThreadPoolUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down Expand Up @@ -473,16 +474,14 @@ public static <T> void publish(RedisNode node, String channel, T message) {
} }


public static void subscribe(Set<RedisNode> nodeList, final JedisPubSub pubsub, final String... channels) { public static void subscribe(Set<RedisNode> nodeList, final JedisPubSub pubsub, final String... channels) {
int i = 0;
for (final RedisNode node : nodeList) { for (final RedisNode node : nodeList) {
Thread t = new Thread(new Runnable() { ThreadPoolUtil.newThread(new Runnable() {
@Override @Override
public void run() { public void run() {
subscribe(node, pubsub, channels); subscribe(node, pubsub, channels);
} }
}); }, ("redis-subscribe-" + i++)).start();
t.setDaemon(true);
t.setName("redis-subscribe-thread");
t.start();
} }
} }


Expand Down

0 comments on commit 5253f3a

Please sign in to comment.