Skip to content

Commit

Permalink
Merge pull request openucx#1997 from brminich/topic/uct_prm_tm_prep_p2
Browse files Browse the repository at this point in the history
UCT/RC: TM preparations for accelerated tls p2
  • Loading branch information
yosefe committed Nov 14, 2017
2 parents 80672f8 + a6fc5c2 commit b339b76
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 200 deletions.
99 changes: 90 additions & 9 deletions src/uct/ib/rc/base/rc_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,50 @@ void uct_rc_fc_cleanup(uct_rc_fc_t *fc)
UCS_STATS_NODE_FREE(fc->stats);
}

static void uct_rc_ep_tag_qp_destroy(uct_rc_ep_t *ep)
{
#if IBV_EXP_HW_TM
uct_rc_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_rc_iface_t);
if (UCT_RC_IFACE_TM_ENABLED(iface)) {
uct_rc_iface_remove_qp(iface, ep->tm_qp->qp_num);
if (ibv_destroy_qp(ep->tm_qp)) {
ucs_warn("failed to destroy TM RNDV QP: %m");
}
}
#endif
}

static ucs_status_t uct_rc_ep_tag_qp_create(uct_rc_iface_t *iface,
uct_rc_ep_t *ep)
{
#if IBV_EXP_HW_TM
struct ibv_qp_cap cap;
ucs_status_t status;
int ret;

if (UCT_RC_IFACE_TM_ENABLED(iface)) {
/* Send queue of this QP will be used by FW for HW RNDV. Driver requires
* such a QP to be initialized with zero send queue length. */
status = uct_rc_iface_qp_create(iface, IBV_QPT_RC, &ep->tm_qp, &cap, 0);
if (status != UCS_OK) {
return status;
}

status = uct_rc_iface_qp_init(iface, ep->tm_qp);
if (status != UCS_OK) {
ret = ibv_destroy_qp(ep->tm_qp);
if (ret) {
ucs_warn("ibv_destroy_qp() returned %d: %m", ret);
}
return status;
}
uct_rc_iface_add_qp(iface, ep, ep->tm_qp->qp_num);
}
#endif
return UCS_OK;
}

UCS_CLASS_INIT_FUNC(uct_rc_ep_t, uct_rc_iface_t *iface)
{
struct ibv_qp_cap cap;
Expand All @@ -135,6 +179,11 @@ UCS_CLASS_INIT_FUNC(uct_rc_ep_t, uct_rc_iface_t *iface)
goto err_txqp_cleanup;
}

status = uct_rc_ep_tag_qp_create(iface, self);
if (status != UCS_OK) {
goto err_fc_cleanup;
}

self->sl = iface->super.config.sl; /* TODO multi-rail */
self->path_bits = iface->super.path_bits[0]; /* TODO multi-rail */

Expand All @@ -148,6 +197,8 @@ UCS_CLASS_INIT_FUNC(uct_rc_ep_t, uct_rc_iface_t *iface)
ucs_list_add_head(&iface->ep_list, &self->list);
return UCS_OK;

err_fc_cleanup:
uct_rc_fc_cleanup(&self->fc);
err_txqp_cleanup:
uct_rc_txqp_cleanup(&self->txqp);
err:
Expand All @@ -162,6 +213,7 @@ static UCS_CLASS_CLEANUP_FUNC(uct_rc_ep_t)

ucs_list_del(&self->list);
uct_rc_iface_remove_qp(iface, self->txqp.qp->qp_num);
uct_rc_ep_tag_qp_destroy(self);
uct_rc_ep_pending_purge(&self->super.super, NULL, NULL);
uct_rc_fc_cleanup(&self->fc);
uct_rc_txqp_cleanup(&self->txqp);
Expand All @@ -171,12 +223,19 @@ UCS_CLASS_DEFINE(uct_rc_ep_t, uct_base_ep_t)

ucs_status_t uct_rc_ep_get_address(uct_ep_h tl_ep, uct_ep_addr_t *addr)
{
uct_rc_ep_t *ep = ucs_derived_of(tl_ep, uct_rc_ep_t);
uct_rc_ep_t *ep = ucs_derived_of(tl_ep, uct_rc_ep_t);
uct_rc_ep_address_t *rc_addr = (uct_rc_ep_address_t*)addr;
uct_ib_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_ib_iface_t);
uct_rc_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_rc_iface_t);

uct_ib_pack_uint24(rc_addr->qp_num, ep->txqp.qp->qp_num);
rc_addr->atomic_mr_id = uct_ib_iface_get_atomic_mr_id(iface);
rc_addr->atomic_mr_id = uct_ib_iface_get_atomic_mr_id(&iface->super);

