Skip to content

Commit

Permalink
prov/efa: Avoid iterating cq->ep_list in cq read
Browse files Browse the repository at this point in the history
This patch includes two aspects of changes that get rid
of the ep_list iteration in cq and cntr progress.

1. Pre-post the internal rx pkts and grow pools during ep enable,
and only repost rx pkts when the ep has recv wc.

2. Move the rnr backoff peer list and other queued ope/pkt lists
into efa_domain, so fi_cq_read can progress these lists once
no matter how many eps are bind.

These 2 changes finally make efa_rdm_ep_progress.c not useful
because there is no dedicated progress for each ep any more.

Signed-off-by: Shi Jin <sjina@amazon.com>
  • Loading branch information
shijin-aws committed Jun 11, 2024
1 parent 7f7c94d commit f589580
Show file tree
Hide file tree
Showing 19 changed files with 609 additions and 131 deletions.
1 change: 0 additions & 1 deletion libfabric.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,6 @@
<ClCompile Include="prov\efa\src\rdm\efa_rdm_atomic.c" />
<ClCompile Include="prov\efa\src\rdm\efa_rdm_ep_utils.c" />
<ClCompile Include="prov\efa\src\rdm\efa_rdm_ep_fiops.c" />
<ClCompile Include="prov\efa\src\rdm\efa_rdm_ep_progress.c" />
<ClCompile Include="prov\efa\src\rdm\efa_rdm_msg.c" />
<ClCompile Include="prov\efa\src\rdm\efa_rdm_pke.c" />
<ClCompile Include="prov\efa\src\rdm\efa_rdm_pke_cmd.c" />
Expand Down
1 change: 0 additions & 1 deletion prov/efa/Makefile.include
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ _efa_files = \
prov/efa/src/rdm/efa_rdm_cq.c \
prov/efa/src/rdm/efa_rdm_ep_utils.c \
prov/efa/src/rdm/efa_rdm_ep_fiops.c \
prov/efa/src/rdm/efa_rdm_ep_progress.c \
prov/efa/src/rdm/efa_rdm_rma.c \
prov/efa/src/rdm/efa_rdm_msg.c \
prov/efa/src/rdm/efa_rdm_pke.c \
Expand Down
6 changes: 3 additions & 3 deletions prov/efa/docs/pkt-processing.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ keep track of whether long message receives are completed. Just like the txe,
when a receive operation is completed a receive completion is written to the app
and the `rxe` (RX entry) is released.

`efa_rdm_ep_progress` is the progress handler we register when the completion queue
`efa_rdm_cq_progress` is the progress handler we register when the completion queue
is created and is called via the util completion queue functions. While the EFA
device will progress sends and receives posted to it, the Libfabric provider
has to process those device completions, potentially copy data out of a bounce
Expand All @@ -55,7 +55,7 @@ those cases.

We also may queue an rxe/te if we're unable to continue sending segments
or if we fail to post a control message for that entry. You'll find the lists
where those are queued and progressed in `efa_rdm_ep_progress_internal`.
where those are queued and progressed in `efa_domain_progress_rdm_peers_and_queues`.

### Dealing with receiver not ready errors (RNR)

Expand All @@ -77,5 +77,5 @@ to that peer until the peer exits backoff, meaning we either received a
successful send completion for that peer or the backoff timer expires.

