Skip to content

Commit

Permalink
HG: Fix HG_Reset() to reset NA resources upon NA class change (fix #272)
Browse files Browse the repository at this point in the history
HG Core: when passing an address that does not match the handle's NA class,
NA resources must be freed and reallocated.

Make allocation of NA resources separate when allocating HG core handle.
  • Loading branch information
soumagne committed Feb 11, 2019
1 parent 9b1e7fa commit 9512b8b
Showing 1 changed file with 153 additions and 90 deletions.
243 changes: 153 additions & 90 deletions src/mercury_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,23 @@ hg_core_destroy(
struct hg_core_private_handle *hg_core_handle
);

/**
* Allocate NA resources.
*/
static hg_return_t
hg_core_alloc_na(
struct hg_core_private_handle *hg_core_handle,
hg_bool_t use_sm
);

/**
* Freee NA resources.
*/
static void
hg_core_free_na(
struct hg_core_private_handle *hg_core_handle
);

/**
* Reset handle.
*/
Expand Down Expand Up @@ -1668,11 +1685,8 @@ hg_core_addr_to_string(struct hg_core_private_class *hg_core_class, char *buf,

/*---------------------------------------------------------------------------*/
static struct hg_core_private_handle *
hg_core_create(struct hg_core_private_context *context,
hg_bool_t HG_UNUSED use_sm)
hg_core_create(struct hg_core_private_context *context, hg_bool_t use_sm)
{
na_class_t *na_class = context->core_context.core_class->na_class;
na_context_t *na_context = context->core_context.na_context;
struct hg_core_private_handle *hg_core_handle = NULL;
hg_return_t ret = HG_SUCCESS;

Expand All @@ -1691,14 +1705,8 @@ hg_core_create(struct hg_core_private_context *context,
hg_core_handle->core_handle.info.addr = HG_CORE_ADDR_NULL;
hg_core_handle->core_handle.info.id = 0;
hg_core_handle->core_handle.info.context_id = 0;
#ifdef HG_HAS_SM_ROUTING
if (use_sm) {
na_class = context->core_context.core_class->na_sm_class;
na_context = context->core_context.na_sm_context;
}
#endif
hg_core_handle->na_class = na_class;
hg_core_handle->na_context = na_context;

/* Default return code */
hg_core_handle->ret = HG_SUCCESS;

/* Add handle to handle list so that we can track it */
Expand All @@ -1710,45 +1718,131 @@ hg_core_create(struct hg_core_private_context *context,
/* Handle is not in use */
hg_atomic_init32(&hg_core_handle->in_use, HG_FALSE);

/* Initialize processing buffers and use unexpected message size */
/* Init in/out header */
hg_core_header_request_init(&hg_core_handle->in_header);
hg_core_header_response_init(&hg_core_handle->out_header);

/* Set refcount to 1 */
hg_atomic_init32(&hg_core_handle->ref_count, 1);

/* Increment N handles from HG context */
hg_atomic_incr32(&context->n_handles);

/* Alloc/init NA resources */
ret = hg_core_alloc_na(hg_core_handle, use_sm);
if (ret != HG_SUCCESS) {
NA_LOG_ERROR("Could not allocate NA handle ops");
ret = NA_NOMEM_ERROR;
goto done;
}

done:
if (ret != HG_SUCCESS) {
hg_core_destroy(hg_core_handle);
hg_core_handle = NULL;
}
return hg_core_handle;
}

/*---------------------------------------------------------------------------*/
static void
hg_core_destroy(struct hg_core_private_handle *hg_core_handle)
{
if (!hg_core_handle) goto done;

if (hg_atomic_decr32(&hg_core_handle->ref_count))
goto done; /* Cannot free yet */

/* Remove handle from list */
hg_thread_spin_lock(&HG_CORE_HANDLE_CONTEXT(hg_core_handle)->created_list_lock);
HG_LIST_REMOVE(hg_core_handle, created);
hg_thread_spin_unlock(&HG_CORE_HANDLE_CONTEXT(hg_core_handle)->created_list_lock);

/* Decrement N handles from HG context */
hg_atomic_decr32(&HG_CORE_HANDLE_CONTEXT(hg_core_handle)->n_handles);

/* Remove reference to HG addr */
hg_core_addr_free(HG_CORE_HANDLE_CLASS(hg_core_handle),
(struct hg_core_private_addr *) hg_core_handle->core_handle.info.addr);

hg_core_header_request_finalize(&hg_core_handle->in_header);
hg_core_header_response_finalize(&hg_core_handle->out_header);

/* Free extra data here if needed */
if (HG_CORE_HANDLE_CLASS(hg_core_handle)->more_data_release)
HG_CORE_HANDLE_CLASS(hg_core_handle)->more_data_release(
(hg_core_handle_t) hg_core_handle);

/* Free user data */
if (hg_core_handle->core_handle.data_free_callback)
hg_core_handle->core_handle.data_free_callback(
hg_core_handle->core_handle.data);

/* Free NA resources */
hg_core_free_na(hg_core_handle);

free(hg_core_handle);

done:
return;
}

/*---------------------------------------------------------------------------*/
static hg_return_t
hg_core_alloc_na(struct hg_core_private_handle *hg_core_handle,
hg_bool_t HG_UNUSED use_sm)
{
hg_return_t ret = HG_SUCCESS;

/* Set handle NA class/context */
hg_core_handle->na_class =
#ifdef HG_HAS_SM_ROUTING
(use_sm) ? HG_CORE_HANDLE_CLASS(hg_core_handle)->core_class.na_sm_class :
#endif
HG_CORE_HANDLE_CLASS(hg_core_handle)->core_class.na_class;
hg_core_handle->na_context =
#ifdef HG_HAS_SM_ROUTING
(use_sm) ? HG_CORE_HANDLE_CONTEXT(hg_core_handle)->core_context.na_sm_context :
#endif
HG_CORE_HANDLE_CONTEXT(hg_core_handle)->core_context.na_context;

/* Initialize in/out buffers and use unexpected message size */
hg_core_handle->core_handle.in_buf_size =
NA_Msg_get_max_unexpected_size(na_class);
NA_Msg_get_max_unexpected_size(hg_core_handle->na_class);
hg_core_handle->core_handle.out_buf_size =
NA_Msg_get_max_expected_size(na_class);
NA_Msg_get_max_expected_size(hg_core_handle->na_class);
hg_core_handle->core_handle.na_in_header_offset =
NA_Msg_get_unexpected_header_size(na_class);
NA_Msg_get_unexpected_header_size(hg_core_handle->na_class);
hg_core_handle->core_handle.na_out_header_offset =
NA_Msg_get_expected_header_size(na_class);
NA_Msg_get_expected_header_size(hg_core_handle->na_class);

hg_core_handle->core_handle.in_buf = NA_Msg_buf_alloc(na_class,
hg_core_handle->core_handle.in_buf_size,
hg_core_handle->core_handle.in_buf = NA_Msg_buf_alloc(
hg_core_handle->na_class, hg_core_handle->core_handle.in_buf_size,
&hg_core_handle->in_buf_plugin_data);
if (!hg_core_handle->core_handle.in_buf) {
HG_LOG_ERROR("Could not allocate buffer for input");
ret = HG_NOMEM_ERROR;
goto done;
}
NA_Msg_init_unexpected(na_class, hg_core_handle->core_handle.in_buf,
NA_Msg_init_unexpected(hg_core_handle->na_class,
hg_core_handle->core_handle.in_buf,
hg_core_handle->core_handle.in_buf_size);

hg_core_handle->core_handle.out_buf = NA_Msg_buf_alloc(na_class,
hg_core_handle->core_handle.out_buf_size,
hg_core_handle->core_handle.out_buf = NA_Msg_buf_alloc(
hg_core_handle->na_class, hg_core_handle->core_handle.out_buf_size,
&hg_core_handle->out_buf_plugin_data);
if (!hg_core_handle->core_handle.out_buf) {
HG_LOG_ERROR("Could not allocate buffer for output");
ret = HG_NOMEM_ERROR;
goto done;
}
NA_Msg_init_expected(na_class, hg_core_handle->core_handle.out_buf,
NA_Msg_init_expected(hg_core_handle->na_class,
hg_core_handle->core_handle.out_buf,
hg_core_handle->core_handle.out_buf_size);

/* Init in/out header */
hg_core_header_request_init(&hg_core_handle->in_header);
hg_core_header_response_init(&hg_core_handle->out_header);

/* Create NA operation IDs */
hg_core_handle->na_send_op_id = NA_Op_create(na_class);
hg_core_handle->na_recv_op_id = NA_Op_create(na_class);
hg_core_handle->na_send_op_id = NA_Op_create(hg_core_handle->na_class);
hg_core_handle->na_recv_op_id = NA_Op_create(hg_core_handle->na_class);
if (hg_core_handle->na_recv_op_id || hg_core_handle->na_send_op_id) {
if ((hg_core_handle->na_recv_op_id == NA_OP_ID_NULL)
|| (hg_core_handle->na_send_op_id == NA_OP_ID_NULL)) {
Expand All @@ -1761,55 +1855,32 @@ hg_core_create(struct hg_core_private_context *context,
hg_core_handle->na_op_count = 1; /* Default (no response) */
hg_atomic_init32(&hg_core_handle->na_op_completed_count, 0);

/* Set refcount to 1 */
hg_atomic_init32(&hg_core_handle->ref_count, 1);

/* Increment N handles from HG context */
hg_atomic_incr32(&context->n_handles);

done:
if (ret != HG_SUCCESS) {
hg_core_destroy(hg_core_handle);
hg_core_handle = NULL;
}
return hg_core_handle;
return ret;
}

/*---------------------------------------------------------------------------*/
static void
hg_core_destroy(struct hg_core_private_handle *hg_core_handle)
hg_core_free_na(struct hg_core_private_handle *hg_core_handle)
{
na_return_t na_ret;

if (!hg_core_handle) goto done;

if (hg_atomic_decr32(&hg_core_handle->ref_count)) {
/* Cannot free yet */
goto done;
/* Free eventual ack buffer */
if (hg_core_handle->ack_buf) {
NA_Msg_buf_free(hg_core_handle->na_class, hg_core_handle->ack_buf,
hg_core_handle->ack_buf_plugin_data);
hg_core_handle->ack_buf = NULL;
}

/* Remove handle from list */
hg_thread_spin_lock(&HG_CORE_HANDLE_CONTEXT(hg_core_handle)->created_list_lock);
HG_LIST_REMOVE(hg_core_handle, created);
hg_thread_spin_unlock(&HG_CORE_HANDLE_CONTEXT(hg_core_handle)->created_list_lock);

/* Decrement N handles from HG context */
hg_atomic_decr32(&HG_CORE_HANDLE_CONTEXT(hg_core_handle)->n_handles);

/* Remove reference to HG addr */
hg_core_addr_free(HG_CORE_HANDLE_CLASS(hg_core_handle),
(struct hg_core_private_addr *) hg_core_handle->core_handle.info.addr);

/* Destroy NA op IDs */
na_ret = NA_Op_destroy(hg_core_handle->na_class, hg_core_handle->na_send_op_id);
if (na_ret != NA_SUCCESS)
HG_LOG_ERROR("Could not destroy NA op ID");
NA_Op_destroy(hg_core_handle->na_class, hg_core_handle->na_recv_op_id);
na_ret = NA_Op_destroy(hg_core_handle->na_class, hg_core_handle->na_recv_op_id);
if (na_ret != NA_SUCCESS)
HG_LOG_ERROR("Could not destroy NA op ID");

hg_core_header_request_finalize(&hg_core_handle->in_header);
hg_core_header_response_finalize(&hg_core_handle->out_header);

/* Free buffers */
na_ret = NA_Msg_buf_free(hg_core_handle->na_class,
hg_core_handle->core_handle.in_buf, hg_core_handle->in_buf_plugin_data);
if (na_ret != NA_SUCCESS)
Expand All @@ -1818,24 +1889,6 @@ hg_core_destroy(struct hg_core_private_handle *hg_core_handle)
hg_core_handle->core_handle.out_buf, hg_core_handle->out_buf_plugin_data);
if (na_ret != NA_SUCCESS)
HG_LOG_ERROR("Could not destroy NA output msg buffer");

/* Free extra data here if needed */
if (HG_CORE_HANDLE_CLASS(hg_core_handle)->more_data_release)
HG_CORE_HANDLE_CLASS(hg_core_handle)->more_data_release(
(hg_core_handle_t) hg_core_handle);
if (hg_core_handle->ack_buf)
NA_Msg_buf_free(hg_core_handle->na_class, hg_core_handle->ack_buf,
hg_core_handle->ack_buf_plugin_data);

/* Free user data */
if (hg_core_handle->core_handle.data_free_callback)
hg_core_handle->core_handle.data_free_callback(
hg_core_handle->core_handle.data);

free(hg_core_handle);

done:
return;
}

/*---------------------------------------------------------------------------*/
Expand Down Expand Up @@ -4489,16 +4542,6 @@ HG_Core_reset(hg_core_handle_t handle, hg_core_addr_t addr, hg_id_t id)
ret = HG_INVALID_PARAM;
goto done;
}

#ifdef HG_HAS_SM_ROUTING
if (hg_core_addr
&& (hg_core_addr->core_addr.na_class != hg_core_handle->na_class)) {
HG_LOG_ERROR("Cannot reset handle to a different address NA class");
ret = HG_INVALID_PARAM;
goto done;
}
#endif

/* Not safe to reset
* TODO could add the ability to defer the reset operation */
if (hg_atomic_get32(&hg_core_handle->in_use)) {
Expand All @@ -4508,6 +4551,26 @@ HG_Core_reset(hg_core_handle_t handle, hg_core_addr_t addr, hg_id_t id)
ret = HG_PROTOCOL_ERROR;
goto done;
}

#ifdef HG_HAS_SM_ROUTING
if (hg_core_addr
&& (hg_core_addr->core_addr.na_class != hg_core_handle->na_class)) {
struct hg_core_private_context *private_context =
(struct hg_core_private_context *) hg_core_handle->core_handle.info.context;
hg_bool_t use_sm =
(private_context->core_context.core_class->na_sm_class
== hg_core_addr->core_addr.na_class);
/* In that case, we must free and re-allocate NA resources */
hg_core_free_na(hg_core_handle);
ret = hg_core_alloc_na(hg_core_handle, use_sm);
if (ret != HG_SUCCESS) {
HG_LOG_ERROR("Could not re-allocate handle NA resources");
goto done;
}
}
#endif

/* Reset handle */
hg_core_reset(hg_core_handle, HG_FALSE);

/* Set addr / RPC ID */
Expand Down

0 comments on commit 9512b8b

Please sign in to comment.