Skip to content

Commit

Permalink
core: io_queue_t flow mode
Browse files Browse the repository at this point in the history
instead of passing ownership of (io_queue_t)*q to the side thread,
instead the ownership of IO objects are passed to the side thread, which
are then individually returned. The worker thread runs return_cb() on
each, determining when it's done with the response batch.

this interface could use more explicit functions to make it more clear.
Ownership of *q isn't actually "passed" anywhere, it's just used or not
used depending on which return function the owner wants.
  • Loading branch information
dormando committed Aug 10, 2021
1 parent c25d0bd commit 57493bf
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 21 deletions.
25 changes: 18 additions & 7 deletions memcached.c
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ void conn_worker_readd(conn *c) {
}
}

void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb fin_cb) {
void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb) {
io_queue_t *q = c->io_queues;
while (q->type != IO_QUEUE_NONE) {
q++;
Expand All @@ -569,6 +569,7 @@ void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_qu
q->submit_cb = cb;
q->complete_cb = com_cb;
q->finalize_cb = fin_cb;
q->return_cb = ret_cb;
return;
}

Expand All @@ -589,17 +590,27 @@ io_queue_t *conn_io_queue_get(conn *c, int type) {
static void conn_io_queue_complete(conn *c) {
io_queue_t *q = c->io_queues;
while (q->type != IO_QUEUE_NONE) {
// Reuse the same submit stack. We zero it out first so callbacks can
// queue new IO's if necessary.
if (q->stack_ctx) {
void *tmp = q->stack_ctx;
q->stack_ctx = NULL;
q->complete_cb(q->ctx, tmp);
q->complete_cb(q);
}
q++;
}
}

// called to return a single IO object to the original worker thread.
void conn_io_queue_return(io_pending_t *io) {
io_queue_t *q = io->q;
int ret = q->return_cb(io);
// An IO may or may not count against the pending responses.
if (ret) {
q->count--;
if (q->count == 0) {
conn_worker_readd(io->c);
}
}
return;
}

conn *conn_new(const int sfd, enum conn_states init_state,
const int event_flags,
const int read_buffer_size, enum network_transport transport,
Expand Down Expand Up @@ -3227,7 +3238,7 @@ static void drive_machine(conn *c) {
if (q->count != 0) {
assert(q->stack_ctx != NULL);
hit = true;
q->submit_cb(q->ctx, q->stack_ctx);
q->submit_cb(q);
c->io_queues_submitted++;
}
}
Expand Down
14 changes: 9 additions & 5 deletions memcached.h
Original file line number Diff line number Diff line change
Expand Up @@ -697,8 +697,9 @@ typedef struct conn conn;
#define IO_QUEUE_NONE 0
#define IO_QUEUE_EXTSTORE 1

typedef void (*io_queue_stack_cb)(void *ctx, void *stack);
typedef void (*io_queue_cb)(io_pending_t *pending);
typedef struct io_queue_s io_queue_t;
typedef void (*io_queue_stack_cb)(io_queue_t *q);
typedef int (*io_queue_cb)(io_pending_t *pending);
// this structure's ownership gets passed between threads:
// - owned normally by the worker thread.
// - multiple queues can be submitted at the same time.
Expand All @@ -714,15 +715,16 @@ typedef void (*io_queue_cb)(io_pending_t *pending);
//
// All of this is to avoid having to hit a mutex owned by the connection
// thread that gets pinged for each thread (or an equivalent atomic).
typedef struct {
struct io_queue_s {
void *ctx; // untouched ptr for specific context
void *stack_ctx; // module-specific context to be batch-submitted
io_queue_stack_cb submit_cb; // callback given a full stack of pending IO's at once.
io_queue_stack_cb complete_cb;
io_queue_cb return_cb; // called on worker thread.
io_queue_cb finalize_cb; // called back on the worker thread.
int type;
int count; // ios to process before returning. only accessed by queue processor once submitted
} io_queue_t;
};

struct _io_pending_t {
io_queue_t *q;
Expand Down Expand Up @@ -859,8 +861,9 @@ enum delta_result_type do_add_delta(conn *c, const char *key,
uint64_t *cas, const uint32_t hv,
item **it_ret);
enum store_item_type do_store_item(item *item, int comm, conn* c, const uint32_t hv);
void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb fin_cb);
void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb);
io_queue_t *conn_io_queue_get(conn *c, int type);
void conn_io_queue_return(io_pending_t *io);
conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size,
enum network_transport transport, struct event_base *base, void *ssl);

Expand Down Expand Up @@ -888,6 +891,7 @@ extern int daemonize(int nochdir, int noclose);
void memcached_thread_init(int nthreads, void *arg);
void redispatch_conn(conn *c);
void timeout_conn(conn *c);
void return_io_pending(conn *c, io_pending_t *io);
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size,
enum network_transport transport, void *ssl);
void sidethread_conn_close(conn *c);
Expand Down
11 changes: 7 additions & 4 deletions storage.c
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,9 @@ int storage_get_item(conn *c, item *it, mc_resp *resp) {
return 0;
}

