Skip to content

Commit

Permalink
PLANNER-1090 Multithreaded Local Search: accurate score calculation s…
Browse files Browse the repository at this point in the history
…peed + shutdown + extract common classes to solver/thread package
  • Loading branch information
ge0ffrey committed Jul 2, 2018
1 parent 764e1c0 commit 8ea37c7
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 46 deletions.
Expand Up @@ -16,6 +16,8 @@

package org.optaplanner.core.impl.localsearch.decider.multithreaded;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.BrokenBarrierException;
Expand All @@ -38,6 +40,8 @@
import org.optaplanner.core.impl.score.director.InnerScoreDirector;
import org.optaplanner.core.impl.solver.ChildThreadType;
import org.optaplanner.core.impl.solver.termination.Termination;
import org.optaplanner.core.impl.solver.thread.OrderByMoveIndexBlockingQueue;
import org.optaplanner.core.impl.solver.thread.ThreadUtils;

/**
* @param <Solution_> the solution type, the class with the {@link PlanningSolution} annotation
Expand All @@ -52,8 +56,9 @@ public class MultiThreadedLocalSearchDecider<Solution_> extends LocalSearchDecid
protected boolean assertExpectedStepScore = false;
protected boolean assertShadowVariablesAreNotStaleAfterStep = false;

protected List<MoveThreadRunner> moveThreadRunnerList;
protected BlockingQueue<MoveThreadOperation<Solution_>> operationQueue;
protected RearrangingBlockingQueue<Solution_> resultQueue;
protected OrderByMoveIndexBlockingQueue<Solution_> resultQueue;
protected ExecutorService executor;
protected CyclicBarrier moveThreadBarrier;

Expand Down Expand Up @@ -84,12 +89,14 @@ public void phaseStarted(LocalSearchPhaseScope<Solution_> phaseScope) {
// Capacity: number of moves in circulation + number of setup xor step operations + number of destroy operations
operationQueue = new ArrayBlockingQueue<>(selectedMoveBufferSize + moveThreadCount + moveThreadCount);
// Capacity: number of moves in circulation + number of exception handling results
resultQueue = new RearrangingBlockingQueue<>(selectedMoveBufferSize + moveThreadCount);
resultQueue = new OrderByMoveIndexBlockingQueue<>(selectedMoveBufferSize + moveThreadCount);
executor = createThreadPoolExecutor();
moveThreadBarrier = new CyclicBarrier(moveThreadCount);
InnerScoreDirector<Solution_> scoreDirector = phaseScope.getScoreDirector();
moveThreadRunnerList = new ArrayList<>(moveThreadCount);
for (int moveThreadIndex = 0; moveThreadIndex < moveThreadCount; moveThreadIndex++) {
MoveThreadRunner moveThreadRunner = new MoveThreadRunner(moveThreadIndex); // TODO pass Queue's so phaseEnded can null them
MoveThreadRunner moveThreadRunner = new MoveThreadRunner(moveThreadIndex);
moveThreadRunnerList.add(moveThreadRunner);
executor.submit(moveThreadRunner);
operationQueue.add(new SetupOperation<>(scoreDirector));
}
Expand All @@ -98,10 +105,21 @@ public void phaseStarted(LocalSearchPhaseScope<Solution_> phaseScope) {
@Override
public void phaseEnded(LocalSearchPhaseScope<Solution_> phaseScope) {
super.phaseEnded(phaseScope);
// Tell the move thread runners to stop
DestroyOperation<Solution_> destroyOperation = new DestroyOperation<>();
for (int i = 0; i < moveThreadCount; i++) {
operationQueue.add(destroyOperation);
}
// TODO This should probably be in a finally that spawns at least the entire phase, maybe even the entire solve
ThreadUtils.shutdownAwaitOrKill(executor, logIndentation, "Multithreaded Local Search");
long childThreadsScoreCalculationCount = 0;
for (MoveThreadRunner moveThreadRunner : moveThreadRunnerList) {
childThreadsScoreCalculationCount += moveThreadRunner.scoreDirector.getCalculationCount();
}
phaseScope.addChildThreadsScoreCalculationCount(childThreadsScoreCalculationCount);
moveThreadRunnerList = null;
operationQueue = null;
resultQueue = null;
// TODO

}

private ExecutorService createThreadPoolExecutor() {
Expand Down Expand Up @@ -129,7 +147,7 @@ public void decideNextStep(LocalSearchStepScope<Solution_> stepScope) {
// even if some of those moves won't be evaluated or foraged
continue;
}
RearrangingBlockingQueue.MoveResult<Solution_> result;
OrderByMoveIndexBlockingQueue.MoveResult<Solution_> result;
try {
result = resultQueue.take();
} catch (InterruptedException e) {
Expand Down
Expand Up @@ -24,7 +24,6 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.optaplanner.core.api.domain.solution.PlanningSolution;
import org.optaplanner.core.config.heuristic.policy.HeuristicConfigPolicy;
Expand All @@ -46,6 +45,7 @@
import org.optaplanner.core.impl.solver.termination.ChildThreadPlumbingTermination;
import org.optaplanner.core.impl.solver.termination.OrCompositeTermination;
import org.optaplanner.core.impl.solver.termination.Termination;
import org.optaplanner.core.impl.solver.thread.ThreadUtils;

/**
* Default implementation of {@link PartitionedSearchPhase}.
Expand Down Expand Up @@ -141,45 +141,11 @@ public void solve(DefaultSolverScope<Solution_> solverScope) {
}
phaseScope.addChildThreadsScoreCalculationCount(partitionQueue.getPartsCalculationCount());
} finally {
// 1. In case one of the partition threads threw an Exception, it is propagated here
// In case one of the partition threads threw an Exception, it is propagated here
// but the other partition threads are not aware of the failure and may continue solving for a long time,
// so we need to ask them to terminate. In case no exception was thrown, this does nothing.
childThreadPlumbingTermination.terminateChildren();

// Intentionally clearing the interrupted flag so that awaitTermination() in step 3 works.
if (Thread.interrupted()) {
// 2a. If the current thread is interrupted, propagate interrupt signal to children by initiating
// an abrupt shutdown.
executor.shutdownNow();
} else {
// 2b. Otherwise, initiate an orderly shutdown of the executor. This allows partition solvers to finish
// solving upon detecting the termination issued previously (step 1). Shutting down the executor
// service is important because the JVM cannot exit until all non-daemon threads have terminated.
executor.shutdown();
}

// 3. Finally, wait until the executor finishes shutting down.
try {
final int awaitingSeconds = 1;
if (!executor.awaitTermination(awaitingSeconds, TimeUnit.SECONDS)) {
// Some solvers refused to complete. Busy threads will be interrupted in the finally block.
// We're only logging the error instead throwing an exception to prevent eating the original
// exception.
logger.error(
"{}Partitioned Search threadPoolExecutor didn't terminate within timeout ({} second).",
logIndentation,
awaitingSeconds);
}
} catch (InterruptedException e) {
// Interrupted while waiting for thread pool termination. Busy threads will be interrupted
// in the finally block.
Thread.currentThread().interrupt();
// If there is an original exception it will be eaten by this.
throw new IllegalStateException("Thread pool termination was interrupted.", e);
} finally {
// Initiate an abrupt shutdown for the case when any of the previous measures failed.
executor.shutdownNow();
}
ThreadUtils.shutdownAwaitOrKill(executor, logIndentation, "Partitioned Search");
}
phaseEnded(phaseScope);
}
Expand Down
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package org.optaplanner.core.impl.localsearch.decider.multithreaded;
package org.optaplanner.core.impl.solver.thread;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -24,15 +24,15 @@
import org.optaplanner.core.api.score.Score;
import org.optaplanner.core.impl.heuristic.move.Move;

public class RearrangingBlockingQueue<Solution_> {
public class OrderByMoveIndexBlockingQueue<Solution_> {

private final BlockingQueue<MoveResult<Solution_>> innerQueue;
private final Map<Integer, MoveResult<Solution_>> backlog;

private int filterStepIndex = Integer.MIN_VALUE;
private int nextMoveIndex = Integer.MIN_VALUE;

public RearrangingBlockingQueue(int capacity) {
public OrderByMoveIndexBlockingQueue(int capacity) {
innerQueue = new ArrayBlockingQueue<>(capacity);
backlog = new HashMap<>(capacity);
}
Expand Down
@@ -0,0 +1,74 @@
/*
* Copyright 2018 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.optaplanner.core.impl.solver.thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThreadUtils {

protected static final transient Logger logger = LoggerFactory.getLogger(ThreadUtils.class);

public static void shutdownAwaitOrKill(ExecutorService executor, String logIndentation, String name) {
// Intentionally clearing the interrupted flag so that awaitTermination() in step 3 works.
if (Thread.interrupted()) {
// 2a. If the current thread is interrupted, propagate interrupt signal to children by initiating
// an abrupt shutdown.
executor.shutdownNow();
} else {
// 2b. Otherwise, initiate an orderly shutdown of the executor. This allows partition solvers to finish
// solving upon detecting the termination issued previously (step 1). Shutting down the executor
// service is important because the JVM cannot exit until all non-daemon threads have terminated.
executor.shutdown();
}

// 3. Finally, wait until the executor finishes shutting down.
try {
final int awaitingSeconds = 1;
if (!executor.awaitTermination(awaitingSeconds, TimeUnit.SECONDS)) {
// Some solvers refused to complete. Busy threads will be interrupted in the finally block.
// We're only logging the error instead throwing an exception to prevent eating the original
// exception.
logger.error(
"{}{}'s ExecutorService didn't terminate within timeout ({} seconds).",
logIndentation, name,
awaitingSeconds);
}
} catch (InterruptedException e) {
// Interrupted while waiting for thread pool termination. Busy threads will be interrupted
// in the finally block.
Thread.currentThread().interrupt();
// If there is an original exception it will be eaten by this.
throw new IllegalStateException("Thread pool termination was interrupted.", e);
} finally {
// Initiate an abrupt shutdown for the case when any of the previous measures failed.
executor.shutdownNow();
}
}


// ************************************************************************
// Private constructor
// ************************************************************************

private ThreadUtils() {
}

}

0 comments on commit 8ea37c7

Please sign in to comment.