See `efa_rdm_ep_queue_rnr_pkt` for where the packets are queued and backoff timers are
set, and see `efa_rdm_ep_check_peer_backoff_timer` for where those timers are
set, and see `efa_domain_progress_rdm_peers_and_queues` for where those timers are
checked and we allow sends to that remote peer again.
11 changes: 4 additions & 7 deletions prov/efa/src/efa_cntr.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,23 +143,20 @@ static struct fi_ops efa_cntr_fi_ops = {

static void efa_rdm_cntr_progress(struct util_cntr *cntr)
{
struct util_ep *ep;
struct fid_list_entry *fid_entry;
struct dlist_entry *item;
struct efa_cntr *efa_cntr;
struct efa_domain *efa_domain;
struct efa_ibv_cq_poll_list_entry *poll_list_entry;

ofi_genlock_lock(&cntr->ep_list_lock);
efa_cntr = container_of(cntr, struct efa_cntr, util_cntr);
efa_domain = container_of(efa_cntr->util_cntr.domain, struct efa_domain, util_domain);

dlist_foreach(&efa_cntr->ibv_cq_poll_list, item) {
poll_list_entry = container_of(item, struct efa_ibv_cq_poll_list_entry, entry);
efa_rdm_cq_poll_ibv_cq(efa_env.efa_cq_read_size, poll_list_entry->cq);
}
dlist_foreach(&cntr->ep_list, item) {
fid_entry = container_of(item, struct fid_list_entry, entry);
ep = container_of(fid_entry->fid, struct util_ep, ep_fid.fid);
ep->progress(ep);
}
efa_domain_progress_rdm_peers_and_queues(efa_domain);
ofi_genlock_unlock(&cntr->ep_list_lock);
}

Expand Down
207 changes: 207 additions & 0 deletions prov/efa/src/efa_domain.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ static int efa_domain_init_rdm(struct efa_domain *efa_domain, struct fi_info *in
efa_domain->rdm_cq_size = MAX(info->rx_attr->size + info->tx_attr->size,
efa_env.cq_size);
efa_domain->num_read_msg_in_flight = 0;

dlist_init(&efa_domain->ope_queued_rnr_list);
dlist_init(&efa_domain->ope_queued_ctrl_list);
dlist_init(&efa_domain->ope_queued_read_list);
dlist_init(&efa_domain->ope_longcts_send_list);
dlist_init(&efa_domain->peer_backoff_list);
dlist_init(&efa_domain->handshake_queued_peer_list);
return 0;
}

Expand Down Expand Up @@ -430,3 +437,203 @@ efa_domain_ops_open(struct fid *fid, const char *ops_name, uint64_t flags,

return ret;
}


void efa_domain_progress_rdm_peers_and_queues(struct efa_domain *domain)
{
struct efa_rdm_peer *peer;
struct dlist_entry *tmp;
struct efa_rdm_ope *ope;
int ret;

assert(domain->info->ep_attr->type == FI_EP_RDM);

/* Update timers for peers that are in backoff list*/
dlist_foreach_container_safe(&domain->peer_backoff_list, struct efa_rdm_peer,
peer, rnr_backoff_entry, tmp) {
if (ofi_gettime_us() >= peer->rnr_backoff_begin_ts +
peer->rnr_backoff_wait_time) {
peer->flags &= ~EFA_RDM_PEER_IN_BACKOFF;
dlist_remove(&peer->rnr_backoff_entry);
}
}

/*
* Resend handshake packet for any peers where the first
* handshake send failed.
*/
dlist_foreach_container_safe(&domain->handshake_queued_peer_list,
struct efa_rdm_peer, peer,
handshake_queued_entry, tmp) {
if (peer->flags & EFA_RDM_PEER_IN_BACKOFF)
continue;

ret = efa_rdm_ep_post_handshake(peer->ep, peer);
if (ret == -FI_EAGAIN)
break;

if (OFI_UNLIKELY(ret)) {
EFA_WARN(FI_LOG_EP_CTRL,
"Failed to post HANDSHAKE to peer %ld: %s\n",
peer->efa_fiaddr, fi_strerror(-ret));
efa_base_ep_write_eq_error(&peer->ep->base_ep, -ret, FI_EFA_ERR_PEER_HANDSHAKE);
return;
}

dlist_remove(&peer->handshake_queued_entry);
peer->flags &= ~EFA_RDM_PEER_HANDSHAKE_QUEUED;
peer->flags |= EFA_RDM_PEER_HANDSHAKE_SENT;
}

/*
* Resend queued RNR pkts
*/
dlist_foreach_container_safe(&domain->ope_queued_rnr_list,
struct efa_rdm_ope,
ope, queued_rnr_entry, tmp) {
peer = efa_rdm_ep_get_peer(ope->ep, ope->addr);
assert(peer);

if (peer->flags & EFA_RDM_PEER_IN_BACKOFF)
continue;

assert(ope->internal_flags & EFA_RDM_OPE_QUEUED_RNR);
assert(!dlist_empty(&ope->queued_pkts));
ret = efa_rdm_ep_post_queued_pkts(ope->ep, &ope->queued_pkts);

if (ret == -FI_EAGAIN)
break;

if (OFI_UNLIKELY(ret)) {
assert(ope->type == EFA_RDM_RXE || ope->type == EFA_RDM_TXE);
if (ope->type == EFA_RDM_RXE)
efa_rdm_rxe_handle_error(ope, -ret, FI_EFA_ERR_PKT_SEND);
else
efa_rdm_txe_handle_error(ope, -ret, FI_EFA_ERR_PKT_SEND);
return;
}

dlist_remove(&ope->queued_rnr_entry);
ope->internal_flags &= ~EFA_RDM_OPE_QUEUED_RNR;
}

/*
* Send any queued ctrl packets.
*/
dlist_foreach_container_safe(&domain->ope_queued_ctrl_list,
struct efa_rdm_ope,
ope, queued_ctrl_entry, tmp) {
peer = efa_rdm_ep_get_peer(ope->ep, ope->addr);
assert(peer);

if (peer->flags & EFA_RDM_PEER_IN_BACKOFF)
continue;

assert(ope->internal_flags & EFA_RDM_OPE_QUEUED_CTRL);
ret = efa_rdm_ope_post_send(ope, ope->queued_ctrl_type);
if (ret == -FI_EAGAIN)
break;

if (OFI_UNLIKELY(ret)) {
efa_rdm_rxe_handle_error(ope, -ret, FI_EFA_ERR_PKT_POST);
return;
}

/* it can happen that efa_rdm_ope_post_send() released ope
* (if the ope is rxe and packet type is EOR and inject is used). In
* that case rxe's state has been set to EFA_RDM_OPE_FREE and
* it has been removed from ep->op_queued_entry_list, so nothing
* is left to do.
*/
if (ope->state == EFA_RDM_OPE_FREE)
continue;

ope->internal_flags &= ~EFA_RDM_OPE_QUEUED_CTRL;
dlist_remove(&ope->queued_ctrl_entry);
}

/*
* Send data packets until window or data queue is exhausted.
*/
dlist_foreach_container(&domain->ope_longcts_send_list, struct efa_rdm_ope,
ope, entry) {
peer = efa_rdm_ep_get_peer(ope->ep, ope->addr);
assert(peer);
if (peer->flags & EFA_RDM_PEER_IN_BACKOFF)
continue;

/*
* Do not send DATA packet until we received HANDSHAKE packet from the peer,
* this is because endpoint does not know whether peer need connid in header
* until it get the HANDSHAKE packet.
*
* We only do this for DATA packet because other types of packets always
* has connid in there packet header. If peer does not make use of the connid,
* the connid can be safely ignored.
*
* DATA packet is different because for DATA packet connid is an optional
* header inserted between the mandatory header and the application data.
* Therefore if peer does not use/understand connid, it will take connid
* as application data thus cause data corruption.
*
* This will not cause deadlock because peer will send a HANDSHAKE packet
* back upon receiving 1st packet from the endpoint, and in all 3 sub0protocols
* (long-CTS message, emulated long-CTS write and emulated long-CTS read)
* where DATA packet is used, endpoint will send other types of packet to
* peer before sending DATA packets. The workflow of the 3 sub-protocol
* can be found in protocol v4 document chapter 3.
*/
if (!(peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED))
continue;

if (ope->window > 0) {
ret = efa_rdm_ope_post_send(ope, EFA_RDM_CTSDATA_PKT);
if (OFI_UNLIKELY(ret)) {
if (ret == -FI_EAGAIN)
break;

efa_rdm_txe_handle_error(ope, -ret, FI_EFA_ERR_PKT_POST);
return;
}
}
}

/*
* Send remote read requests until finish or error encoutered
*/
dlist_foreach_container_safe(&domain->ope_queued_read_list, struct efa_rdm_ope,
ope, queued_read_entry, tmp) {
peer = efa_rdm_ep_get_peer(ope->ep, ope->addr);
/*
* Here peer can be NULL, when the read request is a
* local read request. Local read request is used to copy
* data from host memory to device memory on same process.
*/
if (peer && (peer->flags & EFA_RDM_PEER_IN_BACKOFF))
continue;

/*
* The core's TX queue is full so we can't do any
* additional work.
*/
if (ope->ep->efa_outstanding_tx_ops == ope->ep->efa_max_outstanding_tx_ops)
return;

ret = efa_rdm_ope_post_read(ope);
if (ret == -FI_EAGAIN)
break;

if (OFI_UNLIKELY(ret)) {
assert(ope->type == EFA_RDM_TXE || ope->type == EFA_RDM_RXE);
if (ope->type == EFA_RDM_TXE)
efa_rdm_txe_handle_error(ope, -ret, FI_EFA_ERR_READ_POST);
else
efa_rdm_rxe_handle_error(ope, -ret, FI_EFA_ERR_READ_POST);

return;
}

ope->internal_flags &= ~EFA_RDM_OPE_QUEUED_READ;
dlist_remove(&ope->queued_read_entry);
}
}
23 changes: 21 additions & 2 deletions prov/efa/src/efa_domain.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,28 @@ struct efa_domain {
size_t mtu_size;
size_t addrlen;
bool mr_local;
uint64_t rdm_mode;
size_t rdm_cq_size;
struct dlist_entry list_entry; /* linked to g_efa_domain_list */
struct ofi_genlock srx_lock; /* shared among peer providers */

/* Only valid for RDM EP type */
uint64_t rdm_mode;
size_t rdm_cq_size;
/* number of rdma-read messages in flight */
uint64_t num_read_msg_in_flight;
/* op entries with queued rnr packets */
struct dlist_entry ope_queued_rnr_list;
/* op entries with queued ctrl packets */
struct dlist_entry ope_queued_ctrl_list;
/* op entries with queued read requests */
struct dlist_entry ope_queued_read_list;
/* tx/rx_entries used by long CTS msg/write/read protocol
* which have data to be sent */
struct dlist_entry ope_longcts_send_list;
/* list of #efa_rdm_peer that are in backoff due to RNR */
struct dlist_entry peer_backoff_list;
/* list of #efa_rdm_peer that will retry posting handshake pkt */
struct dlist_entry handshake_queued_peer_list;

};

extern struct dlist_entry g_efa_domain_list;
Expand Down Expand Up @@ -88,4 +105,6 @@ bool efa_domain_support_rnr_retry_modify(struct efa_domain *domain)
int efa_domain_open(struct fid_fabric *fabric_fid, struct fi_info *info,
struct fid_domain **domain_fid, void *context);

void efa_domain_progress_rdm_peers_and_queues(struct efa_domain *domain);

#endif
Loading

0 comments on commit f589580

Please sign in to comment.