Skip to content

Commit

Permalink
Improved timers (#272)
Browse files Browse the repository at this point in the history
* improved timers

* improved cancellation and destruction of timers

* added missing codecov token in github action

* trying to fix code coverage workflow

* fixing typo

* disallow margo_timer_start when timer is being canceled

* complete overhaul of the timer system

* added margo_timer_cancel_many
  • Loading branch information
mdorier authored Apr 12, 2024
1 parent 765fef2 commit 64ad2f8
Show file tree
Hide file tree
Showing 8 changed files with 513 additions and 174 deletions.
9 changes: 4 additions & 5 deletions .github/workflows/codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,14 @@ jobs:
- name: Build code and run unit tests
run: |
spack env activate -d tests
eval `spack env activate --sh tests`
./prepare.sh
./configure --enable-coverage --prefix=`pwd`
eval `spack env activate --sh tests` &&
./prepare.sh &&
./configure --enable-coverage --prefix=`pwd` &&
make check
make clean
- uses: codecov/codecov-action@v3
with:
token: ${{ secrets.CODECOV_TOKEN }}
fail_ci_if_error: true
verbose: true
gcov: true
63 changes: 60 additions & 3 deletions include/margo-timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ typedef struct margo_timer* margo_timer_t;

/**
* @brief Creates a timer object.
* The callback will be submitted to the handler pool.
*
* @param mid Margo instance
* @param cb_fn Callback to call when the timer finishes
Expand All @@ -39,6 +40,33 @@ int margo_timer_create(margo_instance_id mid,
void* cb_dat,
margo_timer_t* timer);

/**
* @brief Creates a timer object and specifies the pool
* in which to run the callback.
*
* @note Passing ABT_POOL_NULL as the pool is allowed.
* In this case, the callback will be invoked directly
* within the ULT that runs the progress loop. This should
* generally be avoided unless the callback is very short,
* and does not call any margo_* or HG_* function. A typical
* example of a valid callback would be one that simply
* sets the value of an ABT_eventual, or one that submits
* a ULT and returns.
*
* @param mid Margo instance
* @param cb_fn Callback to call when the timer finishes
* @param cb_dat Callback data
* @param pool Pool in which to run the callback
* @param timer Resulting timer
*
* @return 0 on success, negative value on failure
*/
int margo_timer_create_with_pool(margo_instance_id mid,
margo_timer_callback_fn cb_fn,
void* cb_dat,
ABT_pool pool,
margo_timer_t* timer);

/**
* @brief Start the timer with a provided timeout.
*
Expand All @@ -50,7 +78,15 @@ int margo_timer_create(margo_instance_id mid,
int margo_timer_start(margo_timer_t timer, double timeout_ms);

/**
* @brief Cancel a started timer.
* @brief Cancel a started timer. If the timer's callback
* has already been submitted as a ULT, this function will
* block until the ULT has executed. If the ULT hasn't started
* yet when margo_timer_cancel is called, the ULT won't run
* the callback and will simply return. If the ULT had started,
* it will run the callback to completion.
*
* This function guarantees that there won't be any invokation
* of the callback after margo_timer_cancel returns.
*
* @param timer Timer to cancel
*
Expand All @@ -59,8 +95,29 @@ int margo_timer_start(margo_timer_t timer, double timeout_ms);
int margo_timer_cancel(margo_timer_t timer);

/**
* @brief Destroys a timer. If the timer was started,
* this function will also cancel it.
* @brief Cancel n timers, blocking until it can ensure that
* no callback associated with any of the timers will be called.
*
* This function is more efficient than calling margo_timer_cancel
* in a loop because it requests the cancelation of all the timers
* before blocking.
*
* @warning All the timers must be associated with the same margo
* instance.
*
* @param n Number of timers
* @param timer Array of timers to cancel
*
* @return 0 on success, negative value on failure
*/
int margo_timer_cancel_many(size_t n, margo_timer_t* timers);

/**
* @brief Destroys a timer.
*
* @important This function will not cancel the timer.
* If it was started, it will still fire, and the timer's
* memory will be freed afterward.
*
* @param timer Timer to destroy.
*
Expand Down
45 changes: 26 additions & 19 deletions src/margo-core.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ static void margo_cleanup(margo_instance_id mid)

/* shut down pending timers */
MARGO_TRACE(mid, "Cleaning up pending timers");
__margo_timer_list_free(mid, mid->timer_list);
__margo_timer_list_free(mid);

MARGO_TRACE(mid, "Destroying mutex and condition variables");
ABT_mutex_free(&mid->finalize_mutex);
Expand Down Expand Up @@ -883,12 +883,11 @@ static hg_return_t margo_cb(const struct hg_cb_info* info)

if (hret == HG_CANCELED && req->timer) { hret = HG_TIMEOUT; }

/* remove timer if there is one and it is still in place (i.e., not timed
* out) */
if (hret != HG_TIMEOUT && req->timer && req->handle) {
__margo_timer_destroy(mid, req->timer);
/* remove timer if there is one and it is still in place */
if (req->timer) {
margo_timer_cancel(req->timer);
margo_timer_destroy(req->timer);
}
if (req->timer) { free(req->timer); }

if (req->kind == MARGO_REQ_CALLBACK) {
if (req->u.callback.cb) req->u.callback.cb(req->u.callback.uargs, hret);
Expand Down Expand Up @@ -955,7 +954,6 @@ static void margo_forward_timeout_cb(void* arg)
margo_request req = (margo_request)arg;
/* cancel the Mercury op if the forward timed out */
HG_Cancel(req->handle);
return;
}

static hg_return_t margo_provider_iforward_internal(
Expand Down Expand Up @@ -1084,18 +1082,24 @@ static hg_return_t margo_provider_iforward_internal(

if (timeout_ms > 0) {
/* set a timer object to expire when this forward times out */
req->timer = calloc(1, sizeof(*(req->timer)));
if (!(req->timer)) {
hret = margo_timer_create_with_pool(mid, margo_forward_timeout_cb, req,
ABT_POOL_NULL, &req->timer);
if (hret != HG_SUCCESS) {
// LCOV_EXCL_START
MARGO_EVENTUAL_FREE(&eventual);
margo_error(mid, "in %s: could not allocate memory for timer",
__func__);
hret = HG_NOMEM_ERROR;
margo_error(mid, "in %s: could not create timer", __func__);
goto finish;
// LCOV_EXCL_END
}
hret = margo_timer_start(req->timer, timeout_ms);
if (hret != HG_SUCCESS) {
// LCOV_EXCL_START
margo_timer_destroy(req->timer);
MARGO_EVENTUAL_FREE(&eventual);
margo_error(mid, "in %s: could not start timer", __func__);
goto finish;
// LCOV_EXCL_END
}
__margo_timer_init(mid, req->timer, margo_forward_timeout_cb, req,
timeout_ms);
}

// get parent RPC id
Expand All @@ -1120,8 +1124,8 @@ static hg_return_t margo_provider_iforward_internal(
/* remove timer if HG_Forward failed */
if (hret != HG_SUCCESS && req->timer) {
// LCOV_EXCL_START
__margo_timer_destroy(mid, req->timer);
free(req->timer);
margo_timer_cancel(req->timer);
margo_timer_destroy(req->timer);
req->timer = NULL;
// LCOV_EXCL_END
}
Expand Down Expand Up @@ -1805,7 +1809,7 @@ static void margo_thread_sleep_cb(void* arg)

void margo_thread_sleep(margo_instance_id mid, double timeout_ms)
{
margo_timer sleep_timer;
margo_timer_t sleep_timer;
margo_thread_sleep_cb_dat sleep_cb_dat;

/* monitoring */
Expand All @@ -1822,8 +1826,9 @@ void margo_thread_sleep(margo_instance_id mid, double timeout_ms)
sleep_cb_dat.is_asleep = 1;

/* initialize the sleep timer */
__margo_timer_init(mid, &sleep_timer, margo_thread_sleep_cb, &sleep_cb_dat,
timeout_ms);
margo_timer_create_with_pool(mid, margo_thread_sleep_cb, &sleep_cb_dat,
ABT_POOL_NULL, &sleep_timer);
margo_timer_start(sleep_timer, timeout_ms);

/* yield thread for specified timeout */
ABT_mutex_lock(sleep_cb_dat.mutex);
Expand All @@ -1835,6 +1840,8 @@ void margo_thread_sleep(margo_instance_id mid, double timeout_ms)
ABT_mutex_free(&sleep_cb_dat.mutex);
ABT_cond_free(&sleep_cb_dat.cond);

margo_timer_destroy(sleep_timer);

/* monitoring */
__MARGO_MONITOR(mid, FN_END, sleep, monitoring_args);
}
Expand Down
2 changes: 1 addition & 1 deletion src/margo-init.c
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ margo_instance_id margo_init_ext(const char* address,
error:
if (mid) {
__margo_handle_cache_destroy(mid);
__margo_timer_list_free(mid, mid->timer_list);
__margo_timer_list_free(mid);
ABT_mutex_free(&mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond);
ABT_mutex_free(&mid->pending_operations_mtx);
Expand Down
14 changes: 8 additions & 6 deletions src/margo-instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <math.h>

#include "margo.h"
#include "margo-timer.h"
#include "margo-config.h"
#include "margo-abt-config.h"
#include "margo-hg-config.h"
Expand Down Expand Up @@ -129,13 +130,14 @@ struct margo_instance {

#define MARGO_RPC_POOL(mid) (mid)->abt.pools[mid->rpc_pool_idx].pool

typedef enum margo_request_kind {
typedef enum margo_request_kind
{
MARGO_REQ_EVENTUAL,
MARGO_REQ_CALLBACK
} margo_request_kind;

struct margo_request_struct {
margo_timer* timer;
margo_timer_t timer;
margo_instance_id mid;
hg_handle_t handle;
margo_monitor_data_t monitor_data;
Expand All @@ -157,10 +159,10 @@ struct margo_request_struct {
struct margo_rpc_data {
margo_instance_id mid;
_Atomic(ABT_pool) pool;
char* rpc_name;
hg_proc_cb_t in_proc_cb; /* user-provided input proc */
hg_proc_cb_t out_proc_cb; /* user-provided output proc */
void* user_data;
char* rpc_name;
hg_proc_cb_t in_proc_cb; /* user-provided input proc */
hg_proc_cb_t out_proc_cb; /* user-provided output proc */
void* user_data;
void (*user_free_callback)(void*);
};

Expand Down
43 changes: 4 additions & 39 deletions src/margo-timer-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,49 +13,19 @@
extern "C" {
#endif

typedef struct margo_timer {
margo_instance_id mid;
margo_timer_callback_fn cb_fn;
void* cb_dat;
double expiration;
struct margo_timer* next;
struct margo_timer* prev;
} margo_timer;
struct margo_timer_list;

/**
* Creates a margo_timer_list.
* @returns a new margo_timer_list, or NULL if failed
* Creates and initializes the margo_timer_list associated with the
* margo instance.
*/
struct margo_timer_list* __margo_timer_list_create();

/**
* Frees the timer list
* @param [in] timer_lst timer list to free
*/
void __margo_timer_list_free(margo_instance_id mid,
struct margo_timer_list* timer_lst);

/**
* Initializes a margo timer object which will perform some action
* after a specified time duration
* @param [in] mid Margo instance
* @param [in] timer pointer to margo timer object to be initialized
* @param [in] cb_fn callback function for timeout action
* @param [in] cb_dat callback data passed to the callback function
* @param [in] timeout_ms timeout duration in milliseconds
*/
void __margo_timer_init(margo_instance_id mid,
margo_timer* timer,
margo_timer_callback_fn cb_fn,
void* cb_dat,
double timeout_ms);

/**
* Destroys a margo timer object which was previously initialized
* @param [in] mid Margo instance
* @param [in] timer pointer to margo timer object to be destroyed
*/
void __margo_timer_destroy(margo_instance_id mid, margo_timer* timer);
void __margo_timer_list_free(margo_instance_id mid);

/**
* Checks for expired timers and performs specified timeout action
Expand All @@ -73,11 +43,6 @@ void __margo_check_timers(margo_instance_id mid);
int __margo_timer_get_next_expiration(margo_instance_id mid,
double* next_timer_exp);

/**
* Gets the margo_timer_list from the margo instance.
*/
struct margo_timer_list* __margo_get_timer_list(margo_instance_id mid);

#ifdef __cplusplus
}
#endif
Expand Down
Loading

0 comments on commit 64ad2f8

Please sign in to comment.