Skip to content

Commit

Permalink
#480 AndInThreads fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
yegor256 committed Nov 30, 2017
1 parent 81c7b24 commit ed0b73f
Showing 1 changed file with 37 additions and 2 deletions.
39 changes: 37 additions & 2 deletions src/main/java/org/cactoos/scalar/AndInThreads.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.cactoos.Func;
import org.cactoos.Proc;
import org.cactoos.Scalar;
Expand Down Expand Up @@ -74,6 +75,11 @@ public final class AndInThreads implements Scalar<Boolean> {
*/
private final Iterable<Scalar<Boolean>> iterable;

/**
* Shut down the service when it's done.
*/
private final boolean shut;

/**
* Ctor.
* @param proc Proc to map
Expand Down Expand Up @@ -165,7 +171,7 @@ public AndInThreads(final Iterator<Scalar<Boolean>> src) {
* @param src The iterable
*/
public AndInThreads(final Iterable<Scalar<Boolean>> src) {
this(Executors.newCachedThreadPool(), src);
this(Executors.newCachedThreadPool(), src, true);
}

/**
Expand Down Expand Up @@ -275,8 +281,20 @@ public AndInThreads(final ExecutorService svc,
*/
public AndInThreads(final ExecutorService svc,
final Iterable<Scalar<Boolean>> src) {
this(svc, src, false);
}

/**
* Ctor.
* @param svc Executable service to run thread in
* @param src The iterable
* @param sht Shut it down
*/
private AndInThreads(final ExecutorService svc,
final Iterable<Scalar<Boolean>> src, final boolean sht) {
this.service = svc;
this.iterable = src;
this.shut = sht;
}

@Override
Expand All @@ -285,10 +303,27 @@ public Boolean value() throws Exception {
for (final Scalar<Boolean> item : this.iterable) {
futures.add(this.service.submit(item::value));
}
return new And(
final boolean result = new And(
(Func<Future<Boolean>, Boolean>) Future::get,
futures
).value();
if (this.shut) {
this.service.shutdown();
try {
if (!this.service.awaitTermination(1L, TimeUnit.MINUTES)) {
throw new IllegalStateException(
String.format(
"Can't terminate the service, result=%b",
result
)
);
}
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IllegalStateException(ex);
}
}
return result;
}

}

0 comments on commit ed0b73f

Please sign in to comment.