Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 111 additions & 75 deletions opal/runtime/opal_progress.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ int opal_progress_spin_count = 10000;
static opal_atomic_lock_t progress_lock;

/* callbacks to progress */
static opal_progress_callback_t *callbacks = NULL;
static volatile opal_progress_callback_t *callbacks = NULL;
static size_t callbacks_len = 0;
static size_t callbacks_size = 0;

static opal_progress_callback_t *callbacks_lp = NULL;
static volatile opal_progress_callback_t *callbacks_lp = NULL;
static size_t callbacks_lp_len = 0;
static size_t callbacks_lp_size = 0;

Expand Down Expand Up @@ -93,6 +93,9 @@ static int debug_output = -1;
*/
static int fake_cb(void) { return 0; }

static int _opal_progress_unregister (opal_progress_callback_t cb, opal_progress_callback_t *callback_array,
size_t *callback_array_len);

/* init the progress engine - called from orte_init */
int
opal_progress_init(void)
Expand All @@ -109,6 +112,27 @@ opal_progress_init(void)
}
#endif

callbacks_size = callbacks_lp_size = 8;

callbacks = malloc (callbacks_size * sizeof (callbacks[0]));
callbacks_lp = malloc (callbacks_lp_size * sizeof (callbacks_lp[0]));

if (NULL == callbacks || NULL == callbacks_lp) {
free (callbacks);
free (callbacks_lp);
callbacks_size = callbacks_lp_size = 0;
callbacks = callbacks_lp = NULL;
return OPAL_ERR_OUT_OF_RESOURCE;
}

for (size_t i = 0 ; i < callbacks_size ; ++i) {
callbacks[i] = fake_cb;
}

for (size_t i = 0 ; i < callbacks_lp_size ; ++i) {
callbacks_lp[i] = fake_cb;
}

