Skip to content

Commit

Permalink
RExecutorFuture and RScheduledFuture shouldn't be tracked if they wer…
Browse files Browse the repository at this point in the history
…en't stored. #1185
  • Loading branch information
Nikita committed Dec 11, 2017
1 parent 5285066 commit 2b8ec95
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 128 deletions.
14 changes: 7 additions & 7 deletions redisson/src/main/java/org/redisson/BaseRemoteService.java
Expand Up @@ -102,7 +102,7 @@ public BaseRemoteService(Codec codec, RedissonClient redisson, String name, Comm
this.responseQueueName = getResponseQueueName(executorId); this.responseQueueName = getResponseQueueName(executorId);
} }


protected String getResponseQueueName(String executorId) { public String getResponseQueueName(String executorId) {
return "{remote_response}:" + executorId; return "{remote_response}:" + executorId;
} }


Expand Down Expand Up @@ -215,7 +215,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl


final Long ackTimeout = request.getOptions().getAckTimeoutInMillis(); final Long ackTimeout = request.getOptions().getAckTimeoutInMillis();


final RemotePromise<Object> result = new RemotePromise<Object>(requestId, getParam(request)) { final RemotePromise<Object> result = new RemotePromise<Object>(requestId) {


@Override @Override
public boolean cancel(boolean mayInterruptIfRunning) { public boolean cancel(boolean mayInterruptIfRunning) {
Expand Down Expand Up @@ -366,10 +366,6 @@ public void operationComplete(Future<RemoteServiceAck> future)
return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] { remoteInterface }, handler); return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] { remoteInterface }, handler);
} }


protected Object getParam(RemoteServiceRequest request) {
return null;
}

