Skip to content

Commit

Permalink
Avoid deadlock with COI
Browse files Browse the repository at this point in the history
When an asynchronous offload task is completed, COI calls the runtime to queue
a "destructor task".  When the task deques are full, a dead-lock situation
arises where the OpenMP threads are inside but cannot progress because the COI
thread is stuck inside the runtime trying to find a slot in a deque.

This patch implements the solution where the task deques doubled in size when
a task is being queued from a COI thread.

Differential Revision: http://reviews.llvm.org/D20733

llvm-svn: 271319
  • Loading branch information
jpeyton52 committed May 31, 2016
1 parent 067325f commit f4f9695
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 28 deletions.
11 changes: 7 additions & 4 deletions openmp/runtime/src/kmp.h
Expand Up @@ -35,10 +35,6 @@
#define TASK_CURRENT_NOT_QUEUED 0
#define TASK_CURRENT_QUEUED 1

#define TASK_DEQUE_BITS 8 // Used solely to define TASK_DEQUE_SIZE and TASK_DEQUE_MASK.
#define TASK_DEQUE_SIZE ( 1 << TASK_DEQUE_BITS )
#define TASK_DEQUE_MASK ( TASK_DEQUE_SIZE - 1 )

#ifdef BUILD_TIED_TASK_STACK
#define TASK_STACK_EMPTY 0 // entries when the stack is empty

