Skip to content

Commit

Permalink
add kick remote interface
Browse files Browse the repository at this point in the history
  • Loading branch information
闫逍旭 committed Jan 4, 2016
1 parent 315f4f4 commit 87f9f45
Show file tree
Hide file tree
Showing 13 changed files with 239 additions and 89 deletions.
@@ -0,0 +1,16 @@
package com.shinemo.mpush.api.event;

import com.shinemo.mpush.api.router.Router;

/**
* Created by ohun on 2016/1/4.
*/
public class RouterChangeEvent implements Event {
public final String userId;
public final Router<?> router;

public RouterChangeEvent(String userId, Router<?> router) {
this.userId = userId;
this.router = router;
}
}
Expand Up @@ -3,14 +3,7 @@
import com.shinemo.mpush.api.Client; import com.shinemo.mpush.api.Client;
import com.shinemo.mpush.api.PushSender; import com.shinemo.mpush.api.PushSender;
import com.shinemo.mpush.api.connection.Connection; import com.shinemo.mpush.api.connection.Connection;
import com.shinemo.mpush.api.router.ClientLocation;
import com.shinemo.mpush.common.message.gateway.GatewayPushMessage;
import com.shinemo.mpush.common.router.RemoteRouter;
import com.shinemo.mpush.common.router.RemoteRouterManager;
import com.shinemo.mpush.common.router.RouterCenter;
import com.shinemo.mpush.netty.client.NettyClientFactory; import com.shinemo.mpush.netty.client.NettyClientFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;


import java.util.Collection; import java.util.Collection;


Expand All @@ -28,7 +21,7 @@ public void init() throws Exception {
this.clientFactory = NettyClientFactory.INSTANCE; this.clientFactory = NettyClientFactory.INSTANCE;
} }


private Connection getConnection(String ip) { public Connection getConnection(String ip) {
try { try {
Client client = clientFactory.get(ip, port); Client client = clientFactory.get(ip, port);
if (client == null) { if (client == null) {
Expand Down Expand Up @@ -64,29 +57,5 @@ public void send(String content, Collection<String> userIds, Callback callback)
} }




public void send(String content, final String userId, final PushRequest callback) {
RemoteRouterManager remoteRouterManager = RouterCenter.INSTANCE.getRemoteRouterManager();
RemoteRouter router = remoteRouterManager.lookup(userId);
if (router == null) {
callback.onOffline(userId);
return;
}
ClientLocation location = router.getRouteValue();
Connection connection = getConnection(location.getHost());
if (connection == null || !connection.isConnected()) {
callback.onFailure(userId);
return;
}
GatewayPushMessage pushMessage = new GatewayPushMessage(userId, content, connection);
PushRequestBus.INSTANCE.register(pushMessage.getSessionId(), callback);
pushMessage.send(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
} else {
callback.onFailure(userId);
}
}
});
}
} }
@@ -1,18 +1,30 @@
package com.shinemo.mpush.client; package com.shinemo.mpush.client;