private void awaitResultAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise<Object> result, private void awaitResultAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise<Object> result,
final String ackName, final RFuture<RRemoteServiceResponse> responseFuture) { final String ackName, final RFuture<RRemoteServiceResponse> responseFuture) {
RFuture<Boolean> deleteFuture = redisson.getBucket(ackName).deleteAsync(); RFuture<Boolean> deleteFuture = redisson.getBucket(ackName).deleteAsync();
Expand Down Expand Up @@ -447,6 +443,10 @@ public void operationComplete(Future<T> future) throws Exception {
synchronized (responses) { synchronized (responses) {
ResponseEntry entry = responses.get(responseQueueName); ResponseEntry entry = responses.get(responseQueueName);
List<Result> list = entry.getResponses().get(requestId); List<Result> list = entry.getResponses().get(requestId);
if (list == null) {
return;
}

for (Iterator<Result> iterator = list.iterator(); iterator.hasNext();) { for (Iterator<Result> iterator = list.iterator(); iterator.hasNext();) {
Result result = iterator.next(); Result result = iterator.next();
if (result.getPromise() == responseFuture) { if (result.getPromise() == responseFuture) {
Expand Down Expand Up @@ -592,7 +592,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId.toString(), method.getName(), getMethodSignatures(method), args, optionsCopy, RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId.toString(), method.getName(), getMethodSignatures(method), args, optionsCopy,
System.currentTimeMillis()); System.currentTimeMillis());


RemotePromise<Object> addPromise = new RemotePromise<Object>(requestId, null); RemotePromise<Object> addPromise = new RemotePromise<Object>(requestId);
addAsync(requestQueueName, request, addPromise); addAsync(requestQueueName, request, addPromise);
addPromise.getAddFuture().sync(); addPromise.getAddFuture().sync();


Expand Down
110 changes: 88 additions & 22 deletions redisson/src/main/java/org/redisson/RedissonExecutorService.java
Expand Up @@ -18,10 +18,12 @@
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.lang.ref.ReferenceQueue;
import java.lang.reflect.Modifier; import java.lang.reflect.Modifier;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
Expand Down Expand Up @@ -57,6 +59,7 @@
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.executor.RedissonExecutorBatchFuture; import org.redisson.executor.RedissonExecutorBatchFuture;
import org.redisson.executor.RedissonExecutorFuture; import org.redisson.executor.RedissonExecutorFuture;
import org.redisson.executor.RedissonExecutorFutureReference;
import org.redisson.executor.RedissonScheduledFuture; import org.redisson.executor.RedissonScheduledFuture;
import org.redisson.executor.RemoteExecutorService; import org.redisson.executor.RemoteExecutorService;
import org.redisson.executor.RemoteExecutorServiceAsync; import org.redisson.executor.RemoteExecutorServiceAsync;
Expand All @@ -71,6 +74,7 @@
import org.redisson.misc.RedissonPromise; import org.redisson.misc.RedissonPromise;
import org.redisson.remote.RequestId; import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry; import org.redisson.remote.ResponseEntry;
import org.redisson.remote.ResponseEntry.Result;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down Expand Up @@ -123,9 +127,13 @@ public class RedissonExecutorService implements RScheduledExecutorService {


private final String name; private final String name;
private final String requestQueueName; private final String requestQueueName;
private final String responseQueueName;
private final QueueTransferService queueTransferService; private final QueueTransferService queueTransferService;
private final String executorId; private final String executorId;
private final ConcurrentMap<String, ResponseEntry> responses; private final ConcurrentMap<String, ResponseEntry> responses;

private final ReferenceQueue<RExecutorFuture<?>> referenceDueue = new ReferenceQueue<RExecutorFuture<?>>();
private final Collection<RedissonExecutorFutureReference> references = Collections.newSetFromMap(PlatformDependent.<RedissonExecutorFutureReference, Boolean>newConcurrentHashMap());


public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson, String name, QueueTransferService queueTransferService, ConcurrentMap<String, ResponseEntry> responses, String redissonId) { public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson, String name, QueueTransferService queueTransferService, ConcurrentMap<String, ResponseEntry> responses, String redissonId) {
super(); super();
Expand All @@ -145,6 +153,7 @@ public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Red


remoteService = redisson.getRemoteService(name, codec); remoteService = redisson.getRemoteService(name, codec);
requestQueueName = ((RedissonRemoteService)remoteService).getRequestQueueName(RemoteExecutorService.class); requestQueueName = ((RedissonRemoteService)remoteService).getRequestQueueName(RemoteExecutorService.class);
responseQueueName = ((RedissonRemoteService)remoteService).getResponseQueueName(executorId);
String objectName = requestQueueName; String objectName = requestQueueName;
tasksCounterName = objectName + ":counter"; tasksCounterName = objectName + ":counter";
tasksName = objectName + ":tasks"; tasksName = objectName + ":tasks";
Expand Down Expand Up @@ -261,7 +270,7 @@ public void execute(Runnable task) {
byte[] classBody = getClassBody(task); byte[] classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncServiceWithoutResult.executeRunnable(task.getClass().getName(), classBody, state, null); RemotePromise<Void> promise = (RemotePromise<Void>)asyncServiceWithoutResult.executeRunnable(task.getClass().getName(), classBody, state, null);
execute(promise); syncExecute(promise);
} }


@Override @Override
Expand Down Expand Up @@ -437,8 +446,8 @@ public void onMessage(String channel, Integer msg) {
@Override @Override
public <T> RExecutorFuture<T> submit(Callable<T> task) { public <T> RExecutorFuture<T> submit(Callable<T> task) {
RemotePromise<T> promise = (RemotePromise<T>) ((PromiseDelegator<T>) submitAsync(task)).getInnerPromise(); RemotePromise<T> promise = (RemotePromise<T>) ((PromiseDelegator<T>) submitAsync(task)).getInnerPromise();
execute(promise); syncExecute(promise);
return new RedissonExecutorFuture<T>(promise, promise.getRequestId()); return createFuture(promise);
} }


@Override @Override
Expand All @@ -448,7 +457,7 @@ public <T> RExecutorFuture<T> submitAsync(Callable<T> task) {
byte[] state = encode(task); byte[] state = encode(task);
RemotePromise<T> result = (RemotePromise<T>) asyncService.executeCallable(task.getClass().getName(), classBody, state, null); RemotePromise<T> result = (RemotePromise<T>) asyncService.executeCallable(task.getClass().getName(), classBody, state, null);
addListener(result); addListener(result);
return new RedissonExecutorFuture<T>(result, result.getRequestId()); return createFuture(result);
} }


@Override @Override
Expand All @@ -465,7 +474,7 @@ public RExecutorBatchFuture submit(Callable<?> ...tasks) {
byte[] classBody = getClassBody(task); byte[] classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
RemotePromise<?> promise = (RemotePromise<?>)asyncService.executeCallable(task.getClass().getName(), classBody, state, null); RemotePromise<?> promise = (RemotePromise<?>)asyncService.executeCallable(task.getClass().getName(), classBody, state, null);
RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise, promise.getRequestId()); RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise);
result.add(executorFuture); result.add(executorFuture);
} }


Expand All @@ -491,7 +500,7 @@ public RExecutorBatchFuture submitAsync(Callable<?> ...tasks) {
byte[] classBody = getClassBody(task); byte[] classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
RemotePromise<?> promise = (RemotePromise<?>)asyncService.executeCallable(task.getClass().getName(), classBody, state, null); RemotePromise<?> promise = (RemotePromise<?>)asyncService.executeCallable(task.getClass().getName(), classBody, state, null);
RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise, promise.getRequestId()); RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise);
result.add(executorFuture); result.add(executorFuture);
} }