Expand Down Expand Up @@ -2223,6 +2219,7 @@ typedef struct kmp_base_thread_data {
// Used only in __kmp_execute_tasks_template, maybe not avail until task is queued?
kmp_bootstrap_lock_t td_deque_lock; // Lock for accessing deque
kmp_taskdata_t ** td_deque; // Deque of tasks encountered by td_thr, dynamically allocated
kmp_int32 td_deque_size; // Size of deck
kmp_uint32 td_deque_head; // Head of deque (will wrap)
kmp_uint32 td_deque_tail; // Tail of deque (will wrap)
kmp_int32 td_deque_ntasks; // Number of tasks in deque
Expand All @@ -2233,6 +2230,12 @@ typedef struct kmp_base_thread_data {
#endif // BUILD_TIED_TASK_STACK
} kmp_base_thread_data_t;

#define TASK_DEQUE_BITS 8 // Used solely to define INITIAL_TASK_DEQUE_SIZE
#define INITIAL_TASK_DEQUE_SIZE ( 1 << TASK_DEQUE_BITS )

#define TASK_DEQUE_SIZE(td) ((td).td_deque_size)
#define TASK_DEQUE_MASK(td) ((td).td_deque_size - 1)

typedef union KMP_ALIGN_CACHE kmp_thread_data {
kmp_base_thread_data_t td;
double td_align; /* use worst case alignment */
Expand Down
3 changes: 2 additions & 1 deletion openmp/runtime/src/kmp_debugger.c
Expand Up @@ -93,7 +93,7 @@ __kmp_omp_debug_struct_info = {
sizeof( void * ),
OMP_LOCK_T_SIZE < sizeof(void *),
bs_last_barrier,
TASK_DEQUE_SIZE,
INITIAL_TASK_DEQUE_SIZE,

// thread structure information
sizeof( kmp_base_info_t ),
Expand Down Expand Up @@ -222,6 +222,7 @@ __kmp_omp_debug_struct_info = {
// thread_data_t.
sizeof( kmp_thread_data_t ),
offset_and_size_of( kmp_base_thread_data_t, td_deque ),
offset_and_size_of( kmp_base_thread_data_t, td_deque_size ),
offset_and_size_of( kmp_base_thread_data_t, td_deque_head ),
offset_and_size_of( kmp_base_thread_data_t, td_deque_tail ),
offset_and_size_of( kmp_base_thread_data_t, td_deque_ntasks ),
Expand Down
1 change: 1 addition & 0 deletions openmp/runtime/src/kmp_omp.h
Expand Up @@ -218,6 +218,7 @@ typedef struct {
/* kmp_thread_data_t */
kmp_int32 hd_sizeof_struct;
offset_and_size_t hd_deque;
offset_and_size_t hd_deque_size;
offset_and_size_t hd_deque_head;
offset_and_size_t hd_deque_tail;
offset_and_size_t hd_deque_ntasks;
Expand Down
95 changes: 72 additions & 23 deletions openmp/runtime/src/kmp_tasking.c
Expand Up @@ -305,7 +305,7 @@ __kmp_push_task(kmp_int32 gtid, kmp_task_t * task )
}

// Check if deque is full
if ( TCR_4(thread_data -> td.td_deque_ntasks) >= TASK_DEQUE_SIZE )
if ( TCR_4(thread_data -> td.td_deque_ntasks) >= TASK_DEQUE_SIZE(thread_data->td) )
{
KA_TRACE(20, ( "__kmp_push_task: T#%d deque is full; returning TASK_NOT_PUSHED for task %p\n",
gtid, taskdata ) );
Expand All @@ -317,7 +317,7 @@ __kmp_push_task(kmp_int32 gtid, kmp_task_t * task )

#if OMP_41_ENABLED
// Need to recheck as we can get a proxy task from a thread outside of OpenMP
if ( TCR_4(thread_data -> td.td_deque_ntasks) >= TASK_DEQUE_SIZE )
if ( TCR_4(thread_data -> td.td_deque_ntasks) >= TASK_DEQUE_SIZE(thread_data->td) )
{
__kmp_release_bootstrap_lock( & thread_data -> td.td_deque_lock );
KA_TRACE(20, ( "__kmp_push_task: T#%d deque is full on 2nd check; returning TASK_NOT_PUSHED for task %p\n",
Expand All @@ -326,12 +326,12 @@ __kmp_push_task(kmp_int32 gtid, kmp_task_t * task )
}
#else
// Must have room since no thread can add tasks but calling thread
KMP_DEBUG_ASSERT( TCR_4(thread_data -> td.td_deque_ntasks) < TASK_DEQUE_SIZE );
KMP_DEBUG_ASSERT( TCR_4(thread_data -> td.td_deque_ntasks) < TASK_DEQUE_SIZE(thread_data->td) );
#endif

thread_data -> td.td_deque[ thread_data -> td.td_deque_tail ] = taskdata; // Push taskdata
// Wrap index.
thread_data -> td.td_deque_tail = ( thread_data -> td.td_deque_tail + 1 ) & TASK_DEQUE_MASK;
thread_data -> td.td_deque_tail = ( thread_data -> td.td_deque_tail + 1 ) & TASK_DEQUE_MASK(thread_data->td);
TCW_4(thread_data -> td.td_deque_ntasks, TCR_4(thread_data -> td.td_deque_ntasks) + 1); // Adjust task count

__kmp_release_bootstrap_lock( & thread_data -> td.td_deque_lock );
Expand Down Expand Up @@ -1641,7 +1641,7 @@ __kmp_remove_my_task( kmp_info_t * thread, kmp_int32 gtid, kmp_task_team_t *task
return NULL;
}

tail = ( thread_data -> td.td_deque_tail - 1 ) & TASK_DEQUE_MASK; // Wrap index.
tail = ( thread_data -> td.td_deque_tail - 1 ) & TASK_DEQUE_MASK(thread_data->td); // Wrap index.
taskdata = thread_data -> td.td_deque[ tail ];

if (is_constrained) {
Expand Down Expand Up @@ -1735,10 +1735,10 @@ __kmp_steal_task( kmp_info_t *victim, kmp_int32 gtid, kmp_task_team_t *task_team
if ( !is_constrained ) {
taskdata = victim_td -> td.td_deque[ victim_td -> td.td_deque_head ];
// Bump head pointer and Wrap.
victim_td -> td.td_deque_head = ( victim_td -> td.td_deque_head + 1 ) & TASK_DEQUE_MASK;
victim_td -> td.td_deque_head = ( victim_td -> td.td_deque_head + 1 ) & TASK_DEQUE_MASK(victim_td->td);
} else {
// While we have postponed tasks let's steal from tail of the deque (smaller tasks)
kmp_int32 tail = ( victim_td -> td.td_deque_tail - 1 ) & TASK_DEQUE_MASK; // Wrap index.
kmp_int32 tail = ( victim_td -> td.td_deque_tail - 1 ) & TASK_DEQUE_MASK(victim_td->td); // Wrap index.
taskdata = victim_td -> td.td_deque[ tail ];
// we need to check if the candidate obeys task scheduling constraint:
// only child of current task can be scheduled
Expand Down Expand Up @@ -2267,14 +2267,42 @@ __kmp_alloc_task_deque( kmp_info_t *thread, kmp_thread_data_t *thread_data )
KMP_DEBUG_ASSERT( thread_data -> td.td_deque_tail == 0 );

KE_TRACE( 10, ( "__kmp_alloc_task_deque: T#%d allocating deque[%d] for thread_data %p\n",
__kmp_gtid_from_thread( thread ), TASK_DEQUE_SIZE, thread_data ) );
__kmp_gtid_from_thread( thread ), INITIAL_TASK_DEQUE_SIZE, thread_data ) );
// Allocate space for task deque, and zero the deque
// Cannot use __kmp_thread_calloc() because threads not around for
// kmp_reap_task_team( ).
thread_data -> td.td_deque = (kmp_taskdata_t **)
__kmp_allocate( TASK_DEQUE_SIZE * sizeof(kmp_taskdata_t *));
__kmp_allocate( INITIAL_TASK_DEQUE_SIZE * sizeof(kmp_taskdata_t *));
thread_data -> td.td_deque_size = INITIAL_TASK_DEQUE_SIZE;
}

//------------------------------------------------------------------------------
// __kmp_realloc_task_deque:
// Re-allocates a task deque for a particular thread, copies the content from the old deque
// and adjusts the necessary data structures relating to the deque.
// This operation must be done with a the deque_lock being held

static void __kmp_realloc_task_deque ( kmp_info_t *thread, kmp_thread_data_t *thread_data )
{
kmp_int32 size = TASK_DEQUE_SIZE(thread_data->td);
kmp_int32 new_size = 2 * size;

KE_TRACE( 10, ( "__kmp_realloc_task_deque: T#%d reallocating deque[from %d to %d] for thread_data %p\n",
__kmp_gtid_from_thread( thread ), size, new_size, thread_data ) );

kmp_taskdata_t ** new_deque = (kmp_taskdata_t **) __kmp_allocate( new_size * sizeof(kmp_taskdata_t *));

int i,j;
for ( i = thread_data->td.td_deque_head, j = 0; j < size; i = (i+1) & TASK_DEQUE_MASK(thread_data->td), j++ )
new_deque[j] = thread_data->td.td_deque[i];

__kmp_free(thread_data->td.td_deque);

thread_data -> td.td_deque_head = 0;
thread_data -> td.td_deque_tail = size;
thread_data -> td.td_deque = new_deque;
thread_data -> td.td_deque_size = new_size;
}

//------------------------------------------------------------------------------
// __kmp_free_task_deque:
Expand Down Expand Up @@ -2769,7 +2797,7 @@ __kmp_tasking_barrier( kmp_team_t *team, kmp_info_t *thread, int gtid )
Because of this, __kmp_push_task needs to check if there's space after getting the lock
*/
static bool __kmp_give_task ( kmp_info_t *thread, kmp_int32 tid, kmp_task_t * task )
static bool __kmp_give_task ( kmp_info_t *thread, kmp_int32 tid, kmp_task_t * task, kmp_int32 pass )
{
kmp_taskdata_t * taskdata = KMP_TASK_TO_TASKDATA(task);
kmp_task_team_t * task_team = taskdata->td_task_team;
Expand All @@ -2789,23 +2817,37 @@ static bool __kmp_give_task ( kmp_info_t *thread, kmp_int32 tid, kmp_task_t * ta
return result;
}

if ( TCR_4(thread_data -> td.td_deque_ntasks) >= TASK_DEQUE_SIZE )
if ( TCR_4(thread_data -> td.td_deque_ntasks) >= TASK_DEQUE_SIZE(thread_data->td) )
{
KA_TRACE(30, ("__kmp_give_task: queue is full while giving task %p to thread %d.\n", taskdata, tid ) );
return result;
}

__kmp_acquire_bootstrap_lock( & thread_data-> td.td_deque_lock );
// if this deque is bigger than the pass ratio give a chance to another thread
if ( TASK_DEQUE_SIZE(thread_data->td)/INITIAL_TASK_DEQUE_SIZE >= pass ) return result;

if ( TCR_4(thread_data -> td.td_deque_ntasks) >= TASK_DEQUE_SIZE )
{
KA_TRACE(30, ("__kmp_give_task: queue is full while giving task %p to thread %d.\n", taskdata, tid ) );
goto release_and_exit;
__kmp_acquire_bootstrap_lock( & thread_data-> td.td_deque_lock );
__kmp_realloc_task_deque(thread,thread_data);

} else {

__kmp_acquire_bootstrap_lock( & thread_data-> td.td_deque_lock );

if ( TCR_4(thread_data -> td.td_deque_ntasks) >= TASK_DEQUE_SIZE(thread_data->td) )
{
KA_TRACE(30, ("__kmp_give_task: queue is full while giving task %p to thread %d.\n", taskdata, tid ) );

// if this deque is bigger than the pass ratio give a chance to another thread
if ( TASK_DEQUE_SIZE(thread_data->td)/INITIAL_TASK_DEQUE_SIZE >= pass )
goto release_and_exit;

__kmp_realloc_task_deque(thread,thread_data);
}
}

// lock is held here, and there is space in the deque

thread_data -> td.td_deque[ thread_data -> td.td_deque_tail ] = taskdata;
// Wrap index.
thread_data -> td.td_deque_tail = ( thread_data -> td.td_deque_tail + 1 ) & TASK_DEQUE_MASK;
thread_data -> td.td_deque_tail = ( thread_data -> td.td_deque_tail + 1 ) & TASK_DEQUE_MASK(thread_data->td);
TCW_4(thread_data -> td.td_deque_ntasks, TCR_4(thread_data -> td.td_deque_ntasks) + 1);

result = true;
Expand Down Expand Up @@ -2919,14 +2961,21 @@ void __kmpc_proxy_task_completed_ooo ( kmp_task_t *ptask )
kmp_team_t * team = taskdata->td_team;
kmp_int32 nthreads = team->t.t_nproc;
kmp_info_t *thread;
kmp_int32 k = 0;

//This should be similar to start_k = __kmp_get_random( thread ) % nthreads but we cannot use __kmp_get_random here
kmp_int32 start_k = 0;
kmp_int32 pass = 1;
kmp_int32 k = start_k;

do {
//This should be similar to k = __kmp_get_random( thread ) % nthreads but we cannot use __kmp_get_random here
//For now we're just linearly trying to find a thread
k = (k+1) % nthreads;
thread = team->t.t_threads[k];
} while ( !__kmp_give_task( thread, k, ptask ) );
k = (k+1) % nthreads;

// we did a full pass through all the threads
if ( k == start_k ) pass = pass << 1;

} while ( !__kmp_give_task( thread, k, ptask, pass ) );

__kmp_second_top_half_finish_proxy(taskdata);

Expand Down

0 comments on commit f4f9695

Please sign in to comment.