Skip to content

Commit d24bd89

Browse files
authored
fix: make get_task_state_core thread safe (#14108)
This PR changes the `m_imp` field of `lean_task` to an atomic. This is necessary because in `get_task_state_core` we access the `m_imp` to see if the task is finished *before* taking the mutex. Thus the memory access as it is done currently is UB. Using relaxed everywhere here is okay as: 1. Reading that a task is not yet finished even though it is is sound because the thread that sets the task to finished will release the mutex which happens-before us taking the mutex on the cold path. Once we take the mutex we re-check and observe that the task is done, then exit. 2. Reading that a task is finished even though it isn't is sound. This could happen when our allocator reuses memory for a previously finished task for a new, not yet finished task. However, this condition can never happen in Lean. There are again two cases to consider: 1. We are the thread that allocated the task, in this case we are guaranteed to observe the store we made during allocation. 2. We are another thread. In this case all of Lean's concurrency primitives currently guarantee that the store that set the task state to not-finished happens-before us even getting access to the task pointer. Thus we are guaranteed to observe the store from the allocating thread.
1 parent 518dc66 commit d24bd89

2 files changed

Lines changed: 56 additions & 45 deletions

File tree

src/include/lean/lean.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -294,9 +294,10 @@ typedef struct {
294294
* invariant: m_imp == nullptr
295295
* transition: RC becomes 0 ==> freed (`deactivate_task` lock) */
296296
typedef struct lean_task {
297-
lean_object m_header;
298-
_Atomic(lean_object *) m_value;
299-
lean_task_imp * m_imp;
297+
lean_object m_header;
298+
_Atomic(lean_object *) m_value;
299+
// This field is atomic as we access it both with and without holding the task_manager mutex.
300+
_Atomic(lean_task_imp *) m_imp;
300301
} lean_task_object;
301302

302303
typedef struct lean_promise {

src/runtime/object.cpp

Lines changed: 52 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ Released under Apache 2.0 license as described in the file LICENSE.
44
55
Author: Leonardo de Moura
66
*/
7+
#include <atomic>
78
#include <string>
89
#include <algorithm>
910
#include <vector>
@@ -716,7 +717,8 @@ static void free_task_imp(lean_task_imp * imp) {
716717
}
717718

718719
static void free_task(lean_task_object * t) {
719-
if (t->m_imp) free_task_imp(t->m_imp);
720+
lean_task_imp* imp = t->m_imp.load(std::memory_order_relaxed);
721+
if (imp) free_task_imp(imp);
720722
lean_free_small_object((lean_object*)t);
721723
}
722724

@@ -756,8 +758,9 @@ class task_manager {
756758
}
757759

758760
void enqueue_core(unique_lock<mutex> & lock, lean_task_object * t) {
759-
lean_assert(t->m_imp);
760-
unsigned prio = t->m_imp->m_prio;
761+
lean_task_imp* imp = t->m_imp.load(std::memory_order_relaxed);
762+
lean_assert(imp);
763+
unsigned prio = imp->m_prio;
761764
if (prio == LEAN_SYNC_PRIO) {
762765
run_task(lock, t);
763766
return;
@@ -777,16 +780,18 @@ class task_manager {
777780
}
778781

779782
void deactivate_task_core(unique_lock<mutex> & lock, lean_task_object * t) {
780-
object * c = t->m_imp->m_closure;
781-
lean_task_object * it = t->m_imp->m_head_dep;
782-
t->m_imp->m_closure = nullptr;
783-
t->m_imp->m_head_dep = nullptr;
784-
t->m_imp->m_canceled = true;
785-
t->m_imp->m_deleted = true;
783+
lean_task_imp* imp = t->m_imp.load(std::memory_order_relaxed);
784+
object * c = imp->m_closure;
785+
lean_task_object * it = imp->m_head_dep;
786+
imp->m_closure = nullptr;
787+
imp->m_head_dep = nullptr;
788+
imp->m_canceled = true;
789+
imp->m_deleted = true;
786790
lock.unlock();
787791
while (it) {
788-
lean_assert(it->m_imp->m_deleted);
789-
lean_task_object * next_it = it->m_imp->m_next_dep;
792+
lean_task_imp* imp = it->m_imp.load(std::memory_order_relaxed);
793+
lean_assert(imp->m_deleted);
794+
lean_task_object * next_it = imp->m_next_dep;
790795
free_task(it);
791796
it = next_it;
792797
}
@@ -849,40 +854,41 @@ class task_manager {
849854
}
850855

851856
void run_task(unique_lock<mutex> & lock, lean_task_object * t) {
852-
lean_assert(t->m_imp);
853-
if (t->m_imp->m_deleted) {
857+
lean_task_imp* imp = t->m_imp.load(std::memory_order_relaxed);
858+
lean_assert(imp);
859+
if (imp->m_deleted) {
854860
free_task(t);
855861
return;
856862
}
857863
reset_heartbeat();
858864
object * v = nullptr;
859865
{
860866
scoped_current_task_object scope_cur_task(t);
861-
object * c = t->m_imp->m_closure;
862-
t->m_imp->m_closure = nullptr;
867+
object * c = imp->m_closure;
868+
imp->m_closure = nullptr;
863869
lock.unlock();
864870
v = lean_apply_1(c, box(0));
865871
// If deactivation was delayed by `m_keep_alive`, deactivate after the final execution (`v != nulltpr`)
866-
if (v != nullptr && t->m_imp->m_keep_alive) {
872+
if (v != nullptr && imp->m_keep_alive) {
867873
lean_dec_ref((lean_object*)t);
868874
}
869875
lock.lock();
870876
}
871-
lean_assert(t->m_imp);
872-
if (t->m_imp->m_deleted) {
877+
lean_assert(imp);
878+
if (imp->m_deleted) {
873879
lock.unlock();
874880
if (v) lean_dec(v);
875881
free_task(t);
876882
lock.lock();
877883
} else if (v != nullptr) {
878-
lean_assert(t->m_imp->m_closure == nullptr);
884+
lean_assert(imp->m_closure == nullptr);
879885
resolve_core(lock, t, v);
880886
} else {
881887
// `bind` task has not finished yet, re-add as dependency of nested task
882888
// NOTE: closure MUST be extracted before unlocking the mutex as otherwise
883889
// another thread could deactivate the task and empty `m_clousure` in
884890
// between.
885-
object * c = t->m_imp->m_closure;
891+
object * c = imp->m_closure;
886892
lock.unlock();
887893
add_dep(lean_to_task(closure_arg_cptr(c)[0]), t);
888894
lock.lock();
@@ -892,8 +898,7 @@ class task_manager {
892898
void resolve_core(unique_lock<mutex> & lock, lean_task_object * t, object * v) {
893899
mark_mt(v);
894900
t->m_value = v;
895-
lean_task_imp * imp = t->m_imp;
896-
t->m_imp = nullptr;
901+
lean_task_imp * imp = t->m_imp.exchange(nullptr, std::memory_order_relaxed);
897902
handle_finished(lock, t, imp);
898903
/* After the task has been finished and we propagated
899904
dependencies, we can release `imp` and keep just the value */
@@ -905,11 +910,12 @@ class task_manager {
905910
lean_task_object * it = imp->m_head_dep;
906911
imp->m_head_dep = nullptr;
907912
while (it) {
913+
lean_task_imp* it_imp = it->m_imp.load(std::memory_order_relaxed);
908914
if (imp->m_canceled)
909-
it->m_imp->m_canceled = true;
910-
lean_task_object * next_it = it->m_imp->m_next_dep;
911-
it->m_imp->m_next_dep = nullptr;
912-
if (it->m_imp->m_deleted) {
915+
it_imp->m_canceled = true;
916+
lean_task_object * next_it = it_imp->m_next_dep;
917+
it_imp->m_next_dep = nullptr;
918+
if (it_imp->m_deleted) {
913919
free_task(it);
914920
} else {
915921
enqueue_core(lock, it);
@@ -983,8 +989,8 @@ class task_manager {
983989
enqueue_core(lock, t2);
984990
return;
985991
}
986-
t2->m_imp->m_next_dep = t1->m_imp->m_head_dep;
987-
t1->m_imp->m_head_dep = t2;
992+
t2->m_imp.load(std::memory_order_relaxed)->m_next_dep = t1->m_imp.load(std::memory_order_relaxed)->m_head_dep;
993+
t1->m_imp.load(std::memory_order_relaxed)->m_head_dep = t2;
988994
}
989995

990996
void wait_for(lean_task_object * t) {
@@ -994,8 +1000,8 @@ class task_manager {
9941000
if (t->m_value)
9951001
return;
9961002
// see `Task.get`
997-
bool in_pool = g_current_task_object && g_current_task_object->m_imp->m_prio <= LEAN_MAX_PRIO;
998-
if (g_current_task_object && g_current_task_object->m_imp->m_prio == LEAN_SYNC_PRIO) {
1003+
bool in_pool = g_current_task_object && g_current_task_object->m_imp.load(std::memory_order_relaxed)->m_prio <= LEAN_MAX_PRIO;
1004+
if (g_current_task_object && g_current_task_object->m_imp.load(std::memory_order_relaxed)->m_prio == LEAN_SYNC_PRIO) {
9991005
lean_panic("`Task.get` called from a `(sync := true)` task");
10001006
}
10011007
if (in_pool) {
@@ -1025,21 +1031,22 @@ class task_manager {
10251031
void deactivate_task(lean_task_object * t) {
10261032
unique_lock<mutex> lock(m_mutex);
10271033
if (object * v = t->m_value) {
1028-
lean_assert(t->m_imp == nullptr);
1034+
lean_assert(t->m_imp.load(std::memory_order_relaxed) == nullptr);
10291035
lock.unlock();
10301036
lean_dec(v);
10311037
free_task(t);
10321038
return;
10331039
} else {
1034-
lean_assert(t->m_imp);
1040+
lean_assert(t->m_imp.load(std::memory_order_relaxed));
10351041
deactivate_task_core(lock, t);
10361042
}
10371043
}
10381044

10391045
void cancel(lean_task_object * t) {
10401046
unique_lock<mutex> lock(m_mutex);
1041-
if (t->m_imp)
1042-
t->m_imp->m_canceled = true;
1047+
lean_task_imp* imp = t->m_imp.load(std::memory_order_relaxed);
1048+
if (imp)
1049+
imp->m_canceled = true;
10431050
}
10441051

10451052
bool shutting_down() const {
@@ -1048,8 +1055,9 @@ class task_manager {
10481055

10491056
uint8_t get_task_state(lean_task_object * t) {
10501057
unique_lock<mutex> lock(m_mutex);
1051-
if (t->m_imp) {
1052-
if (t->m_imp->m_closure) {
1058+
lean_task_imp* imp = t->m_imp.load(std::memory_order_relaxed);
1059+
if (imp) {
1060+
if (imp->m_closure) {
10531061
return 0; // waiting (waiting/queued)
10541062
} else {
10551063
return 1; // running (running/promised)
@@ -1223,11 +1231,12 @@ static obj_res task_bind_fn1(obj_arg x, obj_arg f, obj_arg) {
12231231
lean_dec_ref(new_task);
12241232
return v;
12251233
} else {
1226-
lean_assert(g_current_task_object->m_imp);
1227-
lean_assert(g_current_task_object->m_imp->m_closure == nullptr);
1234+
lean_task_imp* imp = g_current_task_object->m_imp.load(std::memory_order::relaxed);
1235+
lean_assert(imp);
1236+
lean_assert(imp->m_closure == nullptr);
12281237
obj_res c = mk_closure_2_1(task_bind_fn2, new_task);
12291238
mark_mt(c);
1230-
g_current_task_object->m_imp->m_closure = c;
1239+
imp->m_closure = c;
12311240
return nullptr; /* notify queue that task did not finish yet. */
12321241
}
12331242
}
@@ -1245,8 +1254,9 @@ extern "C" LEAN_EXPORT obj_res lean_task_bind_core(obj_arg x, obj_arg f, unsigne
12451254

12461255
extern "C" LEAN_EXPORT bool lean_io_check_canceled_core() {
12471256
if (lean_task_object * t = g_current_task_object) {
1248-
lean_assert(t->m_imp); // task is being executed
1249-
return t->m_imp->m_canceled || g_task_manager->shutting_down();
1257+
lean_task_imp* imp = t->m_imp.load(std::memory_order_relaxed);
1258+
lean_assert(imp); // task is being executed
1259+
return imp->m_canceled || g_task_manager->shutting_down();
12501260
}
12511261
return false;
12521262
}
@@ -1259,7 +1269,7 @@ extern "C" LEAN_EXPORT void lean_io_cancel_core(b_obj_arg t) {
12591269

12601270
extern "C" LEAN_EXPORT uint8_t lean_io_get_task_state_core(b_obj_arg t) {
12611271
lean_task_object * o = lean_to_task(t);
1262-
if (!o->m_imp)
1272+
if (!o->m_imp.load(std::memory_order::relaxed))
12631273
return 2; // finished
12641274
return g_task_manager->get_task_state(o);
12651275
}

0 commit comments

Comments
 (0)