import com.shinemo.mpush.api.PushSender; import com.shinemo.mpush.api.PushSender;
import com.shinemo.mpush.api.connection.Connection;
import com.shinemo.mpush.api.router.ClientLocation;
import com.shinemo.mpush.common.message.gateway.GatewayPushMessage;
import com.shinemo.mpush.common.router.ConnectionRouterManager;
import com.shinemo.mpush.common.router.RemoteRouter;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/** /**
* Created by ohun on 2015/12/30. * Created by ohun on 2015/12/30.
*/ */
public class PushRequest implements PushSender.Callback, Runnable { public class PushRequest implements PushSender.Callback, Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(PushRequest.class);
private PushSender.Callback callback; private PushSender.Callback callback;
private String userId; private String userId;
private String content; private String content;
private long timeout; private long timeout;
private PushClient pushClient; private PushClient pushClient;
private int status = 0; private int status = 0;
private long timeout_; private long timeout_;
private int sessionId;
private long sendTime;


public PushRequest(PushClient pushClient) { public PushRequest(PushClient pushClient) {
this.pushClient = pushClient; this.pushClient = pushClient;
Expand Down Expand Up @@ -42,6 +54,14 @@ public PushRequest setTimeout(long timeout) {
return this; return this;
} }


public void setSessionId(int sessionId) {
this.sessionId = sessionId;
}

public int getSessionId() {
return sessionId;
}

@Override @Override
public void onSuccess(String userId) { public void onSuccess(String userId) {
submit(1); submit(1);
Expand All @@ -64,10 +84,11 @@ public void onTimeout(String userId) {


private void submit(int status) { private void submit(int status) {
this.status = status; this.status = status;
if (sessionId > 0) PushRequestBus.INSTANCE.remove(sessionId);
if (callback != null) { if (callback != null) {
PushRequestBus.INSTANCE.getExecutor().execute(this); PushRequestBus.INSTANCE.getExecutor().execute(this);
} else { } else {

LOGGER.warn("callback is null");
} }
} }


Expand All @@ -85,6 +106,10 @@ public void run() {
} }
} }


public boolean isTimeout() {
return System.currentTimeMillis() > timeout_;
}

public void timeout() { public void timeout() {
onTimeout(userId); onTimeout(userId);
} }
Expand All @@ -103,15 +128,40 @@ public void offline() {


public void send() { public void send() {
this.timeout_ = timeout + System.currentTimeMillis(); this.timeout_ = timeout + System.currentTimeMillis();
pushClient.send(content, userId, this); sendToConnectionServer();
} }


public void redirect() { public void redirect() {
this.timeout_ = timeout + System.currentTimeMillis(); this.timeout_ = timeout + System.currentTimeMillis();
pushClient.send(content, userId, this); ConnectionRouterManager.INSTANCE.invalidateLocalCache(userId);
sendToConnectionServer();
LOGGER.warn("user route has changed, userId={}, content={}", userId, content);
} }


public boolean isTimeout() { private void sendToConnectionServer() {
return System.currentTimeMillis() > timeout_; RemoteRouter router = ConnectionRouterManager.INSTANCE.lookup(userId);
if (router == null) {
this.onOffline(userId);
return;
}
ClientLocation location = router.getRouteValue();
Connection connection = pushClient.getConnection(location.getHost());
if (connection == null || !connection.isConnected()) {
this.onFailure(userId);
return;
}
GatewayPushMessage pushMessage = new GatewayPushMessage(userId, content, connection);
pushMessage.send(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
sendTime = System.currentTimeMillis();
} else {
PushRequest.this.onFailure(userId);
}
}
});
this.sessionId = pushMessage.getSessionId();
PushRequestBus.INSTANCE.add(this);
} }
} }
Expand Up @@ -16,8 +16,8 @@ public PushRequestBus() {
scheduledExecutor.scheduleAtFixedRate(this, 1, 3, TimeUnit.SECONDS); scheduledExecutor.scheduleAtFixedRate(this, 1, 3, TimeUnit.SECONDS);
} }


public void register(int reqId, PushRequest callback) { public void add(PushRequest request) {
requests.put(reqId, callback); requests.put(request.getSessionId(), request);
} }