Expand Down Expand Up @@ -553,7 +562,7 @@ private void check(Object task) {
} }
} }


private <T> void execute(RemotePromise<T> promise) { private <T> void syncExecute(RemotePromise<T> promise) {
RFuture<Boolean> addFuture = promise.getAddFuture(); RFuture<Boolean> addFuture = promise.getAddFuture();
addFuture.syncUninterruptibly(); addFuture.syncUninterruptibly();
Boolean res = addFuture.getNow(); Boolean res = addFuture.getNow();
Expand Down Expand Up @@ -593,7 +602,7 @@ public RExecutorBatchFuture submit(Runnable ...tasks) {
byte[] classBody = getClassBody(task); byte[] classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncService.executeRunnable(task.getClass().getName(), classBody, state, null); RemotePromise<Void> promise = (RemotePromise<Void>)asyncService.executeRunnable(task.getClass().getName(), classBody, state, null);
RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise, promise.getRequestId()); RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise);
result.add(executorFuture); result.add(executorFuture);
} }


Expand All @@ -619,7 +628,7 @@ public RExecutorBatchFuture submitAsync(Runnable ...tasks) {
byte[] classBody = getClassBody(task); byte[] classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncService.executeRunnable(task.getClass().getName(), classBody, state, null); RemotePromise<Void> promise = (RemotePromise<Void>)asyncService.executeRunnable(task.getClass().getName(), classBody, state, null);
RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise, promise.getRequestId()); RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise);
result.add(executorFuture); result.add(executorFuture);
} }


Expand Down Expand Up @@ -653,8 +662,8 @@ public void operationComplete(io.netty.util.concurrent.Future<List<Boolean>> fut
@Override @Override
public RExecutorFuture<?> submit(Runnable task) { public RExecutorFuture<?> submit(Runnable task) {
RemotePromise<Void> promise = (RemotePromise<Void>) ((PromiseDelegator<Void>) submitAsync(task)).getInnerPromise(); RemotePromise<Void> promise = (RemotePromise<Void>) ((PromiseDelegator<Void>) submitAsync(task)).getInnerPromise();
execute(promise); syncExecute(promise);
return new RedissonExecutorFuture<Void>(promise, promise.getRequestId()); return createFuture(promise);
} }


@Override @Override
Expand All @@ -664,15 +673,61 @@ public RExecutorFuture<?> submitAsync(Runnable task) {
byte[] state = encode(task); byte[] state = encode(task);
RemotePromise<Void> result = (RemotePromise<Void>) asyncService.executeRunnable(task.getClass().getName(), classBody, state, null); RemotePromise<Void> result = (RemotePromise<Void>) asyncService.executeRunnable(task.getClass().getName(), classBody, state, null);
addListener(result); addListener(result);
return new RedissonExecutorFuture<Void>(result, result.getRequestId()); return createFuture(result);
}

private void cancelResponseHandling(RequestId requestId) {
synchronized (responses) {
ResponseEntry entry = responses.get(responseQueueName);
if (entry == null) {
return;
}

List<Result> list = entry.getResponses().remove(requestId);
if (list != null) {
for (Result result : list) {
result.getScheduledFuture().cancel(true);
}
}
if (entry.getResponses().isEmpty()) {
responses.remove(responseQueueName, entry);
}
}
} }


