Skip to content

Implementation for task_group dynamic dependencies - part 3 - get_current_task entry point and transfer_this_task_completion_to #1727

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 27 commits into
base: dev/kboyarinov/poc-dynamic-dependencies-part2
Choose a base branch
from
Open
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
301563e
Introduce current_task and add_successor
kboyarinov Mar 20, 2025
192eadc
Merge branch 'dev/kboyarinov/poc-dynamic-dependencies-part2' into dev…
kboyarinov Mar 24, 2025
c8a8f25
postmerge cleanup
kboyarinov Mar 24, 2025
0279512
Cleanup
kboyarinov Mar 24, 2025
ef42cd6
Merge branch 'dev/kboyarinov/poc-dynamic-dependencies-part2' into dev…
kboyarinov May 6, 2025
90402ce
Add transfer_successors_to
kboyarinov May 9, 2025
c02b11f
Enable pre-commit CI checks
kboyarinov May 9, 2025
7ec5de2
Merge remote-tracking branch 'origin/dev/kboyarinov/poc-dynamic-depen…
kboyarinov May 16, 2025
02382c6
Merge remote-tracking branch 'origin/dev/kboyarinov/poc-dynamic-depen…
kboyarinov May 16, 2025
ce83086
Fix copyrights
kboyarinov May 16, 2025
84dfc44
Rewrite to avoid memory leak on transferring
kboyarinov May 19, 2025
4d20eb3
Fix the dynamic state lifetime issue after transferring
kboyarinov May 27, 2025
ea86b3a
Merge remote-tracking branch 'origin/dev/kboyarinov/poc-dynamic-depen…
kboyarinov May 27, 2025
797aa5e
Merge remote-tracking branch 'origin/dev/kboyarinov/poc-dynamic-depen…
kboyarinov May 30, 2025
a73093f
Fix merge issue in test
kboyarinov May 30, 2025
4f87b58
Remove assert from transfer_successors_to
kboyarinov May 30, 2025
acbc7bd
Merge remote-tracking branch 'origin/dev/kboyarinov/poc-dynamic-depen…
kboyarinov Jul 11, 2025
02148af
Fix merging issues
kboyarinov Jul 11, 2025
1fb5e4f
Merge remote-tracking branch 'origin/dev/kboyarinov/poc-dynamic-depen…
kboyarinov Jul 11, 2025
6381564
Merge remote-tracking branch 'origin/dev/kboyarinov/poc-dynamic-depen…
kboyarinov Jul 11, 2025
d0aff72
Add UXL copyrights
kboyarinov Jul 11, 2025
acbeaa0
Merge remote-tracking branch 'origin/dev/kboyarinov/poc-dynamic-depen…
kboyarinov Jul 21, 2025
cf9dbf9
Rename entry point and API
kboyarinov Jul 21, 2025
ae0b12d
Merge remote-tracking branch 'origin/dev/kboyarinov/poc-dynamic-depen…
kboyarinov Jul 22, 2025
70f8fd3
Separate transfer and completion flags
kboyarinov Jul 22, 2025
d8175b9
Reorder the checks
kboyarinov Jul 22, 2025
b60226a
Remove redundant comment
kboyarinov Jul 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ on:
branches: [master]

