Skip to content

Commit

Permalink
Changing Task to prevent completed/completedExceptionally from being …
Browse files Browse the repository at this point in the history
…called from outside the framework or task owner.
  • Loading branch information
DanielSperry committed Mar 31, 2015
1 parent 661728d commit fef6008
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public String generate()
builder.append(".");
builder.append( referenceName );
builder.append("(id);\r\n }\r\n\r\n");
if(clazz.isNoIdentity) {
if (clazz.isNoIdentity) {
builder.append(" public static ");
builder.append( interfaceFullName );
builder.append(" getReference()\r\n {\r\n return new ");
Expand Down Expand Up @@ -87,7 +87,7 @@ public String generate()
builder.append("\r\n {\r\n public ");
builder.append( referenceName );
builder.append("(String id)\r\n {\r\n super(id);\r\n");
if(clazz.isNoIdentity) {
if (clazz.isNoIdentity) {
builder.append(" if (id != null)\r\n {\r\n throw new IllegalArgumentException(\"Id must be null since this interface has @NoIdentity\");\r\n }\r\n");
}
builder.append(" }\r\n\r\n @Override\r\n protected int _interfaceId()\r\n {\r\n return ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ public final Task<?> safeInvoke(T target, int methodId, Object[] params)
}
catch (Throwable ex)
{
final Task<Object> exceptionTask = new Task<>();
exceptionTask.completeExceptionally(ex);
final Task<Object> exceptionTask = Task.fromException(ex);
return exceptionTask;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -880,7 +881,7 @@ public void activationCleanup(boolean block)
{

long cutOut = clock.millis() - TimeUnit.MINUTES.toMillis(10);
final List<Task<?>> futures = block ? new ArrayList<>() : null;
final List<CompletableFuture<?>> futures = block ? new ArrayList<>() : null;
for (Iterator<Map.Entry<EntryKey, ReferenceEntry>> iterator = localActors.entrySet().iterator(); iterator.hasNext(); )
{
Map.Entry<EntryKey, ReferenceEntry> mEntry = iterator.next();
Expand All @@ -896,7 +897,7 @@ public void activationCleanup(boolean block)
}
if (act.lastAccess < cutOut)
{
Task task1 = new Task();
CompletableFuture future = new CompletableFuture();
final Supplier<Task<?>> task = () -> {
try
{
Expand All @@ -906,34 +907,34 @@ public void activationCleanup(boolean block)
res.whenComplete((r, e) -> {
if (e != null)
{
task1.completeExceptionally(e);
future.completeExceptionally(e);
}
else
{
task1.complete(r);
future.complete(r);
}
});
}
else
{
task1.complete(null);
future.complete(null);
}
return res;
}
catch (Error | RuntimeException ex)
{
task1.completeExceptionally(ex);
future.completeExceptionally(ex);
throw ex;
}
catch (Throwable ex)
{
task1.completeExceptionally(ex);
future.completeExceptionally(ex);
throw new UncheckedException(ex);
}
};
if (executionSerializer.offerJob(mEntry.getKey(), task, maxQueueSize) && block)
{
futures.add(task1);
futures.add(future);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ public boolean equals(final Object o)
return (this == o);
}

@Override
protected boolean internalComplete(Object value)
{
return super.internalComplete(value);
}

@Override
protected boolean internalCompleteExceptionally(Throwable ex)
{
return super.internalCompleteExceptionally(ex);
}
}

public void setClock(final Clock clock)
Expand Down Expand Up @@ -188,19 +199,19 @@ private void onMessageReceived(final INodeAddress from, final byte[] buff)
catch (Exception ex)
{
logger.error("Error deserializing response", ex);
pendingResponse.completeExceptionally(new UncheckedException("Error deserializing response", ex));
pendingResponse.internalCompleteExceptionally(new UncheckedException("Error deserializing response", ex));
return;
}
switch (messageType)
{
case 1:
pendingResponse.complete(res);
pendingResponse.internalComplete(res);
return;
case 2:
pendingResponse.completeExceptionally((Throwable) res);
pendingResponse.internalCompleteExceptionally((Throwable) res);
return;
case 3:
pendingResponse.completeExceptionally(new UncheckedException("Error invoking but no exception provided. Res: " + res));
pendingResponse.internalCompleteExceptionally(new UncheckedException("Error invoking but no exception provided. Res: " + res));
return;
default:
// should be impossible
Expand Down Expand Up @@ -313,14 +324,14 @@ public Task<?> sendMessage(INodeAddress to, boolean oneWay, int interfaceId, int
clusterPeer.sendMessage(to, byteArrayOutputStream.toByteArray());
if (oneWay)
{
pendingResponse.complete(NIL);
pendingResponse.internalComplete(NIL);
}
}
catch (Exception ex)
{
pendingResponseMap.remove(messageId);
pendingResponsesQueue.remove(pendingResponse);
pendingResponse.completeExceptionally(ex);
pendingResponse.internalCompleteExceptionally(ex);
}
return pendingResponse;
}
Expand All @@ -340,7 +351,7 @@ public void timeoutCleanup()
}
if (!top.isDone())
{
top.completeExceptionally(new TimeoutException("Response timeout"));
top.internalCompleteExceptionally(new TimeoutException("Response timeout"));
}
pendingResponseMap.remove(top.messageId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class FakeClusterPeer implements IClusterPeer
private AtomicLong messagesSentOk = new AtomicLong();
private AtomicLong messagesReceived = new AtomicLong();
private AtomicLong messagesReceivedOk = new AtomicLong();
private Task startFuture = new Task();
private CompletableFuture startFuture = new CompletableFuture();

public FakeClusterPeer()
{
Expand Down
97 changes: 82 additions & 15 deletions commons/src/main/java/com/ea/orbit/concurrent/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

package com.ea.orbit.concurrent;

import com.ea.orbit.exception.UncheckedException;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -45,10 +47,37 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Task are CompletableFutures were with a few changes.
* <p>
* <ul>
* <li>complete and completeExceptionally are not publicly accessible.</li>
* <li>few utility methods (thenRun, thenReturn)</li>
* <li>TODO: all callbacks are async by default using the current executor</li>
* </ul>
* </p>
*
* @param <T> the type of the returned object.
* @see java.util.concurrent.CompletableFuture
*/
public class Task<T> extends CompletableFuture<T>
{
private static Void NIL = null;

// TODO: make all callbacks async by default and using the current executor
// what "current executor' means will have to be defined.
// the idea is to use a framework supplied executor to serve
// single point to capture all activity derived from the execution
// of one application request.
// Including logs, stats, exception and timing information.

// TODO: add here or elsewhere a method to wrap a task with timeout
// example: Task result = aTask.timeout(60, TimeUnit.SECONDS);

// TODO: consider implementing CompletionStage instead of deriving from CompletableFuture.

// TODO: consider creating a public class CTask = "Completable Task"

/**
* Creates an already completed task from the given value.
*
Expand All @@ -57,10 +86,49 @@ public class Task<T> extends CompletableFuture<T>
public static <T> Task<T> fromValue(T value)
{
final Task<T> t = new Task<T>();
t.complete(value);
t.internalComplete(value);
return t;
}

public static <T> Task<T> fromException(Throwable ex)
{
final Task<T> t = new Task<T>();
t.internalCompleteExceptionally(ex);
return t;
}


protected boolean internalComplete(T value)
{
return super.complete(value);
}

protected boolean internalCompleteExceptionally(Throwable ex)
{
return super.completeExceptionally(ex);
}

/**
* This completableFuture derived method is not available for Tasks.
*/
@Override
@Deprecated
public boolean complete(T value)
{
// TODO: use another runtime exception
throw new UncheckedException("Protected");
}

/**
* This completableFuture derived method is not available for Tasks.
*/
@Override
@Deprecated
public boolean completeExceptionally(Throwable ex)
{
throw new UncheckedException("Protected");
}

/**
* Wraps a CompletionStage as a Task or just casts it if it is already a Task.
*
Expand All @@ -78,11 +146,11 @@ public static <T> Task<T> from(CompletionStage<T> stage)
stage.handle((T v, Throwable ex) -> {
if (ex != null)
{
t.completeExceptionally(ex);
t.internalCompleteExceptionally(ex);
}
else
{
t.complete(v);
t.internalComplete(v);
}
return null;
});
Expand Down Expand Up @@ -119,11 +187,11 @@ public static <T> Task<T> fromFuture(Future<T> future)
{
try
{
t.complete(future.get());
t.internalComplete(future.get());
}
catch (Throwable ex)
{
t.completeExceptionally(ex);
t.internalCompleteExceptionally(ex);
}
return t;
}
Expand Down Expand Up @@ -165,7 +233,7 @@ public void run()
{
try
{
task.complete(future.get(waitTimeout, waitTimeoutUnit));
task.internalComplete(future.get(waitTimeout, waitTimeoutUnit));
return;
}
catch (TimeoutException ex)
Expand All @@ -175,12 +243,12 @@ public void run()
// in this case something completed the future with a timeout exception.
try
{
task.complete(future.get(waitTimeout, waitTimeoutUnit));
task.internalComplete(future.get(waitTimeout, waitTimeoutUnit));
return;
}
catch (Throwable tex0)
{
task.completeExceptionally(tex0);
task.internalCompleteExceptionally(tex0);
}
return;
}
Expand All @@ -199,29 +267,28 @@ public void run()
}
catch (Throwable tex)
{
task.completeExceptionally(tex);
task.internalCompleteExceptionally(tex);
return;
}
}
catch (Throwable ex)
{
task.completeExceptionally(ex);
task.internalCompleteExceptionally(ex);
return;
}
}
}
catch (Throwable ex)
{
task.completeExceptionally(ex);
task.internalCompleteExceptionally(ex);
}
}
}


public static Task<Void> done()
{
final Task<Void> task = new Task<Void>();
task.complete(NIL);
task.internalComplete(NIL);
return task;
}

Expand All @@ -246,7 +313,7 @@ public Task<T> whenComplete(final BiConsumer<? super T, ? super Throwable> actio
/**
* Returns a new Task that is executed when this task completes normally.
* The result of the new Task will be the result of the Supplier passed as parameter.
*
* <p/>
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
Expand Down Expand Up @@ -342,7 +409,7 @@ public static <F extends CompletableFuture> Task<Object> anyOf(Collection<F> cfs
*/
public static <F extends CompletableFuture<?>> Task<Object> anyOf(Stream<F> cfs)
{
return from(CompletableFuture.anyOf((CompletableFuture[])cfs.toArray(size -> new CompletableFuture[size])));
return from(CompletableFuture.anyOf((CompletableFuture[]) cfs.toArray(size -> new CompletableFuture[size])));
}

}
Loading

0 comments on commit fef6008

Please sign in to comment.