Skip to content

Commit

Permalink
Ack name generation fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Aug 3, 2016
1 parent 86f7ee2 commit 78f8bcf
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 15 deletions.
22 changes: 11 additions & 11 deletions src/main/java/org/redisson/RedissonRemoteService.java
Expand Up @@ -29,14 +29,14 @@
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.command.CommandExecutor;
import org.redisson.misc.PromiseDelegator;
import org.redisson.core.RBatch;
import org.redisson.core.RBlockingQueue;
import org.redisson.core.RBlockingQueueAsync;
import org.redisson.core.RRemoteService;
import org.redisson.core.RScript;
import org.redisson.core.RScript.Mode;
import org.redisson.core.RemoteInvocationOptions;
import org.redisson.misc.PromiseDelegator;
import org.redisson.remote.RRemoteAsync;
import org.redisson.remote.RRemoteServiceResponse;
import org.redisson.remote.RemoteServiceAck;
Expand Down Expand Up @@ -163,7 +163,7 @@ public void operationComplete(Future<RemoteServiceRequest> future) throws Except

// send the ack only if expected
if (request.getOptions().isAckExpected()) {
String ackName = getAckName(remoteInterface.getName(), request.getRequestId());
String ackName = getAckName(remoteInterface, request.getRequestId());
Future<Boolean> ackClientsFuture = redisson.getScript().evalAsync(responseName, Mode.READ_WRITE, LongCodec.INSTANCE,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
+ "redis.call('pexpire', KEYS[1], ARGV[2]);"
Expand Down Expand Up @@ -273,14 +273,14 @@ public <T> T get(Class<T> remoteInterface, RemoteInvocationOptions options) {
throw new IllegalArgumentException(m.getReturnType().getClass() + " isn't allowed as return type");
}
}
return async(remoteInterface, options, syncInterface.getName());
return async(remoteInterface, options, syncInterface);
}
}

return sync(remoteInterface, options);
}

private <T> T async(final Class<T> remoteInterface, final RemoteInvocationOptions options, final String interfaceName) {
private <T> T async(final Class<T> remoteInterface, final RemoteInvocationOptions options, final Class<?> syncInterface) {
// local copy of the options, to prevent mutation
final RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions(options);
final String toString = getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + generateRequestId();
Expand All @@ -304,9 +304,9 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl

final String requestId = generateRequestId();

final String requestQueueName = name + ":{" + interfaceName + "}";
final String responseName = name + ":{" + interfaceName + "}:" + requestId;
final String ackName = getAckName(remoteInterface.getName(), requestId);
final String requestQueueName = name + ":{" + syncInterface.getName() + "}";
final String responseName = name + ":{" + syncInterface.getName() + "}:" + requestId;
final String ackName = getAckName(syncInterface, requestId);

final RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName, getCodec());
final RemoteServiceRequest request = new RemoteServiceRequest(requestId,
Expand Down Expand Up @@ -452,7 +452,7 @@ public void operationComplete(Future<RemoteServiceResponse> future)
}
}

private <T> T sync(Class<T> remoteInterface, final RemoteInvocationOptions options) {
private <T> T sync(final Class<T> remoteInterface, final RemoteInvocationOptions options) {
final String interfaceName = remoteInterface.getName();
// local copy of the options, to prevent mutation
final RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions(options);
Expand Down Expand Up @@ -487,7 +487,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl

// poll for the ack only if expected
if (optionsCopy.isAckExpected()) {
String ackName = getAckName(interfaceName, requestId);
String ackName = getAckName(remoteInterface, requestId);
RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS);
if (ack == null) {
ack = tryPollAckAgain(optionsCopy, responseQueue, ackName);
Expand Down Expand Up @@ -517,8 +517,8 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[]{remoteInterface}, handler);
}

private String getAckName(String interfaceName, String requestId) {
return name + ":{" + interfaceName + "}:" + requestId + ":ack";
private String getAckName(Class<?> remoteInterface, String requestId) {
return name + ":{" + remoteInterface.getName() + "}:" + requestId + ":ack";
}

private RemoteServiceAck tryPollAckAgain(RemoteInvocationOptions optionsCopy,
Expand Down
7 changes: 3 additions & 4 deletions src/test/java/org/redisson/RedissonRemoteServiceTest.java
Expand Up @@ -393,7 +393,7 @@ public void testInvocationWithFstCodec() {
assertThat(service.doSomethingWithPojo(new Pojo("test")).getStringField()).isEqualTo("test");
Assert.fail("FstCodec should not be able to serialize a not serializable class");
} catch (Exception e) {
assertThat(e.getCause()).isInstanceOf(EncoderException.class);
assertThat(e.getCause()).isInstanceOf(RuntimeException.class);
assertThat(e.getCause().getMessage()).contains("Pojo does not implement Serializable");
}
} finally {
Expand Down Expand Up @@ -429,9 +429,8 @@ public void testInvocationWithSerializationCodec() {
Assert.fail("SerializationCodec should not be able to serialize a not serializable class");
} catch (Exception e) {
e.printStackTrace();
assertThat(e.getCause()).isInstanceOf(EncoderException.class);
assertThat(e.getCause().getCause()).isInstanceOf(NotSerializableException.class);
assertThat(e.getCause().getCause().getMessage()).contains("Pojo");
assertThat(e.getCause()).isInstanceOf(NotSerializableException.class);
assertThat(e.getCause().getMessage()).contains("Pojo");
}
} finally {
client.shutdown();
Expand Down

0 comments on commit 78f8bcf

Please sign in to comment.