Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions oshmem/mca/atomic/ucx/atomic_ucx_cswap.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ int mca_atomic_ucx_cswap(shmem_ctx_t ctx,
UCP_ATOMIC_FETCH_OP_CSWAP, cond, prev, size,
rva, ucx_mkey->rkey,
opal_common_ucx_empty_complete_cb);

if (OPAL_LIKELY(!UCS_PTR_IS_ERR(status_ptr))) {
mca_spml_ucx_remote_op_posted(ucx_ctx, pe);
}

return opal_common_ucx_wait_request(status_ptr, ucx_ctx->ucp_worker,
"ucp_atomic_fetch_nb");
}
5 changes: 5 additions & 0 deletions oshmem/mca/atomic/ucx/atomic_ucx_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ int mca_atomic_ucx_op(shmem_ctx_t ctx,
status = ucp_atomic_post(ucx_ctx->ucp_peers[pe].ucp_conn,
op, value, size, rva,
ucx_mkey->rkey);

if (OPAL_LIKELY(UCS_OK == status)) {
mca_spml_ucx_remote_op_posted(ucx_ctx, pe);
}

return ucx_status_to_oshmem(status);
}

Expand Down
85 changes: 82 additions & 3 deletions oshmem/mca/spml/ucx/spml_ucx.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ mca_spml_ucx_t mca_spml_ucx = {
.num_disconnect = 1,
.heap_reg_nb = 0,
.enabled = 0,
.get_mkey_slow = NULL
.get_mkey_slow = NULL,
.synchronized_quiet = false
};

mca_spml_ucx_ctx_t mca_spml_ucx_ctx_default = {
Expand Down Expand Up @@ -216,6 +217,40 @@ static void dump_address(int pe, char *addr, size_t len)

static char spml_ucx_transport_ids[1] = { 0 };

int mca_spml_ucx_init_put_op_mask(mca_spml_ucx_ctx_t *ctx, size_t nprocs)
{
int res;

if (mca_spml_ucx.synchronized_quiet) {
ctx->put_proc_indexes = malloc(nprocs * sizeof(*ctx->put_proc_indexes));
if (NULL == ctx->put_proc_indexes) {
return OSHMEM_ERR_OUT_OF_RESOURCE;
}

OBJ_CONSTRUCT(&ctx->put_op_bitmap, opal_bitmap_t);
res = opal_bitmap_init(&ctx->put_op_bitmap, nprocs);
if (OPAL_SUCCESS != res) {
free(ctx->put_proc_indexes);
ctx->put_proc_indexes = NULL;
return res;
}

ctx->put_proc_count = 0;
}

return OSHMEM_SUCCESS;
}

int mca_spml_ucx_clear_put_op_mask(mca_spml_ucx_ctx_t *ctx)
{
if (mca_spml_ucx.synchronized_quiet && ctx->put_proc_indexes) {
OBJ_DESTRUCT(&ctx->put_op_bitmap);
free(ctx->put_proc_indexes);
}

return OSHMEM_SUCCESS;
}

int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
{
size_t i, j, n;
Expand All @@ -235,6 +270,11 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
goto error;
}

rc = mca_spml_ucx_init_put_op_mask(&mca_spml_ucx_ctx_default, nprocs);
if (OSHMEM_SUCCESS != rc) {
goto error;
}

err = ucp_worker_get_address(mca_spml_ucx_ctx_default.ucp_worker, &wk_local_addr, &wk_addr_len);
if (err != UCS_OK) {
goto error;
Expand Down Expand Up @@ -297,6 +337,8 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
free(mca_spml_ucx.remote_addrs_tbl[i]);
}
}

mca_spml_ucx_clear_put_op_mask(&mca_spml_ucx_ctx_default);
if (mca_spml_ucx_ctx_default.ucp_peers)
free(mca_spml_ucx_ctx_default.ucp_peers);
if (mca_spml_ucx.remote_addrs_tbl)
Expand Down Expand Up @@ -583,6 +625,11 @@ static int mca_spml_ucx_ctx_create_common(long options, mca_spml_ucx_ctx_t **ucx
goto error;
}

rc = mca_spml_ucx_init_put_op_mask(ucx_ctx, nprocs);
if (OSHMEM_SUCCESS != rc) {
goto error2;
}

for (i = 0; i < nprocs; i++) {
ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
ep_params.address = (ucp_address_t *)(mca_spml_ucx.remote_addrs_tbl[i]);
Expand Down Expand Up @@ -621,6 +668,8 @@ static int mca_spml_ucx_ctx_create_common(long options, mca_spml_ucx_ctx_t **ucx
}
}

mca_spml_ucx_clear_put_op_mask(ucx_ctx);

if (ucx_ctx->ucp_peers)
free(ucx_ctx->ucp_peers);

