Skip to content

Commit

Permalink
prov/efa: Get efa_rdm_ep from wc's qpn
Browse files Browse the repository at this point in the history
Currently, we get ep pointer from the pkt_entry (wc's wr_id),
which may be not reliable because some wc can be unsolicited
(there is no pkts that the wc consumes). This patch changes
this mechanism to be deriving ep from the wc's qpn, which is
always available and more reliable. This is also what we
are doing for dgram cq which is closest to rdma-core interface.

Signed-off-by: Shi Jin <sjina@amazon.com>
  • Loading branch information
shijin-aws committed May 17, 2024
1 parent 4ee2cbe commit 25c4ef0
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 10 deletions.
26 changes: 16 additions & 10 deletions prov/efa/src/rdm/efa_rdm_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,26 +78,26 @@ static struct fi_ops efa_rdm_cq_fi_ops = {
* remote endpoint. These do not have a packet context, nor do they have a
* connid available.
*
* @param[in] ibv_cq_ex extended ibv cq
* @param[in] flags flags (such as FI_REMOTE_CQ_DATA)
* @param[in] ep efa_rdm_ep
* @param[in] pkt_entry packet entry
* @param[in] ibv_cq_ex extended ibv cq
*/
static
void efa_rdm_cq_proc_ibv_recv_rdma_with_imm_completion(
struct ibv_cq_ex *ibv_cq_ex,
uint64_t flags,
struct efa_rdm_pke *pkt_entry,
struct ibv_cq_ex *ibv_cq_ex)
struct efa_rdm_ep *ep,
struct efa_rdm_pke *pkt_entry
)
{
struct util_cq *target_cq;
int ret;
fi_addr_t src_addr;
struct efa_av *efa_av;
struct efa_rdm_ep *ep = pkt_entry->ep;
uint32_t imm_data = ibv_wc_read_imm_data(ibv_cq_ex);
uint32_t len = ibv_wc_read_byte_len(ibv_cq_ex);

assert(ep);

target_cq = ep->base_ep.util_ep.rx_cq;
efa_av = ep->base_ep.av;

Expand All @@ -123,6 +123,7 @@ void efa_rdm_cq_proc_ibv_recv_rdma_with_imm_completion(
/* Recv with immediate will consume a pkt_entry, but the pkt is not
filled, so free the pkt_entry and record we have one less posted
packet now. */
assert(pkt_entry);
ep->efa_rx_pkts_posted--;
efa_rdm_pke_release_rx(pkt_entry);
}
Expand Down Expand Up @@ -300,15 +301,20 @@ void efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq)
struct efa_rdm_ep *ep = NULL;
struct fi_cq_err_entry err_entry;
struct efa_rdm_cq *efa_rdm_cq;
struct efa_domain *efa_domain;
struct efa_qp *qp;

efa_rdm_cq = container_of(ibv_cq, struct efa_rdm_cq, ibv_cq);
efa_domain = container_of(efa_rdm_cq->util_cq.domain, struct efa_domain, util_domain);

/* Call ibv_start_poll only once */
err = ibv_start_poll(ibv_cq->ibv_cq_ex, &poll_cq_attr);
should_end_poll = !err;

while (!err) {
pkt_entry = (void *)(uintptr_t)ibv_cq->ibv_cq_ex->wr_id;
ep = pkt_entry->ep;
assert(ep);
qp = efa_domain->qp_table[ibv_wc_read_qp_num(ibv_cq->ibv_cq_ex) & efa_domain->qp_table_sz_m1];
ep = container_of(qp->base_ep, struct efa_rdm_ep, base_ep);
efa_av = ep->base_ep.av;
efa_rdm_tracepoint(poll_cq, (size_t) ibv_cq->ibv_cq_ex->wr_id);
opcode = ibv_wc_read_opcode(ibv_cq->ibv_cq_ex);
Expand Down Expand Up @@ -358,8 +364,9 @@ void efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq)
break;
case IBV_WC_RECV_RDMA_WITH_IMM:
efa_rdm_cq_proc_ibv_recv_rdma_with_imm_completion(
ibv_cq->ibv_cq_ex,
FI_REMOTE_CQ_DATA | FI_RMA | FI_REMOTE_WRITE,
pkt_entry, ibv_cq->ibv_cq_ex);
ep, pkt_entry);
break;
default:
EFA_WARN(FI_LOG_EP_CTRL,
Expand Down Expand Up @@ -389,7 +396,6 @@ void efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq)
.prov_errno = prov_errno,
.op_context = NULL
};
efa_rdm_cq = container_of(ibv_cq, struct efa_rdm_cq, ibv_cq);
ofi_cq_write_error(&efa_rdm_cq->util_cq, &err_entry);
}

