Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom Bujok committed Sep 1, 2015
1 parent 11ababd commit 6706c28
Show file tree
Hide file tree
Showing 10 changed files with 182 additions and 226 deletions.
Expand Up @@ -29,6 +29,7 @@
import com.hazelcast.client.spi.impl.ClientInvocationFuture;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.internal.serialization.SerializationService;
import com.hazelcast.logging.Logger;
import com.hazelcast.mapreduce.Collator;
import com.hazelcast.mapreduce.CombinerFactory;
Expand All @@ -51,7 +52,6 @@
import com.hazelcast.mapreduce.impl.task.TransferableJobProcessInformation;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.internal.serialization.SerializationService;
import com.hazelcast.spi.impl.AbstractCompletableFuture;
import com.hazelcast.util.EmptyStatement;
import com.hazelcast.util.UuidUtil;
Expand All @@ -65,12 +65,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static com.hazelcast.util.Preconditions.isNotNull;

public class ClientMapReduceProxy
extends ClientProxy
Expand Down Expand Up @@ -241,14 +236,10 @@ private class ClientCompletableFuture<V>
implements JobCompletableFuture<V> {

private final String jobId;
private final CountDownLatch latch;

private volatile boolean cancelled;

protected ClientCompletableFuture(String jobId) {
super(getContext().getExecutionService().getAsyncExecutor(), Logger.getLogger(ClientCompletableFuture.class));
this.jobId = jobId;
this.latch = new CountDownLatch(1);
}

@Override
Expand All @@ -257,7 +248,8 @@ public String getJobId() {
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
protected boolean shouldCancel(boolean mayInterruptIfRunning) {
boolean cancelled = false;
try {
ClientMessage request = MapReduceCancelCodec.encodeRequest(getName(), jobId);
ClientMessage response = invoke(request, jobId);
Expand All @@ -269,25 +261,10 @@ public boolean cancel(boolean mayInterruptIfRunning) {
}

@Override
public boolean isCancelled() {
return cancelled;
}

@Override
public void setResult(Object result) {
protected void setResult(Object result) {
super.setResult(result);
latch.countDown();
}

@Override
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
isNotNull(unit, "unit");
if (!latch.await(timeout, unit) || !isDone()) {
throw new TimeoutException("timeout reached");
}
return getResult();
}
}

private final class ClientTrackableJob<V>
Expand Down
Expand Up @@ -43,12 +43,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static com.hazelcast.util.Preconditions.isNotNull;

public class ClientMapReduceProxy
extends ClientProxy
Expand Down Expand Up @@ -82,14 +77,6 @@ public String toString() {
return "JobTracker{" + "name='" + getName() + '\'' + '}';
}

/*
* Removed for now since it is moved to Hazelcast 3.3
@Override
public <K, V> ProcessJob<K, V> newProcessJob(KeyValueSource<K, V> source) {
// TODO
return null;
}*/

private <T> T invoke(InvocationClientRequest request, String jobId) throws Exception {
ClientTrackableJob trackableJob = trackableJobs.get(jobId);
if (trackableJob != null) {
Expand Down Expand Up @@ -165,14 +152,10 @@ private class ClientCompletableFuture<V>
implements JobCompletableFuture<V> {

private final String jobId;
private final CountDownLatch latch;

private volatile boolean cancelled;

protected ClientCompletableFuture(String jobId) {
super(getContext().getExecutionService().getAsyncExecutor(), Logger.getLogger(ClientCompletableFuture.class));
this.jobId = jobId;
this.latch = new CountDownLatch(1);
}

@Override
Expand All @@ -181,7 +164,8 @@ public String getJobId() {
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
protected boolean shouldCancel(boolean mayInterruptIfRunning) {
boolean cancelled = false;
try {
cancelled = (Boolean) invoke(new ClientCancellationRequest(getName(), jobId), jobId);
} catch (Exception ignore) {
Expand All @@ -191,23 +175,8 @@ public boolean cancel(boolean mayInterruptIfRunning) {
}

@Override
public boolean isCancelled() {
return cancelled;
}

@Override
public void setResult(Object result) {
protected void setResult(Object result) {
super.setResult(result);
latch.countDown();
}

@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
isNotNull(unit, "unit");
if (!latch.await(timeout, unit) || !isDone()) {
throw new TimeoutException("timeout reached");
}
return getResult();
}
}

Expand Down
Expand Up @@ -369,13 +369,13 @@ public void onFailure(Throwable t) {
}

@Override
public boolean isCancelled() {
protected boolean shouldCancel(boolean mayInterruptIfRunning) {
return false;
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
protected void setResult(Object result) {
super.setResult(result);
}

@Override
Expand Down
Expand Up @@ -52,8 +52,6 @@ public class TrackableJobFuture<V>
private final Collator collator;
private final MapReduceService mapReduceService;

private volatile boolean cancelled;

public TrackableJobFuture(String name, String jobId, JobTracker jobTracker, NodeEngine nodeEngine, Collator collator) {
super(nodeEngine, nodeEngine.getLogger(TrackableJobFuture.class));
this.name = name;
Expand Down Expand Up @@ -86,7 +84,7 @@ public void setResult(Object result) {
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
protected boolean shouldCancel(boolean mayInterruptIfRunning) {
Address jobOwner = mapReduceService.getLocalAddress();
if (!mapReduceService.registerJobSupervisorCancellation(name, jobId, jobOwner)) {
return false;
Expand All @@ -96,8 +94,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
Exception exception = new CancellationException("Operation was cancelled by the user");
cancelled = supervisor.cancelAndNotify(exception) && super.cancel(mayInterruptIfRunning);
return cancelled;
return supervisor.cancelAndNotify(exception);
}

@Override
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.ExecutionService;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.util.EmptyStatement;

import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -122,14 +123,23 @@ private static boolean isDoneState(Object state) {
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
public final boolean cancel(boolean mayInterruptIfRunning) {
Boolean shouldCancel = null;

for (; ; ) {
Object currentState = this.state;

if (isDoneState(currentState)) {
return false;
}

if (shouldCancel == null) {
shouldCancel = shouldCancel(mayInterruptIfRunning);
}
if (!shouldCancel) {
return false;
}

if (STATE.compareAndSet(this, currentState, CANCELLED_STATE)) {
cancelled(mayInterruptIfRunning);
notifyThreadsWaitingOnGet();
Expand All @@ -138,6 +148,18 @@ public boolean cancel(boolean mayInterruptIfRunning) {
}
}

/**
* Protected method invoked on cancel(). Enables aborting the cancellation.
* Useful for futures' encompassing logic that can forbid the cancellation.
* By default always returns true.
*
* @param mayInterruptIfRunning passed through from cancel call
* @return true should the cancellation proceed; false otherwise
*/
protected boolean shouldCancel(boolean mayInterruptIfRunning) {
return true;
}

/**
* Protected method invoked when this task transitions to state
* {@code isCancelled}. The default implementation does nothing.
Expand All @@ -161,17 +183,26 @@ public boolean isCancelled() {
}

@Override
public V get() throws InterruptedException, ExecutionException {
try {
return get(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
logger.severe("Unexpected timeout while processing " + this, e);
return null;
public final V get() throws InterruptedException, ExecutionException {
for (; ; ) {
try {
return get(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
} catch (TimeoutException ignored) {
// A timeout here can only be a spurious artifact.
// It should never happen and even if it does, we must retry.
EmptyStatement.ignore(ignored);
}
}
}


/**
* PLEASE NOTE: It's legal to override this method, but please bear in mind that you should call super.get() or
* implement the done() and cancelled() callbacks to be notified if this future gets done or cancelled.
* Otherwise the overridden implementation of get() may get stuck on waiting forever.
*/
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
public V get(long timeout, TimeUnit unit) throws ExecutionException, InterruptedException, TimeoutException {
final long deadlineTimeMillis = System.currentTimeMillis() + unit.toMillis(timeout);
long millisToWait;
for (; ; ) {
Expand All @@ -181,7 +212,7 @@ public V get(long timeout, TimeUnit unit) throws InterruptedException, Execution
throw new CancellationException();
}
if (isDoneState(currentState)) {
return getResult();
return getResult(currentState);
}
if (Thread.interrupted()) {
throw new InterruptedException();
Expand All @@ -200,7 +231,7 @@ public V get(long timeout, TimeUnit unit) throws InterruptedException, Execution
}
}

public void setResult(Object result) {
protected void setResult(Object result) {
for (; ; ) {
Object currentState = this.state;

Expand All @@ -227,9 +258,19 @@ public void setResult(Object result) {
protected void done() {
}

/**
* Returns:
* <ul>
* <li>null - if cancelled or not done</li>
* <li>result - if done and result is NOT Throwable</li>
* <li>sneaky throws an exception - if done and result is Throwable</li>
* </ul>
*/
protected V getResult() {
Object state = this.state;
return getResult(this.state);
}

private static <V> V getResult(Object state) {
if (isCancelledState(state)) {
return null;
}
Expand All @@ -251,18 +292,18 @@ private void notifyThreadsWaitingOnGet() {
}
}

protected void runAsynchronous(ExecutionCallbackNode head, Object result) {
private void runAsynchronous(ExecutionCallbackNode head, Object result) {
while (head != INITIAL_STATE) {
runAsynchronous(head.callback, head.executor, result);
head = head.next;
}
}

private void runAsynchronous(ExecutionCallback<V> callback, Executor executor, Object result) {
executor.execute(new ExecutionCallbackRunnable<V>(result, callback));
executor.execute(new ExecutionCallbackRunnable<V>(getClass(), result, callback, logger));
}

static final class ExecutionCallbackNode<E> {
private static final class ExecutionCallbackNode<E> {
final ExecutionCallback<E> callback;
final Executor executor;
final ExecutionCallbackNode<E> next;
Expand All @@ -274,13 +315,17 @@ private ExecutionCallbackNode(ExecutionCallback<E> callback, Executor executor,
}
}

private class ExecutionCallbackRunnable<V> implements Runnable {
private static final class ExecutionCallbackRunnable<V> implements Runnable {
private final Class<?> caller;
private final Object result;
private final ExecutionCallback<V> callback;
private final ILogger logger;

public ExecutionCallbackRunnable(Object result, ExecutionCallback<V> callback) {
public ExecutionCallbackRunnable(Class<?> caller, Object result, ExecutionCallback<V> callback, ILogger logger) {
this.caller = caller;
this.result = result;
this.callback = callback;
this.logger = logger;
}

@Override
Expand All @@ -293,7 +338,7 @@ public void run() {
}
} catch (Throwable cause) {
logger.severe("Failed asynchronous execution of execution callback: " + callback
+ "for call " + AbstractCompletableFuture.this, cause);
+ "for call " + caller, cause);
}
}
}
Expand Down

0 comments on commit 6706c28

Please sign in to comment.