OPAL_OUTPUT((debug_output, "progress: initialized event flag to: %x",
opal_progress_event_flag));
OPAL_OUTPUT((debug_output, "progress: initialized yield_when_idle to: %s",
Expand All @@ -130,10 +154,13 @@ opal_progress_finalize(void)

callbacks_len = 0;
callbacks_size = 0;
if (NULL != callbacks) {
free(callbacks);
callbacks = NULL;
}
free(callbacks);
callbacks = NULL;

callbacks_lp_len = 0;
callbacks_lp_size = 0;
free(callbacks_lp);
callbacks_lp = NULL;

opal_atomic_unlock(&progress_lock);

Expand Down Expand Up @@ -322,38 +349,73 @@ opal_progress_set_event_poll_rate(int polltime)
#endif
}

static int opal_progress_find_cb (opal_progress_callback_t cb, opal_progress_callback_t *cbs,
size_t cbs_len)
{
for (size_t i = 0 ; i < cbs_len ; ++i) {
if (cbs[i] == cb) {
return (int) i;
}
}

int
opal_progress_register(opal_progress_callback_t cb)
return OPAL_ERR_NOT_FOUND;
}

static int _opal_progress_register (opal_progress_callback_t cb, opal_progress_callback_t **cbs,
size_t *cbs_size, size_t *cbs_len)
{
int ret = OPAL_SUCCESS;
size_t index;

/* just in case there is a low-priority callback remove it */
(void) opal_progress_unregister (cb);

opal_atomic_lock(&progress_lock);
if (OPAL_ERR_NOT_FOUND != opal_progress_find_cb (cb, *cbs, *cbs_len)) {
return OPAL_SUCCESS;
}

/* see if we need to allocate more space */
if (callbacks_len + 1 > callbacks_size) {
opal_progress_callback_t *tmp;
tmp = (opal_progress_callback_t*)realloc(callbacks, sizeof(opal_progress_callback_t) * (callbacks_size + 4));
if (*cbs_len + 1 > *cbs_size) {
opal_progress_callback_t *tmp, *old;

tmp = (opal_progress_callback_t *) malloc (sizeof (tmp[0]) * 2 * *cbs_size);
if (tmp == NULL) {
ret = OPAL_ERR_TEMP_OUT_OF_RESOURCE;
goto cleanup;
return OPAL_ERR_TEMP_OUT_OF_RESOURCE;
}
/* registering fake callbacks to fill callbacks[] */
for( index = callbacks_len + 1 ; index < callbacks_size + 4 ; index++) {
tmp[index] = &fake_cb;

if (*cbs) {
/* copy old callbacks */
memcpy (tmp, *cbs, sizeof(tmp[0]) * *cbs_size);
}

callbacks = tmp;
callbacks_size += 4;
for (size_t i = *cbs_len ; i < 2 * *cbs_size ; ++i) {
tmp[i] = fake_cb;
}

opal_atomic_wmb ();

/* swap out callback array */
old = opal_atomic_swap_ptr (cbs, tmp);

opal_atomic_wmb ();

free (old);
*cbs_size *= 2;
}

callbacks[callbacks_len++] = cb;
cbs[0][*cbs_len] = cb;
++*cbs_len;

opal_atomic_wmb ();

return ret;
}

int opal_progress_register (opal_progress_callback_t cb)
{
int ret;

opal_atomic_lock(&progress_lock);

(void) _opal_progress_unregister (cb, callbacks_lp, &callbacks_lp_len);

cleanup:
ret = _opal_progress_register (cb, &callbacks, &callbacks_size, &callbacks_len);

opal_atomic_unlock(&progress_lock);

Expand All @@ -362,84 +424,58 @@ opal_progress_register(opal_progress_callback_t cb)

int opal_progress_register_lp (opal_progress_callback_t cb)
{
int ret = OPAL_SUCCESS;
size_t index;

/* just in case there is a high-priority callback remove it */
(void) opal_progress_unregister (cb);
int ret;

opal_atomic_lock(&progress_lock);

/* see if we need to allocate more space */
if (callbacks_lp_len + 1 > callbacks_lp_size) {
opal_progress_callback_t *tmp;
tmp = (opal_progress_callback_t*)realloc(callbacks_lp, sizeof(opal_progress_callback_t) * (callbacks_lp_size + 4));
if (tmp == NULL) {
ret = OPAL_ERR_TEMP_OUT_OF_RESOURCE;
goto cleanup;
}
/* registering fake callbacks_lp to fill callbacks_lp[] */
for( index = callbacks_lp_len + 1 ; index < callbacks_lp_size + 4 ; index++) {
tmp[index] = &fake_cb;
}
(void) _opal_progress_unregister (cb, callbacks, &callbacks_len);

callbacks_lp = tmp;
callbacks_lp_size += 4;
}

callbacks_lp[callbacks_lp_len++] = cb;

cleanup:
ret = _opal_progress_register (cb, &callbacks_lp, &callbacks_lp_size, &callbacks_lp_len);

opal_atomic_unlock(&progress_lock);

return ret;
}

static int _opal_progress_unregister (opal_progress_callback_t cb, opal_progress_callback_t *callback_array,
size_t callback_array_len)
size_t *callback_array_len)
{
size_t i;
int ret = OPAL_ERR_NOT_FOUND;

opal_atomic_lock(&progress_lock);

for (i = 0 ; i < callback_array_len ; ++i) {
if (cb == callback_array[i]) {
callback_array[i] = &fake_cb;
ret = OPAL_SUCCESS;
break;
}
int ret = opal_progress_find_cb (cb, callback_array, *callback_array_len);
if (OPAL_ERR_NOT_FOUND == ret) {
return ret;
}

/* If we found the function we're unregistering: If callbacks_len
is 0, we're not goig to do anything interesting anyway, so
skip. If callbacks_len is 1, it will soon be 0, so no need to
do any repacking. size_t can be unsigned, so 0 - 1 is bad for
a loop condition :). */
if (OPAL_SUCCESS == ret) {
if (i < callback_array_len - 1) {
memmove (callback_array + i, callback_array + i + 1,
(callback_array_len - i - 1) * sizeof (callback_array[0]));
}

callback_array[callback_array_len - 1] = &fake_cb;
callback_array_len--;
do any repacking. */
for (size_t i = (size_t) ret ; i < *callback_array_len - 1 ; ++i) {
/* copy callbacks atomically since another thread may be in
* opal_progress(). */
(void) opal_atomic_swap_ptr (callback_array + i, callback_array[i+1]);
}

opal_atomic_unlock(&progress_lock);
callback_array[*callback_array_len] = fake_cb;
--*callback_array_len;

return ret;
return OPAL_SUCCESS;
}

int opal_progress_unregister (opal_progress_callback_t cb)
{
int ret = _opal_progress_unregister (cb, callbacks, callbacks_len);
int ret;

opal_atomic_lock(&progress_lock);

ret = _opal_progress_unregister (cb, callbacks, &callbacks_len);

if (OPAL_SUCCESS != ret) {
/* if not in the high-priority array try to remove from the lp array.
* a callback will never be in both. */
return _opal_progress_unregister (cb, callbacks_lp, callbacks_lp_len);
ret = _opal_progress_unregister (cb, callbacks_lp, &callbacks_lp_len);
}

opal_atomic_unlock(&progress_lock);

return ret;
}