Skip to content

Commit

Permalink
add push client
Browse files Browse the repository at this point in the history
  • Loading branch information
ohun committed Jan 7, 2016
1 parent 2c5d7a1 commit 2b18bbd
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 79 deletions.
70 changes: 43 additions & 27 deletions mpush-client/src/main/java/com/shinemo/mpush/client/PushClient.java
Expand Up @@ -4,43 +4,62 @@
import com.shinemo.mpush.api.PushSender;
import com.shinemo.mpush.api.connection.Connection;
import com.shinemo.mpush.netty.client.NettyClientFactory;
import com.shinemo.mpush.tools.ConfigCenter;
import com.shinemo.mpush.tools.Jsons;
import com.shinemo.mpush.tools.thread.ThreadPoolUtil;
import com.shinemo.mpush.tools.zk.PathEnum;
import com.shinemo.mpush.tools.zk.ServerApp;
import com.shinemo.mpush.tools.zk.ZkUtil;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.locks.LockSupport;

/**
* Created by ohun on 2015/12/30.
*/
public class PushClient implements PushSender {

public NettyClientFactory clientFactory;
private String host = "127.0.0.1";
private int port = 4000;
public NettyClientFactory clientFactory = NettyClientFactory.INSTANCE;
private int defaultTimeout = 3000;
private int port = 4000;

public void init() throws Exception {
this.clientFactory = NettyClientFactory.INSTANCE;
}

public Connection getConnection(String ip) {
public void init() {
try {
Client client = clientFactory.get(ip, port);
if (client == null) {
final Client client2 = clientFactory.createClient(ip,
port, new PushClientChannelHandler());
client2.init();
new Thread(new Runnable() {
@Override
public void run() {
client2.start();
}
}).start();
client = client2;
}
return ((PushClientChannelHandler) client.getHandler()).getConnection();
} catch (Exception e) {
ConfigCenter.INSTANCE.init();
} catch (IOException e) {
e.printStackTrace();
}
List<String> nodes = ZkUtil.instance.getChildrenKeys(PathEnum.GATEWAY_SERVER.getPath());
if (nodes == null || nodes.isEmpty()) return;
for (String name : nodes) {
String json = ZkUtil.instance.get(PathEnum.GATEWAY_SERVER.getPathByName(name));
ServerApp server = Jsons.fromJson(json, ServerApp.class);
if (server == null) continue;
createClient(server.getIp(), server.getPort());
}
}

private void createClient(String ip, int port) {
Client client = clientFactory.get(ip, port);
if (client == null) {
final Client cli = clientFactory.createGet(ip, port, new PushClientChannelHandler());
ThreadPoolUtil.newThread(new Runnable() {
@Override
public void run() {
cli.init();
cli.start();
}
}, "push-client-" + ip).start();
}
return null;
}

public Connection getConnection(String ip) {
Client client = clientFactory.get(ip, port);
if (client == null) return null;
return ((PushClientChannelHandler) client.getHandler()).getConnection();
}

@Override
Expand All @@ -55,7 +74,4 @@ public void send(String content, Collection<String> userIds, Callback callback)
.send();
}
}



}
Expand Up @@ -144,12 +144,14 @@ private void sendToConnectionServer() {
this.onOffline(userId);
return;
}

ClientLocation location = router.getRouteValue();
Connection connection = pushClient.getConnection(location.getHost());
if (connection == null || !connection.isConnected()) {
this.onFailure(userId);
return;
}

GatewayPushMessage pushMessage = new GatewayPushMessage(userId, content, connection);
pushMessage.send(new ChannelFutureListener() {
@Override
Expand All @@ -161,6 +163,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
}
}
});

this.sessionId = pushMessage.getSessionId();
PushRequestBus.INSTANCE.add(this);
}
Expand Down
@@ -0,0 +1,50 @@
package com.shinemo.mpush.client;

import com.shinemo.mpush.api.PushSender;
import org.junit.Test;

import java.util.Arrays;
import java.util.concurrent.locks.LockSupport;

import static org.junit.Assert.*;

