Skip to content

Commit 36d4e4c

Browse files
Chenle YuJose M Monsalve Diaz
authored andcommitted
[OpenMP] Implement task record and replay mechanism
This patch implements the "task record and replay" mechanism. The idea is to be able to store tasks and their dependencies in the runtime so that we do not pay the cost of task creation and dependency resolution for future executions. The objective is to improve fine-grained task performance, both for those from "omp task" and "taskloop". The entry point of the recording phase is __kmpc_start_record_task, and the end of record is triggered by __kmpc_end_record_task. Tasks encapsulated between a record start and a record end are saved, meaning that the runtime stores their dependencies and structures, referred to as TDG, in order to replay them in subsequent executions. In these TDG replays, we start the execution by scheduling all root tasks (tasks that do not have input dependencies), and there will be no involvement of a hash table to track the dependencies, yet tasks do not need to be created again. At the beginning of __kmpc_start_record_task, we must check if a TDG has already been recorded. If yes, the function returns 0 and starts to replay the TDG by calling __kmp_exec_tdg; if not, we start to record, and the function returns 1. An integer uniquely identifies TDGs. Currently, this identifier needs to be incremented manually in the source code. Still, depending on how this feature would eventually be used in the library, the caller function must do it; also, the caller function needs to implement a mechanism to skip the associated region, according to the return value of __kmpc_start_record_task. Reviewed By: tianshilei1992 Differential Revision: https://reviews.llvm.org/D146642
1 parent 0f1fb62 commit 36d4e4c

15 files changed

+917
-7
lines changed

