Skip to content

Commit

Permalink
PushClient任务超时代码优化
Browse files Browse the repository at this point in the history
  • Loading branch information
夜色 committed Dec 16, 2016
1 parent 3390b3b commit a0955f6
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 43 deletions.
2 changes: 1 addition & 1 deletion mpush-api/src/main/java/com/mpush/api/push/PushResult.java
Expand Up @@ -98,7 +98,7 @@ public String toString() {
"resultCode=" + getResultDesc() + "resultCode=" + getResultDesc() +
", userId='" + userId + '\'' + ", userId='" + userId + '\'' +
", timeLine=" + Arrays.toString(timeLine) + ", timeLine=" + Arrays.toString(timeLine) +
", location=" + location + ", " + location +
'}'; '}';
} }
} }
Expand Up @@ -21,6 +21,7 @@


import com.mpush.api.push.PushContext; import com.mpush.api.push.PushContext;
import com.mpush.api.push.PushException; import com.mpush.api.push.PushException;
import com.mpush.api.push.PushResult;
import com.mpush.api.push.PushSender; import com.mpush.api.push.PushSender;
import com.mpush.api.service.BaseService; import com.mpush.api.service.BaseService;
import com.mpush.api.service.Listener; import com.mpush.api.service.Listener;
Expand All @@ -42,15 +43,15 @@
/*package*/ final class PushClient extends BaseService implements PushSender { /*package*/ final class PushClient extends BaseService implements PushSender {
private final GatewayConnectionFactory factory = GatewayConnectionFactory.create(); private final GatewayConnectionFactory factory = GatewayConnectionFactory.create();


private FutureTask<Boolean> send0(PushContext ctx) { private FutureTask<PushResult> send0(PushContext ctx) {
if (ctx.isBroadcast()) { if (ctx.isBroadcast()) {
return PushRequest.build(factory, ctx).broadcast(); return PushRequest.build(factory, ctx).broadcast();
} else { } else {
Set<RemoteRouter> remoteRouters = CachedRemoteRouterManager.I.lookupAll(ctx.getUserId()); Set<RemoteRouter> remoteRouters = CachedRemoteRouterManager.I.lookupAll(ctx.getUserId());
if (remoteRouters == null || remoteRouters.isEmpty()) { if (remoteRouters == null || remoteRouters.isEmpty()) {
return PushRequest.build(factory, ctx).offline(); return PushRequest.build(factory, ctx).offline();
} }
FutureTask<Boolean> task = null; FutureTask<PushResult> task = null;
for (RemoteRouter remoteRouter : remoteRouters) { for (RemoteRouter remoteRouter : remoteRouters) {
task = PushRequest.build(factory, ctx).send(remoteRouter); task = PushRequest.build(factory, ctx).send(remoteRouter);
} }
Expand All @@ -59,13 +60,13 @@ private FutureTask<Boolean> send0(PushContext ctx) {
} }


@Override @Override
public FutureTask<Boolean> send(PushContext ctx) { public FutureTask<PushResult> send(PushContext ctx) {
if (ctx.isBroadcast()) { if (ctx.isBroadcast()) {
return send0(ctx.setUserId(null)); return send0(ctx.setUserId(null));
} else if (ctx.getUserId() != null) { } else if (ctx.getUserId() != null) {
return send0(ctx); return send0(ctx);
} else if (ctx.getUserIds() != null) { } else if (ctx.getUserIds() != null) {
FutureTask<Boolean> task = null; FutureTask<PushResult> task = null;
for (String userId : ctx.getUserIds()) { for (String userId : ctx.getUserIds()) {
task = send0(ctx.setUserId(userId)); task = send0(ctx.setUserId(userId));
} }
Expand Down
72 changes: 50 additions & 22 deletions mpush-client/src/main/java/com/mpush/client/push/PushRequest.java
Expand Up @@ -34,6 +34,7 @@
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.FutureTask; import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -43,10 +44,10 @@
* *
* @author ohun@live.cn * @author ohun@live.cn
*/ */
public final class PushRequest extends FutureTask<Boolean> { public final class PushRequest extends FutureTask<PushResult> {
private static final Logger LOGGER = LoggerFactory.getLogger(PushRequest.class); private static final Logger LOGGER = LoggerFactory.getLogger(PushRequest.class);


private static final Callable<Boolean> NONE = () -> Boolean.FALSE; private static final Callable<PushResult> NONE = () -> new PushResult(PushResult.CODE_FAILURE);


private enum Status {init, success, failure, offline, timeout} private enum Status {init, success, failure, offline, timeout}


Expand All @@ -63,7 +64,9 @@ private enum Status {init, success, failure, offline, timeout}
private byte[] content; private byte[] content;
private int timeout; private int timeout;
private ClientLocation location; private ClientLocation location;
private int sessionId;
private Future<?> future; private Future<?> future;
private PushResult result;


private void sendToConnServer(RemoteRouter remoteRouter) { private void sendToConnServer(RemoteRouter remoteRouter) {
timeLine.addTimePoint("lookup-remote"); timeLine.addTimePoint("lookup-remote");
Expand Down Expand Up @@ -103,7 +106,8 @@ private void sendToConnServer(RemoteRouter remoteRouter) {
} }
}); });
PushRequest.this.content = null;//释放内存 PushRequest.this.content = null;//释放内存
future = PushRequestBus.I.put(pushMessage.getSessionId(), PushRequest.this); sessionId = pushMessage.getSessionId();
future = PushRequestBus.I.put(sessionId, PushRequest.this);
} }
); );


Expand All @@ -115,27 +119,38 @@ private void sendToConnServer(RemoteRouter remoteRouter) {


private void submit(Status status) { private void submit(Status status) {
if (this.status.compareAndSet(Status.init, status)) {//防止重复调用 if (this.status.compareAndSet(Status.init, status)) {//防止重复调用
timeLine.end(); boolean isTimeoutEnd = status == Status.timeout;//任务是否超时结束
if (future != null) future.cancel(true);
if (callback != null) { if (future != null && !isTimeoutEnd) {//是超时结束任务不用再取消一次
PushRequestBus.I.asyncCall(this); future.cancel(true);//取消超时任务
}

this.timeLine.end();//结束时间流统计
super.set(getResult());//设置同步调用的返回结果

if (callback != null) {//回调callback
if (isTimeoutEnd) {//超时结束时,当前线程已经是线程池里的线程,直接调用callback
callback.onResult(getResult());
} else {//非超时结束时,当前线程为Netty线程池,要异步执行callback
PushRequestBus.I.asyncCall(this);//会执行run方法
}
} }
super.set(this.status.get() == Status.success);
} }
LOGGER.info("push request {} end, userId={}, content={}, location={}, timeLine={}" LOGGER.info("push request {} end, {}, {}, {}", status, userId, location, timeLine);
, status, userId, content, location, timeLine);
} }


/**
* run方法会有两个地方的线程调用
* 1. 任务超时时会调用,见PushRequestBus.I.put(sessionId, PushRequest.this);
* 2. 异步执行callback的时候,见PushRequestBus.I.asyncCall(this);
*/
@Override @Override
public void run() { public void run() {
if (status.get() == Status.init) {//从定时任务过来的,超时时间到了 //判断任务是否超时,如果超时了此时状态是init,否则应该是其他状态, 因为从submit方法过来的状态都不是init
submit(Status.timeout); if (status.get() == Status.init) {
timeout();
} else { } else {
callback.onResult(new PushResult(status.get().ordinal()) callback.onResult(getResult());
.setUserId(userId)
.setLocation(location)
.setTimeLine(timeLine.getTimePoints())
);
} }
} }


Expand All @@ -144,7 +159,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }


public FutureTask<Boolean> send(RemoteRouter router) { public FutureTask<PushResult> send(RemoteRouter router) {
timeLine.begin(); timeLine.begin();
sendToConnServer(router); sendToConnServer(router);
return this; return this;
Expand All @@ -160,13 +175,13 @@ public void redirect() {
} }
} }


public FutureTask<Boolean> offline() { public FutureTask<PushResult> offline() {
CachedRemoteRouterManager.I.invalidateLocalCache(userId); CachedRemoteRouterManager.I.invalidateLocalCache(userId);
submit(Status.offline); submit(Status.offline);
return this; return this;
} }


public FutureTask<Boolean> broadcast() { public FutureTask<PushResult> broadcast() {
timeLine.begin(); timeLine.begin();


boolean success = connectionFactory.broadcast( boolean success = connectionFactory.broadcast(
Expand All @@ -189,7 +204,8 @@ public FutureTask<Boolean> broadcast() {
}); });


if (pushMessage.taskId == null) { if (pushMessage.taskId == null) {
future = PushRequestBus.I.put(pushMessage.getSessionId(), PushRequest.this); sessionId = pushMessage.getSessionId();
future = PushRequestBus.I.put(sessionId, PushRequest.this);
} else { } else {
success(); success();
} }
Expand All @@ -205,7 +221,9 @@ public FutureTask<Boolean> broadcast() {
} }


public void timeout() { public void timeout() {
submit(Status.timeout); if (PushRequestBus.I.getAndRemove(sessionId) != null) {
submit(Status.timeout);
}
} }


public void success() { public void success() {
Expand Down Expand Up @@ -248,6 +266,16 @@ public static PushRequest build(GatewayConnectionFactory factory, PushContext ct


} }


private PushResult getResult() {
if (result == null) {
result = new PushResult(status.get().ordinal())
.setUserId(userId)
.setLocation(location)
.setTimeLine(timeLine.getTimePoints());
}
return result;
}

public PushRequest setCallback(PushCallback callback) { public PushRequest setCallback(PushCallback callback) {
this.callback = callback; this.callback = callback;
return this; return this;
Expand Down
Expand Up @@ -26,10 +26,7 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.*;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;


/** /**
* Created by ohun on 2015/12/30. * Created by ohun on 2015/12/30.
Expand Down
Expand Up @@ -45,18 +45,13 @@ public void testPush() throws Exception {
Logs.init(); Logs.init();
PushSender sender = PushSender.create(); PushSender sender = PushSender.create();
sender.start().join(); sender.start().join();
Thread.sleep(1000);


for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {


PushMsg msg = PushMsg.build(MsgType.MESSAGE, "this a first push."); PushMsg msg = PushMsg.build(MsgType.MESSAGE, "this a first push.");
msg.setMsgId("msgId_" + i); msg.setMsgId("msgId_" + i);


try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

PushContext context = PushContext.build(msg) PushContext context = PushContext.build(msg)
.setAckModel(AckModel.AUTO_ACK) .setAckModel(AckModel.AUTO_ACK)
.setUserId("user-" + i) .setUserId("user-" + i)
Expand All @@ -71,7 +66,9 @@ public void onResult(PushResult result) {
System.err.println("\n\n" + result); System.err.println("\n\n" + result);
} }
}); });
FutureTask<Boolean> future = sender.send(context); FutureTask<PushResult> future = sender.send(context);

//System.err.println("\n\n" + future.get());
} }


LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(30)); LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(30));
Expand Down
Expand Up @@ -27,7 +27,7 @@
* @author ohun@live.cn (夜色) * @author ohun@live.cn (夜色)
*/ */
public final class TimeLine { public final class TimeLine {
private static final SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); private static final SimpleDateFormat formatter = new SimpleDateFormat("HH:mm:ss.SSS");
private final TimePoint root = new TimePoint("root"); private final TimePoint root = new TimePoint("root");
private final String name; private final String name;
private int pointCount; private int pointCount;
Expand Down Expand Up @@ -62,7 +62,7 @@ public void clean() {
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(name); StringBuilder sb = new StringBuilder(name);
if (root.next != null) { if (root.next != null) {
sb.append('[').append(current.time - root.next.time).append(']'); sb.append('[').append(current.time - root.next.time).append(']').append("(ms)");
} }
sb.append('{'); sb.append('{');
TimePoint next = root; TimePoint next = root;
Expand Down Expand Up @@ -99,9 +99,8 @@ public void setNext(TimePoint next) {


@Override @Override
public String toString() { public String toString() {
String header = name + "[" + formatter.format(new Date(time)) + "]"; if (next == null) return name;
if (next == null) return header; return name + " --(" + (next.time - time) + "ms) --> ";
return header + " --" + (next.time - time) + "(ms)--> ";
} }
} }
} }

0 comments on commit a0955f6

Please sign in to comment.