/**
* Created by ohun on 2016/1/7.
*/
public class PushClientTest {

@Test
public void testSend() throws Exception {

}

public static void main(String[] args) throws InterruptedException {
PushClient client = new PushClient();
client.init();
Thread.sleep(5000);
client.send("this a first push", Arrays.asList("user-0", "user-1", "user-2", "user-3", "user-4"),
new PushSender.Callback() {
@Override
public void onSuccess(String userId) {
System.err.println("push onSuccess userId=" + userId);
}

@Override
public void onFailure(String userId) {
System.err.println("push onFailure userId=" + userId);
}

@Override
public void onOffline(String userId) {
System.err.println("push onOffline userId=" + userId);
}

@Override
public void onTimeout(String userId) {
System.err.println("push onTimeout userId=" + userId);
}
}
);
LockSupport.park();
}
}
8 changes: 8 additions & 0 deletions mpush-client/src/test/resources/config.properties
@@ -0,0 +1,8 @@
ZK_SERVER=10.1.20.74:2181
MAX_PACKET_SIZE=10240
COMPRESS_LIMIT=10240
MIN_HEARTBEAT=10000
MAX_HEARTBEAT=1800000
MAX_HB_TIMEOUT_TIMES=2
PRIVATE_KEY=MIIBNgIBADANBgkqhkiG9w0BAQEFAASCASAwggEcAgEAAoGBAKCE8JYKhsbydMPbiO7BJVq1pbuJWJHFxOR7L8Hv3ZVkSG4eNC8DdwAmDHYu/wadfw0ihKFm2gKDcLHp5yz5UQ8PZ8FyDYvgkrvGV0ak4nc40QDJWws621dm01e/INlGKOIStAAsxOityCLv0zm5Vf3+My/YaBvZcB5mGUsPbx8fAgEAAoGAAy0+WanRqwRHXUzt89OsupPXuNNqBlCEqgTqGAt4Nimq6Ur9u2R1KXKXUotxjp71Ubw6JbuUWvJg+5Rmd9RjT0HOUEQF3rvzEepKtaraPhV5ejEIrB+nJWNfGye4yzLdfEXJBGUQzrG+wNe13izfRNXI4dN/6Q5npzqaqv0E1CkCAQACAQACAQACAQACAQA=
PUBLIC_KEY=MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCghPCWCobG8nTD24juwSVataW7iViRxcTkey/B792VZEhuHjQvA3cAJgx2Lv8GnX8NIoShZtoCg3Cx6ecs+VEPD2fBcg2L4JK7xldGpOJ3ONEAyVsLOttXZtNXvyDZRijiErQALMTorcgi79M5uVX9/jMv2Ggb2XAeZhlLD28fHwIDAQAB
30 changes: 30 additions & 0 deletions mpush-client/src/test/resources/logback.xml
@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<target>System.out</target>
<encoding>UTF-8</encoding>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>DEBUG</level>
</filter>
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} - [%thread] %-5level - %logger{35} - %msg%n</pattern>
</layout>
</appender>

<appender name="STDERR" class="ch.qos.logback.core.ConsoleAppender">
<target>System.err</target>
<encoding>UTF-8</encoding>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>WARN</level>
</filter>
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} - [%thread] %-5level - %logger{35} - %msg%n</pattern>
</layout>
</appender>


<root>
<level value="INFO" />
<appender-ref ref="STDOUT" />
</root>
</configuration>
15 changes: 5 additions & 10 deletions mpush-core/src/main/java/com/shinemo/mpush/core/App.java
Expand Up @@ -9,6 +9,7 @@
import com.shinemo.mpush.tools.Jsons;
import com.shinemo.mpush.tools.redis.RedisGroup;
import com.shinemo.mpush.tools.redis.RedisNode;
import com.shinemo.mpush.tools.thread.ThreadPoolUtil;
import com.shinemo.mpush.tools.zk.PathEnum;
import com.shinemo.mpush.tools.zk.ServerApp;
import com.shinemo.mpush.tools.zk.ZkUtil;
Expand Down Expand Up @@ -40,7 +41,7 @@ private void init() throws IOException {
}

public void startConnectionServer() {
Thread t = new Thread(new Runnable() {
ThreadPoolUtil.newThread(new Runnable() {
@Override
public void run() {
final int port = ConfigCenter.INSTANCE.getConnectionServerPort();
Expand All @@ -60,14 +61,11 @@ public void onFailure(String message) {
}
});
}
});
t.setDaemon(false);
t.setName("conn-server-thread");
t.start();
}, "conn-server", false).start();
}