pull_request:
branches: [master, dev/kboyarinov/poc-dynamic-dependencies-part1]
branches: [master, dev/kboyarinov/poc-dynamic-dependencies-part1, dev/kboyarinov/poc-dynamic-dependencies-part2]
types:
- opened
- synchronize
2 changes: 2 additions & 0 deletions include/oneapi/tbb/detail/_task.h
Original file line number Diff line number Diff line change
@@ -63,6 +63,7 @@ TBB_EXPORT d1::slot_id __TBB_EXPORTED_FUNC execution_slot(const d1::execution_da
TBB_EXPORT d1::slot_id __TBB_EXPORTED_FUNC execution_slot(const d1::task_arena_base&);
TBB_EXPORT d1::task_group_context* __TBB_EXPORTED_FUNC current_context();
TBB_EXPORT d1::wait_tree_vertex_interface* get_thread_reference_vertex(d1::wait_tree_vertex_interface* wc);
TBB_EXPORT d1::task* __TBB_EXPORTED_FUNC get_current_task();

// Do not place under __TBB_RESUMABLE_TASKS. It is a stub for unsupported platforms.
struct suspend_point_type;
@@ -271,6 +272,7 @@ inline void wait(wait_context& wait_ctx, task_group_context& ctx) {
call_itt_task_notify(destroy, &wait_ctx);
}

using r1::get_current_task;
using r1::current_context;

class task_traits {
126 changes: 108 additions & 18 deletions include/oneapi/tbb/detail/_task_handle.h
Original file line number Diff line number Diff line change
@@ -75,6 +75,7 @@ class task_dynamic_state {
: m_task(task)
, m_successor_list_head(nullptr)
, m_continuation_vertex(nullptr)
, m_new_dynamic_state(nullptr)
, m_num_references(1) // reserves a task co-ownership for dynamic state
, m_allocator(alloc)
{}
@@ -83,13 +84,22 @@ class task_dynamic_state {

void release() {
if (--m_num_references == 0) {
task_dynamic_state* new_state = m_new_dynamic_state.load(std::memory_order_relaxed);
// There was a new state assigned to the current one by transferring the successors
// Need to unregister the current dynamic state as a co-owner
if (new_state) new_state->release();
m_allocator.delete_object(this);
}
}

task_handle_task* complete_task() {
task_handle_task* next_task = nullptr;

successor_list_node* list = fetch_successor_list(COMPLETED_FLAG);
return release_successor_list(list);
if (!is_transferred(list)) {
next_task = release_successor_list(list);
}
return next_task;
}

bool has_dependencies() const {
@@ -100,7 +110,10 @@ class task_dynamic_state {
m_continuation_vertex.store(nullptr, std::memory_order_release);
}

bool check_transfer_or_completion(successor_list_node* current_list_head, successor_list_node* successor_node);
void add_successor(continuation_vertex* successor);
void add_successor_node(successor_list_node* successor_node);
void add_successor_list(successor_list_node* successor_list);
void release_continuation();

continuation_vertex* get_continuation_vertex() {
@@ -124,19 +137,35 @@ class task_dynamic_state {

using successor_list_state_flag = std::uintptr_t;
static constexpr successor_list_state_flag COMPLETED_FLAG = ~std::uintptr_t(0);
static constexpr successor_list_state_flag TRANSFERRED_FLAG = ~std::uintptr_t(0) - 1;

static bool is_completed(successor_list_node* node) {
return node == reinterpret_cast<successor_list_node*>(COMPLETED_FLAG);
static bool is_completed(successor_list_node* list) {
return list == reinterpret_cast<successor_list_node*>(COMPLETED_FLAG);
}

static bool is_transferred(successor_list_node* list) {
return list == reinterpret_cast<successor_list_node*>(TRANSFERRED_FLAG);
}

successor_list_node* fetch_successor_list(successor_list_state_flag new_list_state_flag) {
return m_successor_list_head.exchange(reinterpret_cast<successor_list_node*>(new_list_state_flag));
}

void transfer_successors_to(task_dynamic_state* new_dynamic_state) {
__TBB_ASSERT(new_dynamic_state != nullptr, nullptr);
// Register current dynamic state as a co-owner of the new dynamic state
// to prevent it's early destruction
new_dynamic_state->reserve();
m_new_dynamic_state.store(new_dynamic_state, std::memory_order_relaxed);
successor_list_node* successor_list = fetch_successor_list(TRANSFERRED_FLAG);
new_dynamic_state->add_successor_list(successor_list);
}

private:
task_handle_task* m_task;
std::atomic<successor_list_node*> m_successor_list_head;
std::atomic<continuation_vertex*> m_continuation_vertex;
std::atomic<task_dynamic_state*> m_new_dynamic_state;
std::atomic<std::size_t> m_num_references;
d1::small_object_allocator m_allocator;
};
@@ -234,6 +263,16 @@ class task_handle_task : public d1::task {
task_dynamic_state* current_state = m_dynamic_state.load(std::memory_order_relaxed);
return current_state ? current_state->has_dependencies() : false;
}

void transfer_successors_to(task_dynamic_state* other_task_state) {
__TBB_ASSERT(other_task_state != nullptr, nullptr);
task_dynamic_state* current_state = m_dynamic_state.load(std::memory_order_relaxed);

// If dynamic state was not created for currently executing task, it cannot have successors
if (current_state != nullptr) {
current_state->transfer_successors_to(other_task_state);
}
}
#endif
};

@@ -363,28 +402,79 @@ inline task_handle_task* release_successor_list(successor_list_node* node) {
return next_task;
}

inline bool task_dynamic_state::check_transfer_or_completion(successor_list_node* current_list_head,
successor_list_node* new_successor_node)
{
bool result = false;
if (is_completed(current_list_head)) {
// Task completed while reading the successor list, no need to add extra dependencies
new_successor_node->get_continuation_vertex()->release();
new_successor_node->finalize();
result = true;
} else {
if (is_transferred(current_list_head)) {
// Originally tracker task transferred successors to other task, add new successor to the receiving task
task_dynamic_state* new_state = m_new_dynamic_state.load(std::memory_order_relaxed);
__TBB_ASSERT(new_state, "successor list is marked as transferred, but new dynamic state is not set");
new_state->add_successor_node(new_successor_node);
result = true;
}
}
return result;
}

inline void task_dynamic_state::add_successor_node(successor_list_node* new_successor_node) {{
__TBB_ASSERT(new_successor_node != nullptr, nullptr);

successor_list_node* current_successor_list_head = m_successor_list_head.load(std::memory_order_acquire);

if (!check_transfer_or_completion(current_successor_list_head, new_successor_node)) {
// Task is not completed and did not transfer the successors
new_successor_node->set_next_node(current_successor_list_head);

while (!m_successor_list_head.compare_exchange_strong(current_successor_list_head, new_successor_node) &&
!check_transfer_or_completion(current_successor_list_head, new_successor_node))
{
// Other thread inserted the successor before us, update the new node
new_successor_node->set_next_node(current_successor_list_head);
}
}
}}

inline void task_dynamic_state::add_successor(continuation_vertex* successor) {
__TBB_ASSERT(successor != nullptr, nullptr);
successor_list_node* current_successor_list_head = m_successor_list_head.load(std::memory_order_acquire);

if (!is_completed(current_successor_list_head)) {
successor->reserve();
if (is_transferred(current_successor_list_head)) {
// Originally tracked task transferred successors to other task, add new successor to the receiving task
task_dynamic_state* new_state = m_new_dynamic_state.load(std::memory_order_relaxed);
__TBB_ASSERT(new_state, "successor list is marked as transferred, but new dynamic state is not set");
new_state->add_successor(successor);
} else {
successor->reserve();
d1::small_object_allocator alloc;
successor_list_node* new_successor_node = alloc.new_object<successor_list_node>(successor, alloc);
add_successor_node(new_successor_node);
}
}
}

d1::small_object_allocator alloc;
successor_list_node* new_successor_node = alloc.new_object<successor_list_node>(successor, alloc);
new_successor_node->set_next_node(current_successor_list_head);
inline void task_dynamic_state::add_successor_list(successor_list_node* successor_list) {
if (successor_list == nullptr) return;

while (!m_successor_list_head.compare_exchange_strong(current_successor_list_head, new_successor_node)) {
// Other thread updated the head of the list
successor_list_node* last_node = successor_list;

if (is_completed(current_successor_list_head)) {
// Current task has completed while we tried to insert the successor to the list
new_successor_node->finalize();
successor->release();
break;
}
new_successor_node->set_next_node(current_successor_list_head);
}
while (last_node->get_next_node() != nullptr) {
last_node = last_node->get_next_node();
}

successor_list_node* current_successor_list_head = m_successor_list_head.load(std::memory_order_acquire);
last_node->set_next_node(current_successor_list_head);

while (!m_successor_list_head.compare_exchange_strong(current_successor_list_head, successor_list)) {
// Other thread updated the head of the list
last_node->set_next_node(current_successor_list_head);
}
}

10 changes: 10 additions & 0 deletions include/oneapi/tbb/task_group.h
Original file line number Diff line number Diff line change
@@ -649,6 +649,16 @@ class task_group : public task_group_base {
internal_set_task_order(task_completion_handle_accessor::get_task_dynamic_state(pred),
task_handle_accessor::get_task_dynamic_state(succ));
}

static void transfer_this_task_completion_to(d2::task_handle& new_task) {
d1::task* curr_task = d1::get_current_task();
__TBB_ASSERT(curr_task != nullptr, "this_task_completion_to was called outside of task body");
task_handle_task* curr_th_task = dynamic_cast<task_handle_task*>(curr_task);
// Not using __TBB_ASSERT(curr_th_task) to allow function_stack_task body to use this method
if (curr_th_task != nullptr) {
curr_th_task->transfer_successors_to(task_handle_accessor::get_task_dynamic_state(new_task));
}
}
#endif
}; // class task_group

2 changes: 2 additions & 0 deletions src/tbb/def/lin32-tbb.def
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
Copyright (c) 2005-2025 Intel Corporation
Copyright (c) 2025 UXL Foundation Contributors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -78,6 +79,7 @@ _ZN3tbb6detail2r16resumeEPNS1_18suspend_point_typeE;
_ZN3tbb6detail2r121current_suspend_pointEv;
_ZN3tbb6detail2r114notify_waitersEj;
_ZN3tbb6detail2r127get_thread_reference_vertexEPNS0_2d126wait_tree_vertex_interfaceE;
_ZN3tbb6detail2r116get_current_taskEv;

/* Task dispatcher (task_dispatcher.cpp) */
_ZN3tbb6detail2r114execution_slotEPKNS0_2d114execution_dataE;
2 changes: 2 additions & 0 deletions src/tbb/def/lin64-tbb.def
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
Copyright (c) 2005-2025 Intel Corporation
Copyright (c) 2025 UXL Foundation Contributors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -78,6 +79,7 @@ _ZN3tbb6detail2r16resumeEPNS1_18suspend_point_typeE;
_ZN3tbb6detail2r121current_suspend_pointEv;
_ZN3tbb6detail2r114notify_waitersEm;
_ZN3tbb6detail2r127get_thread_reference_vertexEPNS0_2d126wait_tree_vertex_interfaceE;
_ZN3tbb6detail2r116get_current_taskEv;

/* Task dispatcher (task_dispatcher.cpp) */
_ZN3tbb6detail2r114execution_slotEPKNS0_2d114execution_dataE;
2 changes: 2 additions & 0 deletions src/tbb/def/mac64-tbb.def
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright (c) 2005-2025 Intel Corporation
# Copyright (c) 2025 UXL Foundation Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -80,6 +81,7 @@ __ZN3tbb6detail2r16resumeEPNS1_18suspend_point_typeE
__ZN3tbb6detail2r121current_suspend_pointEv
__ZN3tbb6detail2r114notify_waitersEm
__ZN3tbb6detail2r127get_thread_reference_vertexEPNS0_2d126wait_tree_vertex_interfaceE
__ZN3tbb6detail2r116get_current_taskEv

# Task dispatcher (task_dispatcher.cpp)
__ZN3tbb6detail2r114execution_slotEPKNS0_2d114execution_dataE
2 changes: 2 additions & 0 deletions src/tbb/def/win32-tbb.def
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
; Copyright (c) 2005-2025 Intel Corporation
; Copyright (c) 2025 UXL Foundation Contributors
;
; Licensed under the Apache License, Version 2.0 (the "License");
; you may not use this file except in compliance with the License.
@@ -72,6 +73,7 @@ EXPORTS
?suspend@r1@detail@tbb@@YAXP6AXPAXPAUsuspend_point_type@123@@Z0@Z
?notify_waiters@r1@detail@tbb@@YAXI@Z
?get_thread_reference_vertex@r1@detail@tbb@@YAPAVwait_tree_vertex_interface@d1@23@PAV4523@@Z
?get_current_task@r1@detail@tbb@@YAPAVtask@d1@23@XZ

; Task dispatcher (task_dispatcher.cpp)
?spawn@r1@detail@tbb@@YAXAAVtask@d1@23@AAVtask_group_context@523@G@Z
2 changes: 2 additions & 0 deletions src/tbb/def/win64-tbb.def
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
; Copyright (c) 2005-2025 Intel Corporation
; Copyright (c) 2025 UXL Foundation Contributors
;
; Licensed under the Apache License, Version 2.0 (the "License");
; you may not use this file except in compliance with the License.
@@ -72,6 +73,7 @@ EXPORTS
?current_suspend_point@r1@detail@tbb@@YAPEAUsuspend_point_type@123@XZ
?notify_waiters@r1@detail@tbb@@YAX_K@Z
?get_thread_reference_vertex@r1@detail@tbb@@YAPEAVwait_tree_vertex_interface@d1@23@PEAV4523@@Z
?get_current_task@r1@detail@tbb@@YAPEAVtask@d1@23@XZ

; Task dispatcher (task_dispatcher.cpp)
?spawn@r1@detail@tbb@@YAXAEAVtask@d1@23@AEAVtask_group_context@523@@Z
5 changes: 4 additions & 1 deletion src/tbb/scheduler_common.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
Copyright (c) 2005-2024 Intel Corporation
Copyright (c) 2005-2025 Intel Corporation
Copyright (c) 2025 UXL Foundation Contributors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -483,6 +484,8 @@ class alignas (max_nfs_size) task_dispatcher {
>
m_reference_vertex_map;

d1::task* m_innermost_running_task{ nullptr };

//! Attempt to get a task from the mailbox.
/** Gets a task only if it has not been executed by its sender or a thief
that has stolen it from the sender's task pool. Otherwise returns nullptr.
5 changes: 5 additions & 0 deletions src/tbb/task.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
Copyright (c) 2005-2025 Intel Corporation
Copyright (c) 2025 UXL Foundation Contributors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -253,6 +254,10 @@ d1::wait_tree_vertex_interface* get_thread_reference_vertex(d1::wait_tree_vertex
return ref_counter;
}

d1::task* get_current_task() {
return governor::get_thread_data()->get_current_task();
}

} // namespace r1
} // namespace detail
} // namespace tbb
6 changes: 5 additions & 1 deletion src/tbb/task_dispatcher.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
Copyright (c) 2020-2025 Intel Corporation
Copyright (c) 2025 UXL Foundation Contributors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -249,11 +250,13 @@ d1::task* task_dispatcher::local_wait_for_all(d1::task* t, Waiter& waiter ) {
task_dispatcher& task_disp;
execution_data_ext old_execute_data_ext;
properties old_properties;
d1::task* old_innermost_running_task;
bool is_initially_registered;

~dispatch_loop_guard() {
task_disp.m_execute_data_ext = old_execute_data_ext;
task_disp.m_properties = old_properties;
task_disp.m_innermost_running_task = old_innermost_running_task;

if (!is_initially_registered) {
task_disp.m_thread_data->my_arena->my_tc_client.get_pm_client()->unregister_thread();
@@ -263,7 +266,7 @@ d1::task* task_dispatcher::local_wait_for_all(d1::task* t, Waiter& waiter ) {
__TBB_ASSERT(task_disp.m_thread_data && governor::is_thread_data_set(task_disp.m_thread_data), nullptr);
__TBB_ASSERT(task_disp.m_thread_data->my_task_dispatcher == &task_disp, nullptr);
}
} dl_guard{ *this, m_execute_data_ext, m_properties, m_thread_data->my_is_registered };
} dl_guard{ *this, m_execute_data_ext, m_properties, m_innermost_running_task, m_thread_data->my_is_registered };

// The context guard to track fp setting and itt tasks.
context_guard_helper</*report_tasks=*/ITTPossible> context_guard;
@@ -328,6 +331,7 @@ d1::task* task_dispatcher::local_wait_for_all(d1::task* t, Waiter& waiter ) {

ITT_CALLEE_ENTER(ITTPossible, t, itt_caller);

m_innermost_running_task = t;
if (ed.context->is_group_execution_cancelled()) {
t = t->cancel(ed);
} else {
Loading