Skip to content

Commit

Permalink
follow parent project
Browse files Browse the repository at this point in the history
  • Loading branch information
ruslansennov committed Dec 28, 2017
1 parent 0078abc commit 9063d3a
Showing 1 changed file with 27 additions and 40 deletions.
67 changes: 27 additions & 40 deletions src/main/resources/super/io/vavr/concurrent/FutureImpl.java
Expand Up @@ -38,7 +38,7 @@ final class FutureImpl<T> implements Future<T> {
/**
* Used to start new threads.
*/
private final ExecutorService executorService;
private final Executor executor;

/**
* Used to synchronize state changes.
Expand All @@ -64,24 +64,15 @@ final class FutureImpl<T> implements Future<T> {
@GuardedBy("lock")
private Queue<Consumer<Try<T>>> actions;

/**
* Once a computation is started via run(), job is defined and used to control the lifecycle of the computation.
* <p>
* The {@code java.util.concurrent.Future} is not intended to store the result of the computation, it is stored in
* {@code value} instead.
*/
@GuardedBy("lock")
private java.util.concurrent.Future<?> job;

// single constructor
private FutureImpl(ExecutorService executorService, Option<Try<T>> value, Queue<Consumer<Try<T>>> actions, CheckedFunction1<FutureImpl<T>, java.util.concurrent.Future<?>> jobFactory) {
this.executorService = executorService;
private FutureImpl(Executor executor, Option<Try<T>> value, Queue<Consumer<Try<T>>> actions, Computation<T> computation) {
this.executor = executor;
synchronized (lock) {
this.cancelled = false;
this.value = value;
this.actions = actions;
try {
this.job = jobFactory.apply(this);
computation.execute(this::tryComplete, this::updateThread);
} catch(Throwable x) {
tryComplete(Try.failure(x));
}
Expand All @@ -91,46 +82,44 @@ private FutureImpl(ExecutorService executorService, Option<Try<T>> value, Queue<
/**
* Creates a {@code FutureImpl} that is immediately completed with the given value. No task will be started.
*
* @param executorService An {@link ExecutorService} to run and control the computation and to perform the actions.
* @param executor An {@link Executor} to run and control the computation and to perform the actions.
* @param value the result of this Future
*/
@SuppressWarnings("unchecked")
static <T> FutureImpl<T> of(ExecutorService executorService, Try<? extends T> value) {
return new FutureImpl<>(executorService, Option.some(Try.narrow(value)), null, ignored -> null);
static <T> FutureImpl<T> of(Executor executor, Try<? extends T> value) {
return new FutureImpl<>(executor, Option.some(Try.narrow(value)), null, (tryComplete, updateThread) -> {});
}

/**
* Creates a {@code FutureImpl} that is eventually completed.
* The given {@code computation} is <em>synchronously</em> executed, no thread is started.
*
* @param executorService An {@link ExecutorService} to run and control the computation and to perform the actions.
* @param executor An {@link Executor} to run and control the computation and to perform the actions.
* @param computation A non-blocking computation
* @param <T> value type of the Future
* @return a new {@code FutureImpl} instance
*/
static <T> FutureImpl<T> sync(ExecutorService executorService, CheckedConsumer<Predicate<Try<? extends T>>> computation) {
return new FutureImpl<>(executorService, Option.none(), Queue.empty(), future -> {
computation.accept(future::tryComplete);
return null;
static <T> FutureImpl<T> sync(Executor executor, CheckedConsumer<Predicate<Try<? extends T>>> computation) {
return new FutureImpl<>(executor, Option.none(), Queue.empty(), (tryComplete, updateThread) -> {
computation.accept(tryComplete);
});
}

/**
* Creates a {@code FutureImpl} that is eventually completed.
* The given {@code computation} is <em>asynchronously</em> executed, a new thread is started.
*
* @param executorService An {@link ExecutorService} to run and control the computation and to perform the actions.
* @param executor An {@link Executor} to run and control the computation and to perform the actions.
* @param computation A (possibly blocking) computation
* @param <T> value type of the Future
* @return a new {@code FutureImpl} instance
*/
static <T> FutureImpl<T> async(ExecutorService executorService, CheckedConsumer<Predicate<Try<? extends T>>> computation) {
// In a single-threaded context this Future may already have been completed during initialization.
return new FutureImpl<>(executorService, Option.none(), Queue.empty(), future -> executorService.submit(() -> {
static <T> FutureImpl<T> async(Executor executor, CheckedConsumer<Predicate<Try<? extends T>>> computation) {
return new FutureImpl<>(executor, Option.none(), Queue.empty(), (tryComplete, updateThread) -> executor.execute(() -> {
try {
computation.accept(future::tryComplete);
computation.accept(tryComplete);
} catch (Throwable x) {
future.tryComplete(Try.failure(x));
tryComplete.test(Try.failure(x));
}
}));
}
Expand All @@ -148,22 +137,17 @@ public Future<T> await(long timeout, TimeUnit unit) {
@Override
public Future<T> cancel(boolean mayInterruptIfRunning) {
if (!isCompleted()) {
synchronized (lock) {
Try.of(() -> job == null || job.cancel(mayInterruptIfRunning))
.recover(ignored -> job != null && job.isCancelled())
.onSuccess(cancelled -> {
if (cancelled) {
this.cancelled = tryComplete(Try.failure(new CancellationException()));
}
});
}
this.cancelled = tryComplete(Try.failure(new CancellationException()));
}
return this;
}

private void updateThread() {
}

@Override
public ExecutorService executorService() {
return executorService;
public Executor executor() {
return executor;
}

@Override
Expand Down Expand Up @@ -235,7 +219,6 @@ private boolean tryComplete(Try<? extends T> value) {
actions = this.actions;
this.value = Option.some(Try.narrow(value));
this.actions = null;
this.job = null;
}
}
if (actions != null) {
Expand All @@ -249,9 +232,13 @@ private boolean tryComplete(Try<? extends T> value) {

private void perform(Consumer<? super Try<T>> action) {
try {
executorService.execute(() -> action.accept(value.get()));
executor.execute(() -> action.accept(value.get()));
} catch(Throwable x) {
// ignored // TODO: tell UncaughtExceptionHandler?
}
}

private interface Computation<T> {
void execute(Predicate<Try<? extends T>> tryComplete, Runnable updateThread) throws Throwable;
}
}

0 comments on commit 9063d3a

Please sign in to comment.