Skip to content

Commit e36cb90

Browse files
authored
chore: revert "feat: expire idle task pool threads after 5 seconds" (#14043)
Reverts #13123
1 parent 9f1e802 commit e36cb90

1 file changed

Lines changed: 21 additions & 36 deletions

File tree

src/runtime/object.cpp

Lines changed: 21 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -726,17 +726,16 @@ struct scoped_current_task_object : flet<lean_task_object *> {
726726

727727
class task_manager {
728728
mutex m_mutex;
729-
unsigned m_num_std_workers{0};
729+
std::vector<std::unique_ptr<lthread>> m_std_workers;
730730
unsigned m_idle_std_workers{0};
731731
unsigned m_max_std_workers{0};
732732
unsigned m_num_dedicated_workers{0};
733733
std::deque<lean_task_object *> m_queues[LEAN_MAX_PRIO+1];
734734
unsigned m_queues_size{0};
735735
unsigned m_max_prio{0};
736-
condition_variable m_queue_cv; // notified on work arrival or shutdown
736+
condition_variable m_queue_cv;
737737
condition_variable m_task_finished_cv;
738-
condition_variable m_capacity_cv; // notified when std-worker capacity may have opened up, or shutdown
739-
condition_variable m_worker_finished_cv; // notified on std/dedicated worker exit or shutdown
738+
condition_variable m_dedicated_finished_cv;
740739
bool m_shutting_down{false};
741740

742741
lean_task_object * dequeue() {
@@ -771,7 +770,7 @@ class task_manager {
771770
m_max_prio = prio;
772771
m_queues[prio].push_back(t);
773772
m_queues_size++;
774-
if (!m_idle_std_workers && m_num_std_workers < m_max_std_workers)
773+
if (!m_idle_std_workers && m_std_workers.size() < m_max_std_workers)
775774
spawn_worker();
776775
else
777776
m_queue_cv.notify_one();
@@ -795,58 +794,46 @@ class task_manager {
795794
lock.lock();
796795
}
797796

798-
static constexpr unsigned WORKER_IDLE_TIMEOUT_MS = 5000;
799-
800797
void spawn_worker() {
801798
if (m_shutting_down)
802799
return;
803800

804-
// NOTE: always called inside lock
805-
m_num_std_workers++;
806-
// The `lthread` object is immediately destroyed, which detaches the thread.
807-
lthread([this]() {
801+
m_std_workers.emplace_back(new lthread([this]() {
808802
save_stack_info(false);
809803
unique_lock<mutex> lock(m_mutex);
810804
m_idle_std_workers++;
811805
while (true) {
812806
if (m_queues_size == 0) {
813807
if (m_shutting_down) {
808+
// We're done
814809
break;
815810
}
816-
// Wait for new tasks, with a timeout so idle threads can exit
817-
if (!m_queue_cv.wait_for(lock, chrono::milliseconds(WORKER_IDLE_TIMEOUT_MS),
818-
[&]() { return m_queues_size > 0 || m_shutting_down; })) {
819-
break; // Exit due to timeout
820-
}
811+
// Wait for new tasks
812+
m_queue_cv.wait(lock);
821813
continue;
822814
}
823815

824816
// There's work to be done.
825817
// If we have reached the maximum number of standard workers (because the
826818
// maximum was decreased by `task_get`), wait for someone else to become
827819
// idle before picking up new work.
828-
// During shutdown we skip this throttling so remaining queued work drains
829-
// promptly rather than blocking on capacity that will not free up.
820+
// But during shutdown, we skip this throttling:
821+
// because the finalizer might have called m_queue_cv.notify_all() for the last
822+
// time, we don't want to get stuck behind the wait().
830823
if (!m_shutting_down &&
831-
m_num_std_workers - m_idle_std_workers >= m_max_std_workers) {
832-
m_capacity_cv.wait(lock, [&]() {
833-
return m_shutting_down ||
834-
m_num_std_workers - m_idle_std_workers < m_max_std_workers;
835-
});
824+
m_std_workers.size() - m_idle_std_workers >= m_max_std_workers) {
825+
m_queue_cv.wait(lock);
836826
continue;
837827
}
838828

839829
lean_task_object * t = dequeue();
840830
m_idle_std_workers--;
841831
run_task(lock, t);
842832
m_idle_std_workers++;
843-
m_capacity_cv.notify_one();
844833
reset_heartbeat();
845834
}
846835
m_idle_std_workers--;
847-
m_num_std_workers--;
848-
m_worker_finished_cv.notify_all();
849-
});
836+
}));
850837
}
851838

852839
void spawn_dedicated_worker(lean_task_object * t) {
@@ -856,7 +843,7 @@ class task_manager {
856843
unique_lock<mutex> lock(m_mutex);
857844
run_task(lock, t);
858845
m_num_dedicated_workers--;
859-
m_worker_finished_cv.notify_all();
846+
m_dedicated_finished_cv.notify_all();
860847
});
861848
// `lthread` will be implicitly freed, which frees up its control resources but does not terminate the thread
862849
}
@@ -951,17 +938,16 @@ class task_manager {
951938
{
952939
unique_lock<mutex> lock(m_mutex);
953940
m_shutting_down = true;
941+
// we can assume that `m_std_workers` will not be changed after this line
954942
}
955943
m_queue_cv.notify_all();
956-
m_capacity_cv.notify_all();
957944
#ifndef LEAN_EMSCRIPTEN
958945
// wait for all workers to finish
959-
{
960-
unique_lock<mutex> lock(m_mutex);
961-
m_worker_finished_cv.wait(lock, [&]() {
962-
return m_num_std_workers == 0 && m_num_dedicated_workers == 0;
963-
});
964-
}
946+
for (auto & t : m_std_workers)
947+
t->join();
948+
949+
unique_lock<mutex> lock(m_mutex);
950+
m_dedicated_finished_cv.wait(lock, [&]() { return m_num_dedicated_workers == 0; });
965951
// never seems to terminate under Emscripten
966952
#endif
967953
}
@@ -1018,7 +1004,6 @@ class task_manager {
10181004
spawn_worker();
10191005
else
10201006
m_queue_cv.notify_one();
1021-
m_capacity_cv.notify_one();
10221007
}
10231008
m_task_finished_cv.wait(lock, [&]() { return t->m_value != nullptr; });
10241009
if (in_pool) {

0 commit comments

Comments
 (0)