From 347135080a307ff49121e17fa7d45c3885d4ea98 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kittywhiskers@users.noreply.github.com> Date: Fri, 21 Aug 2020 09:23:42 +0300 Subject: [PATCH] merge #18710: Add local thread pool to CCheckQueue --- src/bench/checkqueue.cpp | 12 ++--- src/checkqueue.h | 92 ++++++++++++++++++++++++----------- src/init.cpp | 5 +- src/test/checkqueue_tests.cpp | 54 ++++++-------------- src/test/test_dash.cpp | 5 +- src/validation.cpp | 11 +++-- src/validation.h | 6 ++- test/lint/lint-includes.sh | 1 - 8 files changed, 98 insertions(+), 88 deletions(-) diff --git a/src/bench/checkqueue.cpp b/src/bench/checkqueue.cpp index 4d41e28db6e29b..5a563dc42c2be3 100644 --- a/src/bench/checkqueue.cpp +++ b/src/bench/checkqueue.cpp @@ -8,7 +8,6 @@ #include #include #include -#include #include @@ -37,11 +36,11 @@ static void CCheckQueueSpeedPrevectorJob(benchmark::State& state) void swap(PrevectorJob& x){p.swap(x.p);}; }; CCheckQueue queue {QUEUE_BATCH_SIZE}; - boost::thread_group tg; - for (auto x = 0; x < std::max(MIN_CORES, GetNumCores()); ++x) { - tg.create_thread([&]{queue.Thread();}); - } + while (state.KeepRunning()) { + // The main thread should be counted to prevent thread oversubscription, and + // to decrease the variance of benchmark results. + queue.StartWorkerThreads(GetNumCores() - 1); // Make insecure_rand here so that each iteration is identical. FastRandomContext insecure_rand(true); CCheckQueueControl control(&queue); @@ -56,7 +55,6 @@ static void CCheckQueueSpeedPrevectorJob(benchmark::State& state) // it is done explicitly here for clarity control.Wait(); } - tg.interrupt_all(); - tg.join_all(); + queue.StopWorkerThreads(); } BENCHMARK(CCheckQueueSpeedPrevectorJob, 1400); diff --git a/src/checkqueue.h b/src/checkqueue.h index 1061796bc32f5f..1254852e6d87fe 100644 --- a/src/checkqueue.h +++ b/src/checkqueue.h @@ -6,13 +6,12 @@ #define BITCOIN_CHECKQUEUE_H #include +#include +#include #include #include -#include -#include - template class CCheckQueueControl; @@ -31,61 +30,64 @@ class CCheckQueue { private: //! Mutex to protect the inner state - boost::mutex mutex; + Mutex m_mutex; //! Worker threads block on this when out of work - boost::condition_variable condWorker; + std::condition_variable m_worker_cv; //! Master thread blocks on this when out of work - boost::condition_variable condMaster; + std::condition_variable m_master_cv; //! The queue of elements to be processed. //! As the order of booleans doesn't matter, it is used as a LIFO (stack) - std::vector queue; + std::vector queue GUARDED_BY(m_mutex); //! The number of workers (including the master) that are idle. - int nIdle; + int nIdle GUARDED_BY(m_mutex){0}; //! The total number of workers (including the master). - int nTotal; + int nTotal GUARDED_BY(m_mutex){0}; //! The temporary evaluation result. - bool fAllOk; + bool fAllOk GUARDED_BY(m_mutex){true}; /** * Number of verifications that haven't completed yet. * This includes elements that are no longer queued, but still in the * worker's own batches. */ - unsigned int nTodo; + unsigned int nTodo GUARDED_BY(m_mutex){0}; //! The maximum number of elements to be processed in one batch - unsigned int nBatchSize; + const unsigned int nBatchSize; + + std::vector m_worker_threads; + bool m_request_stop GUARDED_BY(m_mutex){false}; /** Internal function that does bulk of the verification work. */ - bool Loop(bool fMaster = false) + bool Loop(bool fMaster) { - boost::condition_variable& cond = fMaster ? condMaster : condWorker; + std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv; std::vector vChecks; vChecks.reserve(nBatchSize); unsigned int nNow = 0; bool fOk = true; do { { - boost::unique_lock lock(mutex); + WAIT_LOCK(m_mutex, lock); // first do the clean-up of the previous loop run (allowing us to do it in the same critsect) if (nNow) { fAllOk &= fOk; nTodo -= nNow; if (nTodo == 0 && !fMaster) // We processed the last element; inform the master it can exit and return the result - condMaster.notify_one(); + m_master_cv.notify_one(); } else { // first iteration nTotal++; } // logically, the do loop starts here - while (queue.empty()) { + while (queue.empty() && !m_request_stop) { if (fMaster && nTodo == 0) { nTotal--; bool fRet = fAllOk; @@ -99,6 +101,10 @@ class CCheckQueue cond.wait(lock); // wait nIdle--; } + if (m_request_stop) { + return false; + } + // Decide how many work units to process now. // * Do not try to do everything at once, but aim for increasingly smaller batches so // all workers finish approximately simultaneously. @@ -107,7 +113,7 @@ class CCheckQueue nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1))); vChecks.resize(nNow); for (unsigned int i = 0; i < nNow; i++) { - // We want the lock on the mutex to be as short as possible, so swap jobs from the global + // We want the lock on the m_mutex to be as short as possible, so swap jobs from the global // queue to the local batch vector instead of copying. vChecks[i].swap(queue.back()); queue.pop_back(); @@ -125,40 +131,68 @@ class CCheckQueue public: //! Mutex to ensure only one concurrent CCheckQueueControl - boost::mutex ControlMutex; + Mutex m_control_mutex; //! Create a new check queue - explicit CCheckQueue(unsigned int nBatchSizeIn) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), nBatchSize(nBatchSizeIn) {} + explicit CCheckQueue(unsigned int nBatchSizeIn) + : nBatchSize(nBatchSizeIn) + { + } - //! Worker thread - void Thread() + //! Create a pool of new worker threads. + void StartWorkerThreads(const int threads_num) { - Loop(); + { + LOCK(m_mutex); + nIdle = 0; + nTotal = 0; + fAllOk = true; + } + assert(m_worker_threads.empty()); + for (int n = 0; n < threads_num; ++n) { + m_worker_threads.emplace_back([this, n]() { + util::ThreadRename(strprintf("scriptch.%i", n)); + Loop(false /* worker thread */); + }); + } } //! Wait until execution finishes, and return whether all evaluations were successful. bool Wait() { - return Loop(true); + return Loop(true /* master thread */); } //! Add a batch of checks to the queue void Add(std::vector& vChecks) { - boost::unique_lock lock(mutex); + LOCK(m_mutex); for (T& check : vChecks) { queue.push_back(T()); check.swap(queue.back()); } nTodo += vChecks.size(); if (vChecks.size() == 1) - condWorker.notify_one(); + m_worker_cv.notify_one(); else if (vChecks.size() > 1) - condWorker.notify_all(); + m_worker_cv.notify_all(); + } + + //! Stop all of the worker threads. + void StopWorkerThreads() + { + WITH_LOCK(m_mutex, m_request_stop = true); + m_worker_cv.notify_all(); + for (std::thread& t : m_worker_threads) { + t.join(); + } + m_worker_threads.clear(); + WITH_LOCK(m_mutex, m_request_stop = false); } ~CCheckQueue() { + assert(m_worker_threads.empty()); } }; @@ -182,7 +216,7 @@ class CCheckQueueControl { // passed queue is supposed to be unused, or nullptr if (pqueue != nullptr) { - ENTER_CRITICAL_SECTION(pqueue->ControlMutex); + ENTER_CRITICAL_SECTION(pqueue->m_control_mutex); } } @@ -206,7 +240,7 @@ class CCheckQueueControl if (!fDone) Wait(); if (pqueue != nullptr) { - LEAVE_CRITICAL_SECTION(pqueue->ControlMutex); + LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex); } } }; diff --git a/src/init.cpp b/src/init.cpp index acccc633fa8ee8..18e56f4fce3d46 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -272,6 +272,7 @@ void PrepareShutdown() // CScheduler/checkqueue threadGroup threadGroup.interrupt_all(); threadGroup.join_all(); + StopScriptCheckWorkerThreads(); // After there are no more peers/RPC left to give us new data which may generate // CValidationInterface callbacks, flush them... @@ -1737,9 +1738,7 @@ bool AppInitMain() LogPrintf("Script verification uses %d additional threads\n", script_threads); if (script_threads >= 1) { g_parallel_script_checks = true; - for (int i = 0; i < script_threads; ++i) { - threadGroup.create_thread([i]() { return ThreadScriptCheck(i); }); - } + StartScriptCheckWorkerThreads(script_threads); } std::vector vSporkAddresses; diff --git a/src/test/checkqueue_tests.cpp b/src/test/checkqueue_tests.cpp index 04f8a2e5a659bf..444b6927bb5b1a 100644 --- a/src/test/checkqueue_tests.cpp +++ b/src/test/checkqueue_tests.cpp @@ -148,10 +148,7 @@ typedef CCheckQueue FrozenCleanup_Queue; static void Correct_Queue_range(std::vector range) { auto small_queue = std::unique_ptr(new Correct_Queue {QUEUE_BATCH_SIZE}); - boost::thread_group tg; - for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { - tg.create_thread([&]{small_queue->Thread();}); - } + small_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); // Make vChecks here to save on malloc (this test can be slow...) std::vector vChecks; for (auto i : range) { @@ -169,8 +166,7 @@ static void Correct_Queue_range(std::vector range) BOOST_TEST_MESSAGE("Failure on trial " << i << " expected, got " << FakeCheckCheckCompletion::n_calls); } } - tg.interrupt_all(); - tg.join_all(); + small_queue->StopWorkerThreads(); } /** Test that 0 checks is correct @@ -213,11 +209,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Correct_Random) BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure) { auto fail_queue = std::unique_ptr(new Failing_Queue {QUEUE_BATCH_SIZE}); - - boost::thread_group tg; - for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { - tg.create_thread([&]{fail_queue->Thread();}); - } + fail_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); for (size_t i = 0; i < 1001; ++i) { CCheckQueueControl control(fail_queue.get()); @@ -238,18 +230,14 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure) BOOST_REQUIRE(success); } } - tg.interrupt_all(); - tg.join_all(); + fail_queue->StopWorkerThreads(); } // Test that a block validation which fails does not interfere with // future blocks, ie, the bad state is cleared. BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) { auto fail_queue = std::unique_ptr(new Failing_Queue {QUEUE_BATCH_SIZE}); - boost::thread_group tg; - for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { - tg.create_thread([&]{fail_queue->Thread();}); - } + fail_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); for (auto times = 0; times < 10; ++times) { for (bool end_fails : {true, false}) { @@ -264,8 +252,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) BOOST_REQUIRE(r != end_fails); } } - tg.interrupt_all(); - tg.join_all(); + fail_queue->StopWorkerThreads(); } // Test that unique checks are actually all called individually, rather than @@ -274,11 +261,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) { auto queue = std::unique_ptr(new Unique_Queue {QUEUE_BATCH_SIZE}); - boost::thread_group tg; - for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { - tg.create_thread([&]{queue->Thread();}); - - } + queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); size_t COUNT = 100000; size_t total = COUNT; @@ -301,8 +284,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) } BOOST_REQUIRE(r); } - tg.interrupt_all(); - tg.join_all(); + queue->StopWorkerThreads(); } @@ -314,10 +296,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory) { auto queue = std::unique_ptr(new Memory_Queue {QUEUE_BATCH_SIZE}); - boost::thread_group tg; - for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { - tg.create_thread([&]{queue->Thread();}); - } + queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); for (size_t i = 0; i < 1000; ++i) { size_t total = i; { @@ -336,8 +315,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory) } BOOST_REQUIRE_EQUAL(MemoryCheck::fake_allocated_memory, 0U); } - tg.interrupt_all(); - tg.join_all(); + queue->StopWorkerThreads(); } // Test that a new verification cannot occur until all checks @@ -345,11 +323,8 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory) BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) { auto queue = std::unique_ptr(new FrozenCleanup_Queue {QUEUE_BATCH_SIZE}); - boost::thread_group tg; bool fails = false; - for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { - tg.create_thread([&]{queue->Thread();}); - } + queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); std::thread t0([&]() { CCheckQueueControl control(queue.get()); std::vector vChecks(1); @@ -367,7 +342,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) } // Try to get control of the queue a bunch of times for (auto x = 0; x < 100 && !fails; ++x) { - fails = queue->ControlMutex.try_lock(); + fails = queue->m_control_mutex.try_lock(); } { // Unfreeze (we need lock n case of spurious wakeup) @@ -378,9 +353,8 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) FrozenCleanupCheck::cv.notify_one(); // Wait for control to finish t0.join(); - tg.interrupt_all(); - tg.join_all(); BOOST_REQUIRE(!fails); + queue->StopWorkerThreads(); } @@ -431,7 +405,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) cv.wait(l, [&](){return has_lock;}); bool fails = false; for (auto x = 0; x < 100 && !fails; ++x) { - fails = queue->ControlMutex.try_lock(); + fails = queue->m_control_mutex.try_lock(); } has_tried = true; cv.notify_one(); diff --git a/src/test/test_dash.cpp b/src/test/test_dash.cpp index cacdac178a6d0b..ebf6b2edc2e3c6 100644 --- a/src/test/test_dash.cpp +++ b/src/test/test_dash.cpp @@ -122,9 +122,7 @@ TestingSetup::TestingSetup(const std::string& chainName) : BasicTestingSetup(cha } // Start script-checking threads. Set g_parallel_script_checks to true so they are used. constexpr int script_check_threads = 2; - for (int i = 0; i < script_check_threads; ++i) { - threadGroup.create_thread([i]() { return ThreadScriptCheck(i); }); - } + StartScriptCheckWorkerThreads(script_check_threads); g_parallel_script_checks = true; peerLogic.reset(new PeerLogicValidation(connman, scheduler, /*enable_bip61=*/true)); } @@ -136,6 +134,7 @@ TestingSetup::~TestingSetup() g_txindex.reset(); threadGroup.interrupt_all(); threadGroup.join_all(); + StopScriptCheckWorkerThreads(); GetMainSignals().FlushBackgroundCallbacks(); GetMainSignals().UnregisterBackgroundSignalScheduler(); g_connman.reset(); diff --git a/src/validation.cpp b/src/validation.cpp index a9d89304036371..7544e2fe271c32 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -1880,9 +1880,14 @@ static bool WriteUndoDataForBlock(const CBlockUndo& blockundo, CValidationState& static CCheckQueue scriptcheckqueue(128); -void ThreadScriptCheck(int worker_num) { - util::ThreadRename(strprintf("scriptch.%i", worker_num)); - scriptcheckqueue.Thread(); +void StartScriptCheckWorkerThreads(int threads_num) +{ + scriptcheckqueue.StartWorkerThreads(threads_num); +} + +void StopScriptCheckWorkerThreads() +{ + scriptcheckqueue.StopWorkerThreads(); } // Protected by cs_main diff --git a/src/validation.h b/src/validation.h index 2a2ac3f729c4fa..0b9d1f72399f10 100644 --- a/src/validation.h +++ b/src/validation.h @@ -277,8 +277,10 @@ bool LoadBlockIndex(const CChainParams& chainparams) EXCLUSIVE_LOCKS_REQUIRED(cs bool LoadChainTip(const CChainParams& chainparams); /** Unload database information */ void UnloadBlockIndex(); -/** Run an instance of the script checking thread */ -void ThreadScriptCheck(int worker_num); +/** Run instances of script checking worker threads */ +void StartScriptCheckWorkerThreads(int threads_num); +/** Stop all of the script checking worker threads */ +void StopScriptCheckWorkerThreads(); /** Check whether we are doing an initial block download (synchronizing from disk or network) */ bool IsInitialBlockDownload(); /** Retrieve a transaction (from memory pool, or from disk, if possible) */ diff --git a/test/lint/lint-includes.sh b/test/lint/lint-includes.sh index 8fd452edd66d7e..ac2632df328bdf 100755 --- a/test/lint/lint-includes.sh +++ b/test/lint/lint-includes.sh @@ -78,7 +78,6 @@ EXPECTED_BOOST_INCLUDES=( boost/test/unit_test_monitor.hpp boost/thread.hpp boost/thread/condition_variable.hpp - boost/thread/mutex.hpp boost/thread/thread.hpp boost/variant.hpp boost/variant/apply_visitor.hpp