diff --git a/optaplanner-core/src/main/java/org/optaplanner/core/impl/localsearch/decider/multithreaded/MultiThreadedLocalSearchDecider.java b/optaplanner-core/src/main/java/org/optaplanner/core/impl/localsearch/decider/multithreaded/MultiThreadedLocalSearchDecider.java index e635833abc..4cabf49172 100644 --- a/optaplanner-core/src/main/java/org/optaplanner/core/impl/localsearch/decider/multithreaded/MultiThreadedLocalSearchDecider.java +++ b/optaplanner-core/src/main/java/org/optaplanner/core/impl/localsearch/decider/multithreaded/MultiThreadedLocalSearchDecider.java @@ -16,6 +16,8 @@ package org.optaplanner.core.impl.localsearch.decider.multithreaded; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.BrokenBarrierException; @@ -38,6 +40,8 @@ import org.optaplanner.core.impl.score.director.InnerScoreDirector; import org.optaplanner.core.impl.solver.ChildThreadType; import org.optaplanner.core.impl.solver.termination.Termination; +import org.optaplanner.core.impl.solver.thread.OrderByMoveIndexBlockingQueue; +import org.optaplanner.core.impl.solver.thread.ThreadUtils; /** * @param the solution type, the class with the {@link PlanningSolution} annotation @@ -52,8 +56,9 @@ public class MultiThreadedLocalSearchDecider extends LocalSearchDecid protected boolean assertExpectedStepScore = false; protected boolean assertShadowVariablesAreNotStaleAfterStep = false; + protected List moveThreadRunnerList; protected BlockingQueue> operationQueue; - protected RearrangingBlockingQueue resultQueue; + protected OrderByMoveIndexBlockingQueue resultQueue; protected ExecutorService executor; protected CyclicBarrier moveThreadBarrier; @@ -84,12 +89,14 @@ public void phaseStarted(LocalSearchPhaseScope phaseScope) { // Capacity: number of moves in circulation + number of setup xor step operations + number of destroy operations operationQueue = new ArrayBlockingQueue<>(selectedMoveBufferSize + moveThreadCount + moveThreadCount); // Capacity: number of moves in circulation + number of exception handling results - resultQueue = new RearrangingBlockingQueue<>(selectedMoveBufferSize + moveThreadCount); + resultQueue = new OrderByMoveIndexBlockingQueue<>(selectedMoveBufferSize + moveThreadCount); executor = createThreadPoolExecutor(); moveThreadBarrier = new CyclicBarrier(moveThreadCount); InnerScoreDirector scoreDirector = phaseScope.getScoreDirector(); + moveThreadRunnerList = new ArrayList<>(moveThreadCount); for (int moveThreadIndex = 0; moveThreadIndex < moveThreadCount; moveThreadIndex++) { - MoveThreadRunner moveThreadRunner = new MoveThreadRunner(moveThreadIndex); // TODO pass Queue's so phaseEnded can null them + MoveThreadRunner moveThreadRunner = new MoveThreadRunner(moveThreadIndex); + moveThreadRunnerList.add(moveThreadRunner); executor.submit(moveThreadRunner); operationQueue.add(new SetupOperation<>(scoreDirector)); } @@ -98,10 +105,21 @@ public void phaseStarted(LocalSearchPhaseScope phaseScope) { @Override public void phaseEnded(LocalSearchPhaseScope phaseScope) { super.phaseEnded(phaseScope); + // Tell the move thread runners to stop + DestroyOperation destroyOperation = new DestroyOperation<>(); + for (int i = 0; i < moveThreadCount; i++) { + operationQueue.add(destroyOperation); + } + // TODO This should probably be in a finally that spawns at least the entire phase, maybe even the entire solve + ThreadUtils.shutdownAwaitOrKill(executor, logIndentation, "Multithreaded Local Search"); + long childThreadsScoreCalculationCount = 0; + for (MoveThreadRunner moveThreadRunner : moveThreadRunnerList) { + childThreadsScoreCalculationCount += moveThreadRunner.scoreDirector.getCalculationCount(); + } + phaseScope.addChildThreadsScoreCalculationCount(childThreadsScoreCalculationCount); + moveThreadRunnerList = null; operationQueue = null; resultQueue = null; - // TODO - } private ExecutorService createThreadPoolExecutor() { @@ -129,7 +147,7 @@ public void decideNextStep(LocalSearchStepScope stepScope) { // even if some of those moves won't be evaluated or foraged continue; } - RearrangingBlockingQueue.MoveResult result; + OrderByMoveIndexBlockingQueue.MoveResult result; try { result = resultQueue.take(); } catch (InterruptedException e) { diff --git a/optaplanner-core/src/main/java/org/optaplanner/core/impl/partitionedsearch/DefaultPartitionedSearchPhase.java b/optaplanner-core/src/main/java/org/optaplanner/core/impl/partitionedsearch/DefaultPartitionedSearchPhase.java index a82b07d5be..62c88d49e7 100644 --- a/optaplanner-core/src/main/java/org/optaplanner/core/impl/partitionedsearch/DefaultPartitionedSearchPhase.java +++ b/optaplanner-core/src/main/java/org/optaplanner/core/impl/partitionedsearch/DefaultPartitionedSearchPhase.java @@ -24,7 +24,6 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.optaplanner.core.api.domain.solution.PlanningSolution; import org.optaplanner.core.config.heuristic.policy.HeuristicConfigPolicy; @@ -46,6 +45,7 @@ import org.optaplanner.core.impl.solver.termination.ChildThreadPlumbingTermination; import org.optaplanner.core.impl.solver.termination.OrCompositeTermination; import org.optaplanner.core.impl.solver.termination.Termination; +import org.optaplanner.core.impl.solver.thread.ThreadUtils; /** * Default implementation of {@link PartitionedSearchPhase}. @@ -141,45 +141,11 @@ public void solve(DefaultSolverScope solverScope) { } phaseScope.addChildThreadsScoreCalculationCount(partitionQueue.getPartsCalculationCount()); } finally { - // 1. In case one of the partition threads threw an Exception, it is propagated here + // In case one of the partition threads threw an Exception, it is propagated here // but the other partition threads are not aware of the failure and may continue solving for a long time, // so we need to ask them to terminate. In case no exception was thrown, this does nothing. childThreadPlumbingTermination.terminateChildren(); - - // Intentionally clearing the interrupted flag so that awaitTermination() in step 3 works. - if (Thread.interrupted()) { - // 2a. If the current thread is interrupted, propagate interrupt signal to children by initiating - // an abrupt shutdown. - executor.shutdownNow(); - } else { - // 2b. Otherwise, initiate an orderly shutdown of the executor. This allows partition solvers to finish - // solving upon detecting the termination issued previously (step 1). Shutting down the executor - // service is important because the JVM cannot exit until all non-daemon threads have terminated. - executor.shutdown(); - } - - // 3. Finally, wait until the executor finishes shutting down. - try { - final int awaitingSeconds = 1; - if (!executor.awaitTermination(awaitingSeconds, TimeUnit.SECONDS)) { - // Some solvers refused to complete. Busy threads will be interrupted in the finally block. - // We're only logging the error instead throwing an exception to prevent eating the original - // exception. - logger.error( - "{}Partitioned Search threadPoolExecutor didn't terminate within timeout ({} second).", - logIndentation, - awaitingSeconds); - } - } catch (InterruptedException e) { - // Interrupted while waiting for thread pool termination. Busy threads will be interrupted - // in the finally block. - Thread.currentThread().interrupt(); - // If there is an original exception it will be eaten by this. - throw new IllegalStateException("Thread pool termination was interrupted.", e); - } finally { - // Initiate an abrupt shutdown for the case when any of the previous measures failed. - executor.shutdownNow(); - } + ThreadUtils.shutdownAwaitOrKill(executor, logIndentation, "Partitioned Search"); } phaseEnded(phaseScope); } diff --git a/optaplanner-core/src/main/java/org/optaplanner/core/impl/localsearch/decider/multithreaded/RearrangingBlockingQueue.java b/optaplanner-core/src/main/java/org/optaplanner/core/impl/solver/thread/OrderByMoveIndexBlockingQueue.java similarity index 97% rename from optaplanner-core/src/main/java/org/optaplanner/core/impl/localsearch/decider/multithreaded/RearrangingBlockingQueue.java rename to optaplanner-core/src/main/java/org/optaplanner/core/impl/solver/thread/OrderByMoveIndexBlockingQueue.java index 34518e9ebd..eb0f3549f6 100644 --- a/optaplanner-core/src/main/java/org/optaplanner/core/impl/localsearch/decider/multithreaded/RearrangingBlockingQueue.java +++ b/optaplanner-core/src/main/java/org/optaplanner/core/impl/solver/thread/OrderByMoveIndexBlockingQueue.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.optaplanner.core.impl.localsearch.decider.multithreaded; +package org.optaplanner.core.impl.solver.thread; import java.util.HashMap; import java.util.Map; @@ -24,7 +24,7 @@ import org.optaplanner.core.api.score.Score; import org.optaplanner.core.impl.heuristic.move.Move; -public class RearrangingBlockingQueue { +public class OrderByMoveIndexBlockingQueue { private final BlockingQueue> innerQueue; private final Map> backlog; @@ -32,7 +32,7 @@ public class RearrangingBlockingQueue { private int filterStepIndex = Integer.MIN_VALUE; private int nextMoveIndex = Integer.MIN_VALUE; - public RearrangingBlockingQueue(int capacity) { + public OrderByMoveIndexBlockingQueue(int capacity) { innerQueue = new ArrayBlockingQueue<>(capacity); backlog = new HashMap<>(capacity); } diff --git a/optaplanner-core/src/main/java/org/optaplanner/core/impl/solver/thread/ThreadUtils.java b/optaplanner-core/src/main/java/org/optaplanner/core/impl/solver/thread/ThreadUtils.java new file mode 100644 index 0000000000..91d065aac6 --- /dev/null +++ b/optaplanner-core/src/main/java/org/optaplanner/core/impl/solver/thread/ThreadUtils.java @@ -0,0 +1,74 @@ +/* + * Copyright 2018 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.optaplanner.core.impl.solver.thread; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ThreadUtils { + + protected static final transient Logger logger = LoggerFactory.getLogger(ThreadUtils.class); + + public static void shutdownAwaitOrKill(ExecutorService executor, String logIndentation, String name) { + // Intentionally clearing the interrupted flag so that awaitTermination() in step 3 works. + if (Thread.interrupted()) { + // 2a. If the current thread is interrupted, propagate interrupt signal to children by initiating + // an abrupt shutdown. + executor.shutdownNow(); + } else { + // 2b. Otherwise, initiate an orderly shutdown of the executor. This allows partition solvers to finish + // solving upon detecting the termination issued previously (step 1). Shutting down the executor + // service is important because the JVM cannot exit until all non-daemon threads have terminated. + executor.shutdown(); + } + + // 3. Finally, wait until the executor finishes shutting down. + try { + final int awaitingSeconds = 1; + if (!executor.awaitTermination(awaitingSeconds, TimeUnit.SECONDS)) { + // Some solvers refused to complete. Busy threads will be interrupted in the finally block. + // We're only logging the error instead throwing an exception to prevent eating the original + // exception. + logger.error( + "{}{}'s ExecutorService didn't terminate within timeout ({} seconds).", + logIndentation, name, + awaitingSeconds); + } + } catch (InterruptedException e) { + // Interrupted while waiting for thread pool termination. Busy threads will be interrupted + // in the finally block. + Thread.currentThread().interrupt(); + // If there is an original exception it will be eaten by this. + throw new IllegalStateException("Thread pool termination was interrupted.", e); + } finally { + // Initiate an abrupt shutdown for the case when any of the previous measures failed. + executor.shutdownNow(); + } + } + + + // ************************************************************************ + // Private constructor + // ************************************************************************ + + private ThreadUtils() { + } + +}