Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prov/sockets: Use buf-pool for progress engine overflow entries #1734

Merged
merged 1 commit into from
Feb 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions prov/sockets/src/sock.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@

#define SOCK_PE_POLL_TIMEOUT (100000)
#define SOCK_PE_MAX_ENTRIES (128)
#define SOCK_PE_MIN_ENTRIES (1)
#define SOCK_PE_WAITTIME (10)

#define SOCK_EQ_DEF_SZ (1<<8)
Expand Down Expand Up @@ -150,6 +149,7 @@
#define SOCK_NO_COMPLETION (1ULL << 60)
#define SOCK_USE_OP_FLAGS (1ULL << 61)
#define SOCK_PE_COMM_BUFF_SZ (1024)
#define SOCK_PE_OVERFLOW_COMM_BUFF_SZ (128)

enum {
SOCK_SIGNAL_RD_FD = 0,
Expand Down Expand Up @@ -781,7 +781,8 @@ struct sock_pe_entry {
uint8_t is_complete;
uint8_t is_error;
uint8_t mr_checked;
uint8_t reserved[4];
uint8_t is_pool_entry;
uint8_t reserved[3];

uint64_t done_len;
uint64_t total_len;
Expand All @@ -795,6 +796,7 @@ struct sock_pe_entry {
struct dlist_entry entry;
struct dlist_entry ctx_entry;
struct ringbuf comm_buf;
size_t cache_sz;
};

struct sock_pe {
Expand All @@ -808,6 +810,7 @@ struct sock_pe {
int signal_fds[2];
uint64_t waittime;

struct util_buf_pool *pe_rx_pool;
struct dlist_entry free_list;
struct dlist_entry busy_list;

Expand Down
4 changes: 2 additions & 2 deletions prov/sockets/src/sock_comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ ssize_t sock_comm_send(struct sock_pe_entry *pe_entry,
{
ssize_t ret, used;

if (len > SOCK_PE_COMM_BUFF_SZ) {
if (len > pe_entry->cache_sz) {
used = rbused(&pe_entry->comm_buf);
if (used == sock_comm_flush(pe_entry)) {
return sock_comm_send_socket(pe_entry->conn, buf, len);
Expand Down Expand Up @@ -167,7 +167,7 @@ ssize_t sock_comm_recv(struct sock_pe_entry *pe_entry, void *buf, size_t len)
{
ssize_t read_len;
if (rbempty(&pe_entry->comm_buf)) {
if (len <= SOCK_PE_COMM_BUFF_SZ) {
if (len <= pe_entry->cache_sz) {
sock_comm_recv_buffer(pe_entry);
} else {
return sock_comm_recv_socket(pe_entry->conn, buf, len);
Expand Down
68 changes: 46 additions & 22 deletions prov/sockets/src/sock_progress.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#include <arpa/inet.h>
#include <net/if.h>

#include <fi_mem.h>
#include "sock.h"
#include "sock_util.h"

Expand Down Expand Up @@ -147,6 +148,12 @@ static void sock_pe_release_entry(struct sock_pe *pe,
if (pe_entry->conn->rx_pe_entry == pe_entry)
pe_entry->conn->rx_pe_entry = NULL;

if (pe_entry->is_pool_entry) {
rbfree(&pe_entry->comm_buf);
util_buf_release(pe->pe_rx_pool, pe_entry);
return;
}

pe->num_free_entries++;
pe_entry->conn = NULL;

Expand Down Expand Up @@ -176,16 +183,25 @@ static struct sock_pe_entry *sock_pe_acquire_entry(struct sock_pe *pe)
struct dlist_entry *entry;
struct sock_pe_entry *pe_entry;

if (dlist_empty(&pe->free_list))
return NULL;

pe->num_free_entries--;
entry = pe->free_list.next;
pe_entry = container_of(entry, struct sock_pe_entry, entry);
dlist_remove(&pe_entry->entry);
dlist_insert_tail(&pe_entry->entry, &pe->busy_list);
SOCK_LOG_DBG("progress entry %p acquired : %lu\n", pe_entry,
PE_INDEX(pe, pe_entry));
if (dlist_empty(&pe->free_list)) {
pe_entry = util_buf_get(pe->pe_rx_pool);
SOCK_LOG_DBG("Getting rx pool entry\n");
if (pe_entry) {
memset(pe_entry, 0, sizeof(*pe_entry));
pe_entry->is_pool_entry = 1;
if (rbinit(&pe_entry->comm_buf, SOCK_PE_OVERFLOW_COMM_BUFF_SZ))
SOCK_LOG_ERROR("failed to init comm-cache\n");
pe_entry->cache_sz = SOCK_PE_OVERFLOW_COMM_BUFF_SZ;
}
} else {
pe->num_free_entries--;
entry = pe->free_list.next;
pe_entry = container_of(entry, struct sock_pe_entry, entry);
dlist_remove(&pe_entry->entry);
dlist_insert_tail(&pe_entry->entry, &pe->busy_list);
SOCK_LOG_DBG("progress entry %p acquired : %lu\n", pe_entry,
PE_INDEX(pe, pe_entry));
}
return pe_entry;
}

Expand Down Expand Up @@ -2435,11 +2451,10 @@ static int sock_pe_progress_rx_ep(struct sock_pe *pe, struct sock_ep *ep,

if (!conn || conn->rx_pe_entry)
continue;
if (!dlist_empty(&pe->free_list)) {
ret = sock_pe_new_rx_entry(pe, rx_ctx, ep, conn);
if (ret < 0)
goto out;
}

ret = sock_pe_new_rx_entry(pe, rx_ctx, ep, conn);
if (ret < 0)
goto out;
}

out:
Expand Down Expand Up @@ -2527,8 +2542,7 @@ int sock_pe_progress_tx_ctx(struct sock_pe *pe, struct sock_tx_ctx *tx_ctx)
fastlock_acquire(&pe->lock);

fastlock_acquire(&tx_ctx->rlock);
if (!rbempty(&tx_ctx->rb) &&
pe->num_free_entries > SOCK_PE_MIN_ENTRIES) {
if (!rbempty(&tx_ctx->rb) && !dlist_empty(&pe->free_list)) {
ret = sock_pe_new_tx_entry(pe, tx_ctx);
}
fastlock_release(&tx_ctx->rlock);
Expand Down Expand Up @@ -2733,6 +2747,7 @@ static void sock_pe_init_table(struct sock_pe *pe)

for (i = 0; i < SOCK_PE_MAX_ENTRIES; i++) {
dlist_insert_head(&pe->pe_table[i].entry, &pe->free_list);
pe->pe_table[i].cache_sz = SOCK_PE_COMM_BUFF_SZ;
if (rbinit(&pe->pe_table[i].comm_buf, SOCK_PE_COMM_BUFF_SZ))
SOCK_LOG_ERROR("failed to init comm-cache\n");
}
Expand All @@ -2757,14 +2772,20 @@ struct sock_pe *sock_pe_init(struct sock_domain *domain)
pthread_mutex_init(&pe->list_lock, NULL);
pe->domain = domain;

pe->pe_rx_pool = util_buf_pool_create(sizeof(struct sock_pe_entry), 16, 0, 1024);
if (!pe->pe_rx_pool) {
SOCK_LOG_ERROR("failed to create buffer pool\n");
goto err1;
}

if (sock_epoll_create(&pe->epoll_set, sock_cm_def_map_sz) < 0) {
SOCK_LOG_ERROR("failed to create epoll set\n");
goto err1;
goto err2;
}

if (domain->progress_mode == FI_PROGRESS_AUTO) {
if (socketpair(AF_UNIX, SOCK_STREAM, 0, pe->signal_fds) < 0)
goto err2;
goto err3;

fd_set_nonblock(pe->signal_fds[SOCK_SIGNAL_RD_FD]);
sock_epoll_add(&pe->epoll_set, pe->signal_fds[SOCK_SIGNAL_RD_FD]);
Expand All @@ -2773,17 +2794,19 @@ struct sock_pe *sock_pe_init(struct sock_domain *domain)
if (pthread_create(&pe->progress_thread, NULL,
sock_pe_progress_thread, (void *)pe)) {
SOCK_LOG_ERROR("Couldn't create progress thread\n");
goto err3;
goto err4;
}
}
SOCK_LOG_DBG("PE init: OK\n");
return pe;

err3:
err4:
close(pe->signal_fds[0]);
close(pe->signal_fds[1]);
err2:
err3:
sock_epoll_close(&pe->epoll_set);
err2:
util_buf_pool_destroy(pe->pe_rx_pool);
err1:
fastlock_destroy(&pe->lock);
free(pe);
Expand All @@ -2805,6 +2828,7 @@ void sock_pe_finalize(struct sock_pe *pe)
rbfree(&pe->pe_table[i].comm_buf);
}

util_buf_pool_destroy(pe->pe_rx_pool);
fastlock_destroy(&pe->lock);
fastlock_destroy(&pe->signal_lock);
pthread_mutex_destroy(&pe->list_lock);
Expand Down