public void startGatewayServer() {
Thread t = new Thread(new Runnable() {
ThreadPoolUtil.newThread(new Runnable() {
@Override
public void run() {
final int port = ConfigCenter.INSTANCE.getGatewayServerPort();
Expand All @@ -87,10 +85,7 @@ public void onFailure(String message) {
}
});
}
});
t.setDaemon(false);
t.setName("gateway-server-thread");
t.start();
}, "gateway-server", false).start();
}

private void registerServerToZK(int port, PathEnum path) {
Expand Down
Expand Up @@ -49,7 +49,7 @@ public ClientChannelHandler() {
deviceId = "test-device-id-100" + new Random().nextInt(5);
}
if (userId == null) {
userId = "user_" + new Random().nextInt(5);
userId = "user-" + new Random().nextInt(5);
}
}

Expand Down
Expand Up @@ -32,7 +32,7 @@ public void testClient() throws Exception {
String json = ZkUtil.instance.get(PathEnum.CONNECTION_SERVER.getPathByName(name));
ServerApp server = Jsons.fromJson(json, ServerApp.class);
ClientChannelHandler handler = new ClientChannelHandler();
final Client client = NettyClientFactory.INSTANCE.get(server.getIp(), server.getPort(), handler);
final Client client = NettyClientFactory.INSTANCE.createGet(server.getIp(), server.getPort(), handler);
client.init();
Thread t = new Thread(new Runnable() {
@Override
Expand Down
Expand Up @@ -42,43 +42,29 @@ public void onRemoval(RemovalNotification<String, Client> notification) {
*
* @param remoteHost
* @param port
* @param handler
* @return
* @throws Exception
*/
public Client get(final String remoteHost, final int port, final ChannelHandler handler) throws Exception {
final String key = String.format(format, remoteHost, port);
Client client = cachedClients.get(key, new Callable<Client>() {
@Override
public Client call() throws Exception {
Client client = createClient(remoteHost, port, handler);
return client;
}
});
if (client == null) {
cachedClients.invalidate(key);
return null;
}
return client;
public Client get(final String remoteHost, final int port) {
String key = String.format(format, remoteHost, port);
return cachedClients.getIfPresent(key);
}

public Client get(final String remoteHost, final int port) throws Exception {
return get(remoteHost, port, null);
}


protected Client createClient(final String remoteHost, final int port) throws Exception {
return createClient(remoteHost, port, null);
public Client createGet(String remoteHost, int port, ChannelHandler handler) {
Client client = createClient(remoteHost, port, handler);
if (client != null) {
String key = String.format(format, remoteHost, port);
cachedClients.put(key, client);
}
return client;
}

public abstract Client createClient(final String remoteHost, final int port, ChannelHandler handler) throws Exception;

abstract Client createClient(String remoteHost, int port, ChannelHandler handler);

public void remove(Client client) {
if (client != null) {
cachedClients.invalidate(client.getUri());
LOGGER.warn(MessageFormat.format("[Remoting] {0} is removed", client));
}
}

}
Expand Up @@ -8,16 +8,9 @@

public class NettyClientFactory extends AbstractNettyClientFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientFactory.class);

public static final NettyClientFactory INSTANCE = new NettyClientFactory();

public Client createClient(final String host, final int port, final ChannelHandler handler) throws Exception {
Client createClient(String host, int port, ChannelHandler handler) {
return new NettyClient(host, port, handler);
}

public void remove(final Client client) {
super.remove(client);
}

}
Expand Up @@ -5,6 +5,7 @@
import java.util.Map;
import java.util.Set;

import com.shinemo.mpush.tools.thread.ThreadPoolUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -473,16 +474,14 @@ public static <T> void publish(RedisNode node, String channel, T message) {
}

public static void subscribe(Set<RedisNode> nodeList, final JedisPubSub pubsub, final String... channels) {
int i = 0;
for (final RedisNode node : nodeList) {
Thread t = new Thread(new Runnable() {
ThreadPoolUtil.newThread(new Runnable() {
@Override
public void run() {
subscribe(node, pubsub, channels);
}
});
t.setDaemon(true);
t.setName("redis-subscribe-thread");
t.start();
}, ("redis-subscribe-" + i++)).start();
}
}

Expand Down

0 comments on commit 2b18bbd

Please sign in to comment.