Skip to content

Commit

Permalink
增加ack超时时间设置
Browse files Browse the repository at this point in the history
  • Loading branch information
夜色 committed Oct 12, 2016
1 parent 5ef5034 commit fddd9ac
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 7 deletions.
6 changes: 3 additions & 3 deletions mpush-api/src/main/java/com/mpush/api/push/PushContext.java
Expand Up @@ -73,7 +73,7 @@ public class PushContext {
/** /**
* 推送超时时间 * 推送超时时间
*/ */
private long timeout = 3000; private int timeout = 3000;


public PushContext(byte[] context) { public PushContext(byte[] context) {
this.context = context; this.context = context;
Expand Down Expand Up @@ -144,11 +144,11 @@ public PushContext setBroadcast(boolean broadcast) {
return this; return this;
} }


public long getTimeout() { public int getTimeout() {
return timeout; return timeout;
} }


public PushContext setTimeout(long timeout) { public PushContext setTimeout(int timeout) {
this.timeout = timeout; this.timeout = timeout;
return this; return this;
} }
Expand Down
Expand Up @@ -57,7 +57,7 @@ private enum Status {init, success, failure, offline, timeout}
private PushCallback callback; private PushCallback callback;
private String userId; private String userId;
private byte[] content; private byte[] content;
private long timeout; private int timeout;
private ClientLocation location; private ClientLocation location;
private Future<?> future; private Future<?> future;
private String result; private String result;
Expand Down Expand Up @@ -91,6 +91,7 @@ private void sendToConnServer(RemoteRouter remoteRouter) {
GatewayPushMessage pushMessage = GatewayPushMessage pushMessage =
new GatewayPushMessage(userId, content, gatewayConn) new GatewayPushMessage(userId, content, gatewayConn)
.setClientType(location.getClientType()) .setClientType(location.getClientType())
.setTimeout(timeout - 500)
.addFlag(ackModel.flag); .addFlag(ackModel.flag);


pushMessage.sendRaw(f -> { pushMessage.sendRaw(f -> {
Expand Down Expand Up @@ -228,7 +229,7 @@ public PushRequest setContent(byte[] content) {
return this; return this;
} }


public PushRequest setTimeout(long timeout) { public PushRequest setTimeout(int timeout) {
this.timeout = timeout; this.timeout = timeout;
return this; return this;
} }
Expand Down
Expand Up @@ -40,6 +40,7 @@ public class GatewayPushMessage extends ByteBufMessage {
public String userId; public String userId;
public Set<String> tags; public Set<String> tags;
public int clientType; public int clientType;
public int timeout;
public byte[] content; public byte[] content;


public GatewayPushMessage(String userId, byte[] content, Connection connection) { public GatewayPushMessage(String userId, byte[] content, Connection connection) {
Expand All @@ -57,6 +58,7 @@ public void decode(ByteBuf body) {
userId = decodeString(body); userId = decodeString(body);
tags = decodeSet(body); tags = decodeSet(body);
clientType = decodeInt(body); clientType = decodeInt(body);
timeout = decodeInt(body);
content = decodeBytes(body); content = decodeBytes(body);
} }


Expand All @@ -65,6 +67,7 @@ public void encode(ByteBuf body) {
encodeString(body, userId); encodeString(body, userId);
encodeSet(body, tags); encodeSet(body, tags);
encodeInt(body, clientType); encodeInt(body, clientType);
encodeInt(body, timeout);
encodeBytes(body, content); encodeBytes(body, content);
} }


Expand All @@ -90,6 +93,11 @@ public GatewayPushMessage addFlag(byte flag) {
return this; return this;
} }


public GatewayPushMessage setTimeout(int timeout) {
this.timeout = timeout;
return this;
}

public boolean isBroadcast() { public boolean isBroadcast() {
return userId == null; return userId == null;
} }
Expand All @@ -113,6 +121,7 @@ public String toString() {
return "GatewayPushMessage{" + return "GatewayPushMessage{" +
"userId='" + userId + '\'' + "userId='" + userId + '\'' +
", clientType='" + clientType + '\'' + ", clientType='" + clientType + '\'' +
", timeout='" + timeout + '\'' +
", content='" + content.length + '\'' + ", content='" + content.length + '\'' +
'}'; '}';
} }
Expand Down
Expand Up @@ -42,7 +42,9 @@
*/ */
public final class AckMessageQueue { public final class AckMessageQueue {
private final Logger logger = LoggerFactory.getLogger(AckMessageQueue.class); private final Logger logger = LoggerFactory.getLogger(AckMessageQueue.class);
private static final int DEFAULT_TIMEOUT = 3000;
public static final AckMessageQueue I = new AckMessageQueue(); public static final AckMessageQueue I = new AckMessageQueue();

private final ConcurrentMap<Integer, AckContext> queue = new ConcurrentHashMap<>(); private final ConcurrentMap<Integer, AckContext> queue = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduledExecutor; private final ScheduledExecutorService scheduledExecutor;


Expand All @@ -52,10 +54,10 @@ private AckMessageQueue() {
}); });
} }


public void put(int sessionId, AckContext context) { public void put(int sessionId, AckContext context, int timeout) {
queue.put(sessionId, context); queue.put(sessionId, context);
context.pushMessageId = sessionId; context.pushMessageId = sessionId;
scheduledExecutor.schedule(context, 3, TimeUnit.SECONDS); scheduledExecutor.schedule(context, timeout > 0 ? timeout : DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
} }


public AckContext getAndRemove(int sessionId) { public AckContext getAndRemove(int sessionId) {
Expand Down

0 comments on commit fddd9ac

Please sign in to comment.