Skip to content

Commit

Permalink
send the RemoteInvocationOptions into the RemoteServiceRequest for se…
Browse files Browse the repository at this point in the history
…rver side use
  • Loading branch information
pierredavidbelanger committed May 22, 2016
1 parent d1fe17c commit 9ee20f6
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 131 deletions.
24 changes: 10 additions & 14 deletions src/main/java/org/redisson/RedissonRemoteService.java
Expand Up @@ -99,8 +99,8 @@ public void operationComplete(Future<RemoteServiceRequest> future) throws Except
// subscribe(remoteInterface, requestQueue);

final RemoteServiceRequest request = future.getNow();
// negative ackTimeout means unacknowledged call, do not check the ack
if (request.getAckTimeout() >= 0 && System.currentTimeMillis() - request.getDate() > request.getAckTimeout()) {
// check the ack only if expected
if (request.getOptions().isAckExpected() && System.currentTimeMillis() - request.getDate() > request.getOptions().getAckTimeoutInMillis()) {
log.debug("request: {} has been skipped due to ackTimeout");
// re-subscribe after a skipped ackTimeout
subscribe(remoteInterface, requestQueue);
Expand All @@ -110,9 +110,9 @@ public void operationComplete(Future<RemoteServiceRequest> future) throws Except
final RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName()));
final String responseName = name + ":{" + remoteInterface.getName() + "}:" + request.getRequestId();

