diff --git a/prov/sockets/src/sock.h b/prov/sockets/src/sock.h index f761be9bbe9..d0edf5f916d 100644 --- a/prov/sockets/src/sock.h +++ b/prov/sockets/src/sock.h @@ -81,7 +81,6 @@ #define SOCK_PE_POLL_TIMEOUT (100000) #define SOCK_PE_MAX_ENTRIES (128) -#define SOCK_PE_MIN_ENTRIES (1) #define SOCK_PE_WAITTIME (10) #define SOCK_EQ_DEF_SZ (1<<8) @@ -150,6 +149,7 @@ #define SOCK_NO_COMPLETION (1ULL << 60) #define SOCK_USE_OP_FLAGS (1ULL << 61) #define SOCK_PE_COMM_BUFF_SZ (1024) +#define SOCK_PE_OVERFLOW_COMM_BUFF_SZ (128) enum { SOCK_SIGNAL_RD_FD = 0, @@ -781,7 +781,8 @@ struct sock_pe_entry { uint8_t is_complete; uint8_t is_error; uint8_t mr_checked; - uint8_t reserved[4]; + uint8_t is_pool_entry; + uint8_t reserved[3]; uint64_t done_len; uint64_t total_len; @@ -795,6 +796,7 @@ struct sock_pe_entry { struct dlist_entry entry; struct dlist_entry ctx_entry; struct ringbuf comm_buf; + size_t cache_sz; }; struct sock_pe { @@ -808,6 +810,7 @@ struct sock_pe { int signal_fds[2]; uint64_t waittime; + struct util_buf_pool *pe_rx_pool; struct dlist_entry free_list; struct dlist_entry busy_list; diff --git a/prov/sockets/src/sock_comm.c b/prov/sockets/src/sock_comm.c index 388515aa6da..e3b03679e54 100644 --- a/prov/sockets/src/sock_comm.c +++ b/prov/sockets/src/sock_comm.c @@ -95,7 +95,7 @@ ssize_t sock_comm_send(struct sock_pe_entry *pe_entry, { ssize_t ret, used; - if (len > SOCK_PE_COMM_BUFF_SZ) { + if (len > pe_entry->cache_sz) { used = rbused(&pe_entry->comm_buf); if (used == sock_comm_flush(pe_entry)) { return sock_comm_send_socket(pe_entry->conn, buf, len); @@ -167,7 +167,7 @@ ssize_t sock_comm_recv(struct sock_pe_entry *pe_entry, void *buf, size_t len) { ssize_t read_len; if (rbempty(&pe_entry->comm_buf)) { - if (len <= SOCK_PE_COMM_BUFF_SZ) { + if (len <= pe_entry->cache_sz) { sock_comm_recv_buffer(pe_entry); } else { return sock_comm_recv_socket(pe_entry->conn, buf, len); diff --git a/prov/sockets/src/sock_progress.c b/prov/sockets/src/sock_progress.c index 76b04bd52bf..825bb2704f4 100644 --- a/prov/sockets/src/sock_progress.c +++ b/prov/sockets/src/sock_progress.c @@ -55,6 +55,7 @@ #include #include +#include #include "sock.h" #include "sock_util.h" @@ -147,6 +148,12 @@ static void sock_pe_release_entry(struct sock_pe *pe, if (pe_entry->conn->rx_pe_entry == pe_entry) pe_entry->conn->rx_pe_entry = NULL; + if (pe_entry->is_pool_entry) { + rbfree(&pe_entry->comm_buf); + util_buf_release(pe->pe_rx_pool, pe_entry); + return; + } + pe->num_free_entries++; pe_entry->conn = NULL; @@ -176,16 +183,25 @@ static struct sock_pe_entry *sock_pe_acquire_entry(struct sock_pe *pe) struct dlist_entry *entry; struct sock_pe_entry *pe_entry; - if (dlist_empty(&pe->free_list)) - return NULL; - - pe->num_free_entries--; - entry = pe->free_list.next; - pe_entry = container_of(entry, struct sock_pe_entry, entry); - dlist_remove(&pe_entry->entry); - dlist_insert_tail(&pe_entry->entry, &pe->busy_list); - SOCK_LOG_DBG("progress entry %p acquired : %lu\n", pe_entry, - PE_INDEX(pe, pe_entry)); + if (dlist_empty(&pe->free_list)) { + pe_entry = util_buf_get(pe->pe_rx_pool); + SOCK_LOG_DBG("Getting rx pool entry\n"); + if (pe_entry) { + memset(pe_entry, 0, sizeof(*pe_entry)); + pe_entry->is_pool_entry = 1; + if (rbinit(&pe_entry->comm_buf, SOCK_PE_OVERFLOW_COMM_BUFF_SZ)) + SOCK_LOG_ERROR("failed to init comm-cache\n"); + pe_entry->cache_sz = SOCK_PE_OVERFLOW_COMM_BUFF_SZ; + } + } else { + pe->num_free_entries--; + entry = pe->free_list.next; + pe_entry = container_of(entry, struct sock_pe_entry, entry); + dlist_remove(&pe_entry->entry); + dlist_insert_tail(&pe_entry->entry, &pe->busy_list); + SOCK_LOG_DBG("progress entry %p acquired : %lu\n", pe_entry, + PE_INDEX(pe, pe_entry)); + } return pe_entry; } @@ -2435,11 +2451,10 @@ static int sock_pe_progress_rx_ep(struct sock_pe *pe, struct sock_ep *ep, if (!conn || conn->rx_pe_entry) continue; - if (!dlist_empty(&pe->free_list)) { - ret = sock_pe_new_rx_entry(pe, rx_ctx, ep, conn); - if (ret < 0) - goto out; - } + + ret = sock_pe_new_rx_entry(pe, rx_ctx, ep, conn); + if (ret < 0) + goto out; } out: @@ -2527,8 +2542,7 @@ int sock_pe_progress_tx_ctx(struct sock_pe *pe, struct sock_tx_ctx *tx_ctx) fastlock_acquire(&pe->lock); fastlock_acquire(&tx_ctx->rlock); - if (!rbempty(&tx_ctx->rb) && - pe->num_free_entries > SOCK_PE_MIN_ENTRIES) { + if (!rbempty(&tx_ctx->rb) && !dlist_empty(&pe->free_list)) { ret = sock_pe_new_tx_entry(pe, tx_ctx); } fastlock_release(&tx_ctx->rlock); @@ -2733,6 +2747,7 @@ static void sock_pe_init_table(struct sock_pe *pe) for (i = 0; i < SOCK_PE_MAX_ENTRIES; i++) { dlist_insert_head(&pe->pe_table[i].entry, &pe->free_list); + pe->pe_table[i].cache_sz = SOCK_PE_COMM_BUFF_SZ; if (rbinit(&pe->pe_table[i].comm_buf, SOCK_PE_COMM_BUFF_SZ)) SOCK_LOG_ERROR("failed to init comm-cache\n"); } @@ -2757,14 +2772,20 @@ struct sock_pe *sock_pe_init(struct sock_domain *domain) pthread_mutex_init(&pe->list_lock, NULL); pe->domain = domain; + pe->pe_rx_pool = util_buf_pool_create(sizeof(struct sock_pe_entry), 16, 0, 1024); + if (!pe->pe_rx_pool) { + SOCK_LOG_ERROR("failed to create buffer pool\n"); + goto err1; + } + if (sock_epoll_create(&pe->epoll_set, sock_cm_def_map_sz) < 0) { SOCK_LOG_ERROR("failed to create epoll set\n"); - goto err1; + goto err2; } if (domain->progress_mode == FI_PROGRESS_AUTO) { if (socketpair(AF_UNIX, SOCK_STREAM, 0, pe->signal_fds) < 0) - goto err2; + goto err3; fd_set_nonblock(pe->signal_fds[SOCK_SIGNAL_RD_FD]); sock_epoll_add(&pe->epoll_set, pe->signal_fds[SOCK_SIGNAL_RD_FD]); @@ -2773,17 +2794,19 @@ struct sock_pe *sock_pe_init(struct sock_domain *domain) if (pthread_create(&pe->progress_thread, NULL, sock_pe_progress_thread, (void *)pe)) { SOCK_LOG_ERROR("Couldn't create progress thread\n"); - goto err3; + goto err4; } } SOCK_LOG_DBG("PE init: OK\n"); return pe; -err3: +err4: close(pe->signal_fds[0]); close(pe->signal_fds[1]); -err2: +err3: sock_epoll_close(&pe->epoll_set); +err2: + util_buf_pool_destroy(pe->pe_rx_pool); err1: fastlock_destroy(&pe->lock); free(pe); @@ -2805,6 +2828,7 @@ void sock_pe_finalize(struct sock_pe *pe) rbfree(&pe->pe_table[i].comm_buf); } + util_buf_pool_destroy(pe->pe_rx_pool); fastlock_destroy(&pe->lock); fastlock_destroy(&pe->signal_lock); pthread_mutex_destroy(&pe->list_lock);