openmp/runtime/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,10 @@ if(LIBOMP_OMPD_SUPPORT AND ((NOT LIBOMP_OMPT_SUPPORT) OR (NOT "${CMAKE_SYSTEM_NA
342342
set(LIBOMP_OMPD_SUPPORT FALSE)
343343
endif()
344344

345+
# OMPX Taskgraph support
346+
# Whether to build with OMPX Taskgraph (e.g. task record & replay)
347+
set(LIBOMP_OMPX_TASKGRAPH FALSE CACHE BOOL "OMPX-taskgraph (task record & replay)?")
348+
345349
# Error check hwloc support after config-ix has run
346350
if(LIBOMP_USE_HWLOC AND (NOT LIBOMP_HAVE_HWLOC))
347351
libomp_error_say("Hwloc requested but not available")
@@ -411,6 +415,7 @@ if(${OPENMP_STANDALONE_BUILD})
411415
libomp_say("Use Adaptive locks -- ${LIBOMP_USE_ADAPTIVE_LOCKS}")
412416
libomp_say("Use quad precision -- ${LIBOMP_USE_QUAD_PRECISION}")
413417
libomp_say("Use Hwloc library -- ${LIBOMP_USE_HWLOC}")
418+
libomp_say("Use OMPX-taskgraph -- ${LIBOMP_OMPX_TASKGRAPH}")
414419
endif()
415420

416421
add_subdirectory(src)

openmp/runtime/src/kmp.h

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2487,6 +2487,62 @@ typedef struct {
24872487
} ed;
24882488
} kmp_event_t;
24892489

2490+
#if OMPX_TASKGRAPH
2491+
// Initial number of allocated nodes while recording
2492+
#define INIT_MAPSIZE 50
2493+
2494+
typedef struct kmp_taskgraph_flags { /*This needs to be exactly 32 bits */
2495+
unsigned nowait : 1;
2496+
unsigned re_record : 1;
2497+
unsigned reserved : 30;
2498+
} kmp_taskgraph_flags_t;
2499+
2500+
/// Represents a TDG node
2501+
typedef struct kmp_node_info {
2502+
kmp_task_t *task; // Pointer to the actual task
2503+
kmp_int32 *successors; // Array of the succesors ids
2504+
kmp_int32 nsuccessors; // Number of succesors of the node
2505+
std::atomic<kmp_int32>
2506+
npredecessors_counter; // Number of predessors on the fly
2507+
kmp_int32 npredecessors; // Total number of predecessors
2508+
kmp_int32 successors_size; // Number of allocated succesors ids
2509+
kmp_taskdata_t *parent_task; // Parent implicit task
2510+
} kmp_node_info_t;
2511+
2512+
/// Represent a TDG's current status
2513+
typedef enum kmp_tdg_status {
2514+
KMP_TDG_NONE = 0,
2515+
KMP_TDG_RECORDING = 1,
2516+
KMP_TDG_READY = 2
2517+
} kmp_tdg_status_t;
2518+
2519+
/// Structure that contains a TDG
2520+
typedef struct kmp_tdg_info {
2521+
kmp_int32 tdg_id; // Unique idenfifier of the TDG
2522+
kmp_taskgraph_flags_t tdg_flags; // Flags related to a TDG
2523+
kmp_int32 map_size; // Number of allocated TDG nodes
2524+
kmp_int32 num_roots; // Number of roots tasks int the TDG
2525+
kmp_int32 *root_tasks; // Array of tasks identifiers that are roots
2526+
kmp_node_info_t *record_map; // Array of TDG nodes
2527+
kmp_tdg_status_t tdg_status =
2528+
KMP_TDG_NONE; // Status of the TDG (recording, ready...)
2529+
std::atomic<kmp_int32> num_tasks; // Number of TDG nodes
2530+
kmp_bootstrap_lock_t
2531+
graph_lock; // Protect graph attributes when updated via taskloop_recur
2532+
// Taskloop reduction related
2533+
void *rec_taskred_data; // Data to pass to __kmpc_task_reduction_init or
2534+
// __kmpc_taskred_init
2535+
kmp_int32 rec_num_taskred;
2536+
} kmp_tdg_info_t;
2537+
2538+
extern kmp_int32 __kmp_max_tdgs;
2539+
extern kmp_tdg_info_t **__kmp_global_tdgs;
2540+
extern kmp_int32 __kmp_curr_tdg_idx;
2541+
extern kmp_int32 __kmp_successors_size;
2542+
extern std::atomic<kmp_int32> __kmp_tdg_task_id;
2543+
extern kmp_int32 __kmp_num_tdg;
2544+
#endif
2545+
24902546
#ifdef BUILD_TIED_TASK_STACK
24912547

24922548
/* Tied Task stack definitions */
@@ -2534,7 +2590,12 @@ typedef struct kmp_tasking_flags { /* Total struct must be exactly 32 bits */
25342590
unsigned complete : 1; /* 1==complete, 0==not complete */
25352591
unsigned freed : 1; /* 1==freed, 0==allocated */
25362592
unsigned native : 1; /* 1==gcc-compiled task, 0==intel */
2593+
#if OMPX_TASKGRAPH
2594+
unsigned onced : 1; /* 1==ran once already, 0==never ran, record & replay purposes */
2595+
unsigned reserved31 : 6; /* reserved for library use */
2596+
#else
25372597
unsigned reserved31 : 7; /* reserved for library use */
2598+
#endif
25382599

25392600
} kmp_tasking_flags_t;
25402601

@@ -2583,6 +2644,10 @@ struct kmp_taskdata { /* aligned during dynamic allocation */
25832644
kmp_event_t td_allow_completion_event;
25842645
#if OMPT_SUPPORT
25852646
ompt_task_info_t ompt_task_info;
2647+
#endif
2648+
#if OMPX_TASKGRAPH
2649+
bool is_taskgraph = 0; // whether the task is within a TDG
2650+
kmp_tdg_info_t *tdg; // used to associate task with a TDG
25862651
#endif
25872652
kmp_target_data_t td_target_data;
25882653
}; // struct kmp_taskdata
@@ -4124,6 +4189,20 @@ KMP_EXPORT void __kmpc_init_nest_lock_with_hint(ident_t *loc, kmp_int32 gtid,
41244189
void **user_lock,
41254190
uintptr_t hint);
41264191

4192+
#if OMPX_TASKGRAPH
4193+
// Taskgraph's Record & Replay mechanism
4194+
// __kmp_tdg_is_recording: check whether a given TDG is recording
4195+
// status: the tdg's current status
4196+
static inline bool __kmp_tdg_is_recording(kmp_tdg_status_t status) {
4197+
return status == KMP_TDG_RECORDING;
4198+
}
4199+
4200+
KMP_EXPORT kmp_int32 __kmpc_start_record_task(ident_t *loc, kmp_int32 gtid,
4201+
kmp_int32 input_flags,
4202+
kmp_int32 tdg_id);
4203+
KMP_EXPORT void __kmpc_end_record_task(ident_t *loc, kmp_int32 gtid,
4204+
kmp_int32 input_flags, kmp_int32 tdg_id);
4205+
#endif
41274206
/* Interface to fast scalable reduce methods routines */
41284207

41294208
KMP_EXPORT kmp_int32 __kmpc_reduce_nowait(

openmp/runtime/src/kmp_config.h.cmake

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
#define OMPT_SUPPORT LIBOMP_OMPT_SUPPORT
4747
#cmakedefine01 LIBOMP_OMPD_SUPPORT
4848
#define OMPD_SUPPORT LIBOMP_OMPD_SUPPORT
49+
#cmakedefine01 LIBOMP_OMPX_TASKGRAPH
50+
#define OMPX_TASKGRAPH LIBOMP_OMPX_TASKGRAPH
4951
#cmakedefine01 LIBOMP_PROFILING_SUPPORT
5052
#define OMP_PROFILING_SUPPORT LIBOMP_PROFILING_SUPPORT
5153
#cmakedefine01 LIBOMP_OMPT_OPTIONAL

openmp/runtime/src/kmp_global.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,4 +557,16 @@ int __kmp_nesting_mode = 0;
557557
int __kmp_nesting_mode_nlevels = 1;
558558
int *__kmp_nesting_nth_level;
559559

560+
#if OMPX_TASKGRAPH
561+
// TDG record & replay
562+
kmp_int32 __kmp_max_tdgs = 100;
563+
kmp_tdg_info_t **__kmp_global_tdgs = NULL;
564+
kmp_int32 __kmp_curr_tdg_idx =
565+
0; // Id of the current TDG being recorded or executed
566+
kmp_int32 __kmp_num_tdg = 0;
567+
kmp_int32 __kmp_successors_size = 10; // Initial succesor size list for
568+
// recording
569+
std::atomic<kmp_int32> __kmp_tdg_task_id = 0;
570+
#endif
560571
// end of file //
572+

openmp/runtime/src/kmp_settings.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1238,6 +1238,18 @@ static void __kmp_stg_parse_num_threads(char const *name, char const *value,
12381238
K_DIAG(1, ("__kmp_dflt_team_nth == %d\n", __kmp_dflt_team_nth));
12391239
} // __kmp_stg_parse_num_threads
12401240

1241+
#if OMPX_TASKGRAPH
1242+
static void __kmp_stg_parse_max_tdgs(char const *name, char const *value,
1243+
void *data) {
1244+
__kmp_stg_parse_int(name, value, 0, INT_MAX, &__kmp_max_tdgs);
1245+
} // __kmp_stg_parse_max_tdgs
1246+
1247+
static void __kmp_std_print_max_tdgs(kmp_str_buf_t *buffer, char const *name,
1248+
void *data) {
1249+
__kmp_stg_print_int(buffer, name, __kmp_max_tdgs);
1250+
} // __kmp_std_print_max_tdgs
1251+
#endif
1252+
12411253
static void __kmp_stg_parse_num_hidden_helper_threads(char const *name,
12421254
char const *value,
12431255
void *data) {
@@ -5592,6 +5604,10 @@ static kmp_setting_t __kmp_stg_table[] = {
55925604
{"LIBOMP_NUM_HIDDEN_HELPER_THREADS",
55935605
__kmp_stg_parse_num_hidden_helper_threads,
55945606
__kmp_stg_print_num_hidden_helper_threads, NULL, 0, 0},
5607+
#if OMPX_TASKGRAPH
5608+
{"KMP_MAX_TDGS", __kmp_stg_parse_max_tdgs, __kmp_std_print_max_tdgs, NULL,
5609+
0, 0},
5610+
#endif
55955611

55965612
#if OMPT_SUPPORT
55975613
{"OMP_TOOL", __kmp_stg_parse_omp_tool, __kmp_stg_print_omp_tool, NULL, 0,

openmp/runtime/src/kmp_taskdeps.cpp

Lines changed: 121 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,44 @@ static kmp_depnode_list_t *__kmp_add_node(kmp_info_t *thread,
218218
static inline void __kmp_track_dependence(kmp_int32 gtid, kmp_depnode_t *source,
219219
kmp_depnode_t *sink,
220220
kmp_task_t *sink_task) {
221+
#if OMPX_TASKGRAPH
222+
kmp_taskdata_t *task_source = KMP_TASK_TO_TASKDATA(source->dn.task);
223+
kmp_taskdata_t *task_sink = KMP_TASK_TO_TASKDATA(sink_task);
224+
if (source->dn.task && sink_task) {
225+
// Not supporting dependency between two tasks that one is within the TDG
226+
// and the other is not
227+
KMP_ASSERT(task_source->is_taskgraph == task_sink->is_taskgraph);
228+
}
229+
if (task_sink->is_taskgraph &&
230+
__kmp_tdg_is_recording(task_sink->tdg->tdg_status)) {
231+
kmp_node_info_t *source_info =
232+
&task_sink->tdg->record_map[task_source->td_task_id];
233+
bool exists = false;
234+
for (int i = 0; i < source_info->nsuccessors; i++) {
235+
if (source_info->successors[i] == task_sink->td_task_id) {
236+
exists = true;
237+
break;
238+
}
239+
}
240+
if (!exists) {
241+
if (source_info->nsuccessors >= source_info->successors_size) {
242+
source_info->successors_size = 2 * source_info->successors_size;
243+
kmp_int32 *old_succ_ids = source_info->successors;
244+
kmp_int32 *new_succ_ids = (kmp_int32 *)__kmp_allocate(
245+
source_info->successors_size * sizeof(kmp_int32));
246+
source_info->successors = new_succ_ids;
247+
__kmp_free(old_succ_ids);
248+
}
249+
250+
source_info->successors[source_info->nsuccessors] = task_sink->td_task_id;
251+
source_info->nsuccessors++;
252+
253+
kmp_node_info_t *sink_info =
254+
&(task_sink->tdg->record_map[task_sink->td_task_id]);
255+
sink_info->npredecessors++;
256+
}
257+
}
258+
#endif
221259
#ifdef KMP_SUPPORT_GRAPH_OUTPUT
222260
kmp_taskdata_t *task_source = KMP_TASK_TO_TASKDATA(source->dn.task);
223261
// do not use sink->dn.task as that is only filled after the dependences
@@ -256,10 +294,23 @@ __kmp_depnode_link_successor(kmp_int32 gtid, kmp_info_t *thread,
256294
// link node as successor of list elements
257295
for (kmp_depnode_list_t *p = plist; p; p = p->next) {
258296
kmp_depnode_t *dep = p->node;
297+
#if OMPX_TASKGRAPH
298+
kmp_tdg_status tdg_status = KMP_TDG_NONE;
299+
if (task) {
300+
kmp_taskdata_t *td = KMP_TASK_TO_TASKDATA(task);
301+
if (td->is_taskgraph)
302+
tdg_status = KMP_TASK_TO_TASKDATA(task)->tdg->tdg_status;
303+
if (__kmp_tdg_is_recording(tdg_status))
304+
__kmp_track_dependence(gtid, dep, node, task);
305+
}
306+
#endif
259307
if (dep->dn.task) {
260308
KMP_ACQUIRE_DEPNODE(gtid, dep);
261309
if (dep->dn.task) {
262-
__kmp_track_dependence(gtid, dep, node, task);
310+
#if OMPX_TASKGRAPH
311+
if (!(__kmp_tdg_is_recording(tdg_status)) && task)
312+
#endif
313+
__kmp_track_dependence(gtid, dep, node, task);
263314
dep->dn.successors = __kmp_add_node(thread, dep->dn.successors, node);
264315
KA_TRACE(40, ("__kmp_process_deps: T#%d adding dependence from %p to "
265316
"%p\n",
@@ -281,16 +332,42 @@ static inline kmp_int32 __kmp_depnode_link_successor(kmp_int32 gtid,
281332
if (!sink)
282333
return 0;
283334
kmp_int32 npredecessors = 0;
335+
#if OMPX_TASKGRAPH
336+
kmp_tdg_status tdg_status = KMP_TDG_NONE;
337+
kmp_taskdata_t *td = KMP_TASK_TO_TASKDATA(task);
338+
if (task) {
339+
if (td->is_taskgraph)
340+
tdg_status = KMP_TASK_TO_TASKDATA(task)->tdg->tdg_status;
341+
if (__kmp_tdg_is_recording(tdg_status) && sink->dn.task)
342+
__kmp_track_dependence(gtid, sink, source, task);
343+
}
344+
#endif
284345
if (sink->dn.task) {
285346
// synchronously add source to sink' list of successors
286347
KMP_ACQUIRE_DEPNODE(gtid, sink);
287348
if (sink->dn.task) {
288-
__kmp_track_dependence(gtid, sink, source, task);
349+
#if OMPX_TASKGRAPH
350+
if (!(__kmp_tdg_is_recording(tdg_status)) && task)
351+
#endif
352+
__kmp_track_dependence(gtid, sink, source, task);
289353
sink->dn.successors = __kmp_add_node(thread, sink->dn.successors, source);
290354
KA_TRACE(40, ("__kmp_process_deps: T#%d adding dependence from %p to "
291355
"%p\n",
292356
gtid, KMP_TASK_TO_TASKDATA(sink->dn.task),
293357
KMP_TASK_TO_TASKDATA(task)));
358+
#if OMPX_TASKGRAPH
359+
if (__kmp_tdg_is_recording(tdg_status)) {
360+
kmp_taskdata_t *tdd = KMP_TASK_TO_TASKDATA(sink->dn.task);
361+
if (tdd->is_taskgraph) {
362+
if (tdd->td_flags.onced)
363+
// decrement npredecessors if sink->dn.task belongs to a taskgraph
364+
// and
365+
// 1) the task is reset to its initial state (by kmp_free_task) or
366+
// 2) the task is complete but not yet reset
367+
npredecessors--;
368+
}
369+
}
370+
#endif
294371
npredecessors++;
295372
}
296373
KMP_RELEASE_DEPNODE(gtid, sink);
@@ -595,6 +672,48 @@ kmp_int32 __kmpc_omp_task_with_deps(ident_t *loc_ref, kmp_int32 gtid,
595672
kmp_info_t *thread = __kmp_threads[gtid];
596673
kmp_taskdata_t *current_task = thread->th.th_current_task;
597674

675+
#if OMPX_TASKGRAPH
676+
// record TDG with deps
677+
if (new_taskdata->is_taskgraph &&
678+
__kmp_tdg_is_recording(new_taskdata->tdg->tdg_status)) {
679+
kmp_tdg_info_t *tdg = new_taskdata->tdg;
680+
// extend record_map if needed
681+
if (new_taskdata->td_task_id >= tdg->map_size) {
682+
__kmp_acquire_bootstrap_lock(&tdg->graph_lock);
683+
if (new_taskdata->td_task_id >= tdg->map_size) {
684+
kmp_uint old_size = tdg->map_size;
685+
kmp_uint new_size = old_size * 2;
686+
kmp_node_info_t *old_record = tdg->record_map;
687+
kmp_node_info_t *new_record = (kmp_node_info_t *)__kmp_allocate(
688+
new_size * sizeof(kmp_node_info_t));
689+
KMP_MEMCPY(new_record, tdg->record_map,
690+
old_size * sizeof(kmp_node_info_t));
691+
tdg->record_map = new_record;
692+
693+
__kmp_free(old_record);
694+
695+
for (kmp_int i = old_size; i < new_size; i++) {
696+
kmp_int32 *successorsList = (kmp_int32 *)__kmp_allocate(
697+
__kmp_successors_size * sizeof(kmp_int32));
698+
new_record[i].task = nullptr;
699+
new_record[i].successors = successorsList;
700+
new_record[i].nsuccessors = 0;
701+
new_record[i].npredecessors = 0;
702+
new_record[i].successors_size = __kmp_successors_size;
703+
KMP_ATOMIC_ST_REL(&new_record[i].npredecessors_counter, 0);
704+
}
705+
// update the size at the end, so that we avoid other
706+
// threads use old_record while map_size is already updated
707+
tdg->map_size = new_size;
708+
}
709+
__kmp_release_bootstrap_lock(&tdg->graph_lock);
710+
}
711+
tdg->record_map[new_taskdata->td_task_id].task = new_task;
712+
tdg->record_map[new_taskdata->td_task_id].parent_task =
713+
new_taskdata->td_parent;
714+
KMP_ATOMIC_INC(&tdg->num_tasks);
715+
}
716+
#endif
598717
#if OMPT_SUPPORT
599718
if (ompt_enabled.enabled) {
600719
if (!current_task->ompt_task_info.frame.enter_frame.ptr)

openmp/runtime/src/kmp_taskdeps.h

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,23 @@ static inline void __kmp_dephash_free(kmp_info_t *thread, kmp_dephash_t *h) {
9292
extern void __kmpc_give_task(kmp_task_t *ptask, kmp_int32 start);
9393

9494
static inline void __kmp_release_deps(kmp_int32 gtid, kmp_taskdata_t *task) {
95+
96+
#if OMPX_TASKGRAPH
97+
if (task->is_taskgraph && !(__kmp_tdg_is_recording(task->tdg->tdg_status))) {
98+
kmp_node_info_t *TaskInfo = &(task->tdg->record_map[task->td_task_id]);
99+
100+
for (int i = 0; i < TaskInfo->nsuccessors; i++) {
101+
kmp_int32 successorNumber = TaskInfo->successors[i];
102+
kmp_node_info_t *successor = &(task->tdg->record_map[successorNumber]);
103+
kmp_int32 npredecessors = KMP_ATOMIC_DEC(&successor->npredecessors_counter) - 1;
104+
if (successor->task != nullptr && npredecessors == 0) {
105+
__kmp_omp_task(gtid, successor->task, false);
106+
}
107+
}
108+
return;
109+
}
110+
#endif
111+
95112
kmp_info_t *thread = __kmp_threads[gtid];
96113
kmp_depnode_t *node = task->td_depnode;
97114

@@ -120,8 +137,12 @@ static inline void __kmp_release_deps(kmp_int32 gtid, kmp_taskdata_t *task) {
120137
gtid, task));
121138

122139
KMP_ACQUIRE_DEPNODE(gtid, node);
123-
node->dn.task =
124-
NULL; // mark this task as finished, so no new dependencies are generated
140+
#if OMPX_TASKGRAPH
141+
if (!task->is_taskgraph ||
142+
(task->is_taskgraph && !__kmp_tdg_is_recording(task->tdg->tdg_status)))
143+
#endif
144+
node->dn.task =
145+
NULL; // mark this task as finished, so no new dependencies are generated
125146
KMP_RELEASE_DEPNODE(gtid, node);
126147

127148
kmp_depnode_list_t *next;

0 commit comments

Comments
 (0)