Skip to content

Commit

Permalink
增加PushCenter消息流转时间线, 方便监控消息的各个生命周期的耗时PushClient -> GatewayClient -> G…
Browse files Browse the repository at this point in the history
…atewayServer -> PushCenter -> ConnServer -> Client
  • Loading branch information
夜色 committed Dec 29, 2016
1 parent e7e6c8c commit d0eb910
Show file tree
Hide file tree
Showing 13 changed files with 340 additions and 149 deletions.
16 changes: 8 additions & 8 deletions mpush-api/src/main/java/com/mpush/api/spi/push/PushListener.java
Expand Up @@ -8,47 +8,47 @@ public interface PushListener<T extends IPushMessage> {
* *
* @param message 要下发的消息 * @param message 要下发的消息
*/ */
void onSuccess(T message); void onSuccess(T message, Object[] timePoints);


/** /**
* 收到客户端ACK后回调 * 收到客户端ACK后回调
* *
* @param message 要下发的消息 * @param message 要下发的消息
*/ */
void onAckSuccess(T message); void onAckSuccess(T message, Object[] timePoints);


/** /**
* 广播消息推送全部结束后回调 * 广播消息推送全部结束后回调
* *
* @param message 要下发的消息 * @param message 要下发的消息
*/ */
void onBroadcastComplete(T message); void onBroadcastComplete(T message, Object[] timePoints);


/** /**
* 消息下发失败后回调 * 消息下发失败后回调
* *
* @param message 要下发的消息 * @param message 要下发的消息
*/ */
void onFailure(T message); void onFailure(T message, Object[] timePoints);


/** /**
* 推送消息发现用户不在线时回调 * 推送消息发现用户不在线时回调
* *
* @param message 要下发的消息 * @param message 要下发的消息
*/ */
void onOffline(T message); void onOffline(T message, Object[] timePoints);


/** /**
* 推送消息发现用户不在当前机器时回调 * 推送消息发现用户不在当前机器时回调
* *
* @param message 要下发的消息 * @param message 要下发的消息
*/ */
void onRedirect(T message); void onRedirect(T message, Object[] timePoints);


/** /**
* 等待客户端ACK超时时回调 * 发送消息超时或等待客户端ACK超时时回调
* *
* @param message 要下发的消息 * @param message 要下发的消息
*/ */
void onAckTimeout(T message); void onTimeout(T message, Object[] timePoints);
} }
Expand Up @@ -55,11 +55,11 @@ public void handle(ErrorMessage message) {


Logs.PUSH.warn("receive an error gateway response, message={}", message); Logs.PUSH.warn("receive an error gateway response, message={}", message);
if (message.code == OFFLINE.errorCode) {//用户离线 if (message.code == OFFLINE.errorCode) {//用户离线
request.offline(); request.onOffline();
} else if (message.code == PUSH_CLIENT_FAILURE.errorCode) {//下发到客户端失败 } else if (message.code == PUSH_CLIENT_FAILURE.errorCode) {//下发到客户端失败
request.failure(); request.onFailure();
} else if (message.code == ROUTER_CHANGE.errorCode) {//用户路由信息更改 } else if (message.code == ROUTER_CHANGE.errorCode) {//用户路由信息更改
request.redirect(); request.onRedirect();
} }
} }
} }
Expand Down
Expand Up @@ -26,6 +26,7 @@
import com.mpush.client.push.PushRequestBus; import com.mpush.client.push.PushRequestBus;
import com.mpush.common.handler.BaseMessageHandler; import com.mpush.common.handler.BaseMessageHandler;
import com.mpush.common.message.OkMessage; import com.mpush.common.message.OkMessage;
import com.mpush.common.push.GatewayPushResult;
import com.mpush.tools.log.Logs; import com.mpush.tools.log.Logs;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand All @@ -50,7 +51,7 @@ public void handle(OkMessage message) {
Logs.PUSH.warn("receive a gateway response, but request has timeout. message={}", message); Logs.PUSH.warn("receive a gateway response, but request has timeout. message={}", message);
return; return;
} }
request.success();//推送成功 request.onSuccess(GatewayPushResult.fromJson(message.data));//推送成功
} }
} }
} }
Expand Up @@ -46,7 +46,7 @@ private FutureTask<PushResult> send0(PushContext ctx) {
} else { } else {
Set<RemoteRouter> remoteRouters = CachedRemoteRouterManager.I.lookupAll(ctx.getUserId()); Set<RemoteRouter> remoteRouters = CachedRemoteRouterManager.I.lookupAll(ctx.getUserId());
if (remoteRouters == null || remoteRouters.isEmpty()) { if (remoteRouters == null || remoteRouters.isEmpty()) {
return PushRequest.build(factory, ctx).offline(); return PushRequest.build(factory, ctx).onOffline();
} }
FutureTask<PushResult> task = null; FutureTask<PushResult> task = null;
for (RemoteRouter remoteRouter : remoteRouters) { for (RemoteRouter remoteRouter : remoteRouters) {
Expand Down
52 changes: 33 additions & 19 deletions mpush-client/src/main/java/com/mpush/client/push/PushRequest.java
Expand Up @@ -24,6 +24,7 @@
import com.mpush.api.router.ClientLocation; import com.mpush.api.router.ClientLocation;
import com.mpush.client.gateway.connection.GatewayConnectionFactory; import com.mpush.client.gateway.connection.GatewayConnectionFactory;
import com.mpush.common.message.gateway.GatewayPushMessage; import com.mpush.common.message.gateway.GatewayPushMessage;
import com.mpush.common.push.GatewayPushResult;
import com.mpush.common.router.CachedRemoteRouterManager; import com.mpush.common.router.CachedRemoteRouterManager;
import com.mpush.common.router.RemoteRouter; import com.mpush.common.router.RemoteRouter;
import com.mpush.tools.Jsons; import com.mpush.tools.Jsons;
Expand Down Expand Up @@ -165,22 +166,6 @@ public FutureTask<PushResult> send(RemoteRouter router) {
return this; return this;
} }


public void redirect() {
timeLine.addTimePoint("redirect");
LOGGER.warn("user route has changed, userId={}, location={}", userId, location);
CachedRemoteRouterManager.I.invalidateLocalCache(userId);
if (status.get() == Status.init) {//表示任务还没有完成,还可以重新发送
RemoteRouter remoteRouter = CachedRemoteRouterManager.I.lookup(userId, location.getClientType());
send(remoteRouter);
}
}

public FutureTask<PushResult> offline() {
CachedRemoteRouterManager.I.invalidateLocalCache(userId);
submit(Status.offline);
return this;
}

public FutureTask<PushResult> broadcast() { public FutureTask<PushResult> broadcast() {
timeLine.begin(); timeLine.begin();


Expand Down Expand Up @@ -220,20 +205,49 @@ public FutureTask<PushResult> broadcast() {
return this; return this;
} }


public void timeout() { private void offline() {
CachedRemoteRouterManager.I.invalidateLocalCache(userId);
submit(Status.offline);
}

private void timeout() {
if (PushRequestBus.I.getAndRemove(sessionId) != null) { if (PushRequestBus.I.getAndRemove(sessionId) != null) {
submit(Status.timeout); submit(Status.timeout);
} }
} }


public void success() { private void success() {
submit(Status.success); submit(Status.success);
} }


public void failure() { private void failure() {
submit(Status.failure); submit(Status.failure);
} }


public void onFailure() {
failure();
}

public void onRedirect() {
timeLine.addTimePoint("redirect");
LOGGER.warn("user route has changed, userId={}, location={}", userId, location);
CachedRemoteRouterManager.I.invalidateLocalCache(userId);
if (status.get() == Status.init) {//表示任务还没有完成,还可以重新发送
RemoteRouter remoteRouter = CachedRemoteRouterManager.I.lookup(userId, location.getClientType());
send(remoteRouter);
}
}

public FutureTask<PushResult> onOffline() {
offline();
return this;
}

public void onSuccess(GatewayPushResult result) {
if (result != null) timeLine.addTimePoints(result.timePoints);
submit(Status.success);
}

public long getTimeout() { public long getTimeout() {
return timeout; return timeout;
} }
Expand Down
Expand Up @@ -30,6 +30,7 @@


import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;


/** /**
Expand Down Expand Up @@ -156,7 +157,7 @@ public BaseMessage setRecipient(InetSocketAddress recipient) {
return this; return this;
} }


public ExecutorService getExecutor() { public ScheduledExecutorService getExecutor() {
return connection.getChannel().eventLoop(); return connection.getChannel().eventLoop();
} }


Expand Down
@@ -0,0 +1,49 @@
/*
* (C) Copyright 2015-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Contributors:
* ohun@live.cn (夜色)
*/

package com.mpush.common.push;

import com.mpush.common.message.gateway.GatewayPushMessage;
import com.mpush.tools.Jsons;

/**
* Created by ohun on 2016/12/29.
*
* @author ohun@live.cn (夜色)
*/
public final class GatewayPushResult {
public String userId;
public Integer clientType;
public Object[] timePoints;

public GatewayPushResult(String userId, Integer clientType, Object[] timePoints) {
this.userId = userId;
this.timePoints = timePoints;
if (clientType > 0) this.clientType = clientType;
}

public static String toJson(GatewayPushMessage message, Object[] timePoints) {
return Jsons.toJson(new GatewayPushResult(message.userId, message.clientType, timePoints));
}

public static GatewayPushResult fromJson(String json) {
if (json == null) return null;
return Jsons.fromJson(json, GatewayPushResult.class);
}
}
14 changes: 7 additions & 7 deletions mpush-core/src/main/java/com/mpush/core/mq/MQPushListener.java
Expand Up @@ -38,43 +38,43 @@ public MQPushListener() {
} }


@Override @Override
public void onSuccess(MQPushMessage message) { public void onSuccess(MQPushMessage message, Object[] timePoints) {
//publish messageId to mq:[success/queue] //publish messageId to mq:[success/queue]
mqClient.publish("/mpush/push/success", message); mqClient.publish("/mpush/push/success", message);
} }


@Override @Override
public void onAckSuccess(MQPushMessage message) { public void onAckSuccess(MQPushMessage message, Object[] timePoints) {
//publish messageId to mq:[success/queue] //publish messageId to mq:[success/queue]
mqClient.publish("/mpush/push/success", message); mqClient.publish("/mpush/push/success", message);
} }


@Override @Override
public void onBroadcastComplete(MQPushMessage message) { public void onBroadcastComplete(MQPushMessage message, Object[] timePoints) {
//publish messageId to mq:[broadcast/finish/queue] //publish messageId to mq:[broadcast/finish/queue]
mqClient.publish("/mpush/push/broadcast_finish", message); mqClient.publish("/mpush/push/broadcast_finish", message);
} }


@Override @Override
public void onFailure(MQPushMessage message) { public void onFailure(MQPushMessage message, Object[] timePoints) {
//publish messageId to mq:[failure/queue], client can retry //publish messageId to mq:[failure/queue], client can retry
mqClient.publish("/mpush/push/failure", message); mqClient.publish("/mpush/push/failure", message);
} }


@Override @Override
public void onOffline(MQPushMessage message) { public void onOffline(MQPushMessage message, Object[] timePoints) {
//publish messageId to mq:[offline/queue], client persist offline message to db //publish messageId to mq:[offline/queue], client persist offline message to db
mqClient.publish("/mpush/push/offline", message); mqClient.publish("/mpush/push/offline", message);
} }


@Override @Override
public void onRedirect(MQPushMessage message) { public void onRedirect(MQPushMessage message, Object[] timePoints) {
//publish messageId to mq:[route/change/queue], client should be try again //publish messageId to mq:[route/change/queue], client should be try again
mqClient.publish("/mpush/push/route_change", message); mqClient.publish("/mpush/push/route_change", message);
} }


@Override @Override
public void onAckTimeout(MQPushMessage message) { public void onTimeout(MQPushMessage message, Object[] timePoints) {
//publish messageId to mq:[ack/timeout/queue], client can retry //publish messageId to mq:[ack/timeout/queue], client can retry
mqClient.publish("/mpush/push/ack_timeout", message); mqClient.publish("/mpush/push/ack_timeout", message);
} }
Expand Down
Expand Up @@ -30,6 +30,7 @@
import com.mpush.common.qps.OverFlowException; import com.mpush.common.qps.OverFlowException;
import com.mpush.core.router.LocalRouter; import com.mpush.core.router.LocalRouter;
import com.mpush.core.router.RouterCenter; import com.mpush.core.router.RouterCenter;
import com.mpush.tools.common.TimeLine;
import com.mpush.tools.log.Logs; import com.mpush.tools.log.Logs;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
Expand Down Expand Up @@ -57,6 +58,8 @@ public final class BroadcastPushTask implements PushTask, ChannelFutureListener


private final Condition condition; private final Condition condition;


private final TimeLine timeLine = new TimeLine();

//使用Iterator, 记录任务遍历到的位置,因为有流控,一次任务可能会被分批发送,而且还有在推送过程中上/下线的用户 //使用Iterator, 记录任务遍历到的位置,因为有流控,一次任务可能会被分批发送,而且还有在推送过程中上/下线的用户
private final Iterator<Map.Entry<String, Map<Integer, LocalRouter>>> iterator; private final Iterator<Map.Entry<String, Map<Integer, LocalRouter>>> iterator;


Expand All @@ -65,6 +68,7 @@ public BroadcastPushTask(IPushMessage message, FlowControl flowControl) {
this.flowControl = flowControl; this.flowControl = flowControl;
this.condition = message.getCondition(); this.condition = message.getCondition();
this.iterator = RouterCenter.I.getLocalRouterManager().routers().entrySet().iterator(); this.iterator = RouterCenter.I.getLocalRouterManager().routers().entrySet().iterator();
this.timeLine.begin("push-center-begin");
} }


@Override @Override
Expand Down Expand Up @@ -120,7 +124,7 @@ private boolean broadcast() {


private void report() { private void report() {
Logs.PUSH.info("[Broadcast] task finished, cost={}, message={}", (System.currentTimeMillis() - begin), message); Logs.PUSH.info("[Broadcast] task finished, cost={}, message={}", (System.currentTimeMillis() - begin), message);
PushCenter.I.getPushListener().onBroadcastComplete(message);//通知发送方,广播推送完毕 PushCenter.I.getPushListener().onBroadcastComplete(message, timeLine.end().getTimePoints());//通知发送方,广播推送完毕
} }


private boolean checkCondition(Condition condition, Connection connection) { private boolean checkCondition(Condition condition, Connection connection) {
Expand Down

0 comments on commit d0eb910

Please sign in to comment.