@Override @Override
public RScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit unit) { public RScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit unit) {
RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleAsync(task, delay, unit); RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleAsync(task, delay, unit);
execute((RemotePromise<?>)future.getInnerPromise()); RemotePromise<?> rp = (RemotePromise<?>)future.getInnerPromise();
syncExecute(rp);
storeReference(future, rp.getRequestId());
return future; return future;
} }

private <T> RExecutorFuture<T> createFuture(RemotePromise<T> promise) {
RExecutorFuture<T> f = new RedissonExecutorFuture<T>(promise);
storeReference(f, promise.getRequestId());
return f;
}

private <T> RScheduledFuture<T> createFuture(RemotePromise<T> promise, long scheduledExecutionTime) {
RedissonScheduledFuture<T> f = new RedissonScheduledFuture<T>(promise, scheduledExecutionTime);
storeReference(f, promise.getRequestId());
return f;
}

private void storeReference(RExecutorFuture<?> future, RequestId requestId) {
while (true) {
RedissonExecutorFutureReference r = (RedissonExecutorFutureReference) referenceDueue.poll();
if (r == null) {
break;
}
references.remove(r);
cancelResponseHandling(r.getRequestId());
}
RedissonExecutorFutureReference reference = new RedissonExecutorFutureReference(requestId, future, referenceDueue);
references.add(reference);
}


@Override @Override
public RScheduledFuture<?> scheduleAsync(Runnable task, long delay, TimeUnit unit) { public RScheduledFuture<?> scheduleAsync(Runnable task, long delay, TimeUnit unit) {
Expand All @@ -682,13 +737,16 @@ public RScheduledFuture<?> scheduleAsync(Runnable task, long delay, TimeUnit uni
long startTime = System.currentTimeMillis() + unit.toMillis(delay); long startTime = System.currentTimeMillis() + unit.toMillis(delay);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledService.scheduleRunnable(task.getClass().getName(), classBody, state, startTime, null); RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledService.scheduleRunnable(task.getClass().getName(), classBody, state, startTime, null);
addListener(result); addListener(result);
return new RedissonScheduledFuture<Void>(result, startTime);
return createFuture(result, startTime);
} }


@Override @Override
public <V> RScheduledFuture<V> schedule(Callable<V> task, long delay, TimeUnit unit) { public <V> RScheduledFuture<V> schedule(Callable<V> task, long delay, TimeUnit unit) {
RedissonScheduledFuture<V> future = (RedissonScheduledFuture<V>) scheduleAsync(task, delay, unit); RedissonScheduledFuture<V> future = (RedissonScheduledFuture<V>) scheduleAsync(task, delay, unit);
execute((RemotePromise<V>)future.getInnerPromise()); RemotePromise<?> rp = (RemotePromise<?>)future.getInnerPromise();
syncExecute(rp);
storeReference(future, rp.getRequestId());
return future; return future;
} }


Expand All @@ -700,13 +758,15 @@ public <V> RScheduledFuture<V> scheduleAsync(Callable<V> task, long delay, TimeU
long startTime = System.currentTimeMillis() + unit.toMillis(delay); long startTime = System.currentTimeMillis() + unit.toMillis(delay);
RemotePromise<V> result = (RemotePromise<V>) asyncScheduledService.scheduleCallable(task.getClass().getName(), classBody, state, startTime, null); RemotePromise<V> result = (RemotePromise<V>) asyncScheduledService.scheduleCallable(task.getClass().getName(), classBody, state, startTime, null);
addListener(result); addListener(result);
return new RedissonScheduledFuture<V>(result, startTime); return createFuture(result, startTime);
} }


@Override @Override
public RScheduledFuture<?> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { public RScheduledFuture<?> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) {
RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleAtFixedRateAsync(task, initialDelay, period, unit); RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleAtFixedRateAsync(task, initialDelay, period, unit);
execute((RemotePromise<?>)future.getInnerPromise()); RemotePromise<?> rp = (RemotePromise<?>)future.getInnerPromise();
syncExecute(rp);
storeReference(future, rp.getRequestId());
return future; return future;
} }