void storage_submit_cb(void *ctx, void *stack) {
void storage_submit_cb(io_queue_t *q) {
// Don't need to do anything special for extstore.
extstore_submit(ctx, stack);
extstore_submit(q->ctx, q->stack_ctx);
}

static void recache_or_free(io_pending_t *pending) {
Expand Down Expand Up @@ -446,12 +446,14 @@ static void recache_or_free(io_pending_t *pending) {
// TODO: stubbed with a reminder: should be able to move most of the extstore
// callback code into this code instead, executing on worker thread instead of
// IO thread.
void storage_complete_cb(void *ctx, void *stack_ctx) {
void storage_complete_cb(io_queue_t *q) {
// need to reset the stack for next use.
q->stack_ctx = NULL;
return;
}

// Called after responses have been transmitted. Need to free up related data.
void storage_finalize_cb(io_pending_t *pending) {
int storage_finalize_cb(io_pending_t *pending) {
recache_or_free(pending);
io_pending_storage_t *p = (io_pending_storage_t *)pending;
obj_io *io = &p->io_ctx;
Expand All @@ -461,6 +463,7 @@ void storage_finalize_cb(io_pending_t *pending) {
io->iov = NULL;
}
// don't need to free the main context, since it's embedded.
return 0; // return code ignored.
}

/*
Expand Down
6 changes: 3 additions & 3 deletions storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ bool storage_validate_item(void *e, item *it);
int storage_get_item(conn *c, item *it, mc_resp *resp);

// callbacks for the IO queue subsystem.
void storage_submit_cb(void *ctx, void *stack);
void storage_complete_cb(void *ctx, void *stack);
void storage_finalize_cb(io_pending_t *pending);
void storage_submit_cb(io_queue_t *q);
void storage_complete_cb(io_queue_t *q);
int storage_finalize_cb(io_pending_t *pending);

// Thread functions.
int start_storage_write_thread(void *arg);
Expand Down
32 changes: 30 additions & 2 deletions thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ enum conn_queue_item_modes {
queue_timeout, /* socket sfd timed out */
queue_redispatch, /* return conn from side thread */
queue_stop, /* exit thread */
queue_return_io, /* returning a pending IO object immediately */
};
typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
Expand All @@ -46,6 +47,7 @@ struct conn_queue_item {
enum conn_queue_item_modes mode;
conn *c;
void *ssl;
io_pending_t *io; // IO when used for deferred IO handling.
STAILQ_ENTRY(conn_queue_item) i_next;
};

Expand Down Expand Up @@ -321,6 +323,12 @@ static void cqi_free(CQ *cq, CQ_ITEM *item) {
cache_free(cq->cache, item);
}

// TODO: Skip notify if queue wasn't empty?
// - Requires cq_push() returning a "was empty" flag
// - Requires event handling loop to pop the entire queue and work from that
// instead of the ev_count work there now.
// In testing this does result in a large performance uptick, but unclear how
// much that will transfer from a synthetic benchmark.
static void notify_worker(LIBEVENT_THREAD *t, CQ_ITEM *item) {
cq_push(t->ev_queue, item);
#ifdef HAVE_EVENTFD
Expand Down Expand Up @@ -558,10 +566,10 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg)
#ifdef EXTSTORE
if (c->thread->storage) {
conn_io_queue_add(c, IO_QUEUE_EXTSTORE, c->thread->storage,
storage_submit_cb, storage_complete_cb, storage_finalize_cb);
storage_submit_cb, storage_complete_cb, NULL, storage_finalize_cb);
}
#endif
conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL, NULL);
conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL, NULL, NULL);

#ifdef TLS
if (settings.ssl_enabled && c->ssl != NULL) {
Expand All @@ -587,6 +595,10 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg)
/* asked to stop */
event_base_loopexit(me->base, NULL);
break;
case queue_return_io:
/* getting an individual IO object back */
conn_io_queue_return(item->io);
break;
}

cqi_free(me->ev_queue, item);
Expand Down Expand Up @@ -714,6 +726,22 @@ void timeout_conn(conn *c) {
notify_worker_fd(c->thread, c->sfd, queue_timeout);
}

void return_io_pending(conn *c, io_pending_t *io) {
CQ_ITEM *item = cqi_new(c->thread->ev_queue);
if (item == NULL) {
// TODO: how can we avoid this?
// In the main case I just loop, since a malloc failure here for a
// tiny object that's generally in a fixed size queue is going to
// implode shortly.
return;
}

item->mode = queue_return_io;
item->io = io;

notify_worker(c->thread, item);
}

/* This misses the allow_new_conns flag :( */
void sidethread_conn_close(conn *c) {
if (settings.verbose > 1)
Expand Down

0 comments on commit 57493bf

Please sign in to comment.