Skip to content

Commit

Permalink
Merge pull request #1734 from jithinjosepkl/master
Browse files Browse the repository at this point in the history
prov/sockets: Use buf-pool for progress engine overflow entries
  • Loading branch information
jithinjose committed Feb 11, 2016
2 parents 1151900 + abc7e8f commit ac6c40c
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 26 deletions.
7 changes: 5 additions & 2 deletions prov/sockets/src/sock.h
Expand Up @@ -81,7 +81,6 @@

#define SOCK_PE_POLL_TIMEOUT (100000)
#define SOCK_PE_MAX_ENTRIES (128)
#define SOCK_PE_MIN_ENTRIES (1)
#define SOCK_PE_WAITTIME (10)

#define SOCK_EQ_DEF_SZ (1<<8)
Expand Down Expand Up @@ -150,6 +149,7 @@
#define SOCK_NO_COMPLETION (1ULL << 60)
#define SOCK_USE_OP_FLAGS (1ULL << 61)
#define SOCK_PE_COMM_BUFF_SZ (1024)
#define SOCK_PE_OVERFLOW_COMM_BUFF_SZ (128)

enum {
SOCK_SIGNAL_RD_FD = 0,
Expand Down Expand Up @@ -781,7 +781,8 @@ struct sock_pe_entry {
uint8_t is_complete;
uint8_t is_error;
uint8_t mr_checked;
uint8_t reserved[4];
uint8_t is_pool_entry;
uint8_t reserved[3];

uint64_t done_len;
uint64_t total_len;
Expand All @@ -795,6 +796,7 @@ struct sock_pe_entry {
struct dlist_entry entry;
struct dlist_entry ctx_entry;
struct ringbuf comm_buf;
size_t cache_sz;
};

struct sock_pe {
Expand All @@ -808,6 +810,7 @@ struct sock_pe {
int signal_fds[2];
uint64_t waittime;

struct util_buf_pool *pe_rx_pool;
struct dlist_entry free_list;
struct dlist_entry busy_list;

Expand Down
4 changes: 2 additions & 2 deletions prov/sockets/src/sock_comm.c
Expand Up @@ -95,7 +95,7 @@ ssize_t sock_comm_send(struct sock_pe_entry *pe_entry,
{
ssize_t ret, used;

if (len > SOCK_PE_COMM_BUFF_SZ) {
if (len > pe_entry->cache_sz) {
used = rbused(&pe_entry->comm_buf);
if (used == sock_comm_flush(pe_entry)) {
return sock_comm_send_socket(pe_entry->conn, buf, len);
Expand Down Expand Up @@ -167,7 +167,7 @@ ssize_t sock_comm_recv(struct sock_pe_entry *pe_entry, void *buf, size_t len)
{
ssize_t read_len;
if (rbempty(&pe_entry->comm_buf)) {
if (len <= SOCK_PE_COMM_BUFF_SZ) {
if (len <= pe_entry->cache_sz) {
sock_comm_recv_buffer(pe_entry);
} else {
return sock_comm_recv_socket(pe_entry->conn, buf, len);
Expand Down
68 changes: 46 additions & 22 deletions prov/sockets/src/sock_progress.c
Expand Up @@ -55,6 +55,7 @@
#include <arpa/inet.h>
#include <net/if.h>

#include <fi_mem.h>
#include "sock.h"
#include "sock_util.h"

Expand Down Expand Up @@ -147,6 +148,12 @@ static void sock_pe_release_entry(struct sock_pe *pe,
if (pe_entry->conn->rx_pe_entry == pe_entry)
pe_entry->conn->rx_pe_entry = NULL;

if (pe_entry->is_pool_entry) {
rbfree(&pe_entry->comm_buf);
util_buf_release(pe->pe_rx_pool, pe_entry);
return;
}

pe->num_free_entries++;
pe_entry->conn = NULL;

Expand Down Expand Up @@ -176,16 +183,25 @@ static struct sock_pe_entry *sock_pe_acquire_entry(struct sock_pe *pe)
struct dlist_entry *entry;
struct sock_pe_entry *pe_entry;

if (dlist_empty(&pe->free_list))
return NULL;

pe->num_free_entries--;
entry = pe->free_list.next;
pe_entry = container_of(entry, struct sock_pe_entry, entry);
dlist_remove(&pe_entry->entry);
dlist_insert_tail(&pe_entry->entry, &pe->busy_list);
SOCK_LOG_DBG("progress entry %p acquired : %lu\n", pe_entry,
PE_INDEX(pe, pe_entry));
if (dlist_empty(&pe->free_list)) {
pe_entry = util_buf_get(pe->pe_rx_pool);
SOCK_LOG_DBG("Getting rx pool entry\n");
if (pe_entry) {
memset(pe_entry, 0, sizeof(*pe_entry));
pe_entry->is_pool_entry = 1;
if (rbinit(&pe_entry->comm_buf, SOCK_PE_OVERFLOW_COMM_BUFF_SZ))
SOCK_LOG_ERROR("failed to init comm-cache\n");
pe_entry->cache_sz = SOCK_PE_OVERFLOW_COMM_BUFF_SZ;
}
} else {
pe->num_free_entries--;
entry = pe->free_list.next;
pe_entry = container_of(entry, struct sock_pe_entry, entry);
dlist_remove(&pe_entry->entry);
dlist_insert_tail(&pe_entry->entry, &pe->busy_list);
SOCK_LOG_DBG("progress entry %p acquired : %lu\n", pe_entry,
PE_INDEX(pe, pe_entry));
}
return pe_entry;
}

Expand Down Expand Up @@ -2435,11 +2451,10 @@ static int sock_pe_progress_rx_ep(struct sock_pe *pe, struct sock_ep *ep,

if (!conn || conn->rx_pe_entry)
continue;
if (!dlist_empty(&pe->free_list)) {
ret = sock_pe_new_rx_entry(pe, rx_ctx, ep, conn);
if (ret < 0)
goto out;
}

ret = sock_pe_new_rx_entry(pe, rx_ctx, ep, conn);
if (ret < 0)
goto out;
}

out:
Expand Down Expand Up @@ -2527,8 +2542,7 @@ int sock_pe_progress_tx_ctx(struct sock_pe *pe, struct sock_tx_ctx *tx_ctx)
fastlock_acquire(&pe->lock);

fastlock_acquire(&tx_ctx->rlock);
if (!rbempty(&tx_ctx->rb) &&
pe->num_free_entries > SOCK_PE_MIN_ENTRIES) {
if (!rbempty(&tx_ctx->rb) && !dlist_empty(&pe->free_list)) {
ret = sock_pe_new_tx_entry(pe, tx_ctx);
}
fastlock_release(&tx_ctx->rlock);
Expand Down Expand Up @@ -2733,6 +2747,7 @@ static void sock_pe_init_table(struct sock_pe *pe)

for (i = 0; i < SOCK_PE_MAX_ENTRIES; i++) {
dlist_insert_head(&pe->pe_table[i].entry, &pe->free_list);
pe->pe_table[i].cache_sz = SOCK_PE_COMM_BUFF_SZ;
if (rbinit(&pe->pe_table[i].comm_buf, SOCK_PE_COMM_BUFF_SZ))
SOCK_LOG_ERROR("failed to init comm-cache\n");
}
Expand All @@ -2757,14 +2772,20 @@ struct sock_pe *sock_pe_init(struct sock_domain *domain)
pthread_mutex_init(&pe->list_lock, NULL);
pe->domain = domain;

pe->pe_rx_pool = util_buf_pool_create(sizeof(struct sock_pe_entry), 16, 0, 1024);
if (!pe->pe_rx_pool) {
SOCK_LOG_ERROR("failed to create buffer pool\n");
goto err1;
}

if (sock_epoll_create(&pe->epoll_set, sock_cm_def_map_sz) < 0) {
SOCK_LOG_ERROR("failed to create epoll set\n");
goto err1;
goto err2;
}

if (domain->progress_mode == FI_PROGRESS_AUTO) {
if (socketpair(AF_UNIX, SOCK_STREAM, 0, pe->signal_fds) < 0)
goto err2;
goto err3;

fd_set_nonblock(pe->signal_fds[SOCK_SIGNAL_RD_FD]);
sock_epoll_add(&pe->epoll_set, pe->signal_fds[SOCK_SIGNAL_RD_FD]);
Expand All @@ -2773,17 +2794,19 @@ struct sock_pe *sock_pe_init(struct sock_domain *domain)
if (pthread_create(&pe->progress_thread, NULL,
sock_pe_progress_thread, (void *)pe)) {
SOCK_LOG_ERROR("Couldn't create progress thread\n");
goto err3;
goto err4;
}
}
SOCK_LOG_DBG("PE init: OK\n");
return pe;

err3:
err4:
close(pe->signal_fds[0]);
close(pe->signal_fds[1]);
err2:
err3:
sock_epoll_close(&pe->epoll_set);
err2:
util_buf_pool_destroy(pe->pe_rx_pool);
err1:
fastlock_destroy(&pe->lock);
free(pe);
Expand All @@ -2805,6 +2828,7 @@ void sock_pe_finalize(struct sock_pe *pe)
rbfree(&pe->pe_table[i].comm_buf);
}

util_buf_pool_destroy(pe->pe_rx_pool);
fastlock_destroy(&pe->lock);
fastlock_destroy(&pe->signal_lock);
pthread_mutex_destroy(&pe->list_lock);
Expand Down

0 comments on commit ac6c40c

Please sign in to comment.