diff --git a/mpush-api/src/main/java/com/mpush/api/push/PushResult.java b/mpush-api/src/main/java/com/mpush/api/push/PushResult.java index f5e85e36..55dc23c7 100644 --- a/mpush-api/src/main/java/com/mpush/api/push/PushResult.java +++ b/mpush-api/src/main/java/com/mpush/api/push/PushResult.java @@ -98,7 +98,7 @@ public String toString() { "resultCode=" + getResultDesc() + ", userId='" + userId + '\'' + ", timeLine=" + Arrays.toString(timeLine) + - ", location=" + location + + ", " + location + '}'; } } diff --git a/mpush-client/src/main/java/com/mpush/client/push/PushClient.java b/mpush-client/src/main/java/com/mpush/client/push/PushClient.java index b0112e08..ee92d056 100644 --- a/mpush-client/src/main/java/com/mpush/client/push/PushClient.java +++ b/mpush-client/src/main/java/com/mpush/client/push/PushClient.java @@ -21,6 +21,7 @@ import com.mpush.api.push.PushContext; import com.mpush.api.push.PushException; +import com.mpush.api.push.PushResult; import com.mpush.api.push.PushSender; import com.mpush.api.service.BaseService; import com.mpush.api.service.Listener; @@ -42,7 +43,7 @@ /*package*/ final class PushClient extends BaseService implements PushSender { private final GatewayConnectionFactory factory = GatewayConnectionFactory.create(); - private FutureTask send0(PushContext ctx) { + private FutureTask send0(PushContext ctx) { if (ctx.isBroadcast()) { return PushRequest.build(factory, ctx).broadcast(); } else { @@ -50,7 +51,7 @@ private FutureTask send0(PushContext ctx) { if (remoteRouters == null || remoteRouters.isEmpty()) { return PushRequest.build(factory, ctx).offline(); } - FutureTask task = null; + FutureTask task = null; for (RemoteRouter remoteRouter : remoteRouters) { task = PushRequest.build(factory, ctx).send(remoteRouter); } @@ -59,13 +60,13 @@ private FutureTask send0(PushContext ctx) { } @Override - public FutureTask send(PushContext ctx) { + public FutureTask send(PushContext ctx) { if (ctx.isBroadcast()) { return send0(ctx.setUserId(null)); } else if (ctx.getUserId() != null) { return send0(ctx); } else if (ctx.getUserIds() != null) { - FutureTask task = null; + FutureTask task = null; for (String userId : ctx.getUserIds()) { task = send0(ctx.setUserId(userId)); } diff --git a/mpush-client/src/main/java/com/mpush/client/push/PushRequest.java b/mpush-client/src/main/java/com/mpush/client/push/PushRequest.java index 186a68f7..ea1dd97b 100644 --- a/mpush-client/src/main/java/com/mpush/client/push/PushRequest.java +++ b/mpush-client/src/main/java/com/mpush/client/push/PushRequest.java @@ -34,6 +34,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.atomic.AtomicReference; @@ -43,10 +44,10 @@ * * @author ohun@live.cn */ -public final class PushRequest extends FutureTask { +public final class PushRequest extends FutureTask { private static final Logger LOGGER = LoggerFactory.getLogger(PushRequest.class); - private static final Callable NONE = () -> Boolean.FALSE; + private static final Callable NONE = () -> new PushResult(PushResult.CODE_FAILURE); private enum Status {init, success, failure, offline, timeout} @@ -63,7 +64,9 @@ private enum Status {init, success, failure, offline, timeout} private byte[] content; private int timeout; private ClientLocation location; + private int sessionId; private Future future; + private PushResult result; private void sendToConnServer(RemoteRouter remoteRouter) { timeLine.addTimePoint("lookup-remote"); @@ -103,7 +106,8 @@ private void sendToConnServer(RemoteRouter remoteRouter) { } }); PushRequest.this.content = null;//释放内存 - future = PushRequestBus.I.put(pushMessage.getSessionId(), PushRequest.this); + sessionId = pushMessage.getSessionId(); + future = PushRequestBus.I.put(sessionId, PushRequest.this); } ); @@ -115,27 +119,38 @@ private void sendToConnServer(RemoteRouter remoteRouter) { private void submit(Status status) { if (this.status.compareAndSet(Status.init, status)) {//防止重复调用 - timeLine.end(); - if (future != null) future.cancel(true); - if (callback != null) { - PushRequestBus.I.asyncCall(this); + boolean isTimeoutEnd = status == Status.timeout;//任务是否超时结束 + + if (future != null && !isTimeoutEnd) {//是超时结束任务不用再取消一次 + future.cancel(true);//取消超时任务 + } + + this.timeLine.end();//结束时间流统计 + super.set(getResult());//设置同步调用的返回结果 + + if (callback != null) {//回调callback + if (isTimeoutEnd) {//超时结束时,当前线程已经是线程池里的线程,直接调用callback + callback.onResult(getResult()); + } else {//非超时结束时,当前线程为Netty线程池,要异步执行callback + PushRequestBus.I.asyncCall(this);//会执行run方法 + } } - super.set(this.status.get() == Status.success); } - LOGGER.info("push request {} end, userId={}, content={}, location={}, timeLine={}" - , status, userId, content, location, timeLine); + LOGGER.info("push request {} end, {}, {}, {}", status, userId, location, timeLine); } + /** + * run方法会有两个地方的线程调用 + * 1. 任务超时时会调用,见PushRequestBus.I.put(sessionId, PushRequest.this); + * 2. 异步执行callback的时候,见PushRequestBus.I.asyncCall(this); + */ @Override public void run() { - if (status.get() == Status.init) {//从定时任务过来的,超时时间到了 - submit(Status.timeout); + //判断任务是否超时,如果超时了此时状态是init,否则应该是其他状态, 因为从submit方法过来的状态都不是init + if (status.get() == Status.init) { + timeout(); } else { - callback.onResult(new PushResult(status.get().ordinal()) - .setUserId(userId) - .setLocation(location) - .setTimeLine(timeLine.getTimePoints()) - ); + callback.onResult(getResult()); } } @@ -144,7 +159,7 @@ public boolean cancel(boolean mayInterruptIfRunning) { throw new UnsupportedOperationException(); } - public FutureTask send(RemoteRouter router) { + public FutureTask send(RemoteRouter router) { timeLine.begin(); sendToConnServer(router); return this; @@ -160,13 +175,13 @@ public void redirect() { } } - public FutureTask offline() { + public FutureTask offline() { CachedRemoteRouterManager.I.invalidateLocalCache(userId); submit(Status.offline); return this; } - public FutureTask broadcast() { + public FutureTask broadcast() { timeLine.begin(); boolean success = connectionFactory.broadcast( @@ -189,7 +204,8 @@ public FutureTask broadcast() { }); if (pushMessage.taskId == null) { - future = PushRequestBus.I.put(pushMessage.getSessionId(), PushRequest.this); + sessionId = pushMessage.getSessionId(); + future = PushRequestBus.I.put(sessionId, PushRequest.this); } else { success(); } @@ -205,7 +221,9 @@ public FutureTask broadcast() { } public void timeout() { - submit(Status.timeout); + if (PushRequestBus.I.getAndRemove(sessionId) != null) { + submit(Status.timeout); + } } public void success() { @@ -248,6 +266,16 @@ public static PushRequest build(GatewayConnectionFactory factory, PushContext ct } + private PushResult getResult() { + if (result == null) { + result = new PushResult(status.get().ordinal()) + .setUserId(userId) + .setLocation(location) + .setTimeLine(timeLine.getTimePoints()); + } + return result; + } + public PushRequest setCallback(PushCallback callback) { this.callback = callback; return this; diff --git a/mpush-client/src/main/java/com/mpush/client/push/PushRequestBus.java b/mpush-client/src/main/java/com/mpush/client/push/PushRequestBus.java index d82c9a08..9adb4320 100644 --- a/mpush-client/src/main/java/com/mpush/client/push/PushRequestBus.java +++ b/mpush-client/src/main/java/com/mpush/client/push/PushRequestBus.java @@ -26,10 +26,7 @@ import org.slf4j.LoggerFactory; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * Created by ohun on 2015/12/30. diff --git a/mpush-test/src/main/java/com/mpush/test/push/PushClientTestMain.java b/mpush-test/src/main/java/com/mpush/test/push/PushClientTestMain.java index bee7de11..c89d2122 100644 --- a/mpush-test/src/main/java/com/mpush/test/push/PushClientTestMain.java +++ b/mpush-test/src/main/java/com/mpush/test/push/PushClientTestMain.java @@ -45,18 +45,13 @@ public void testPush() throws Exception { Logs.init(); PushSender sender = PushSender.create(); sender.start().join(); + Thread.sleep(1000); for (int i = 0; i < 10; i++) { PushMsg msg = PushMsg.build(MsgType.MESSAGE, "this a first push."); msg.setMsgId("msgId_" + i); - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - PushContext context = PushContext.build(msg) .setAckModel(AckModel.AUTO_ACK) .setUserId("user-" + i) @@ -71,7 +66,9 @@ public void onResult(PushResult result) { System.err.println("\n\n" + result); } }); - FutureTask future = sender.send(context); + FutureTask future = sender.send(context); + + //System.err.println("\n\n" + future.get()); } LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(30)); diff --git a/mpush-tools/src/main/java/com/mpush/tools/common/TimeLine.java b/mpush-tools/src/main/java/com/mpush/tools/common/TimeLine.java index eb1923a0..ff52a9c3 100644 --- a/mpush-tools/src/main/java/com/mpush/tools/common/TimeLine.java +++ b/mpush-tools/src/main/java/com/mpush/tools/common/TimeLine.java @@ -27,7 +27,7 @@ * @author ohun@live.cn (夜色) */ public final class TimeLine { - private static final SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + private static final SimpleDateFormat formatter = new SimpleDateFormat("HH:mm:ss.SSS"); private final TimePoint root = new TimePoint("root"); private final String name; private int pointCount; @@ -62,7 +62,7 @@ public void clean() { public String toString() { StringBuilder sb = new StringBuilder(name); if (root.next != null) { - sb.append('[').append(current.time - root.next.time).append(']'); + sb.append('[').append(current.time - root.next.time).append(']').append("(ms)"); } sb.append('{'); TimePoint next = root; @@ -99,9 +99,8 @@ public void setNext(TimePoint next) { @Override public String toString() { - String header = name + "[" + formatter.format(new Date(time)) + "]"; - if (next == null) return header; - return header + " --" + (next.time - time) + "(ms)--> "; + if (next == null) return name; + return name + " --(" + (next.time - time) + "ms) --> "; } } }