#if IBV_EXP_HW_TM
if (UCT_RC_IFACE_TM_ENABLED(iface)) {
uct_ib_pack_uint24(rc_addr->tm_qp_num, ep->tm_qp->qp_num);
}
#endif

return UCS_OK;
}

Expand All @@ -187,19 +246,41 @@ ucs_status_t uct_rc_ep_connect_to_ep(uct_ep_h tl_ep, const uct_device_addr_t *de
uct_rc_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_rc_iface_t);
const uct_ib_address_t *ib_addr = (const uct_ib_address_t *)dev_addr;
const uct_rc_ep_address_t *rc_addr = (const uct_rc_ep_address_t*)ep_addr;
uint32_t qp_num;
struct ibv_ah_attr ah_attr;
ucs_status_t status;

uct_ib_iface_fill_ah_attr_from_addr(&iface->super, ib_addr, ep->path_bits, &ah_attr);

status = uct_rc_iface_qp_connect(iface, ep->txqp.qp,
uct_ib_unpack_uint24(rc_addr->qp_num),
&ah_attr);
if (status == UCS_OK) {
ep->atomic_mr_offset = uct_ib_md_atomic_offset(rc_addr->atomic_mr_id);
#if IBV_EXP_HW_TM
if (UCT_RC_IFACE_TM_ENABLED(iface)) {
/* For HW TM we need 2 QPs, one of which will be used by the device for
* RNDV offload (for issuing RDMA reads and sending RNDV ACK). No WQEs
* should be posted to the send side of the QP which is owned by device. */
status = uct_rc_iface_qp_connect(iface, ep->tm_qp,
uct_ib_unpack_uint24(rc_addr->qp_num),
&ah_attr);
if (status != UCS_OK) {
return status;
}

/* Need to connect local ep QP to the one owned by device
* (and bound to XRQ) on the peer. */
qp_num = uct_ib_unpack_uint24(rc_addr->tm_qp_num);
} else
#endif
{
qp_num = uct_ib_unpack_uint24(rc_addr->qp_num);
}

return status;
status = uct_rc_iface_qp_connect(iface, ep->txqp.qp, qp_num, &ah_attr);
if (status != UCS_OK) {
return status;
}

ep->atomic_mr_offset = uct_ib_md_atomic_offset(rc_addr->atomic_mr_id);

return UCS_OK;
}