Expand Down Expand Up @@ -715,6 +764,7 @@ int mca_spml_ucx_put(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_add
void *rva;
spml_ucx_mkey_t *ucx_mkey;
mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx;
int res;
#if HAVE_DECL_UCP_PUT_NB
ucs_status_ptr_t request;
#else
Expand All @@ -725,12 +775,18 @@ int mca_spml_ucx_put(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_add
#if HAVE_DECL_UCP_PUT_NB
request = ucp_put_nb(ucx_ctx->ucp_peers[dst].ucp_conn, src_addr, size,
(uint64_t)rva, ucx_mkey->rkey, opal_common_ucx_empty_complete_cb);
return opal_common_ucx_wait_request(request, ucx_ctx->ucp_worker, "ucp_put_nb");
res = opal_common_ucx_wait_request(request, ucx_ctx->ucp_worker, "ucp_put_nb");
#else
status = ucp_put(ucx_ctx->ucp_peers[dst].ucp_conn, src_addr, size,
(uint64_t)rva, ucx_mkey->rkey);
return ucx_status_to_oshmem(status);
res = ucx_status_to_oshmem(status);
#endif

if (OPAL_LIKELY(OSHMEM_SUCCESS == res)) {
mca_spml_ucx_remote_op_posted(ucx_ctx, dst);
}

return res;
}

int mca_spml_ucx_put_nb(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_addr, int dst, void **handle)
Expand All @@ -744,6 +800,10 @@ int mca_spml_ucx_put_nb(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_
status = ucp_put_nbi(ucx_ctx->ucp_peers[dst].ucp_conn, src_addr, size,
(uint64_t)rva, ucx_mkey->rkey);

if (OPAL_LIKELY(status >= 0)) {
mca_spml_ucx_remote_op_posted(ucx_ctx, dst);
}

return ucx_status_to_oshmem_nb(status);
}

Expand All @@ -767,9 +827,28 @@ int mca_spml_ucx_fence(shmem_ctx_t ctx)

int mca_spml_ucx_quiet(shmem_ctx_t ctx)
{
int flush_get_data;
int ret;
unsigned i;
int idx;
mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx;

if (mca_spml_ucx.synchronized_quiet) {
for (i = 0; i < ucx_ctx->put_proc_count; i++) {
idx = ucx_ctx->put_proc_indexes[i];
ret = mca_spml_ucx_get_nb(ctx,
ucx_ctx->ucp_peers[idx].mkeys->super.super.va_base,
sizeof(flush_get_data), &flush_get_data, idx, NULL);
if (OMPI_SUCCESS != ret) {
oshmem_shmem_abort(-1);
return ret;
}

opal_bitmap_clear_bit(&ucx_ctx->put_op_bitmap, idx);
}
ucx_ctx->put_proc_count = 0;
}

opal_atomic_wmb();

ret = opal_common_ucx_worker_flush(ucx_ctx->ucp_worker);
Expand Down
19 changes: 18 additions & 1 deletion oshmem/mca/spml/ucx/spml_ucx.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

#include "opal/class/opal_free_list.h"
#include "opal/class/opal_list.h"
#include "opal/class/opal_bitmap.h"

#include "orte/runtime/orte_globals.h"
#include "opal/mca/common/ucx/common_ucx.h"
Expand Down Expand Up @@ -70,6 +71,9 @@ struct mca_spml_ucx_ctx {
ucp_worker_h ucp_worker;
ucp_peer_t *ucp_peers;
long options;
opal_bitmap_t put_op_bitmap;
int *put_proc_indexes;
unsigned put_proc_count;
};
typedef struct mca_spml_ucx_ctx mca_spml_ucx_ctx_t;

Expand Down Expand Up @@ -104,7 +108,7 @@ struct mca_spml_ucx {
mca_spml_ucx_ctx_t *aux_ctx;
pthread_spinlock_t async_lock;
int aux_refcnt;

bool synchronized_quiet;
};
typedef struct mca_spml_ucx mca_spml_ucx_t;

Expand Down Expand Up @@ -171,6 +175,9 @@ extern int spml_ucx_ctx_progress(void);
extern int spml_ucx_progress_aux_ctx(void);
void mca_spml_ucx_async_cb(int fd, short event, void *cbdata);

int mca_spml_ucx_init_put_op_mask(mca_spml_ucx_ctx_t *ctx, size_t nprocs);
int mca_spml_ucx_clear_put_op_mask(mca_spml_ucx_ctx_t *ctx);

static inline void mca_spml_ucx_aux_lock(void)
{
if (mca_spml_ucx.async_progress) {
Expand Down Expand Up @@ -224,6 +231,16 @@ static inline int ucx_status_to_oshmem_nb(ucs_status_t status)
#endif
}

static inline void mca_spml_ucx_remote_op_posted(mca_spml_ucx_ctx_t *ctx, int dst)
{
if (OPAL_UNLIKELY(mca_spml_ucx.synchronized_quiet)) {
if (!opal_bitmap_is_set_bit(&ctx->put_op_bitmap, dst)) {
ctx->put_proc_indexes[ctx->put_proc_count++] = dst;
opal_bitmap_set_bit(&ctx->put_op_bitmap, dst);
}
}
}

#define MCA_SPML_UCX_CTXS_ARRAY_SIZE 64
#define MCA_SPML_UCX_CTXS_ARRAY_INC 64

Expand Down
5 changes: 5 additions & 0 deletions oshmem/mca/spml/ucx/spml_ucx_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ static int mca_spml_ucx_component_register(void)
"Asynchronous progress tick granularity (in usec)",
&mca_spml_ucx.async_tick);

mca_spml_ucx_param_register_bool("synchronized_quiet", 0,
"Use synchronized quiet on shmem_quiet or shmem_barrier_all operations",
&mca_spml_ucx.synchronized_quiet);

opal_common_ucx_mca_var_register(&mca_spml_ucx_component.spmlm_version);

return OSHMEM_SUCCESS;
Expand Down Expand Up @@ -329,6 +333,7 @@ static void _ctx_cleanup(mca_spml_ucx_ctx_t *ctx)
mca_spml_ucx.num_disconnect,
ctx->ucp_worker);
free(del_procs);
mca_spml_ucx_clear_put_op_mask(ctx);
free(ctx->ucp_peers);
}

Expand Down