Skip to content

Commit

Permalink
glow runtime: use folly cpu thread pool (#4260)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #4260

In glow we have our own implementation of a ThreadPool. Glow's implementation is a pool of thread each having their own work queue. New work is assigned to different threads round robin.

This kind of implementation is usually good if and only if
1. all the work items are guaranteed to complete with similar amount of time. Or
2. we implement work stealing.

Otherwise we can end up with load imbalance where some threads have a heavy workload and long queue while other threads are idle.

Now that we have folly, we can just use folly's thread pool which is implemented with a shared queue using high performance data structure and primitives (MPMC queue + LIfoSemaphore). Using this implementation can reduce load imbalance and tail latency.

In my testing, the avg latency and throughput doesn't really change. But the tail latency is much better. The reason is because of shorter queuing time between device manager thread being done with an inference and the host manager executor thread handling the result.

Previously, the handling of the device manager result is always scheduled onto a thread that was pre-determined via round robin. If this thread happens to be busy (say handling some other input), then the result handling has to wait while there may be other threads that are totally idle. After the change, the result will be handled by any worker thread that's free.

You can observe this change in the glow trace.

Reviewed By: yinghai

Differential Revision: D20203927

fbshipit-source-id: 7ac2ada194837bcfc1395aec996dc6be13d9d2f6
  • Loading branch information
tracelogfb authored and facebook-github-bot committed Mar 11, 2020
1 parent 2aa0e5d commit 476c7bd
Show file tree
Hide file tree
Showing 12 changed files with 42 additions and 45 deletions.
4 changes: 1 addition & 3 deletions .circleci/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,11 @@ fi
CMAKE_ARGS+=("-DCMAKE_CXX_FLAGS=-Werror")
CMAKE_ARGS+=("-DGLOW_WITH_CPU=ON")
CMAKE_ARGS+=("-DGLOW_WITH_HABANA=OFF")

if [[ "${CIRCLE_JOB}" == "ASAN" ]]; then
CMAKE_ARGS+=("-DGLOW_USE_SANITIZER='Address;Undefined'")
CMAKE_ARGS+=("-DGLOW_WITH_OPENCL=OFF")
CMAKE_ARGS+=("-DCMAKE_BUILD_TYPE=Release")
elif [[ "${CIRCLE_JOB}" == "TSAN" ]]; then
CMAKE_ARGS+=("-DGLOW_USE_SANITIZER='Thread'")
CMAKE_ARGS+=("-DGLOW_WITH_OPENCL=OFF")
elif [[ "$CIRCLE_JOB" == "RELEASE_WITH_EXPENSIVE_TESTS" ]]; then
# Download the models and tell cmake where to find them.
MODELS_DIR="$GLOW_DIR/downloaded_models"
Expand Down
5 changes: 0 additions & 5 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ jobs:
environment:
DOCKER_IMAGE: "308535385114.dkr.ecr.us-east-1.amazonaws.com/caffe2/py2-clang8-ubuntu16.04:283"
<<: *linux_default
TSAN:
environment:
DOCKER_IMAGE: "308535385114.dkr.ecr.us-east-1.amazonaws.com/caffe2/py2-clang8-ubuntu16.04:283"
<<: *linux_default
32B_DIM_T:
environment:
DOCKER_IMAGE: "308535385114.dkr.ecr.us-east-1.amazonaws.com/caffe2/py2-clang8-ubuntu16.04:283"
Expand All @@ -108,7 +104,6 @@ workflows:
- DEBUG
- OPENCL
- ASAN
- TSAN
- 32B_DIM_T
- RELEASE_WITH_EXPENSIVE_TESTS
- PYTORCH
Expand Down
File renamed without changes.
6 changes: 1 addition & 5 deletions .circleci/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ set -euxo pipefail
export GLOW_SRC=$PWD
export GLOW_BUILD_DIR=${GLOW_SRC}/build
export LOADER=${GLOW_BUILD_DIR}/bin/image-classifier
export LSAN_OPTIONS="suppressions=$GLOW_SRC/.circleci/suppressions.txt"
export LSAN_OPTIONS="suppressions=$GLOW_SRC/.circleci/lsan_suppressions.txt"
export ASAN_SYMBOLIZER_PATH=/usr/bin/llvm-symbolizer
export IMAGES_DIR=${GLOW_SRC}/tests/images/

Expand Down Expand Up @@ -69,10 +69,6 @@ case ${CIRCLE_JOB} in
OPENCL)
run_unit_tests check
;;
TSAN)
# Run only Glow tests.
run_unit_tests check
;;
DEBUG)
run_unit_tests check
run_unit_tests test_unopt
Expand Down
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,12 @@ else()
endif()
set(SAVE_CMAKE_CXX_STANDARD "${CMAKE_CXX_STANDARD}")
unset(CMAKE_CXX_STANDARD)
set(CXX_STD "c++14" CACHE STRING "Force c++14 for folly")
set(COMPILER_HAS_F_ALIGNED_NEW OFF CACHE BOOL "turn off -faligned_new for folly")
add_subdirectory("${GLOW_THIRDPARTY_DIR}/folly" EXCLUDE_FROM_ALL)
target_include_directories(folly PUBLIC "${OPENSSL_INCLUDE_DIR}")
set(CMAKE_CXX_FLAGS "${SAVE_CMAKE_CXX_FLAGS}")
set(CMAKE_CXX_STANDARD "${SAVE_CMAKE_CXX_STANDARD}")