Expand All @@ -718,13 +778,15 @@ public RScheduledFuture<?> scheduleAtFixedRateAsync(Runnable task, long initialD
long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay); long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.scheduleAtFixedRate(task.getClass().getName(), classBody, state, startTime, unit.toMillis(period), executorId, null); RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.scheduleAtFixedRate(task.getClass().getName(), classBody, state, startTime, unit.toMillis(period), executorId, null);
addListener(result); addListener(result);
return new RedissonScheduledFuture<Void>(result, startTime); return createFuture(result, startTime);
} }


@Override @Override
public RScheduledFuture<?> schedule(Runnable task, CronSchedule cronSchedule) { public RScheduledFuture<?> schedule(Runnable task, CronSchedule cronSchedule) {
RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleAsync(task, cronSchedule); RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleAsync(task, cronSchedule);
execute((RemotePromise<?>)future.getInnerPromise()); RemotePromise<?> rp = (RemotePromise<?>)future.getInnerPromise();
syncExecute(rp);
storeReference(future, rp.getRequestId());
return future; return future;
} }


Expand All @@ -737,17 +799,21 @@ public RScheduledFuture<?> scheduleAsync(Runnable task, CronSchedule cronSchedul
long startTime = startDate.getTime(); long startTime = startDate.getTime();
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.schedule(task.getClass().getName(), classBody, state, startTime, cronSchedule.getExpression().getCronExpression(), executorId, null); RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.schedule(task.getClass().getName(), classBody, state, startTime, cronSchedule.getExpression().getCronExpression(), executorId, null);
addListener(result); addListener(result);
return new RedissonScheduledFuture<Void>(result, startTime) { RedissonScheduledFuture<Void> f = new RedissonScheduledFuture<Void>(result, startTime) {
public long getDelay(TimeUnit unit) { public long getDelay(TimeUnit unit) {
return unit.convert(startDate.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS); return unit.convert(startDate.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}; };
}; };
storeReference(f, result.getRequestId());
return f;
} }


@Override @Override
public RScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) { public RScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) {
RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleWithFixedDelayAsync(task, initialDelay, delay, unit); RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleWithFixedDelayAsync(task, initialDelay, delay, unit);
execute((RemotePromise<?>)future.getInnerPromise()); RemotePromise<?> rp = (RemotePromise<?>)future.getInnerPromise();
syncExecute(rp);
storeReference(future, rp.getRequestId());
return future; return future;
} }


Expand All @@ -759,7 +825,7 @@ public RScheduledFuture<?> scheduleWithFixedDelayAsync(Runnable task, long initi
long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay); long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.scheduleWithFixedDelay(task.getClass().getName(), classBody, state, startTime, unit.toMillis(delay), executorId, null); RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.scheduleWithFixedDelay(task.getClass().getName(), classBody, state, startTime, unit.toMillis(delay), executorId, null);
addListener(result); addListener(result);
return new RedissonScheduledFuture<Void>(result, startTime); return createFuture(result, startTime);
} }


@Override @Override
Expand Down
Expand Up @@ -30,6 +30,10 @@ public class RedissonExecutorFuture<V> extends PromiseDelegator<V> implements RE


private final RequestId taskId; private final RequestId taskId;


public RedissonExecutorFuture(RemotePromise<V> promise) {
this(promise, promise.getRequestId());
}

public RedissonExecutorFuture(RPromise<V> promise, RequestId taskId) { public RedissonExecutorFuture(RPromise<V> promise, RequestId taskId) {
super(promise); super(promise);
this.taskId = taskId; this.taskId = taskId;
Expand Down
@@ -0,0 +1,42 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.executor;

import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;

import org.redisson.api.RExecutorFuture;
import org.redisson.remote.RequestId;

/**
*
* @author Nikita Koksharov
*
*/
public class RedissonExecutorFutureReference extends WeakReference<RExecutorFuture<?>> {

private RequestId requestId;

public RedissonExecutorFutureReference(RequestId requestId, RExecutorFuture<?> referent, ReferenceQueue<? super RExecutorFuture<?>> q) {
super(referent, q);
this.requestId = requestId;
}

public RequestId getRequestId() {
return requestId;
}

}

0 comments on commit 2b8ec95

Please sign in to comment.