Expand Down
6 changes: 6 additions & 0 deletions prov/efa/test/efa_unit_test_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,12 @@ static void test_rdm_cq_read_bad_send_status(struct efa_resource *resource,
ibv_cqx->end_poll = &efa_mock_ibv_end_poll_check_mock;
ibv_cqx->read_opcode = &efa_mock_ibv_read_opcode_return_mock;
ibv_cqx->read_vendor_err = &efa_mock_ibv_read_vendor_err_return_mock;
ibv_cqx->read_qp_num = &efa_mock_ibv_read_qp_num_return_mock;
will_return(efa_mock_ibv_start_poll_use_saved_send_wr_with_mock_status, IBV_WC_GENERAL_ERR);
will_return(efa_mock_ibv_end_poll_check_mock, NULL);
will_return(efa_mock_ibv_read_opcode_return_mock, IBV_WC_SEND);
will_return(efa_mock_ibv_read_vendor_err_return_mock, vendor_error);
will_return(efa_mock_ibv_read_qp_num_return_mock, 0);
ret = fi_cq_read(resource->cq, &cq_entry, 1);
/* fi_cq_read() called efa_mock_ibv_start_poll_use_saved_send_wr(), which pulled one send_wr from g_ibv_submitted_wr_idv=_vec */
assert_int_equal(g_ibv_submitted_wr_id_cnt, 0);
Expand Down Expand Up @@ -307,6 +309,7 @@ void test_ibv_cq_ex_read_bad_recv_status(struct efa_resource **state)
efa_rdm_cq->ibv_cq.ibv_cq_ex->end_poll = &efa_mock_ibv_end_poll_check_mock;
efa_rdm_cq->ibv_cq.ibv_cq_ex->read_opcode = &efa_mock_ibv_read_opcode_return_mock;
efa_rdm_cq->ibv_cq.ibv_cq_ex->read_vendor_err = &efa_mock_ibv_read_vendor_err_return_mock;
efa_rdm_cq->ibv_cq.ibv_cq_ex->read_qp_num = &efa_mock_ibv_read_qp_num_return_mock;

will_return(efa_mock_ibv_start_poll_return_mock, 0);
will_return(efa_mock_ibv_end_poll_check_mock, NULL);
Expand All @@ -315,6 +318,7 @@ void test_ibv_cq_ex_read_bad_recv_status(struct efa_resource **state)
* therefore use will_return_always()
*/
will_return_always(efa_mock_ibv_read_opcode_return_mock, IBV_WC_RECV);
will_return_always(efa_mock_ibv_read_qp_num_return_mock, 0);
will_return(efa_mock_ibv_read_vendor_err_return_mock, EFA_IO_COMP_STATUS_LOCAL_ERROR_UNRESP_REMOTE);
/* the recv error will not populate to application cq because it's an EFA internal error and
* and not related to any application recv. Currently we can only read the error from eq.
Expand Down Expand Up @@ -568,6 +572,7 @@ static void test_impl_ibv_cq_ex_read_unknow_peer_ah(struct efa_resource *resourc
efa_rdm_cq->ibv_cq.ibv_cq_ex->read_slid = &efa_mock_ibv_read_slid_return_mock;
efa_rdm_cq->ibv_cq.ibv_cq_ex->read_byte_len = &efa_mock_ibv_read_byte_len_return_mock;
efa_rdm_cq->ibv_cq.ibv_cq_ex->read_opcode = &efa_mock_ibv_read_opcode_return_mock;
efa_rdm_cq->ibv_cq.ibv_cq_ex->read_qp_num = &efa_mock_ibv_read_qp_num_return_mock;
efa_rdm_cq->ibv_cq.ibv_cq_ex->read_src_qp = &efa_mock_ibv_read_src_qp_return_mock;

if (support_efadv_cq) {
Expand All @@ -588,6 +593,7 @@ static void test_impl_ibv_cq_ex_read_unknow_peer_ah(struct efa_resource *resourc
will_return(efa_mock_ibv_read_slid_return_mock, 0xffff); // slid=0xffff(-1) indicates an unknown AH
will_return(efa_mock_ibv_read_byte_len_return_mock, pkt_entry->pkt_size);
will_return_maybe(efa_mock_ibv_read_opcode_return_mock, IBV_WC_RECV);
will_return_maybe(efa_mock_ibv_read_qp_num_return_mock, 0);
will_return_maybe(efa_mock_ibv_read_src_qp_return_mock, raw_addr.qpn);

/* Post receive buffer */
Expand Down
3 changes: 3 additions & 0 deletions prov/efa/test/efa_unit_test_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ void test_efa_rdm_ep_handshake_exchange_host_id(struct efa_resource **state, uin
efa_rdm_cq->ibv_cq.ibv_cq_ex->read_opcode = &efa_mock_ibv_read_opcode_return_mock;
efa_rdm_cq->ibv_cq.ibv_cq_ex->read_slid = &efa_mock_ibv_read_slid_return_mock;
efa_rdm_cq->ibv_cq.ibv_cq_ex->read_src_qp = &efa_mock_ibv_read_src_qp_return_mock;
efa_rdm_cq->ibv_cq.ibv_cq_ex->read_qp_num = &efa_mock_ibv_read_qp_num_return_mock;
efa_rdm_cq->ibv_cq.ibv_cq_ex->read_vendor_err = &efa_mock_ibv_read_vendor_err_return_mock;
efa_rdm_cq->ibv_cq.ibv_cq_ex->start_poll = &efa_mock_ibv_start_poll_return_mock;
efa_rdm_cq->ibv_cq.ibv_cq_ex->status = IBV_WC_SUCCESS;
Expand All @@ -189,6 +190,7 @@ void test_efa_rdm_ep_handshake_exchange_host_id(struct efa_resource **state, uin
will_return(efa_mock_ibv_next_poll_check_function_called_and_return_mock, ENOENT);
will_return(efa_mock_ibv_read_byte_len_return_mock, pkt_entry->pkt_size);
will_return(efa_mock_ibv_read_opcode_return_mock, IBV_WC_RECV);
will_return(efa_mock_ibv_read_qp_num_return_mock, 0);
will_return(efa_mock_ibv_read_slid_return_mock, efa_rdm_ep_get_peer_ahn(efa_rdm_ep, peer_addr));
will_return(efa_mock_ibv_read_src_qp_return_mock, raw_addr.qpn);
will_return(efa_mock_ibv_start_poll_return_mock, IBV_WC_SUCCESS);
Expand All @@ -199,6 +201,7 @@ void test_efa_rdm_ep_handshake_exchange_host_id(struct efa_resource **state, uin
*/
will_return(efa_mock_ibv_end_poll_check_mock, NULL);
will_return(efa_mock_ibv_read_opcode_return_mock, IBV_WC_SEND);
will_return(efa_mock_ibv_read_qp_num_return_mock, 0);
will_return(efa_mock_ibv_read_vendor_err_return_mock, FI_EFA_ERR_OTHER);
will_return(efa_mock_ibv_start_poll_return_mock, IBV_WC_SUCCESS);

Expand Down
5 changes: 5 additions & 0 deletions prov/efa/test/efa_unit_test_mocks.c
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ uint32_t efa_mock_ibv_read_vendor_err_return_mock(struct ibv_cq_ex *current)
return mock();
}

uint32_t efa_mock_ibv_read_qp_num_return_mock(struct ibv_cq_ex *current)
{
return mock();
}

int g_ofi_copy_from_hmem_iov_call_counter;
ssize_t efa_mock_ofi_copy_from_hmem_iov_inc_counter(void *dest, size_t size,
enum fi_hmem_iface hmem_iface, uint64_t device,
Expand Down
2 changes: 2 additions & 0 deletions prov/efa/test/efa_unit_test_mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ uint32_t efa_mock_ibv_read_opcode_return_mock(struct ibv_cq_ex *current);

uint32_t efa_mock_ibv_read_vendor_err_return_mock(struct ibv_cq_ex *current);

uint32_t efa_mock_ibv_read_qp_num_return_mock(struct ibv_cq_ex *current);

ssize_t __real_ofi_copy_from_hmem_iov(void *dest, size_t size,
enum fi_hmem_iface hmem_iface, uint64_t device,
const struct iovec *hmem_iov,
Expand Down

0 comments on commit 25c4ef0

Please sign in to comment.