// negative ackTimeout means unacknowledged call, do not send the ack
if (request.getAckTimeout() >= 0) {
Future<List<?>> ackClientsFuture = send(request.getAckTimeout(), responseName, new RemoteServiceAck());
// send the ack only if expected
if (request.getOptions().isAckExpected()) {
Future<List<?>> ackClientsFuture = send(request.getOptions().getAckTimeoutInMillis(), responseName, new RemoteServiceAck());
ackClientsFuture.addListener(new FutureListener<List<?>>() {
@Override
public void operationComplete(Future<List<?>> future) throws Exception {
Expand Down Expand Up @@ -149,9 +149,9 @@ private <T> void invokeMethod(final Class<T> remoteInterface, final RBlockingQue
log.error("Can't execute: " + request, e);
}

// negative responseTimeout means fire-and-forget call, do not send the response
if (request.getResponseTimeout() >= 0) {
Future<List<?>> clientsFuture = send(request.getResponseTimeout(), responseName, responseHolder.get());
// send the response only if expected
if (request.getOptions().isResultExpected()) {
Future<List<?>> clientsFuture = send(request.getOptions().getExecutionTimeoutInMillis(), responseName, responseHolder.get());
clientsFuture.addListener(new FutureListener<List<?>>() {
@Override
public void operationComplete(Future<List<?>> future) throws Exception {
Expand Down Expand Up @@ -189,7 +189,7 @@ public <T> T get(final Class<T> remoteInterface, final long executionTimeout, fi
.expectResultWithin(executionTimeout, executionTimeUnit));
}

public <T> T get(final Class<T> remoteInterface, RemoteInvocationOptions options) {
public <T> T get(final Class<T> remoteInterface, final RemoteInvocationOptions options) {
// local copy of the options, to prevent mutation
final RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions(options);
final String toString = getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + generateRequestId();
Expand All @@ -212,11 +212,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
String requestQueueName = name + ":{" + remoteInterface.getName() + "}";
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName);
RemoteServiceRequest request = new RemoteServiceRequest(requestId,
method.getName(),
args,
optionsCopy.isAckExpected() ? optionsCopy.getAckTimeoutInMillis() : -1,
optionsCopy.isResultExpected() ? optionsCopy.getExecutionTimeoutInMillis() : -1,
System.currentTimeMillis());
method.getName(), args, optionsCopy, System.currentTimeMillis());
requestQueue.add(request);

RBlockingQueue<RRemoteServiceResponse> responseQueue = null;
Expand Down
21 changes: 15 additions & 6 deletions src/main/java/org/redisson/core/RemoteInvocationOptions.java
Expand Up @@ -15,6 +15,7 @@
*/
package org.redisson.core;

import java.io.Serializable;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -55,14 +56,12 @@
*
* @see RRemoteService#get(Class, RemoteInvocationOptions)
*/
public class RemoteInvocationOptions {
public class RemoteInvocationOptions implements Serializable {

private Long ackTimeoutInMillis;
private Long executionTimeoutInMillis;

private RemoteInvocationOptions(Long ackTimeoutInMillis, Long executionTimeoutInMillis) {
this.ackTimeoutInMillis = ackTimeoutInMillis;
this.executionTimeoutInMillis = executionTimeoutInMillis;
private RemoteInvocationOptions() {
}

public RemoteInvocationOptions(RemoteInvocationOptions copy) {
Expand All @@ -75,13 +74,15 @@ public RemoteInvocationOptions(RemoteInvocationOptions copy) {
* <p/>
* This is equivalent to:
* <pre>
* RemoteInvocationOptions.defaults()
* new RemoteInvocationOptions()
* .expectAckWithin(1, TimeUnit.SECONDS)
* .expectResultWithin(30, TimeUnit.SECONDS)
* </pre>
*/
public static RemoteInvocationOptions defaults() {
return new RemoteInvocationOptions(TimeUnit.SECONDS.toMillis(1), TimeUnit.SECONDS.toMillis(30));
return new RemoteInvocationOptions()
.expectAckWithin(1, TimeUnit.SECONDS)
.expectResultWithin(20, TimeUnit.SECONDS);
}

public Long getAckTimeoutInMillis() {
Expand Down Expand Up @@ -129,4 +130,12 @@ public RemoteInvocationOptions noResult() {
executionTimeoutInMillis = null;
return this;
}

@Override
public String toString() {
return "RemoteInvocationOptions[" +
"ackTimeoutInMillis=" + ackTimeoutInMillis +
", executionTimeoutInMillis=" + executionTimeoutInMillis +
']';
}
}
26 changes: 11 additions & 15 deletions src/main/java/org/redisson/remote/RemoteServiceRequest.java
Expand Up @@ -15,6 +15,8 @@
*/
package org.redisson.remote;

import org.redisson.core.RemoteInvocationOptions;

import java.io.Serializable;
import java.util.Arrays;

Expand All @@ -23,52 +25,46 @@ public class RemoteServiceRequest implements Serializable {
private String requestId;
private String methodName;
private Object[] args;
private long ackTimeout;
private long responseTimeout;
private RemoteInvocationOptions options;
private long date;


public RemoteServiceRequest() {
}

public RemoteServiceRequest(String requestId, String methodName, Object[] args, long ackTimeout, long responseTimeout, long date) {
public RemoteServiceRequest(String requestId, String methodName, Object[] args, RemoteInvocationOptions options, long date) {
super();
this.requestId = requestId;
this.methodName = methodName;
this.args = args;
this.ackTimeout = ackTimeout;
this.responseTimeout = responseTimeout;
this.options = options;
this.date = date;
}

public long getResponseTimeout() {
return responseTimeout;
}

public long getDate() {
return date;
}

public long getAckTimeout() {
return ackTimeout;
}

public String getRequestId() {
return requestId;
}

public Object[] getArgs() {
return args;
}


public RemoteInvocationOptions getOptions() {
return options;
}

public String getMethodName() {
return methodName;
}

@Override
public String toString() {
return "RemoteServiceRequest [requestId=" + requestId + ", methodName=" + methodName + ", args="
+ Arrays.toString(args) + ", ackTimeout=" + ackTimeout + ", date=" + date + "]";
+ Arrays.toString(args) + ", options=" + options + ", date=" + date + "]";
}

}

0 comments on commit 9ee20f6

Please sign in to comment.