Skip to content

Commit

Permalink
ACK代码优化
Browse files Browse the repository at this point in the history
  • Loading branch information
夜色 committed Nov 28, 2016
1 parent 1b48631 commit c27cd87
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 17 deletions.
28 changes: 18 additions & 10 deletions mpush-core/src/main/java/com/mpush/core/ack/AckContext.java
Expand Up @@ -19,50 +19,58 @@


package com.mpush.core.ack; package com.mpush.core.ack;


import com.mpush.common.message.BaseMessage; import java.util.concurrent.Future;

import java.util.concurrent.atomic.AtomicBoolean;


/** /**
* Created by ohun on 16/9/5. * Created by ohun on 16/9/5.
* *
* @author ohun@live.cn (夜色) * @author ohun@live.cn (夜色)
*/ */
public final class AckContext implements Runnable { public final class AckContext implements Runnable {
private final AtomicBoolean done = new AtomicBoolean(false);

private AckCallback callback; private AckCallback callback;
/*package*/ int pushMessageId;
private int sessionId;
private Future<?> future;


public AckContext() { public AckContext() {
} }


public boolean tryDone() { public void setSessionId(int sessionId) {
return done.compareAndSet(false, true); this.sessionId = sessionId;
}

public void setFuture(Future<?> future) {
this.future = future;
} }


public AckContext setCallback(AckCallback callback) { public AckContext setCallback(AckCallback callback) {
this.callback = callback; this.callback = callback;
return this; return this;
} }


private boolean tryDone() {
return future.cancel(true);
}

public void success() { public void success() {
if (tryDone()) { if (tryDone()) {
callback.onSuccess(this); callback.onSuccess(this);
callback = null;
} }
} }


public void timeout() { public void timeout() {
AckContext context = AckMessageQueue.I.getAndRemove(pushMessageId); AckContext context = AckMessageQueue.I.getAndRemove(sessionId);
if (context != null && tryDone()) { if (context != null && tryDone()) {
callback.onTimeout(this); callback.onTimeout(this);
callback = null;
} }
} }


@Override @Override
public String toString() { public String toString() {
return "AckContext{" + return "AckContext{" +
", pushMessageId=" + pushMessageId + ", sessionId=" + sessionId +
'}'; '}';
} }


Expand Down
17 changes: 11 additions & 6 deletions mpush-core/src/main/java/com/mpush/core/ack/AckMessageQueue.java
Expand Up @@ -34,22 +34,27 @@
*/ */
public final class AckMessageQueue { public final class AckMessageQueue {
private final Logger logger = LoggerFactory.getLogger(AckMessageQueue.class); private final Logger logger = LoggerFactory.getLogger(AckMessageQueue.class);

private static final int DEFAULT_TIMEOUT = 3000; private static final int DEFAULT_TIMEOUT = 3000;
public static final AckMessageQueue I = new AckMessageQueue(); public static final AckMessageQueue I = new AckMessageQueue();


private final ConcurrentMap<Integer, AckContext> queue = new ConcurrentHashMap<>(); private final ConcurrentMap<Integer, AckContext> queue = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduledExecutor; private final ScheduledExecutorService scheduledExecutor;


private AckMessageQueue() { private AckMessageQueue() {
scheduledExecutor = new ScheduledThreadPoolExecutor(1, new NamedPoolThreadFactory(T_ARK_REQ_TIMER), (r, e) -> { scheduledExecutor = new ScheduledThreadPoolExecutor(1,
logger.error("one ack context was rejected, context=" + r); new NamedPoolThreadFactory(T_ARK_REQ_TIMER),
}); (r, e) -> logger.error("one ack context was rejected, context=" + r)
);
} }


public void put(int sessionId, AckContext context, int timeout) { public void add(int sessionId, AckContext context, int timeout) {
queue.put(sessionId, context); queue.put(sessionId, context);
context.pushMessageId = sessionId; context.setSessionId(sessionId);
scheduledExecutor.schedule(context, timeout > 0 ? timeout : DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS); context.setFuture(scheduledExecutor.schedule(context,
timeout > 0 ? timeout : DEFAULT_TIMEOUT,
TimeUnit.MILLISECONDS
));
} }


public AckContext getAndRemove(int sessionId) { public AckContext getAndRemove(int sessionId) {
Expand Down
Expand Up @@ -105,7 +105,7 @@ private boolean checkLocal(final GatewayPushMessage message) {
if (future.isSuccess()) {//推送成功 if (future.isSuccess()) {//推送成功


if (message.needAck()) {//需要客户端ACK, 消息进队列等待客户端响应ACK if (message.needAck()) {//需要客户端ACK, 消息进队列等待客户端响应ACK
AckMessageQueue.I.put(pushMessage.getSessionId(), buildAckContext(message), message.timeout); AckMessageQueue.I.add(pushMessage.getSessionId(), buildAckContext(message), message.timeout);
} else { } else {
OkMessage.from(message).setData(userId + ',' + clientType).sendRaw(); OkMessage.from(message).setData(userId + ',' + clientType).sendRaw();
} }
Expand Down

0 comments on commit c27cd87

Please sign in to comment.