add_library(folly_jemalloc INTERFACE)
target_link_libraries(folly_jemalloc INTERFACE folly jemalloc)
endif()
Expand Down
6 changes: 1 addition & 5 deletions cmake/modules/GlowDefaults.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@ if(MSVC OR CMAKE_CXX_SIMULATE_ID STREQUAL "MSVC")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /STACK:4194304")
else()
include(CheckCXXCompilerFlag)
if(CMAKE_BUILD_RTTI STREQUAL "ON")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wnon-virtual-dtor -fno-exceptions")
else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wnon-virtual-dtor -fno-exceptions -fno-rtti")
endif()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wnon-virtual-dtor")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fno-omit-frame-pointer -O0")
CHECK_CXX_COMPILER_FLAG("-Wno-psabi" HAS_W_NO_PSABI)
if(HAS_W_NO_PSABI)
Expand Down
4 changes: 2 additions & 2 deletions include/glow/Runtime/Executor/ThreadPoolExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
#include <unordered_map>

#include "NetworkExecutionState.h"
#include "folly/executors/CPUThreadPoolExecutor.h"
#include "glow/Runtime/Executor/Executor.h"
#include "glow/Support/ThreadPool.h"

namespace glow {
namespace runtime {
Expand Down Expand Up @@ -100,7 +100,7 @@ class ThreadPoolExecutor final : public Executor {
/// The default number of workers in the thread pool.
constexpr static unsigned kNumWorkers = 3;
/// The thread pool used to drive execution.
ThreadPool threadPool_;
folly::CPUThreadPoolExecutor threadPool_;

/// Map of networkExecutionState pools for each network.
std::unordered_map<const DAGNode *,
Expand Down
14 changes: 10 additions & 4 deletions lib/Backends/NNPI/InferencePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ Error InferencePoolEnv::init(unsigned numWorkers, NNPIAdapter adapter,
return MAKE_ERR("InferencePool already initialized!");
}
numWorkers_ = numWorkers;
workersPool_ = glow::make_unique<ThreadPool>(numWorkers_, "NNPI-worker");
workersPool_ = glow::make_unique<folly::CPUThreadPoolExecutor>(
numWorkers_, std::make_shared<folly::NamedThreadFactory>("NNPI-worker"));
deviceTracing_ = deviceTracing;

inferenceContexts_.resize(numWorkers_);
Expand Down Expand Up @@ -137,13 +138,18 @@ Error InferencePoolEnv::init(unsigned numWorkers, NNPIAdapter adapter,
return Error::success();
}

void InferencePoolEnv::stop(bool block) { workersPool_->stop(block); }
void InferencePoolEnv::stop(bool block) {
workersPool_->stop();
if (block) {
workersPool_->join();
}
}

void InferencePoolEnv::execute(RunIdentifierTy runId,
std::unique_ptr<ExecutionContext> ctx,
runtime::ResultCBTy resultCB) {
workersPool_->submit([this, runId, ctx = std::move(ctx),
resultCB = std::move(resultCB)]() mutable {
workersPool_->add([this, runId, ctx = std::move(ctx),
resultCB = std::move(resultCB)]() mutable {
InferenceContext *infCtx = nullptr;
{
const std::lock_guard<std::mutex> lock(freeContextsLock_);
Expand Down
4 changes: 2 additions & 2 deletions lib/Backends/NNPI/InferencePool.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
#include "InferenceContext.h"
#include "NNPICompiledFunction.h"
#include "NNPITracing.h"
#include "folly/executors/CPUThreadPoolExecutor.h"
#include "glow/Runtime/RuntimeTypes.h"
#include "glow/Support/ThreadPool.h"
#include "nnpi_inference.h"
#include "nnpi_transformer.h"
#include <atomic>
Expand All @@ -33,7 +33,7 @@ namespace runtime {

class InferencePoolEnv {
unsigned numWorkers_;
std::unique_ptr<ThreadPool> workersPool_;
std::unique_ptr<folly::CPUThreadPoolExecutor> workersPool_;
std::vector<InferenceContext> inferenceContexts_;
std::vector<InferenceContext *> freeContexts_;
std::mutex freeContextsLock_;
Expand Down
3 changes: 3 additions & 0 deletions lib/Onnxifi/Base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,9 @@ onnxStatus Graph::setIOAndRun(uint32_t inputsCount,
}
TRACE_EVENT_SCOPE_END_NAMED(soEvent);

if (ctx->getTraceContext()) {
ctx->getTraceContext()->setThreadName("Caller");
}
auto ret = run(std::move(ctx), outputEvent, traceEvents);
if (GlowSaveOnnxifiIO) {
// We need to wait for the execution to finish in order to extract output
Expand Down
35 changes: 20 additions & 15 deletions lib/Runtime/Executor/ThreadPoolExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ void InflightBarrier::wait() {
ThreadPoolExecutor::ThreadPoolExecutor(const DeviceManagerMapTy &deviceManagers,
unsigned numWorkers,
const std::string &name)
: threadPool_(numWorkers, name), deviceManagers_(deviceManagers) {}
: threadPool_(numWorkers,
std::make_shared<folly::NamedThreadFactory>(name)),
deviceManagers_(deviceManagers) {}

void ThreadPoolExecutor::shutdown() {
// Prevent more requests from being processed.
Expand All @@ -72,6 +74,9 @@ void ThreadPoolExecutor::shutdown() {
// processed before starting to destroy state that is used in
// handleDeviceManagerResult().
inflightBarrier_.wait();

threadPool_.stop();
threadPool_.join();
}

void ThreadPoolExecutor::run(const DAGNode *root,
Expand All @@ -83,8 +88,9 @@ void ThreadPoolExecutor::run(const DAGNode *root,
"ThreadPoolExecutor::run");

if (context->getTraceContext()) {
for (auto id : threadPool_.getThreadIds()) {
context->getTraceContext()->setThreadName(id, "ThreadPoolExecutor");
auto tid = threads::getThreadId();
if (!context->getTraceContext()->getThreadNames().count(tid)) {
context->getTraceContext()->setThreadName(tid, "ThreadPoolExecutor");
}
}

Expand Down Expand Up @@ -169,18 +175,17 @@ void ThreadPoolExecutor::executeDAGNode(NetworkExecutionState *executionState,

// Immediately move the handling of the result onto this run's executor
// to avoid doing work on the DeviceManager thread.
threadPool_.getExecutor()->submit(
[this, executionState, node, err = std::move(err), currentDevice,
id, ctx = std::move(resultCtx)]() mutable {
TRACE_EVENT_LOG_ID(ctx->getTraceContext(), TraceLevel::REQUEST,
"handle result queuing",
TraceEvent::AsyncEndType, TraceEvent::now(),
id);

node->markFinished(currentDevice);
this->handleDeviceManagerResult(executionState, std::move(err),
std::move(ctx), node);
});
threadPool_.add([this, executionState, node, err = std::move(err),
currentDevice, id,
ctx = std::move(resultCtx)]() mutable {
TRACE_EVENT_LOG_ID(ctx->getTraceContext(), TraceLevel::REQUEST,
"handle result queuing", TraceEvent::AsyncEndType,
TraceEvent::now(), id);

node->markFinished(currentDevice);
this->handleDeviceManagerResult(executionState, std::move(err),
std::move(ctx), node);
});
});
}

Expand Down
3 changes: 0 additions & 3 deletions lib/Runtime/HostManager/HostManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -453,9 +453,6 @@ HostManager::runNetwork(llvm::StringRef networkName,

TRACE_EVENT_SCOPE(context->getTraceContext(), TraceLevel::RUNTIME,
"HostManager::runNetwork");
if (context->getTraceContext()) {
context->getTraceContext()->setThreadName("Caller");
}
auto currentRun = totalRequestCount_++;

NetworkData *network = nullptr;
Expand Down

0 comments on commit 476c7bd

Please sign in to comment.