public PushRequest remove(int reqId) { public PushRequest remove(int reqId) {
Expand Down
@@ -0,0 +1,49 @@
package com.shinemo.mpush.common.router;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

import java.util.concurrent.TimeUnit;

/**
* Created by ohun on 2016/1/4.
*/
public class ConnectionRouterManager extends RemoteRouterManager {
public static final ConnectionRouterManager INSTANCE = new ConnectionRouterManager();
// TODO: 2015/12/30 可以增加一层本地缓存,防止疯狂查询redis, 但是要注意失效问题及数据不一致问题
private final Cache<String, RemoteRouter> cache = CacheBuilder
.newBuilder()
.expireAfterWrite(5, TimeUnit.MINUTES)
.expireAfterAccess(5, TimeUnit.MINUTES)
.build();


@Override
public RemoteRouter register(String userId, RemoteRouter route) {
RemoteRouter old = cache.getIfPresent(userId);
cache.put(userId, route);
return old;
}

@Override
public boolean unRegister(String userId) {
cache.invalidate(userId);
return true;
}

@Override
public RemoteRouter lookup(String userId) {
return cache.getIfPresent(userId);
}

/**
* 如果推送失败,可能是缓存不一致了,可以让本地缓存失效
* <p>
* 失效对应的本地缓存
*
* @param userId
*/
public void invalidateLocalCache(String userId) {
cache.invalidate(userId);
}
}
@@ -1,49 +1,24 @@
package com.shinemo.mpush.common.router; package com.shinemo.mpush.common.router;


import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.shinemo.mpush.api.router.RouterManager; import com.shinemo.mpush.api.router.RouterManager;


import java.util.concurrent.TimeUnit;

/** /**
* Created by ohun on 2015/12/23. * Created by ohun on 2015/12/23.
*/ */
public class RemoteRouterManager implements RouterManager<RemoteRouter> { public class RemoteRouterManager implements RouterManager<RemoteRouter> {
// TODO: 2015/12/30 可以增加一层本地缓存,防止疯狂查询redis, 但是要注意失效问题及数据不一致问题
private final Cache<String, RemoteRouter> cache = CacheBuilder
.newBuilder()
.expireAfterWrite(5, TimeUnit.MINUTES)
.expireAfterAccess(5, TimeUnit.MINUTES)
.build();



@Override @Override
public RemoteRouter register(String userId, RemoteRouter route) { public RemoteRouter register(String userId, RemoteRouter route) {
RemoteRouter old = cache.getIfPresent(userId); return null;
cache.put(userId, route);
return old;
} }


@Override @Override
public boolean unRegister(String userId) { public boolean unRegister(String userId) {
cache.invalidate(userId);
return true; return true;
} }


@Override @Override
public RemoteRouter lookup(String userId) { public RemoteRouter lookup(String userId) {
return cache.getIfPresent(userId); return null;
}

/**
* 如果推送失败,可能是缓存不一致了,可以让本地缓存失效
* <p>
* 失效对应的本地缓存
*
* @param userId
*/
public void invalidateLocalCache(String userId) {
cache.invalidate(userId);
} }
} }
Expand Up @@ -8,7 +8,7 @@
import com.shinemo.mpush.common.message.OkMessage; import com.shinemo.mpush.common.message.OkMessage;
import com.shinemo.mpush.api.protocol.Packet; import com.shinemo.mpush.api.protocol.Packet;
import com.shinemo.mpush.common.handler.BaseMessageHandler; import com.shinemo.mpush.common.handler.BaseMessageHandler;
import com.shinemo.mpush.common.router.RouterCenter; import com.shinemo.mpush.core.router.RouterCenter;


/** /**
* Created by ohun on 2015/12/23. * Created by ohun on 2015/12/23.
Expand Down
Expand Up @@ -9,7 +9,7 @@
import com.shinemo.mpush.api.protocol.Packet; import com.shinemo.mpush.api.protocol.Packet;
import com.shinemo.mpush.api.router.Router; import com.shinemo.mpush.api.router.Router;
import com.shinemo.mpush.common.handler.BaseMessageHandler; import com.shinemo.mpush.common.handler.BaseMessageHandler;
import com.shinemo.mpush.common.router.RouterCenter; import com.shinemo.mpush.core.router.RouterCenter;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;


Expand Down
@@ -0,0 +1,19 @@
package com.shinemo.mpush.core.router;

/**
* Created by ohun on 2016/1/4.
*/
public class KickRemoteMsg {
public String userId;
public String deviceId;
public String srcServer;

@Override
public String toString() {
return "KickRemoteMsg{" +
"userId='" + userId + '\'' +
", deviceId='" + deviceId + '\'' +
", srcServer='" + srcServer + '\'' +
'}';
}
}
@@ -1,4 +1,4 @@
package com.shinemo.mpush.common.router; package com.shinemo.mpush.core.router;


import com.shinemo.mpush.api.connection.Connection; import com.shinemo.mpush.api.connection.Connection;
import com.shinemo.mpush.api.router.Router; import com.shinemo.mpush.api.router.Router;
Expand Down
@@ -1,4 +1,4 @@
package com.shinemo.mpush.common.router; package com.shinemo.mpush.core.router;


import com.shinemo.mpush.api.router.RouterManager; import com.shinemo.mpush.api.router.RouterManager;


Expand Down

0 comments on commit 87f9f45

Please sign in to comment.