ucs_status_t uct_rc_modify_qp(uct_rc_txqp_t *txqp, enum ibv_qp_state state)
Expand Down
9 changes: 9 additions & 0 deletions src/uct/ib/rc/base/rc_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ typedef struct uct_rc_fc {
struct uct_rc_ep {
uct_base_ep_t super;
uct_rc_txqp_t txqp;
#if IBV_EXP_HW_TM
struct ibv_qp *tm_qp;
#endif
uint16_t atomic_mr_offset;
uint8_t sl;
uint8_t path_bits;
Expand All @@ -205,6 +208,12 @@ UCS_CLASS_DECLARE(uct_rc_ep_t, uct_rc_iface_t*);

typedef struct uct_rc_ep_address {
uct_ib_uint24_t qp_num;
#if IBV_EXP_HW_TM
/* For RNDV TM enabling 2 QPs should be created, one is for sending WRs and
* another one for HW (device will use it for RDMA reads and sending RNDV
* Complete messages). */
uct_ib_uint24_t tm_qp_num;
#endif
uint8_t atomic_mr_id;
} UCS_S_PACKED uct_rc_ep_address_t;

Expand Down
29 changes: 29 additions & 0 deletions src/uct/ib/rc/base/rc_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,35 @@ ucs_status_t uct_rc_iface_query(uct_rc_iface_t *iface,
return UCS_OK;
}

ucs_status_t uct_rc_iface_get_address(uct_iface_h tl_iface,
uct_iface_addr_t *addr)
{
uct_rc_iface_t UCS_V_UNUSED *iface = ucs_derived_of(tl_iface,
uct_rc_iface_t);

*(uint8_t*)addr = UCT_RC_IFACE_TM_ENABLED(iface) ?
UCT_RC_IFACE_ADDR_TYPE_TM :
UCT_RC_IFACE_ADDR_TYPE_BASIC;
return UCS_OK;
}

int uct_rc_iface_is_reachable(const uct_iface_h tl_iface,
const uct_device_addr_t *dev_addr,
const uct_iface_addr_t *iface_addr)
{
uct_rc_iface_t UCS_V_UNUSED *iface = ucs_derived_of(tl_iface,
uct_rc_iface_t);
uint8_t my_type = UCT_RC_IFACE_TM_ENABLED(iface) ?
UCT_RC_IFACE_ADDR_TYPE_TM :
UCT_RC_IFACE_ADDR_TYPE_BASIC;

if ((iface_addr != NULL) && (my_type != *(uint8_t*)iface_addr)) {
return 0;
}

return uct_ib_iface_is_reachable(tl_iface, dev_addr, iface_addr);
}

void uct_rc_iface_add_qp(uct_rc_iface_t *iface, uct_rc_ep_t *ep,
unsigned qp_num)
{
Expand Down
17 changes: 17 additions & 0 deletions src/uct/ib/rc/base/rc_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@
_desc->super.user_comp = _comp;


enum {
UCT_RC_IFACE_ADDR_TYPE_BASIC,

/* Tag Matching address. It additionaly contains QP number which
* is used for hardware offloads. */
UCT_RC_IFACE_ADDR_TYPE_TM,
UCT_RC_IFACE_ADDR_TYPE_LAST
};


enum {
UCT_RC_IFACE_STAT_RX_COMPLETION,
UCT_RC_IFACE_STAT_TX_COMPLETION,
Expand Down Expand Up @@ -385,6 +395,13 @@ extern ucs_config_field_t uct_rc_fc_config_table[];
ucs_status_t uct_rc_iface_query(uct_rc_iface_t *iface,
uct_iface_attr_t *iface_attr);

ucs_status_t uct_rc_iface_get_address(uct_iface_h tl_iface,
uct_iface_addr_t *addr);

int uct_rc_iface_is_reachable(const uct_iface_h tl_iface,
const uct_device_addr_t *dev_addr,
const uct_iface_addr_t *iface_addr);

ucs_status_t uct_rc_iface_tag_init(uct_rc_iface_t *iface,
uct_rc_iface_config_t *config,
struct ibv_exp_create_srq_attr *srq_init_attr,
Expand Down
29 changes: 0 additions & 29 deletions src/uct/ib/rc/verbs/rc_verbs.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,12 @@
#include "rc_verbs_common.h"


enum {
UCT_RC_VERBS_IFACE_ADDR_TYPE_BASIC,

/* Tag Matching address. It additionaly contains QP number which
* is used for hardware offloads. */
UCT_RC_VERBS_IFACE_ADDR_TYPE_TM,
UCT_RC_VERBS_IFACE_ADDR_TYPE_LAST
};


/**
* RC verbs communication context.
*/
typedef struct uct_rc_verbs_ep {
uct_rc_ep_t super;
uct_rc_verbs_txcnt_t txcnt;
#if IBV_EXP_HW_TM
struct ibv_qp *tm_qp;
#endif
} uct_rc_verbs_ep_t;


Expand Down Expand Up @@ -62,14 +49,6 @@ typedef struct uct_rc_verbs_iface {

#if IBV_EXP_HW_TM

/* For RNDV TM enabling 2 QPs should be created, one is for sending WRs and
* another one for HW (device will use it for RDMA reads and sending RNDV
* Complete messages).*/
typedef struct uct_rc_verbs_ep_tm_address {
uct_rc_ep_address_t super;
uct_ib_uint24_t tm_qp_num;
} UCS_S_PACKED uct_rc_verbs_ep_tm_address_t;

# define UCT_RC_VERBS_CHECK_RES_PTR(_iface, _ep) \
UCT_RC_CHECK_CQE_RET(_iface, _ep, &(_ep)->txqp, \
UCS_STATUS_PTR(UCS_ERR_NO_RESOURCE)) \
Expand All @@ -96,8 +75,6 @@ ucs_status_ptr_t uct_rc_verbs_ep_tag_rndv_zcopy(uct_ep_h tl_ep, uct_tag_t tag,
size_t iovcnt,
uct_completion_t *comp);

ucs_status_t uct_rc_verbs_ep_tag_rndv_cancel(uct_ep_h tl_ep, void *op);

ucs_status_t uct_rc_verbs_ep_tag_rndv_request(uct_ep_h tl_ep, uct_tag_t tag,
const void* header,
unsigned header_length);
Expand Down Expand Up @@ -184,12 +161,6 @@ ucs_status_t uct_rc_verbs_ep_atomic_cswap32(uct_ep_h tl_ep, uint32_t compare, ui
ucs_status_t uct_rc_verbs_ep_flush(uct_ep_h tl_ep, unsigned flags,
uct_completion_t *comp);

ucs_status_t uct_rc_verbs_ep_connect_to_ep(uct_ep_h tl_ep,
const uct_device_addr_t *dev_addr,
const uct_ep_addr_t *ep_addr);

ucs_status_t uct_rc_verbs_ep_get_address(uct_ep_h tl_ep, uct_ep_addr_t *addr);

ucs_status_t uct_rc_verbs_ep_fc_ctrl(uct_ep_t *tl_ep, unsigned op,
uct_rc_fc_request_t *req);

Expand Down
Loading

0 comments on commit b339b76

Please sign in to comment.