Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
114 contributors

Users who have contributed to this file

@dormando @dustin @bradfitz @lindner @BrianAker @kroki @tharanga @steveyen @ingenthr @guikcd @dpaneda @LINKIWI @viraptor @tony2001 @shivnagarajan @NRWB @mlichvar @minkikim89 @evmar @Linkerist @elfchief @drbrain @devnexen @danielschemmel @toofishes @sergiocarlosmorales
10298 lines (9359 sloc) 354 KB
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* memcached - memory caching daemon
*
* https://www.memcached.org/
*
* Copyright 2003 Danga Interactive, Inc. All rights reserved.
*
* Use and distribution licensed under the BSD license. See
* the LICENSE file for full text.
*
* Authors:
* Anatoly Vorobey <mellon@pobox.com>
* Brad Fitzpatrick <brad@danga.com>
*/
#include "memcached.h"
#ifdef EXTSTORE
#include "storage.h"
#endif
#include "authfile.h"
#include "restart.h"
#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <signal.h>
#include <sys/param.h>
#include <sys/resource.h>
#include <sys/uio.h>
#include <ctype.h>
#include <stdarg.h>
/* some POSIX systems need the following definition
* to get mlockall flags out of sys/mman.h. */
#ifndef _P1003_1B_VISIBLE
#define _P1003_1B_VISIBLE
#endif
#include <pwd.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <errno.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <assert.h>
#include <sysexits.h>
#include <stddef.h>
#ifdef HAVE_GETOPT_LONG
#include <getopt.h>
#endif
#ifdef TLS
#include "tls.h"
#endif
#if defined(__FreeBSD__)
#include <sys/sysctl.h>
#endif
/*
* forward declarations
*/
static void drive_machine(conn *c);
static int new_socket(struct addrinfo *ai);
static ssize_t tcp_read(conn *arg, void *buf, size_t count);
static ssize_t tcp_sendmsg(conn *arg, struct msghdr *msg, int flags);
static ssize_t tcp_write(conn *arg, void *buf, size_t count);
enum try_read_result {
READ_DATA_RECEIVED,
READ_NO_DATA_RECEIVED,
READ_ERROR, /** an error occurred (on the socket) (or client closed connection) */
READ_MEMORY_ERROR /** failed to allocate more memory */
};
static int try_read_command_negotiate(conn *c);
static int try_read_command_udp(conn *c);
static int try_read_command_binary(conn *c);
static int try_read_command_ascii(conn *c);
static int try_read_command_asciiauth(conn *c);
static enum try_read_result try_read_network(conn *c);
static enum try_read_result try_read_udp(conn *c);
static void conn_set_state(conn *c, enum conn_states state);
static int start_conn_timeout_thread();
static mc_resp* resp_finish(conn *c, mc_resp *resp);
/* stats */
static void stats_init(void);
static void server_stats(ADD_STAT add_stats, conn *c);
static void process_stat_settings(ADD_STAT add_stats, void *c);
static void conn_to_str(const conn *c, char *addr, char *svr_addr);
/** Return a datum for stats in binary protocol */
static bool get_stats(const char *stat_type, int nkey, ADD_STAT add_stats, void *c);
/* defaults */
static void settings_init(void);
/* event handling, network IO */
static void event_handler(const int fd, const short which, void *arg);
static void conn_close(conn *c);
static void conn_init(void);
static bool update_event(conn *c, const int new_flags);
static void complete_nread(conn *c);
static void process_command(conn *c, char *command);
static void write_and_free(conn *c, char *buf, int bytes);
static void write_bin_error(conn *c, protocol_binary_response_status err,
const char *errstr, int swallow);
static void write_bin_miss_response(conn *c, char *key, size_t nkey);
#ifdef EXTSTORE
static void _get_extstore_cb(void *e, obj_io *io, int ret);
static inline int _get_extstore(conn *c, item *it, mc_resp *resp);
#endif
static void conn_free(conn *c);
/** binprot handlers **/
static void process_bin_flush(conn *c, char *extbuf);
static void process_bin_append_prepend(conn *c);
static void process_bin_update(conn *c, char *extbuf);
static void process_bin_get_or_touch(conn *c, char *extbuf);
static void process_bin_delete(conn *c);
static void complete_incr_bin(conn *c, char *extbuf);
static void process_bin_stat(conn *c);
static void process_bin_sasl_auth(conn *c);
/** exported globals **/
struct stats stats;
struct stats_state stats_state;
struct settings settings;
time_t process_started; /* when the process was started */
conn **conns;
struct slab_rebalance slab_rebal;
volatile int slab_rebalance_signal;
#ifdef EXTSTORE
/* hoping this is temporary; I'd prefer to cut globals, but will complete this
* battle another day.
*/
void *ext_storage = NULL;
#endif
/** file scope variables **/
static conn *listen_conn = NULL;
static int max_fds;
static struct event_base *main_base;
enum transmit_result {
TRANSMIT_COMPLETE, /** All done writing. */
TRANSMIT_INCOMPLETE, /** More data remaining to write. */
TRANSMIT_SOFT_ERROR, /** Can't write any more right now. */
TRANSMIT_HARD_ERROR /** Can't write (c->state is set to conn_closing) */
};
/* Default methods to read from/ write to a socket */
ssize_t tcp_read(conn *c, void *buf, size_t count) {
assert (c != NULL);
return read(c->sfd, buf, count);
}
ssize_t tcp_sendmsg(conn *c, struct msghdr *msg, int flags) {
assert (c != NULL);
return sendmsg(c->sfd, msg, flags);
}
ssize_t tcp_write(conn *c, void *buf, size_t count) {
assert (c != NULL);
return write(c->sfd, buf, count);
}
static enum transmit_result transmit(conn *c);
/* This reduces the latency without adding lots of extra wiring to be able to
* notify the listener thread of when to listen again.
* Also, the clock timer could be broken out into its own thread and we
* can block the listener via a condition.
*/
static volatile bool allow_new_conns = true;
static bool stop_main_loop = false;
static struct event maxconnsevent;
static void maxconns_handler(const int fd, const short which, void *arg) {
struct timeval t = {.tv_sec = 0, .tv_usec = 10000};
if (fd == -42 || allow_new_conns == false) {
/* reschedule in 10ms if we need to keep polling */
evtimer_set(&maxconnsevent, maxconns_handler, 0);
event_base_set(main_base, &maxconnsevent);
evtimer_add(&maxconnsevent, &t);
} else {
evtimer_del(&maxconnsevent);
accept_new_conns(true);
}
}
#define REALTIME_MAXDELTA 60*60*24*30
/* Negative exptimes can underflow and end up immortal. realtime() will
immediately expire values that are greater than REALTIME_MAXDELTA, but less
than process_started, so lets aim for that. */
#define EXPTIME_TO_POSITIVE_TIME(exptime) (exptime < 0) ? \
REALTIME_MAXDELTA + 1 : exptime
/*
* given time value that's either unix time or delta from current unix time, return
* unix time. Use the fact that delta can't exceed one month (and real time value can't
* be that low).
*/
static rel_time_t realtime(const time_t exptime) {
/* no. of seconds in 30 days - largest possible delta exptime */
if (exptime == 0) return 0; /* 0 means never expire */
if (exptime > REALTIME_MAXDELTA) {
/* if item expiration is at/before the server started, give it an
expiration time of 1 second after the server started.
(because 0 means don't expire). without this, we'd
underflow and wrap around to some large value way in the
future, effectively making items expiring in the past
really expiring never */
if (exptime <= process_started)
return (rel_time_t)1;
return (rel_time_t)(exptime - process_started);
} else {
return (rel_time_t)(exptime + current_time);
}
}
static void stats_init(void) {
memset(&stats, 0, sizeof(struct stats));
memset(&stats_state, 0, sizeof(struct stats_state));
stats_state.accepting_conns = true; /* assuming we start in this state. */
/* make the time we started always be 2 seconds before we really
did, so time(0) - time.started is never zero. if so, things
like 'settings.oldest_live' which act as booleans as well as
values are now false in boolean context... */
process_started = time(0) - ITEM_UPDATE_INTERVAL - 2;
stats_prefix_init(settings.prefix_delimiter);
}
static void stats_reset(void) {
STATS_LOCK();
memset(&stats, 0, sizeof(struct stats));
stats_prefix_clear();
STATS_UNLOCK();
threadlocal_stats_reset();
item_stats_reset();
}
static void settings_init(void) {
settings.use_cas = true;
settings.access = 0700;
settings.port = 11211;
settings.udpport = 0;
#ifdef TLS
settings.ssl_enabled = false;
settings.ssl_ctx = NULL;
settings.ssl_chain_cert = NULL;
settings.ssl_key = NULL;
settings.ssl_verify_mode = SSL_VERIFY_NONE;
settings.ssl_keyformat = SSL_FILETYPE_PEM;
settings.ssl_ciphers = NULL;
settings.ssl_ca_cert = NULL;
settings.ssl_last_cert_refresh_time = current_time;
settings.ssl_wbuf_size = 16 * 1024; // default is 16KB (SSL max frame size is 17KB)
settings.ssl_session_cache = false;
#endif
/* By default this string should be NULL for getaddrinfo() */
settings.inter = NULL;
settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
settings.maxconns = 1024; /* to limit connections-related memory to about 5MB */
settings.verbose = 0;
settings.oldest_live = 0;
settings.oldest_cas = 0; /* supplements accuracy of oldest_live */
settings.evict_to_free = 1; /* push old items out of cache when memory runs out */
settings.socketpath = NULL; /* by default, not using a unix socket */
settings.auth_file = NULL; /* by default, not using ASCII authentication tokens */
settings.factor = 1.25;
settings.chunk_size = 48; /* space for a modest key and value */
settings.num_threads = 4; /* N workers */
settings.num_threads_per_udp = 0;
settings.prefix_delimiter = ':';
settings.detail_enabled = 0;
settings.reqs_per_event = 20;
settings.backlog = 1024;
settings.binding_protocol = negotiating_prot;
settings.item_size_max = 1024 * 1024; /* The famous 1MB upper limit. */
settings.slab_page_size = 1024 * 1024; /* chunks are split from 1MB pages. */
settings.slab_chunk_size_max = settings.slab_page_size / 2;
settings.sasl = false;
settings.maxconns_fast = true;
settings.lru_crawler = false;
settings.lru_crawler_sleep = 100;
settings.lru_crawler_tocrawl = 0;
settings.lru_maintainer_thread = false;
settings.lru_segmented = true;
settings.hot_lru_pct = 20;
settings.warm_lru_pct = 40;
settings.hot_max_factor = 0.2;
settings.warm_max_factor = 2.0;
settings.temp_lru = false;
settings.temporary_ttl = 61;
settings.idle_timeout = 0; /* disabled */
settings.hashpower_init = 0;
settings.slab_reassign = true;
settings.slab_automove = 1;
settings.slab_automove_ratio = 0.8;
settings.slab_automove_window = 30;
settings.shutdown_command = false;
settings.tail_repair_time = TAIL_REPAIR_TIME_DEFAULT;
settings.flush_enabled = true;
settings.dump_enabled = true;
settings.crawls_persleep = 1000;
settings.logger_watcher_buf_size = LOGGER_WATCHER_BUF_SIZE;
settings.logger_buf_size = LOGGER_BUF_SIZE;
settings.drop_privileges = false;
settings.watch_enabled = true;
settings.resp_obj_mem_limit = 0;
settings.read_buf_mem_limit = 0;
#ifdef MEMCACHED_DEBUG
settings.relaxed_privileges = false;
#endif
}
extern pthread_mutex_t conn_lock;
/* Connection timeout thread bits */
static pthread_t conn_timeout_tid;
static int do_run_conn_timeout_thread;
#define CONNS_PER_SLICE 100
#define TIMEOUT_MSG_SIZE (1 + sizeof(int))
static void *conn_timeout_thread(void *arg) {
int i;
conn *c;
char buf[TIMEOUT_MSG_SIZE];
rel_time_t oldest_last_cmd;
int sleep_time;
int sleep_slice = max_fds / CONNS_PER_SLICE;
if (sleep_slice == 0)
sleep_slice = CONNS_PER_SLICE;
useconds_t timeslice = 1000000 / sleep_slice;
while(do_run_conn_timeout_thread) {
if (settings.verbose > 2)
fprintf(stderr, "idle timeout thread at top of connection list\n");
oldest_last_cmd = current_time;
for (i = 0; i < max_fds; i++) {
if ((i % CONNS_PER_SLICE) == 0) {
if (settings.verbose > 2)
fprintf(stderr, "idle timeout thread sleeping for %ulus\n",
(unsigned int)timeslice);
usleep(timeslice);
}
if (!conns[i])
continue;
c = conns[i];
if (!IS_TCP(c->transport))
continue;
if (c->state != conn_new_cmd && c->state != conn_read)
continue;
if ((current_time - c->last_cmd_time) > settings.idle_timeout) {
buf[0] = 't';
memcpy(&buf[1], &i, sizeof(int));
if (write(c->thread->notify_send_fd, buf, TIMEOUT_MSG_SIZE)
!= TIMEOUT_MSG_SIZE)
perror("Failed to write timeout to notify pipe");
} else {
if (c->last_cmd_time < oldest_last_cmd)
oldest_last_cmd = c->last_cmd_time;
}
}
/* This is the soonest we could have another connection time out */
sleep_time = settings.idle_timeout - (current_time - oldest_last_cmd) + 1;
if (sleep_time <= 0)
sleep_time = 1;
if (settings.verbose > 2)
fprintf(stderr,
"idle timeout thread finished pass, sleeping for %ds\n",
sleep_time);
usleep((useconds_t) sleep_time * 1000000);
}
return NULL;
}
static int start_conn_timeout_thread() {
int ret;
if (settings.idle_timeout == 0)
return -1;
do_run_conn_timeout_thread = 1;
if ((ret = pthread_create(&conn_timeout_tid, NULL,
conn_timeout_thread, NULL)) != 0) {
fprintf(stderr, "Can't create idle connection timeout thread: %s\n",
strerror(ret));
return -1;
}
return 0;
}
int stop_conn_timeout_thread(void) {
if (!do_run_conn_timeout_thread)
return -1;
do_run_conn_timeout_thread = 0;
pthread_join(conn_timeout_tid, NULL);
return 0;
}
/*
* read buffer cache helper functions
*/
static void rbuf_release(conn *c) {
if (c->rbuf != NULL && c->rbytes == 0 && !IS_UDP(c->transport)) {
if (c->rbuf_malloced) {
free(c->rbuf);
c->rbuf_malloced = false;
} else {
do_cache_free(c->thread->rbuf_cache, c->rbuf);
}
c->rsize = 0;
c->rbuf = NULL;
c->rcurr = NULL;
}
}
static bool rbuf_alloc(conn *c) {
if (c->rbuf == NULL) {
c->rbuf = do_cache_alloc(c->thread->rbuf_cache);
if (!c->rbuf) {
THR_STATS_LOCK(c);
c->thread->stats.read_buf_oom++;
THR_STATS_UNLOCK(c);
return false;
}
c->rsize = READ_BUFFER_SIZE;
c->rcurr = c->rbuf;
}
return true;
}
// Just for handling huge ASCII multigets.
// The previous system was essentially the same; realloc'ing until big enough,
// then realloc'ing back down after the request finished.
static bool rbuf_switch_to_malloc(conn *c) {
// Might as well start with x2 and work from there.
size_t size = c->rsize * 2;
char *tmp = malloc(size);
if (!tmp)
return false;
do_cache_free(c->thread->rbuf_cache, c->rbuf);
memcpy(tmp, c->rcurr, c->rbytes);
c->rcurr = c->rbuf = tmp;
c->rsize = size;
c->rbuf_malloced = true;
return true;
}
/*
* Initializes the connections array. We don't actually allocate connection
* structures until they're needed, so as to avoid wasting memory when the
* maximum connection count is much higher than the actual number of
* connections.
*
* This does end up wasting a few pointers' worth of memory for FDs that are
* used for things other than connections, but that's worth it in exchange for
* being able to directly index the conns array by FD.
*/
static void conn_init(void) {
/* We're unlikely to see an FD much higher than maxconns. */
int next_fd = dup(1);
if (next_fd < 0) {
perror("Failed to duplicate file descriptor\n");
exit(1);
}
int headroom = 10; /* account for extra unexpected open FDs */
struct rlimit rl;
max_fds = settings.maxconns + headroom + next_fd;
/* But if possible, get the actual highest FD we can possibly ever see. */
if (getrlimit(RLIMIT_NOFILE, &rl) == 0) {
max_fds = rl.rlim_max;
} else {
fprintf(stderr, "Failed to query maximum file descriptor; "
"falling back to maxconns\n");
}
close(next_fd);
if ((conns = calloc(max_fds, sizeof(conn *))) == NULL) {
fprintf(stderr, "Failed to allocate connection structures\n");
/* This is unrecoverable so bail out early. */
exit(1);
}
}
static const char *prot_text(enum protocol prot) {
char *rv = "unknown";
switch(prot) {
case ascii_prot:
rv = "ascii";
break;
case binary_prot:
rv = "binary";
break;
case negotiating_prot:
rv = "auto-negotiate";
break;
}
return rv;
}
void conn_close_idle(conn *c) {
if (settings.idle_timeout > 0 &&
(current_time - c->last_cmd_time) > settings.idle_timeout) {
if (c->state != conn_new_cmd && c->state != conn_read) {
if (settings.verbose > 1)
fprintf(stderr,
"fd %d wants to timeout, but isn't in read state", c->sfd);
return;
}
if (settings.verbose > 1)
fprintf(stderr, "Closing idle fd %d\n", c->sfd);
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.idle_kicks++;
pthread_mutex_unlock(&c->thread->stats.mutex);
conn_set_state(c, conn_closing);
drive_machine(c);
}
}
/* bring conn back from a sidethread. could have had its event base moved. */
void conn_worker_readd(conn *c) {
c->ev_flags = EV_READ | EV_PERSIST;
event_set(&c->event, c->sfd, c->ev_flags, event_handler, (void *)c);
event_base_set(c->thread->base, &c->event);
// TODO: call conn_cleanup/fail/etc
if (event_add(&c->event, 0) == -1) {
perror("event_add");
}
// side thread wanted us to close immediately.
if (c->state == conn_closing) {
drive_machine(c);
return;
}
c->state = conn_new_cmd;
#ifdef EXTSTORE
// If we had IO objects, process
if (c->io_wraplist) {
//assert(c->io_wrapleft == 0); // assert no more to process
conn_set_state(c, conn_mwrite);
drive_machine(c);
}
#endif
}
conn *conn_new(const int sfd, enum conn_states init_state,
const int event_flags,
const int read_buffer_size, enum network_transport transport,
struct event_base *base, void *ssl) {
conn *c;
assert(sfd >= 0 && sfd < max_fds);
c = conns[sfd];
if (NULL == c) {
if (!(c = (conn *)calloc(1, sizeof(conn)))) {
STATS_LOCK();
stats.malloc_fails++;
STATS_UNLOCK();
fprintf(stderr, "Failed to allocate connection object\n");
return NULL;
}
MEMCACHED_CONN_CREATE(c);
c->read = NULL;
c->sendmsg = NULL;
c->write = NULL;
c->rbuf = NULL;
c->rsize = read_buffer_size;
// UDP connections use a persistent static buffer.
if (c->rsize) {
c->rbuf = (char *)malloc((size_t)c->rsize);
}
if (c->rsize && c->rbuf == NULL) {
conn_free(c);
STATS_LOCK();
stats.malloc_fails++;
STATS_UNLOCK();
fprintf(stderr, "Failed to allocate buffers for connection\n");
return NULL;
}
STATS_LOCK();
stats_state.conn_structs++;
STATS_UNLOCK();
c->sfd = sfd;
conns[sfd] = c;
}
c->transport = transport;
c->protocol = settings.binding_protocol;
/* unix socket mode doesn't need this, so zeroed out. but why
* is this done for every command? presumably for UDP
* mode. */
if (!settings.socketpath) {
c->request_addr_size = sizeof(c->request_addr);
} else {
c->request_addr_size = 0;
}
if (transport == tcp_transport && init_state == conn_new_cmd) {
if (getpeername(sfd, (struct sockaddr *) &c->request_addr,
&c->request_addr_size)) {
perror("getpeername");
memset(&c->request_addr, 0, sizeof(c->request_addr));
}
}
if (settings.verbose > 1) {
if (init_state == conn_listening) {
fprintf(stderr, "<%d server listening (%s)\n", sfd,
prot_text(c->protocol));
} else if (IS_UDP(transport)) {
fprintf(stderr, "<%d server listening (udp)\n", sfd);
} else if (c->protocol == negotiating_prot) {
fprintf(stderr, "<%d new auto-negotiating client connection\n",
sfd);
} else if (c->protocol == ascii_prot) {
fprintf(stderr, "<%d new ascii client connection.\n", sfd);
} else if (c->protocol == binary_prot) {
fprintf(stderr, "<%d new binary client connection.\n", sfd);
} else {
fprintf(stderr, "<%d new unknown (%d) client connection\n",
sfd, c->protocol);
assert(false);
}
}
#ifdef TLS
c->ssl = NULL;
c->ssl_wbuf = NULL;
c->ssl_enabled = false;
#endif
c->state = init_state;
c->rlbytes = 0;
c->cmd = -1;
c->rbytes = 0;
c->rcurr = c->rbuf;
c->ritem = 0;
c->rbuf_malloced = false;
c->sasl_started = false;
c->set_stale = false;
c->mset_res = false;
c->close_after_write = false;
c->last_cmd_time = current_time; /* initialize for idle kicker */
#ifdef EXTSTORE
c->io_wraplist = NULL;
c->io_wrapleft = 0;
#endif
c->item = 0;
c->noreply = false;
#ifdef TLS
if (ssl) {
c->ssl = (SSL*)ssl;
c->read = ssl_read;
c->sendmsg = ssl_sendmsg;
c->write = ssl_write;
c->ssl_enabled = true;
SSL_set_info_callback(c->ssl, ssl_callback);
} else
#else
// This must be NULL if TLS is not enabled.
assert(ssl == NULL);
#endif
{
c->read = tcp_read;
c->sendmsg = tcp_sendmsg;
c->write = tcp_write;
}
if (IS_UDP(transport)) {
c->try_read_command = try_read_command_udp;
} else {
switch (c->protocol) {
case ascii_prot:
if (settings.auth_file == NULL) {
c->authenticated = true;
c->try_read_command = try_read_command_ascii;
} else {
c->authenticated = false;
c->try_read_command = try_read_command_asciiauth;
}
break;
case binary_prot:
// binprot handles its own authentication via SASL parsing.
c->authenticated = false;
c->try_read_command = try_read_command_binary;
break;
case negotiating_prot:
c->try_read_command = try_read_command_negotiate;
break;
}
}
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
c->ev_flags = event_flags;
if (event_add(&c->event, 0) == -1) {
perror("event_add");
return NULL;
}
STATS_LOCK();
stats_state.curr_conns++;
stats.total_conns++;
STATS_UNLOCK();
MEMCACHED_CONN_ALLOCATE(c->sfd);
return c;
}
#ifdef EXTSTORE
static void recache_or_free(conn *c, io_wrap *wrap) {
item *it;
it = (item *)wrap->io.buf;
bool do_free = true;
if (wrap->active) {
// If request never dispatched, free the read buffer but leave the
// item header alone.
do_free = false;
size_t ntotal = ITEM_ntotal(wrap->hdr_it);
slabs_free(it, ntotal, slabs_clsid(ntotal));
c->io_wrapleft--;
assert(c->io_wrapleft >= 0);
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.get_aborted_extstore++;
pthread_mutex_unlock(&c->thread->stats.mutex);
} else if (wrap->miss) {
// If request was ultimately a miss, unlink the header.
do_free = false;
size_t ntotal = ITEM_ntotal(wrap->hdr_it);
item_unlink(wrap->hdr_it);
slabs_free(it, ntotal, slabs_clsid(ntotal));
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.miss_from_extstore++;
if (wrap->badcrc)
c->thread->stats.badcrc_from_extstore++;
pthread_mutex_unlock(&c->thread->stats.mutex);
} else if (settings.ext_recache_rate) {
// hashvalue is cuddled during store
uint32_t hv = (uint32_t)it->time;
// opt to throw away rather than wait on a lock.
void *hold_lock = item_trylock(hv);
if (hold_lock != NULL) {
item *h_it = wrap->hdr_it;
uint8_t flags = ITEM_LINKED|ITEM_FETCHED|ITEM_ACTIVE;
// Item must be recently hit at least twice to recache.
if (((h_it->it_flags & flags) == flags) &&
h_it->time > current_time - ITEM_UPDATE_INTERVAL &&
c->recache_counter++ % settings.ext_recache_rate == 0) {
do_free = false;
// In case it's been updated.
it->exptime = h_it->exptime;
it->it_flags &= ~ITEM_LINKED;
it->refcount = 0;
it->h_next = NULL; // might not be necessary.
STORAGE_delete(c->thread->storage, h_it);
item_replace(h_it, it, hv);
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.recache_from_extstore++;
pthread_mutex_unlock(&c->thread->stats.mutex);
}
}
if (hold_lock)
item_trylock_unlock(hold_lock);
}
if (do_free)
slabs_free(it, ITEM_ntotal(it), ITEM_clsid(it));
wrap->io.buf = NULL; // sanity.
wrap->io.next = NULL;
wrap->next = NULL;
wrap->active = false;
// TODO: reuse lock and/or hv.
item_remove(wrap->hdr_it);
}
#endif
static void conn_release_items(conn *c) {
assert(c != NULL);
if (c->item) {
item_remove(c->item);
c->item = 0;
}
#ifdef EXTSTORE
if (c->io_wraplist) {
io_wrap *tmp = c->io_wraplist;
while (tmp) {
io_wrap *next = tmp->next;
recache_or_free(c, tmp);
// malloc'ed iovec list used for chunked extstore fetches.
if (tmp->io.iov) {
free(tmp->io.iov);
tmp->io.iov = NULL;
}
do_cache_free(c->thread->io_cache, tmp); // lockless
tmp = next;
}
c->io_wraplist = NULL;
}
#endif
// Cull any unsent responses.
if (c->resp_head) {
mc_resp *resp = c->resp_head;
// r_f() handles the chain maintenance.
while (resp) {
// temporary by default. hide behind a debug flag in the future:
// double free detection. Transmit loops can drop out early, but
// here we could infinite loop.
if (resp->free) {
fprintf(stderr, "ERROR: double free detected during conn_release_items(): [%d] [%s]\n",
c->sfd, c->protocol == binary_prot ? "binary" : "ascii");
// Since this is a critical failure, just leak the memory.
// If these errors are seen, an abort() can be used instead.
c->resp_head = NULL;
c->resp = NULL;
break;
}
resp = resp_finish(c, resp);
}
}
}
static void conn_cleanup(conn *c) {
assert(c != NULL);
conn_release_items(c);
if (c->sasl_conn) {
assert(settings.sasl);
sasl_dispose(&c->sasl_conn);
c->sasl_conn = NULL;
}
if (IS_UDP(c->transport)) {
conn_set_state(c, conn_read);
}
}
/*
* Frees a connection.
*/
void conn_free(conn *c) {
if (c) {
assert(c != NULL);
assert(c->sfd >= 0 && c->sfd < max_fds);
MEMCACHED_CONN_DESTROY(c);
conns[c->sfd] = NULL;
if (c->rbuf)
free(c->rbuf);
#ifdef TLS
if (c->ssl_wbuf)
c->ssl_wbuf = NULL;
#endif
free(c);
}
}
static void conn_close(conn *c) {
assert(c != NULL);
/* delete the event, the socket and the conn */
event_del(&c->event);
if (settings.verbose > 1)
fprintf(stderr, "<%d connection closed.\n", c->sfd);
conn_cleanup(c);
// force release of read buffer.
if (c->thread) {
c->rbytes = 0;
rbuf_release(c);
}
MEMCACHED_CONN_RELEASE(c->sfd);
conn_set_state(c, conn_closed);
#ifdef TLS
if (c->ssl) {
SSL_shutdown(c->ssl);
SSL_free(c->ssl);
}
#endif
close(c->sfd);
pthread_mutex_lock(&conn_lock);
allow_new_conns = true;
pthread_mutex_unlock(&conn_lock);
STATS_LOCK();
stats_state.curr_conns--;
STATS_UNLOCK();
return;
}
// Since some connections might be off on side threads and some are managed as
// listeners we need to walk through them all from a central point.
// Must be called with all worker threads hung or in the process of closing.
void conn_close_all(void) {
int i;
for (i = 0; i < max_fds; i++) {
if (conns[i] && conns[i]->state != conn_closed) {
conn_close(conns[i]);
}
}
}
/**
* Convert a state name to a human readable form.
*/
static const char *state_text(enum conn_states state) {
const char* const statenames[] = { "conn_listening",
"conn_new_cmd",
"conn_waiting",
"conn_read",
"conn_parse_cmd",
"conn_write",
"conn_nread",
"conn_swallow",
"conn_closing",
"conn_mwrite",
"conn_closed",
"conn_watch" };
return statenames[state];
}
/*
* Sets a connection's current state in the state machine. Any special
* processing that needs to happen on certain state transitions can
* happen here.
*/
static void conn_set_state(conn *c, enum conn_states state) {
assert(c != NULL);
assert(state >= conn_listening && state < conn_max_state);
if (state != c->state) {
if (settings.verbose > 2) {
fprintf(stderr, "%d: going from %s to %s\n",
c->sfd, state_text(c->state),
state_text(state));
}
if (state == conn_write || state == conn_mwrite) {
MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->resp->wbuf, c->resp->wbytes);
}
c->state = state;
}
}
/*
* response object helper functions
*/
static void resp_reset(mc_resp *resp) {
if (resp->item) {
item_remove(resp->item);
resp->item = NULL;
}
if (resp->write_and_free) {
free(resp->write_and_free);
resp->write_and_free = NULL;
}
resp->wbytes = 0;
resp->tosend = 0;
resp->iovcnt = 0;
resp->chunked_data_iov = 0;
resp->chunked_total = 0;
resp->skip = false;
}
static void resp_add_iov(mc_resp *resp, const void *buf, int len) {
assert(resp->iovcnt < MC_RESP_IOVCOUNT);
int x = resp->iovcnt;
resp->iov[x].iov_base = (void *)buf;
resp->iov[x].iov_len = len;
resp->iovcnt++;
resp->tosend += len;
}
// Notes that an IOV should be handled as a chunked item header.
// TODO: I'm hoping this isn't a permanent abstraction while I learn what the
// API should be.
static void resp_add_chunked_iov(mc_resp *resp, const void *buf, int len) {
resp->chunked_data_iov = resp->iovcnt;
resp->chunked_total = len;
resp_add_iov(resp, buf, len);
}
static bool resp_start(conn *c) {
mc_resp *resp = do_cache_alloc(c->thread->resp_cache);
if (!resp) {
THR_STATS_LOCK(c);
c->thread->stats.response_obj_oom++;
THR_STATS_UNLOCK(c);
return false;
}
// FIXME: make wbuf indirect or use offsetof to zero up until wbuf
memset(resp, 0, sizeof(*resp));
if (!c->resp_head) {
c->resp_head = resp;
}
if (!c->resp) {
c->resp = resp;
} else {
c->resp->next = resp;
c->resp = resp;
}
if (IS_UDP(c->transport)) {
// need to hold on to some data for async responses.
c->resp->request_id = c->request_id;
c->resp->request_addr = c->request_addr;
c->resp->request_addr_size = c->request_addr_size;
}
return true;
}
// returns next response in chain.
static mc_resp* resp_finish(conn *c, mc_resp *resp) {
mc_resp *next = resp->next;
if (resp->item) {
// TODO: cache hash value in resp obj?
item_remove(resp->item);
resp->item = NULL;
}
if (resp->write_and_free) {
free(resp->write_and_free);
}
if (c->resp_head == resp) {
c->resp_head = next;
}
if (c->resp == resp) {
c->resp = NULL;
}
resp->free = true;
do_cache_free(c->thread->resp_cache, resp);
return next;
}
// tells if connection has a depth of response objects to process.
static bool resp_has_stack(conn *c) {
return c->resp_head->next != NULL ? true : false;
}
static void out_string(conn *c, const char *str) {
size_t len;
mc_resp *resp = c->resp;
assert(c != NULL);
// if response was original filled with something, but we're now writing
// out an error or similar, have to reset the object first.
// TODO: since this is often redundant with allocation, how many callers
// are actually requiring it be reset? Can we fast test by just looking at
// tosend and reset if nonzero?
resp_reset(resp);
if (c->noreply) {
// TODO: just invalidate the response since nothing's been attempted
// to send yet?
resp->skip = true;
if (settings.verbose > 1)
fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);
conn_set_state(c, conn_new_cmd);
return;
}
if (settings.verbose > 1)
fprintf(stderr, ">%d %s\n", c->sfd, str);
// Fill response object with static string.
len = strlen(str);
if ((len + 2) > WRITE_BUFFER_SIZE) {
/* ought to be always enough. just fail for simplicity */
str = "SERVER_ERROR output line too long";
len = strlen(str);
}
memcpy(resp->wbuf, str, len);
memcpy(resp->wbuf + len, "\r\n", 2);
resp_add_iov(resp, resp->wbuf, len + 2);
conn_set_state(c, conn_new_cmd);
return;
}
// For metaget-style ASCII commands. Ignores noreply, ensuring clients see
// protocol level errors.
static void out_errstring(conn *c, const char *str) {
c->noreply = false;
out_string(c, str);
}
/*
* Outputs a protocol-specific "out of memory" error. For ASCII clients,
* this is equivalent to out_string().
*/
static void out_of_memory(conn *c, char *ascii_error) {
const static char error_prefix[] = "SERVER_ERROR ";
const static int error_prefix_len = sizeof(error_prefix) - 1;
if (c->protocol == binary_prot) {
/* Strip off the generic error prefix; it's irrelevant in binary */
if (!strncmp(ascii_error, error_prefix, error_prefix_len)) {
ascii_error += error_prefix_len;
}
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, ascii_error, 0);
} else {
out_string(c, ascii_error);
}
}
/*
* we get here after reading the value in set/add/replace commands. The command
* has been stored in c->cmd, and the item is ready in c->item.
*/
static void complete_nread_ascii(conn *c) {
assert(c != NULL);
item *it = c->item;
int comm = c->cmd;
enum store_item_type ret;
bool is_valid = false;
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[ITEM_clsid(it)].set_cmds++;
pthread_mutex_unlock(&c->thread->stats.mutex);
if ((it->it_flags & ITEM_CHUNKED) == 0) {
if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) == 0) {
is_valid = true;
}
} else {
char buf[2];
/* should point to the final item chunk */
item_chunk *ch = (item_chunk *) c->ritem;
assert(ch->used != 0);
/* :( We need to look at the last two bytes. This could span two
* chunks.
*/
if (ch->used > 1) {
buf[0] = ch->data[ch->used - 2];
buf[1] = ch->data[ch->used - 1];
} else {
assert(ch->prev);
assert(ch->used == 1);
buf[0] = ch->prev->data[ch->prev->used - 1];
buf[1] = ch->data[ch->used - 1];
}
if (strncmp(buf, "\r\n", 2) == 0) {
is_valid = true;
} else {
assert(1 == 0);
}
}
if (!is_valid) {
// metaset mode always returns errors.
if (c->mset_res) {
c->noreply = false;
}
out_string(c, "CLIENT_ERROR bad data chunk");
} else {
ret = store_item(it, comm, c);
#ifdef ENABLE_DTRACE
uint64_t cas = ITEM_get_cas(it);
switch (c->cmd) {
case NREAD_ADD:
MEMCACHED_COMMAND_ADD(c->sfd, ITEM_key(it), it->nkey,
(ret == 1) ? it->nbytes : -1, cas);
break;
case NREAD_REPLACE:
MEMCACHED_COMMAND_REPLACE(c->sfd, ITEM_key(it), it->nkey,
(ret == 1) ? it->nbytes : -1, cas);
break;
case NREAD_APPEND:
MEMCACHED_COMMAND_APPEND(c->sfd, ITEM_key(it), it->nkey,
(ret == 1) ? it->nbytes : -1, cas);
break;
case NREAD_PREPEND:
MEMCACHED_COMMAND_PREPEND(c->sfd, ITEM_key(it), it->nkey,
(ret == 1) ? it->nbytes : -1, cas);
break;
case NREAD_SET:
MEMCACHED_COMMAND_SET(c->sfd, ITEM_key(it), it->nkey,
(ret == 1) ? it->nbytes : -1, cas);
break;
case NREAD_CAS:
MEMCACHED_COMMAND_CAS(c->sfd, ITEM_key(it), it->nkey, it->nbytes,
cas);
break;
}
#endif
if (c->mset_res) {
// Replace the status code in the response.
// Rest was prepared during mset parsing.
mc_resp *resp = c->resp;
conn_set_state(c, conn_new_cmd);
switch (ret) {
case STORED:
memcpy(resp->wbuf, "OK ", 3);
// Only place noreply is used for meta cmds is a nominal response.
if (c->noreply) {
resp->skip = true;
}
break;
case EXISTS:
memcpy(resp->wbuf, "EX ", 3);
break;
case NOT_FOUND:
memcpy(resp->wbuf, "NF ", 3);
break;
case NOT_STORED:
memcpy(resp->wbuf, "NS ", 3);
break;
default:
c->noreply = false;
out_string(c, "SERVER_ERROR Unhandled storage type.");
}
} else {
switch (ret) {
case STORED:
out_string(c, "STORED");
break;
case EXISTS:
out_string(c, "EXISTS");
break;
case NOT_FOUND:
out_string(c, "NOT_FOUND");
break;
case NOT_STORED:
out_string(c, "NOT_STORED");
break;
default:
out_string(c, "SERVER_ERROR Unhandled storage type.");
}
}
}
c->set_stale = false; /* force flag to be off just in case */
c->mset_res = false;
item_remove(c->item); /* release the c->item reference */
c->item = 0;
}
/**
* get a pointer to the key in this request
*/
static char* binary_get_key(conn *c) {
return c->rcurr - (c->binary_header.request.keylen);
}
static void add_bin_header(conn *c, uint16_t err, uint8_t hdr_len, uint16_t key_len, uint32_t body_len) {
protocol_binary_response_header* header;
mc_resp *resp = c->resp;
assert(c);
resp_reset(resp);
header = (protocol_binary_response_header *)resp->wbuf;
header->response.magic = (uint8_t)PROTOCOL_BINARY_RES;
header->response.opcode = c->binary_header.request.opcode;
header->response.keylen = (uint16_t)htons(key_len);
header->response.extlen = (uint8_t)hdr_len;
header->response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
header->response.status = (uint16_t)htons(err);
header->response.bodylen = htonl(body_len);
header->response.opaque = c->opaque;
header->response.cas = htonll(c->cas);
if (settings.verbose > 1) {
int ii;
fprintf(stderr, ">%d Writing bin response:", c->sfd);
for (ii = 0; ii < sizeof(header->bytes); ++ii) {
if (ii % 4 == 0) {
fprintf(stderr, "\n>%d ", c->sfd);
}
fprintf(stderr, " 0x%02x", header->bytes[ii]);
}
fprintf(stderr, "\n");
}
resp->wbytes = sizeof(header->response);
resp_add_iov(resp, resp->wbuf, resp->wbytes);
}
/**
* Writes a binary error response. If errstr is supplied, it is used as the
* error text; otherwise a generic description of the error status code is
* included.
*/
static void write_bin_error(conn *c, protocol_binary_response_status err,
const char *errstr, int swallow) {
size_t len;
if (!errstr) {
switch (err) {
case PROTOCOL_BINARY_RESPONSE_ENOMEM:
errstr = "Out of memory";
break;
case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
errstr = "Unknown command";
break;
case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
errstr = "Not found";
break;
case PROTOCOL_BINARY_RESPONSE_EINVAL:
errstr = "Invalid arguments";
break;
case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
errstr = "Data exists for key.";
break;
case PROTOCOL_BINARY_RESPONSE_E2BIG:
errstr = "Too large.";
break;
case PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL:
errstr = "Non-numeric server-side value for incr or decr";
break;
case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
errstr = "Not stored.";
break;
case PROTOCOL_BINARY_RESPONSE_AUTH_ERROR:
errstr = "Auth failure.";
break;
default:
assert(false);
errstr = "UNHANDLED ERROR";
fprintf(stderr, ">%d UNHANDLED ERROR: %d\n", c->sfd, err);
}
}
if (settings.verbose > 1) {
fprintf(stderr, ">%d Writing an error: %s\n", c->sfd, errstr);
}
len = strlen(errstr);
add_bin_header(c, err, 0, 0, len);
if (len > 0) {
resp_add_iov(c->resp, errstr, len);
}
if (swallow > 0) {
c->sbytes = swallow;
conn_set_state(c, conn_swallow);
} else {
conn_set_state(c, conn_mwrite);
}
}
/* Form and send a response to a command over the binary protocol */
static void write_bin_response(conn *c, void *d, int hlen, int keylen, int dlen) {
if (!c->noreply || c->cmd == PROTOCOL_BINARY_CMD_GET ||
c->cmd == PROTOCOL_BINARY_CMD_GETK) {
add_bin_header(c, 0, hlen, keylen, dlen);
mc_resp *resp = c->resp;
if (dlen > 0) {
resp_add_iov(resp, d, dlen);
}
}
conn_set_state(c, conn_new_cmd);
}
static void complete_incr_bin(conn *c, char *extbuf) {
item *it;
char *key;
size_t nkey;
/* Weird magic in add_delta forces me to pad here */
char tmpbuf[INCR_MAX_STORAGE_LEN];
uint64_t cas = 0;
protocol_binary_response_incr* rsp = (protocol_binary_response_incr*)c->resp->wbuf;
protocol_binary_request_incr* req = (void *)extbuf;
assert(c != NULL);
//assert(c->wsize >= sizeof(*rsp));
/* fix byteorder in the request */
req->message.body.delta = ntohll(req->message.body.delta);
req->message.body.initial = ntohll(req->message.body.initial);
req->message.body.expiration = ntohl(req->message.body.expiration);
key = binary_get_key(c);
nkey = c->binary_header.request.keylen;
if (settings.verbose > 1) {
int i;
fprintf(stderr, "incr ");
for (i = 0; i < nkey; i++) {
fprintf(stderr, "%c", key[i]);
}
fprintf(stderr, " %lld, %llu, %d\n",
(long long)req->message.body.delta,
(long long)req->message.body.initial,
req->message.body.expiration);
}
if (c->binary_header.request.cas != 0) {
cas = c->binary_header.request.cas;
}
switch(add_delta(c, key, nkey, c->cmd == PROTOCOL_BINARY_CMD_INCREMENT,
req->message.body.delta, tmpbuf,
&cas)) {
case OK:
rsp->message.body.value = htonll(strtoull(tmpbuf, NULL, 10));
if (cas) {
c->cas = cas;
}
write_bin_response(c, &rsp->message.body, 0, 0,
sizeof(rsp->message.body.value));
break;
case NON_NUMERIC:
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL, NULL, 0);
break;
case EOM:
out_of_memory(c, "SERVER_ERROR Out of memory incrementing value");
break;
case DELTA_ITEM_NOT_FOUND:
if (req->message.body.expiration != 0xffffffff) {
/* Save some room for the response */
rsp->message.body.value = htonll(req->message.body.initial);
snprintf(tmpbuf, INCR_MAX_STORAGE_LEN, "%llu",
(unsigned long long)req->message.body.initial);
int res = strlen(tmpbuf);
it = item_alloc(key, nkey, 0, realtime(req->message.body.expiration),
res + 2);
if (it != NULL) {
memcpy(ITEM_data(it), tmpbuf, res);
memcpy(ITEM_data(it) + res, "\r\n", 2);
if (store_item(it, NREAD_ADD, c)) {
c->cas = ITEM_get_cas(it);
write_bin_response(c, &rsp->message.body, 0, 0, sizeof(rsp->message.body.value));
} else {
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_NOT_STORED,
NULL, 0);
}
item_remove(it); /* release our reference */
} else {
out_of_memory(c,
"SERVER_ERROR Out of memory allocating new item");
}
} else {
pthread_mutex_lock(&c->thread->stats.mutex);
if (c->cmd == PROTOCOL_BINARY_CMD_INCREMENT) {
c->thread->stats.incr_misses++;
} else {
c->thread->stats.decr_misses++;
}
pthread_mutex_unlock(&c->thread->stats.mutex);
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, NULL, 0);
}
break;
case DELTA_ITEM_CAS_MISMATCH:
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, NULL, 0);
break;
}
}
static void complete_update_bin(conn *c) {
protocol_binary_response_status eno = PROTOCOL_BINARY_RESPONSE_EINVAL;
enum store_item_type ret = NOT_STORED;
assert(c != NULL);
item *it = c->item;
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[ITEM_clsid(it)].set_cmds++;
pthread_mutex_unlock(&c->thread->stats.mutex);
/* We don't actually receive the trailing two characters in the bin
* protocol, so we're going to just set them here */
if ((it->it_flags & ITEM_CHUNKED) == 0) {
*(ITEM_data(it) + it->nbytes - 2) = '\r';
*(ITEM_data(it) + it->nbytes - 1) = '\n';
} else {
assert(c->ritem);
item_chunk *ch = (item_chunk *) c->ritem;
if (ch->size == ch->used)
ch = ch->next;
assert(ch->size - ch->used >= 2);
ch->data[ch->used] = '\r';
ch->data[ch->used + 1] = '\n';
ch->used += 2;
}
ret = store_item(it, c->cmd, c);
#ifdef ENABLE_DTRACE
uint64_t cas = ITEM_get_cas(it);
switch (c->cmd) {
case NREAD_ADD:
MEMCACHED_COMMAND_ADD(c->sfd, ITEM_key(it), it->nkey,
(ret == STORED) ? it->nbytes : -1, cas);
break;
case NREAD_REPLACE:
MEMCACHED_COMMAND_REPLACE(c->sfd, ITEM_key(it), it->nkey,
(ret == STORED) ? it->nbytes : -1, cas);
break;
case NREAD_APPEND:
MEMCACHED_COMMAND_APPEND(c->sfd, ITEM_key(it), it->nkey,
(ret == STORED) ? it->nbytes : -1, cas);
break;
case NREAD_PREPEND:
MEMCACHED_COMMAND_PREPEND(c->sfd, ITEM_key(it), it->nkey,
(ret == STORED) ? it->nbytes : -1, cas);
break;
case NREAD_SET:
MEMCACHED_COMMAND_SET(c->sfd, ITEM_key(it), it->nkey,
(ret == STORED) ? it->nbytes : -1, cas);
break;
}
#endif
switch (ret) {
case STORED:
/* Stored */
write_bin_response(c, NULL, 0, 0, 0);
break;
case EXISTS:
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, NULL, 0);
break;
case NOT_FOUND:
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, NULL, 0);
break;
case NOT_STORED:
case TOO_LARGE:
case NO_MEMORY:
if (c->cmd == NREAD_ADD) {
eno = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
} else if(c->cmd == NREAD_REPLACE) {
eno = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
} else {
eno = PROTOCOL_BINARY_RESPONSE_NOT_STORED;
}
write_bin_error(c, eno, NULL, 0);
}
item_remove(c->item); /* release the c->item reference */
c->item = 0;
}
static void write_bin_miss_response(conn *c, char *key, size_t nkey) {
if (nkey) {
add_bin_header(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT,
0, nkey, nkey);
char *ofs = c->resp->wbuf + sizeof(protocol_binary_response_header);
memcpy(ofs, key, nkey);
resp_add_iov(c->resp, ofs, nkey);
conn_set_state(c, conn_new_cmd);
} else {
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT,
NULL, 0);
}
}
static void process_bin_get_or_touch(conn *c, char *extbuf) {
item *it;
protocol_binary_response_get* rsp = (protocol_binary_response_get*)c->resp->wbuf;
char* key = binary_get_key(c);
size_t nkey = c->binary_header.request.keylen;
int should_touch = (c->cmd == PROTOCOL_BINARY_CMD_TOUCH ||
c->cmd == PROTOCOL_BINARY_CMD_GAT ||
c->cmd == PROTOCOL_BINARY_CMD_GATK);
int should_return_key = (c->cmd == PROTOCOL_BINARY_CMD_GETK ||
c->cmd == PROTOCOL_BINARY_CMD_GATK);
int should_return_value = (c->cmd != PROTOCOL_BINARY_CMD_TOUCH);
bool failed = false;
if (settings.verbose > 1) {
fprintf(stderr, "<%d %s ", c->sfd, should_touch ? "TOUCH" : "GET");
if (fwrite(key, 1, nkey, stderr)) {}
fputc('\n', stderr);
}
if (should_touch) {
protocol_binary_request_touch *t = (void *)extbuf;
time_t exptime = ntohl(t->message.body.expiration);
it = item_touch(key, nkey, realtime(exptime), c);
} else {
it = item_get(key, nkey, c, DO_UPDATE);
}
if (it) {
/* the length has two unnecessary bytes ("\r\n") */
uint16_t keylen = 0;
uint32_t bodylen = sizeof(rsp->message.body) + (it->nbytes - 2);
pthread_mutex_lock(&c->thread->stats.mutex);
if (should_touch) {
c->thread->stats.touch_cmds++;
c->thread->stats.slab_stats[ITEM_clsid(it)].touch_hits++;
} else {
c->thread->stats.get_cmds++;
c->thread->stats.lru_hits[it->slabs_clsid]++;
}
pthread_mutex_unlock(&c->thread->stats.mutex);
if (should_touch) {
MEMCACHED_COMMAND_TOUCH(c->sfd, ITEM_key(it), it->nkey,
it->nbytes, ITEM_get_cas(it));
} else {
MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
it->nbytes, ITEM_get_cas(it));
}
if (c->cmd == PROTOCOL_BINARY_CMD_TOUCH) {
bodylen -= it->nbytes - 2;
} else if (should_return_key) {
bodylen += nkey;
keylen = nkey;
}
add_bin_header(c, 0, sizeof(rsp->message.body), keylen, bodylen);
rsp->message.header.response.cas = htonll(ITEM_get_cas(it));
// add the flags
FLAGS_CONV(it, rsp->message.body.flags);
rsp->message.body.flags = htonl(rsp->message.body.flags);
resp_add_iov(c->resp, &rsp->message.body, sizeof(rsp->message.body));
if (should_return_key) {
resp_add_iov(c->resp, ITEM_key(it), nkey);
}
if (should_return_value) {
/* Add the data minus the CRLF */
#ifdef EXTSTORE
if (it->it_flags & ITEM_HDR) {
if (_get_extstore(c, it, c->resp) != 0) {
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.get_oom_extstore++;
pthread_mutex_unlock(&c->thread->stats.mutex);
failed = true;
}
} else if ((it->it_flags & ITEM_CHUNKED) == 0) {
resp_add_iov(c->resp, ITEM_data(it), it->nbytes - 2);
} else {
// Allow transmit handler to find the item and expand iov's
resp_add_chunked_iov(c->resp, it, it->nbytes - 2);
}
#else
if ((it->it_flags & ITEM_CHUNKED) == 0) {
resp_add_iov(c->resp, ITEM_data(it), it->nbytes - 2);
} else {
resp_add_chunked_iov(c->resp, it, it->nbytes - 2);
}
#endif
}
if (!failed) {
conn_set_state(c, conn_new_cmd);
/* Remember this command so we can garbage collect it later */
#ifdef EXTSTORE
if ((it->it_flags & ITEM_HDR) != 0 && should_return_value) {
// Only have extstore clean if header and returning value.
c->resp->item = NULL;
} else {
c->resp->item = it;
}
#else
c->resp->item = it;
#endif
} else {
item_remove(it);
}
} else {
failed = true;
}
if (failed) {
pthread_mutex_lock(&c->thread->stats.mutex);
if (should_touch) {
c->thread->stats.touch_cmds++;
c->thread->stats.touch_misses++;
} else {
c->thread->stats.get_cmds++;
c->thread->stats.get_misses++;
}
pthread_mutex_unlock(&c->thread->stats.mutex);
if (should_touch) {
MEMCACHED_COMMAND_TOUCH(c->sfd, key, nkey, -1, 0);
} else {
MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
}
if (c->noreply) {
conn_set_state(c, conn_new_cmd);
} else {
if (should_return_key) {
write_bin_miss_response(c, key, nkey);
} else {
write_bin_miss_response(c, NULL, 0);
}
}
}
if (settings.detail_enabled) {
stats_prefix_record_get(key, nkey, NULL != it);
}
}
static void append_bin_stats(const char *key, const uint16_t klen,
const char *val, const uint32_t vlen,
conn *c) {
char *buf = c->stats.buffer + c->stats.offset;
uint32_t bodylen = klen + vlen;
protocol_binary_response_header header = {
.response.magic = (uint8_t)PROTOCOL_BINARY_RES,
.response.opcode = PROTOCOL_BINARY_CMD_STAT,
.response.keylen = (uint16_t)htons(klen),
.response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES,
.response.bodylen = htonl(bodylen),
.response.opaque = c->opaque
};
memcpy(buf, header.bytes, sizeof(header.response));
buf += sizeof(header.response);
if (klen > 0) {
memcpy(buf, key, klen);
buf += klen;
if (vlen > 0) {
memcpy(buf, val, vlen);
}
}
c->stats.offset += sizeof(header.response) + bodylen;
}
static void append_ascii_stats(const char *key, const uint16_t klen,
const char *val, const uint32_t vlen,
conn *c) {
char *pos = c->stats.buffer + c->stats.offset;
uint32_t nbytes = 0;
int remaining = c->stats.size - c->stats.offset;
int room = remaining - 1;
if (klen == 0 && vlen == 0) {
nbytes = snprintf(pos, room, "END\r\n");
} else if (vlen == 0) {
nbytes = snprintf(pos, room, "STAT %s\r\n", key);
} else {
nbytes = snprintf(pos, room, "STAT %s %s\r\n", key, val);
}
c->stats.offset += nbytes;
}
static bool grow_stats_buf(conn *c, size_t needed) {
size_t nsize = c->stats.size;
size_t available = nsize - c->stats.offset;
bool rv = true;
/* Special case: No buffer -- need to allocate fresh */
if (c->stats.buffer == NULL) {
nsize = 1024;
available = c->stats.size = c->stats.offset = 0;
}
while (needed > available) {
assert(nsize > 0);
nsize = nsize << 1;
available = nsize - c->stats.offset;
}
if (nsize != c->stats.size) {
char *ptr = realloc(c->stats.buffer, nsize);
if (ptr) {
c->stats.buffer = ptr;
c->stats.size = nsize;
} else {
STATS_LOCK();
stats.malloc_fails++;
STATS_UNLOCK();
rv = false;
}
}
return rv;
}
static void append_stats(const char *key, const uint16_t klen,
const char *val, const uint32_t vlen,
const void *cookie)
{
/* value without a key is invalid */
if (klen == 0 && vlen > 0) {
return;
}
conn *c = (conn*)cookie;
if (c->protocol == binary_prot) {
size_t needed = vlen + klen + sizeof(protocol_binary_response_header);
if (!grow_stats_buf(c, needed)) {
return;
}
append_bin_stats(key, klen, val, vlen, c);
} else {
size_t needed = vlen + klen + 10; // 10 == "STAT = \r\n"
if (!grow_stats_buf(c, needed)) {
return;
}
append_ascii_stats(key, klen, val, vlen, c);
}
assert(c->stats.offset <= c->stats.size);
}
static void process_bin_stat(conn *c) {
char *subcommand = binary_get_key(c);
size_t nkey = c->binary_header.request.keylen;
if (settings.verbose > 1) {
int ii;
fprintf(stderr, "<%d STATS ", c->sfd);
for (ii = 0; ii < nkey; ++ii) {
fprintf(stderr, "%c", subcommand[ii]);
}
fprintf(stderr, "\n");
}
if (nkey == 0) {
/* request all statistics */
server_stats(&append_stats, c);
(void)get_stats(NULL, 0, &append_stats, c);
} else if (strncmp(subcommand, "reset", 5) == 0) {
stats_reset();
} else if (strncmp(subcommand, "settings", 8) == 0) {
process_stat_settings(&append_stats, c);
} else if (strncmp(subcommand, "detail", 6) == 0) {
char *subcmd_pos = subcommand + 6;
if (strncmp(subcmd_pos, " dump", 5) == 0) {
int len;
char *dump_buf = stats_prefix_dump(&len);
if (dump_buf == NULL || len <= 0) {
out_of_memory(c, "SERVER_ERROR Out of memory generating stats");
if (dump_buf != NULL)
free(dump_buf);
return;
} else {
append_stats("detailed", strlen("detailed"), dump_buf, len, c);
free(dump_buf);
}
} else if (strncmp(subcmd_pos, " on", 3) == 0) {
settings.detail_enabled = 1;
} else if (strncmp(subcmd_pos, " off", 4) == 0) {
settings.detail_enabled = 0;
} else {
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, NULL, 0);
return;
}
} else {
if (get_stats(subcommand, nkey, &append_stats, c)) {
if (c->stats.buffer == NULL) {
out_of_memory(c, "SERVER_ERROR Out of memory generating stats");
} else {
write_and_free(c, c->stats.buffer, c->stats.offset);
c->stats.buffer = NULL;
}
} else {
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, NULL, 0);
}
return;
}
/* Append termination package and start the transfer */
append_stats(NULL, 0, NULL, 0, c);
if (c->stats.buffer == NULL) {
out_of_memory(c, "SERVER_ERROR Out of memory preparing to send stats");
} else {
write_and_free(c, c->stats.buffer, c->stats.offset);
c->stats.buffer = NULL;
}
}
/* Just write an error message and disconnect the client */
static void handle_binary_protocol_error(conn *c) {
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_EINVAL, NULL, 0);
if (settings.verbose) {
fprintf(stderr, "Protocol error (opcode %02x), close connection %d\n",
c->binary_header.request.opcode, c->sfd);
}
c->close_after_write = true;
}
static void init_sasl_conn(conn *c) {
assert(c);
/* should something else be returned? */
if (!settings.sasl)
return;
c->authenticated = false;
if (!c->sasl_conn) {
int result=sasl_server_new("memcached",
NULL,
my_sasl_hostname[0] ? my_sasl_hostname : NULL,
NULL, NULL,
NULL, 0, &c->sasl_conn);
if (result != SASL_OK) {
if (settings.verbose) {
fprintf(stderr, "Failed to initialize SASL conn.\n");
}
c->sasl_conn = NULL;
}
}
}
static void bin_list_sasl_mechs(conn *c) {
// Guard against a disabled SASL.
if (!settings.sasl) {
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, NULL,
c->binary_header.request.bodylen
- c->binary_header.request.keylen);
return;
}
init_sasl_conn(c);
const char *result_string = NULL;
unsigned int string_length = 0;
int result=sasl_listmech(c->sasl_conn, NULL,
"", /* What to prepend the string with */
" ", /* What to separate mechanisms with */
"", /* What to append to the string */
&result_string, &string_length,
NULL);
if (result != SASL_OK) {
/* Perhaps there's a better error for this... */
if (settings.verbose) {
fprintf(stderr, "Failed to list SASL mechanisms.\n");
}
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, NULL, 0);
return;
}
write_bin_response(c, (char*)result_string, 0, 0, string_length);
}
static void process_bin_sasl_auth(conn *c) {
// Guard for handling disabled SASL on the server.
if (!settings.sasl) {
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, NULL,
c->binary_header.request.bodylen
- c->binary_header.request.keylen);
return;
}
assert(c->binary_header.request.extlen == 0);
int nkey = c->binary_header.request.keylen;
int vlen = c->binary_header.request.bodylen - nkey;
if (nkey > MAX_SASL_MECH_LEN) {
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_EINVAL, NULL, vlen);
conn_set_state(c, conn_swallow);
return;
}
char *key = binary_get_key(c);
assert(key);
item *it = item_alloc(key, nkey, 0, 0, vlen+2);
/* Can't use a chunked item for SASL authentication. */
if (it == 0 || (it->it_flags & ITEM_CHUNKED)) {
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, NULL, vlen);
conn_set_state(c, conn_swallow);
if (it) {
do_item_remove(it);
}
return;
}
c->item = it;
c->ritem = ITEM_data(it);
c->rlbytes = vlen;
conn_set_state(c, conn_nread);
c->substate = bin_reading_sasl_auth_data;
}
static void process_bin_complete_sasl_auth(conn *c) {
assert(settings.sasl);
const char *out = NULL;
unsigned int outlen = 0;
assert(c->item);
init_sasl_conn(c);
int nkey = c->binary_header.request.keylen;
int vlen = c->binary_header.request.bodylen - nkey;
if (nkey > ((item*) c->item)->nkey) {
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_EINVAL, NULL, vlen);
conn_set_state(c, conn_swallow);
return;
}
char mech[nkey+1];
memcpy(mech, ITEM_key((item*)c->item), nkey);
mech[nkey] = 0x00;
if (settings.verbose)
fprintf(stderr, "mech: ``%s'' with %d bytes of data\n", mech, vlen);
const char *challenge = vlen == 0 ? NULL : ITEM_data((item*) c->item);
if (vlen > ((item*) c->item)->nbytes) {
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_EINVAL, NULL, vlen);
conn_set_state(c, conn_swallow);
return;
}
int result=-1;
switch (c->cmd) {
case PROTOCOL_BINARY_CMD_SASL_AUTH:
result = sasl_server_start(c->sasl_conn, mech,
challenge, vlen,
&out, &outlen);
c->sasl_started = (result == SASL_OK || result == SASL_CONTINUE);
break;
case PROTOCOL_BINARY_CMD_SASL_STEP:
if (!c->sasl_started) {
if (settings.verbose) {
fprintf(stderr, "%d: SASL_STEP called but sasl_server_start "
"not called for this connection!\n", c->sfd);
}
break;
}
result = sasl_server_step(c->sasl_conn,
challenge, vlen,
&out, &outlen);
break;
default:
assert(false); /* CMD should be one of the above */
/* This code is pretty much impossible, but makes the compiler
happier */
if (settings.verbose) {
fprintf(stderr, "Unhandled command %d with challenge %s\n",
c->cmd, challenge);
}
break;
}
if (settings.verbose) {
fprintf(stderr, "sasl result code: %d\n", result);
}
switch(result) {
case SASL_OK:
c->authenticated = true;
write_bin_response(c, "Authenticated", 0, 0, strlen("Authenticated"));
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.auth_cmds++;
pthread_mutex_unlock(&c->thread->stats.mutex);
break;
case SASL_CONTINUE:
add_bin_header(c, PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE, 0, 0, outlen);
if (outlen > 0) {
resp_add_iov(c->resp, out, outlen);
}
// Immediately flush our write.
conn_set_state(c, conn_mwrite);
break;
default:
if (settings.verbose)
fprintf(stderr, "Unknown sasl response: %d\n", result);
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, NULL, 0);
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.auth_cmds++;
c->thread->stats.auth_errors++;
pthread_mutex_unlock(&c->thread->stats.mutex);
}
}
static bool authenticated(conn *c) {
assert(settings.sasl);
bool rv = false;
switch (c->cmd) {
case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS: /* FALLTHROUGH */
case PROTOCOL_BINARY_CMD_SASL_AUTH: /* FALLTHROUGH */
case PROTOCOL_BINARY_CMD_SASL_STEP: /* FALLTHROUGH */
case PROTOCOL_BINARY_CMD_VERSION: /* FALLTHROUGH */
rv = true;
break;
default:
rv = c->authenticated;
}
if (settings.verbose > 1) {
fprintf(stderr, "authenticated() in cmd 0x%02x is %s\n",
c->cmd, rv ? "true" : "false");
}
return rv;
}
static void dispatch_bin_command(conn *c, char *extbuf) {
int protocol_error = 0;
uint8_t extlen = c->binary_header.request.extlen;
uint16_t keylen = c->binary_header.request.keylen;
uint32_t bodylen = c->binary_header.request.bodylen;
if (keylen > bodylen || keylen + extlen > bodylen) {
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, NULL, 0);
c->close_after_write = true;
return;
}
if (settings.sasl && !authenticated(c)) {
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, NULL, 0);
c->close_after_write = true;
return;
}
MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);
c->noreply = true;
/* binprot supports 16bit keys, but internals are still 8bit */
if (keylen > KEY_MAX_LENGTH) {
handle_binary_protocol_error(c);
return;
}
switch (c->cmd) {
case PROTOCOL_BINARY_CMD_SETQ:
c->cmd = PROTOCOL_BINARY_CMD_SET;
break;
case PROTOCOL_BINARY_CMD_ADDQ:
c->cmd = PROTOCOL_BINARY_CMD_ADD;
break;
case PROTOCOL_BINARY_CMD_REPLACEQ:
c->cmd = PROTOCOL_BINARY_CMD_REPLACE;
break;
case PROTOCOL_BINARY_CMD_DELETEQ:
c->cmd = PROTOCOL_BINARY_CMD_DELETE;
break;
case PROTOCOL_BINARY_CMD_INCREMENTQ:
c->cmd = PROTOCOL_BINARY_CMD_INCREMENT;
break;
case PROTOCOL_BINARY_CMD_DECREMENTQ:
c->cmd = PROTOCOL_BINARY_CMD_DECREMENT;
break;
case PROTOCOL_BINARY_CMD_QUITQ:
c->cmd = PROTOCOL_BINARY_CMD_QUIT;
break;
case PROTOCOL_BINARY_CMD_FLUSHQ:
c->cmd = PROTOCOL_BINARY_CMD_FLUSH;
break;
case PROTOCOL_BINARY_CMD_APPENDQ:
c->cmd = PROTOCOL_BINARY_CMD_APPEND;
break;
case PROTOCOL_BINARY_CMD_PREPENDQ:
c->cmd = PROTOCOL_BINARY_CMD_PREPEND;
break;
case PROTOCOL_BINARY_CMD_GETQ:
c->cmd = PROTOCOL_BINARY_CMD_GET;
break;
case PROTOCOL_BINARY_CMD_GETKQ:
c->cmd = PROTOCOL_BINARY_CMD_GETK;
break;
case PROTOCOL_BINARY_CMD_GATQ:
c->cmd = PROTOCOL_BINARY_CMD_GAT;
break;
case PROTOCOL_BINARY_CMD_GATKQ:
c->cmd = PROTOCOL_BINARY_CMD_GATK;
break;
default:
c->noreply = false;
}
switch (c->cmd) {
case PROTOCOL_BINARY_CMD_VERSION:
if (extlen == 0 && keylen == 0 && bodylen == 0) {
write_bin_response(c, VERSION, 0, 0, strlen(VERSION));
} else {
protocol_error = 1;
}
break;
case PROTOCOL_BINARY_CMD_FLUSH:
if (keylen == 0 && bodylen == extlen && (extlen == 0 || extlen == 4)) {
process_bin_flush(c, extbuf);
} else {
protocol_error = 1;
}
break;
case PROTOCOL_BINARY_CMD_NOOP:
if (extlen == 0 && keylen == 0 && bodylen == 0) {
write_bin_response(c, NULL, 0, 0, 0);
// NOOP forces pipeline flush.
conn_set_state(c, conn_mwrite);
} else {
protocol_error = 1;
}
break;
case PROTOCOL_BINARY_CMD_SET: /* FALLTHROUGH */
case PROTOCOL_BINARY_CMD_ADD: /* FALLTHROUGH */
case PROTOCOL_BINARY_CMD_REPLACE:
if (extlen == 8 && keylen != 0 && bodylen >= (keylen + 8)) {
process_bin_update(c, extbuf);
} else {
protocol_error = 1;
}
break;
case PROTOCOL_BINARY_CMD_GETQ: /* FALLTHROUGH */
case PROTOCOL_BINARY_CMD_GET: /* FALLTHROUGH */
case PROTOCOL_BINARY_CMD_GETKQ: /* FALLTHROUGH */
case PROTOCOL_BINARY_CMD_GETK:
if (extlen == 0 && bodylen == keylen && keylen > 0) {
process_bin_get_or_touch(c, extbuf);
} else {
protocol_error = 1;
}
break;
case PROTOCOL_BINARY_CMD_DELETE:
if (keylen > 0 && extlen == 0 && bodylen == keylen) {
process_bin_delete(c);
} else {
protocol_error = 1;
}
break;
case PROTOCOL_BINARY_CMD_INCREMENT:
case PROTOCOL_BINARY_CMD_DECREMENT:
if (keylen > 0 && extlen == 20 && bodylen == (keylen + extlen)) {
complete_incr_bin(c, extbuf);
} else {
protocol_error = 1;
}
break;
case PROTOCOL_BINARY_CMD_APPEND:
case PROTOCOL_BINARY_CMD_PREPEND:
if (keylen > 0 && extlen == 0) {
process_bin_append_prepend(c);
} else {
protocol_error = 1;
}
break;
case PROTOCOL_BINARY_CMD_STAT:
if (extlen == 0) {
process_bin_stat(c);
} else {
protocol_error = 1;
}
break;
case PROTOCOL_BINARY_CMD_QUIT:
if (keylen == 0 && extlen == 0 && bodylen == 0) {
write_bin_response(c, NULL, 0, 0, 0);
conn_set_state(c, conn_mwrite);
c->close_after_write = true;
} else {
protocol_error = 1;
}
break;
case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS:
if (extlen == 0 && keylen == 0 && bodylen == 0) {
bin_list_sasl_mechs(c);
} else {
protocol_error = 1;
}
break;
case PROTOCOL_BINARY_CMD_SASL_AUTH:
case PROTOCOL_BINARY_CMD_SASL_STEP:
if (extlen == 0 && keylen != 0) {
process_bin_sasl_auth(c);
} else {
protocol_error = 1;
}
break;
case PROTOCOL_BINARY_CMD_TOUCH:
case PROTOCOL_BINARY_CMD_GAT:
case PROTOCOL_BINARY_CMD_GATQ:
case PROTOCOL_BINARY_CMD_GATK:
case PROTOCOL_BINARY_CMD_GATKQ:
if (extlen == 4 && keylen != 0) {
process_bin_get_or_touch(c, extbuf);
} else {
protocol_error = 1;
}
break;
default:
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, NULL,
bodylen);
}
if (protocol_error)
handle_binary_protocol_error(c);
}
static void process_bin_update(conn *c, char *extbuf) {
char *key;
int nkey;
int vlen;
item *it;
protocol_binary_request_set* req = (void *)extbuf;
assert(c != NULL);
key = binary_get_key(c);
nkey = c->binary_header.request.keylen;
/* fix byteorder in the request */
req->message.body.flags = ntohl(req->message.body.flags);
req->message.body.expiration = ntohl(req->message.body.expiration);
vlen = c->binary_header.request.bodylen - (nkey + c->binary_header.request.extlen);
if (settings.verbose > 1) {
int ii;
if (c->cmd == PROTOCOL_BINARY_CMD_ADD) {
fprintf(stderr, "<%d ADD ", c->sfd);
} else if (c->cmd == PROTOCOL_BINARY_CMD_SET) {
fprintf(stderr, "<%d SET ", c->sfd);
} else {
fprintf(stderr, "<%d REPLACE ", c->sfd);
}
for (ii = 0; ii < nkey; ++ii) {
fprintf(stderr, "%c", key[ii]);
}
fprintf(stderr, " Value len is %d", vlen);
fprintf(stderr, "\n");
}
if (settings.detail_enabled) {
stats_prefix_record_set(key, nkey);
}
it = item_alloc(key, nkey, req->message.body.flags,
realtime(req->message.body.expiration), vlen+2);
if (it == 0) {
enum store_item_type status;
if (! item_size_ok(nkey, req->message.body.flags, vlen + 2)) {
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_E2BIG, NULL, vlen);
status = TOO_LARGE;
} else {
out_of_memory(c, "SERVER_ERROR Out of memory allocating item");
/* This error generating method eats the swallow value. Add here. */
c->sbytes = vlen;
status = NO_MEMORY;
}
/* FIXME: losing c->cmd since it's translated below. refactor? */
LOGGER_LOG(c->thread->l, LOG_MUTATIONS, LOGGER_ITEM_STORE,
NULL, status, 0, key, nkey, req->message.body.expiration,
ITEM_clsid(it), c->sfd);
/* Avoid stale data persisting in cache because we failed alloc.
* Unacceptable for SET. Anywhere else too? */
if (c->cmd == PROTOCOL_BINARY_CMD_SET) {
it = item_get(key, nkey, c, DONT_UPDATE);
if (it) {
item_unlink(it);
STORAGE_delete(c->thread->storage, it);
item_remove(it);
}
}
/* swallow the data line */
conn_set_state(c, conn_swallow);
return;
}
ITEM_set_cas(it, c->binary_header.request.cas);
switch (c->cmd) {
case PROTOCOL_BINARY_CMD_ADD:
c->cmd = NREAD_ADD;
break;
case PROTOCOL_BINARY_CMD_SET:
c->cmd = NREAD_SET;
break;
case PROTOCOL_BINARY_CMD_REPLACE:
c->cmd = NREAD_REPLACE;
break;
default:
assert(0);
}
if (ITEM_get_cas(it) != 0) {
c->cmd = NREAD_CAS;
}
c->item = it;
#ifdef NEED_ALIGN
if (it->it_flags & ITEM_CHUNKED) {
c->ritem = ITEM_schunk(it);
} else {
c->ritem = ITEM_data(it);
}
#else
c->ritem = ITEM_data(it);
#endif
c->rlbytes = vlen;
conn_set_state(c, conn_nread);
c->substate = bin_read_set_value;
}
static void process_bin_append_prepend(conn *c) {
char *key;
int nkey;
int vlen;
item *it;
assert(c != NULL);
key = binary_get_key(c);
nkey = c->binary_header.request.keylen;
vlen = c->binary_header.request.bodylen - nkey;
if (settings.verbose > 1) {
fprintf(stderr, "Value len is %d\n", vlen);
}
if (settings.detail_enabled) {
stats_prefix_record_set(key, nkey);
}
it = item_alloc(key, nkey, 0, 0, vlen+2);
if (it == 0) {
if (! item_size_ok(nkey, 0, vlen + 2)) {
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_E2BIG, NULL, vlen);
} else {
out_of_memory(c, "SERVER_ERROR Out of memory allocating item");
/* OOM calls eat the swallow value. Add here. */
c->sbytes = vlen;
}
/* swallow the data line */
conn_set_state(c, conn_swallow);
return;
}
ITEM_set_cas(it, c->binary_header.request.cas);
switch (c->cmd) {
case PROTOCOL_BINARY_CMD_APPEND:
c->cmd = NREAD_APPEND;
break;
case PROTOCOL_BINARY_CMD_PREPEND:
c->cmd = NREAD_PREPEND;
break;
default:
assert(0);
}
c->item = it;
#ifdef NEED_ALIGN
if (it->it_flags & ITEM_CHUNKED) {
c->ritem = ITEM_schunk(it);
} else {
c->ritem = ITEM_data(it);
}
#else
c->ritem = ITEM_data(it);
#endif
c->rlbytes = vlen;
conn_set_state(c, conn_nread);
c->substate = bin_read_set_value;
}
static void process_bin_flush(conn *c, char *extbuf) {
time_t exptime = 0;
protocol_binary_request_flush* req = (void *)extbuf;
rel_time_t new_oldest = 0;
if (!settings.flush_enabled) {
// flush_all is not allowed but we log it on stats
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, NULL, 0);
return;
}
if (c->binary_header.request.extlen == sizeof(req->message.body)) {
exptime = ntohl(req->message.body.expiration);
}
if (exptime > 0) {
new_oldest = realtime(exptime);
} else {
new_oldest = current_time;
}
if (settings.use_cas) {
settings.oldest_live = new_oldest - 1;
if (settings.oldest_live <= current_time)
settings.oldest_cas = get_cas_id();
} else {
settings.oldest_live = new_oldest;
}
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.flush_cmds++;
pthread_mutex_unlock(&c->thread->stats.mutex);
write_bin_response(c, NULL, 0, 0, 0);
}
static void process_bin_delete(conn *c) {
item *it;
uint32_t hv;
char* key = binary_get_key(c);
size_t nkey = c->binary_header.request.keylen;
assert(c != NULL);
if (settings.verbose > 1) {
int ii;
fprintf(stderr, "Deleting ");
for (ii = 0; ii < nkey; ++ii) {
fprintf(stderr, "%c", key[ii]);
}
fprintf(stderr, "\n");
}
if (settings.detail_enabled) {
stats_prefix_record_delete(key, nkey);
}
it = item_get_locked(key, nkey, c, DONT_UPDATE, &hv);
if (it) {
uint64_t cas = c->binary_header.request.cas;
if (cas == 0 || cas == ITEM_get_cas(it)) {
MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[ITEM_clsid(it)].delete_hits++;
pthread_mutex_unlock(&c->thread->stats.mutex);
do_item_unlink(it, hv);
STORAGE_delete(c->thread->storage, it);
write_bin_response(c, NULL, 0, 0, 0);
} else {
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, NULL, 0);
}
do_item_remove(it); /* release our reference */
} else {
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, NULL, 0);
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.delete_misses++;
pthread_mutex_unlock(&c->thread->stats.mutex);
}
item_unlock(hv);
}
static void complete_nread_binary(conn *c) {
assert(c != NULL);
assert(c->cmd >= 0);
switch(c->substate) {
case bin_read_set_value:
complete_update_bin(c);
break;
case bin_reading_sasl_auth_data:
process_bin_complete_sasl_auth(c);
if (c->item) {
do_item_remove(c->item);
c->item = NULL;
}
break;
default:
fprintf(stderr, "Not handling substate %d\n", c->substate);
assert(0);
}
}
static void reset_cmd_handler(conn *c) {
c->cmd = -1;
c->substate = bin_no_state;
if (c->item != NULL) {
// TODO: Any other way to get here?
// SASL auth was mistakenly using it. Nothing else should?
item_remove(c->item);
c->item = NULL;
}
if (c->rbytes > 0) {
conn_set_state(c, conn_parse_cmd);
} else if (c->resp_head) {
conn_set_state(c, conn_mwrite);
} else {
conn_set_state(c, conn_waiting);
}
}
static void complete_nread(conn *c) {
assert(c != NULL);
assert(c->protocol == ascii_prot
|| c->protocol == binary_prot);
if (c->protocol == ascii_prot) {
complete_nread_ascii(c);
} else if (c->protocol == binary_prot) {
complete_nread_binary(c);
}
}
/* Destination must always be chunked */
/* This should be part of item.c */
static int _store_item_copy_chunks(item *d_it, item *s_it, const int len) {
item_chunk *dch = (item_chunk *) ITEM_schunk(d_it);
/* Advance dch until we find free space */
while (dch->size == dch->used) {
if (dch->next) {
dch = dch->next;
} else {
break;
}
}
if (s_it->it_flags & ITEM_CHUNKED) {
int remain = len;
item_chunk *sch = (item_chunk *) ITEM_schunk(s_it);
int copied = 0;
/* Fills dch's to capacity, not straight copy sch in case data is
* being added or removed (ie append/prepend)
*/
while (sch && dch && remain) {
assert(dch->used <= dch->size);
int todo = (dch->size - dch->used < sch->used - copied)
? dch->size - dch->used : sch->used - copied;
if (remain < todo)
todo = remain;
memcpy(dch->data + dch->used, sch->data + copied, todo);
dch->used += todo;
copied += todo;
remain -= todo;
assert(dch->used <= dch->size);
if (dch->size == dch->used) {
item_chunk *tch = do_item_alloc_chunk(dch, remain);
if (tch) {
dch = tch;
} else {
return -1;
}
}
assert(copied <= sch->used);
if (copied == sch->used) {
copied = 0;
sch = sch->next;
}
}
/* assert that the destination had enough space for the source */
assert(remain == 0);
} else {
int done = 0;
/* Fill dch's via a non-chunked item. */
while (len > done && dch) {
int todo = (dch->size - dch->used < len - done)
? dch->size - dch->used : len - done;
//assert(dch->size - dch->used != 0);
memcpy(dch->data + dch->used, ITEM_data(s_it) + done, todo);
done += todo;
dch->used += todo;
assert(dch->used <= dch->size);
if (dch->size == dch->used) {
item_chunk *tch = do_item_alloc_chunk(dch, len - done);
if (tch) {
dch = tch;
} else {
return -1;
}
}
}
assert(len == done);
}
return 0;
}
static int _store_item_copy_data(int comm, item *old_it, item *new_it, item *add_it) {
if (comm == NREAD_APPEND) {
if (new_it->it_flags & ITEM_CHUNKED) {
if (_store_item_copy_chunks(new_it, old_it, old_it->nbytes - 2) == -1 ||
_store_item_copy_chunks(new_it, add_it, add_it->nbytes) == -1) {
return -1;
}
} else {
memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(add_it), add_it->nbytes);
}
} else {
/* NREAD_PREPEND */
if (new_it->it_flags & ITEM_CHUNKED) {
if (_store_item_copy_chunks(new_it, add_it, add_it->nbytes - 2) == -1 ||
_store_item_copy_chunks(new_it, old_it, old_it->nbytes) == -1) {
return -1;
}
} else {
memcpy(ITEM_data(new_it), ITEM_data(add_it), add_it->nbytes);
memcpy(ITEM_data(new_it) + add_it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
}
}
return 0;
}
/*
* Stores an item in the cache according to the semantics of one of the set
* commands. Protected by the item lock.
*
* Returns the state of storage.
*/
enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t hv) {
char *key = ITEM_key(it);
item *old_it = do_item_get(key, it->nkey, hv, c, DONT_UPDATE);
enum store_item_type stored = NOT_STORED;
enum cas_result { CAS_NONE, CAS_MATCH, CAS_BADVAL, CAS_STALE, CAS_MISS };
item *new_it = NULL;
uint32_t flags;
/* Do the CAS test up front so we can apply to all store modes */
enum cas_result cas_res = CAS_NONE;
bool do_store = false;
if (old_it != NULL) {
// Most of the CAS work requires something to compare to.
uint64_t it_cas = ITEM_get_cas(it);
uint64_t old_cas = ITEM_get_cas(old_it);
if (it_cas == 0) {
cas_res = CAS_NONE;
} else if (it_cas == old_cas) {
cas_res = CAS_MATCH;
} else if (c->set_stale && it_cas < old_cas) {
cas_res = CAS_STALE;
} else {
cas_res = CAS_BADVAL;
}
switch (comm) {
case NREAD_ADD:
/* add only adds a nonexistent item, but promote to head of LRU */
do_item_update(old_it);
break;
case NREAD_CAS:
if (cas_res == CAS_MATCH) {
// cas validates
// it and old_it may belong to different classes.
// I'm updating the stats for the one that's getting pushed out
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[ITEM_clsid(old_it)].cas_hits++;
pthread_mutex_unlock(&c->thread->stats.mutex);
do_store = true;
} else if (cas_res == CAS_STALE) {
// if we're allowed to set a stale value, CAS must be lower than
// the current item's CAS.
// This replaces the value, but should preserve TTL, and stale
// item marker bit + token sent if exists.
it->exptime = old_it->exptime;
it->it_flags |= ITEM_STALE;
if (old_it->it_flags & ITEM_TOKEN_SENT) {
it->it_flags |= ITEM_TOKEN_SENT;
}
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[ITEM_clsid(old_it)].cas_hits++;
pthread_mutex_unlock(&c->thread->stats.mutex);
do_store = true;
} else {
// NONE or BADVAL are the same for CAS cmd
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[ITEM_clsid(old_it)].cas_badval++;
pthread_mutex_unlock(&c->thread->stats.mutex);
if (settings.verbose > 1) {
fprintf(stderr, "CAS: failure: expected %llu, got %llu\n",
(unsigned long long)ITEM_get_cas(old_it),
(unsigned long long)ITEM_get_cas(it));
}
stored = EXISTS;
}
break;
case NREAD_APPEND:
case NREAD_PREPEND:
if (cas_res != CAS_NONE && cas_res != CAS_MATCH) {
stored = EXISTS;
break;
}
#ifdef EXTSTORE
if ((old_it->it_flags & ITEM_HDR) != 0) {
/* block append/prepend from working with extstore-d items.
* leave response code to NOT_STORED default */
break;
}
#endif
/* we have it and old_it here - alloc memory to hold both */
FLAGS_CONV(old_it, flags);
new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
// OOM trying to copy.
if (new_it == NULL)
break;
/* copy data from it and old_it to new_it */
if (_store_item_copy_data(comm, old_it, new_it, it) == -1) {
// failed data copy
break;
} else {
// refcount of new_it is 1 here. will end up 2 after link.
// it's original ref is managed outside of this function
it = new_it;
do_store = true;
}
break;
case NREAD_REPLACE:
case NREAD_SET:
do_store = true;
break;
}
if (do_store) {
STORAGE_delete(c->thread->storage, old_it);
item_replace(old_it, it, hv);
stored = STORED;
}
do_item_remove(old_it); /* release our reference */
if (new_it != NULL) {
// append/prepend end up with an extra reference for new_it.
do_item_remove(new_it);
}
} else {
/* No pre-existing item to replace or compare to. */
if (ITEM_get_cas(it) != 0) {
/* Asked for a CAS match but nothing to compare it to. */
cas_res = CAS_MISS;
}
switch (comm) {
case NREAD_ADD:
case NREAD_SET:
do_store = true;
break;
case NREAD_CAS:
// LRU expired
stored = NOT_FOUND;
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.cas_misses++;
pthread_mutex_unlock(&c->thread->stats.mutex);
break;
case NREAD_REPLACE:
case NREAD_APPEND:
case NREAD_PREPEND:
/* Requires an existing item. */
break;
}
if (do_store) {
do_item_link(it, hv);
stored = STORED;
}
}
if (stored == STORED) {
c->cas = ITEM_get_cas(it);
}
LOGGER_LOG(c->thread->l, LOG_MUTATIONS, LOGGER_ITEM_STORE, NULL,
stored, comm, ITEM_key(it), it->nkey, it->exptime, ITEM_clsid(it), c->sfd);
return stored;
}
typedef struct token_s {
char *value;
size_t length;
} token_t;
#define COMMAND_TOKEN 0
#define SUBCOMMAND_TOKEN 1
#define KEY_TOKEN 1
#define MAX_TOKENS 24
#define WANT_TOKENS(ntokens, min, max) \
do { \
if ((min != -1 && ntokens < min) || (max != -1 && ntokens > max)) { \
out_string(c, "ERROR"); \
return; \
} \
} while (0)
#define WANT_TOKENS_OR(ntokens, a, b) \
do { \
if (ntokens != a && ntokens != b) { \
out_string(c, "ERROR"); \
return; \
} \
} while (0)
#define WANT_TOKENS_MIN(ntokens, min) \
do { \
if (ntokens < min) { \
out_string(c, "ERROR"); \
return; \
} \
} while (0)
/*
* Tokenize the command string by replacing whitespace with '\0' and update
* the token array tokens with pointer to start of each token and length.
* Returns total number of tokens. The last valid token is the terminal
* token (value points to the first unprocessed character of the string and
* length zero).
*
* Usage example:
*
* while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
* for(int ix = 0; tokens[ix].length != 0; ix++) {
* ...
* }
* ncommand = tokens[ix].value - command;
* command = tokens[ix].value;
* }
*/
static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
char *s, *e;
size_t ntokens = 0;
size_t len = strlen(command);
unsigned int i = 0;
assert(command != NULL && tokens != NULL && max_tokens > 1);
s = e = command;
for (i = 0; i < len; i++) {
if (*e == ' ') {
if (s != e) {
tokens[ntokens].value = s;
tokens[ntokens].length = e - s;
ntokens++;
*e = '\0';
if (ntokens == max_tokens - 1) {
e++;
s = e; /* so we don't add an extra token */
break;
}
}
s = e + 1;
}
e++;
}
if (s != e) {
tokens[ntokens].value = s;
tokens[ntokens].length = e - s;
ntokens++;
}
/*
* If we scanned the whole string, the terminal value pointer is null,
* otherwise it is the first unprocessed character.
*/
tokens[ntokens].value = *e == '\0' ? NULL : e;
tokens[ntokens].length = 0;
ntokens++;
return ntokens;
}
/* set up a connection to write a buffer then free it, used for stats */
static void write_and_free(conn *c, char *buf, int bytes) {
if (buf) {
mc_resp *resp = c->resp;
resp->write_and_free = buf;
resp_add_iov(resp, buf, bytes);
conn_set_state(c, conn_new_cmd);
} else {
out_of_memory(c, "SERVER_ERROR out of memory writing stats");
}
}
static inline bool set_noreply_maybe(conn *c, token_t *tokens, size_t ntokens)
{
int noreply_index = ntokens - 2;
/*
NOTE: this function is not the first place where we are going to
send the reply. We could send it instead from process_command()
if the request line has wrong number of tokens. However parsing
malformed line for "noreply" option is not reliable anyway, so
it can't be helped.
*/
if (tokens[noreply_index].value
&& strcmp(tokens[noreply_index].value, "noreply") == 0) {
c->noreply = true;
}
return c->noreply;
}
void append_stat(const char *name, ADD_STAT add_stats, conn *c,
const char *fmt, ...) {
char val_str[STAT_VAL_LEN];
int vlen;
va_list ap;
assert(name);
assert(add_stats);
assert(c);
assert(fmt);
va_start(ap, fmt);
vlen = vsnprintf(val_str, sizeof(val_str) - 1, fmt, ap);
va_end(ap);
add_stats(name, strlen(name), val_str, vlen, c);
}
inline static void process_stats_detail(conn *c, const char *command) {
assert(c != NULL);
if (strcmp(command, "on") == 0) {
settings.detail_enabled = 1;
out_string(c, "OK");
}
else if (strcmp(command, "off") == 0) {
settings.detail_enabled = 0;
out_string(c, "OK");
}
else if (strcmp(command, "dump") == 0) {
int len;
char *stats = stats_prefix_dump(&len);
write_and_free(c, stats, len);
}
else {
out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
}
}
/* return server specific stats only */
static void server_stats(ADD_STAT add_stats, conn *c) {
pid_t pid = getpid();
rel_time_t now = current_time;
struct thread_stats thread_stats;
threadlocal_stats_aggregate(&thread_stats);
struct slab_stats slab_stats;
slab_stats_aggregate(&thread_stats, &slab_stats);
#ifdef EXTSTORE
struct extstore_stats st;
#endif
#ifndef WIN32
struct rusage usage;
getrusage(RUSAGE_SELF, &usage);
#endif /* !WIN32 */
STATS_LOCK();
APPEND_STAT("pid", "%lu", (long)pid);
APPEND_STAT("uptime", "%u", now - ITEM_UPDATE_INTERVAL);
APPEND_STAT("time", "%ld", now + (long)process_started);
APPEND_STAT("version", "%s", VERSION);
APPEND_STAT("libevent", "%s", event_get_version());
APPEND_STAT("pointer_size", "%d", (int)(8 * sizeof(void *)));
#ifndef WIN32
append_stat("rusage_user", add_stats, c, "%ld.%06ld",
(long)usage.ru_utime.tv_sec,
(long)usage.ru_utime.tv_usec);
append_stat("rusage_system", add_stats, c, "%ld.%06ld",
(long)usage.ru_stime.tv_sec,
(long)usage.ru_stime.tv_usec);
#endif /* !WIN32 */
APPEND_STAT("max_connections", "%d", settings.maxconns);
APPEND_STAT("curr_connections", "%llu", (unsigned long long)stats_state.curr_conns - 1);
APPEND_STAT("total_connections", "%llu", (unsigned long long)stats.total_conns);
if (settings.maxconns_fast) {
APPEND_STAT("rejected_connections", "%llu", (unsigned long long)stats.rejected_conns);
}
APPEND_STAT("connection_structures", "%u", stats_state.conn_structs);
APPEND_STAT("response_obj_bytes", "%llu", (unsigned long long)thread_stats.response_obj_bytes);
APPEND_STAT("response_obj_total", "%llu", (unsigned long long)thread_stats.response_obj_total);
APPEND_STAT("response_obj_free", "%llu", (unsigned long long)thread_stats.response_obj_free);
APPEND_STAT("response_obj_oom", "%llu", (unsigned long long)thread_stats.response_obj_oom);
APPEND_STAT("read_buf_bytes", "%llu", (unsigned long long)thread_stats.read_buf_bytes);
APPEND_STAT("read_buf_bytes_free", "%llu", (unsigned long long)thread_stats.read_buf_bytes_free);
APPEND_STAT("read_buf_oom", "%llu", (unsigned long long)thread_stats.read_buf_oom);
APPEND_STAT("reserved_fds", "%u", stats_state.reserved_fds);
APPEND_STAT("cmd_get", "%llu", (unsigned long long)thread_stats.get_cmds);
APPEND_STAT("cmd_set", "%llu", (unsigned long long)slab_stats.set_cmds);
APPEND_STAT("cmd_flush", "%llu", (unsigned long long)thread_stats.flush_cmds);
APPEND_STAT("cmd_touch", "%llu", (unsigned long long)thread_stats.touch_cmds);
APPEND_STAT("cmd_meta", "%llu", (unsigned long long)thread_stats.meta_cmds);
APPEND_STAT("get_hits", "%llu", (unsigned long long)slab_stats.get_hits);
APPEND_STAT("get_misses", "%llu", (unsigned long long)thread_stats.get_misses);
APPEND_STAT("get_expired", "%llu", (unsigned long long)thread_stats.get_expired);
APPEND_STAT("get_flushed", "%llu", (unsigned long long)thread_stats.get_flushed);
#ifdef EXTSTORE
if (c->thread->storage) {
APPEND_STAT("get_extstore", "%llu", (unsigned long long)thread_stats.get_extstore);
APPEND_STAT("get_aborted_extstore", "%llu", (unsigned long long)thread_stats.get_aborted_extstore);
APPEND_STAT("get_oom_extstore", "%llu", (unsigned long long)thread_stats.get_oom_extstore);
APPEND_STAT("recache_from_extstore", "%llu", (unsigned long long)thread_stats.recache_from_extstore);
APPEND_STAT("miss_from_extstore", "%llu", (unsigned long long)thread_stats.miss_from_extstore);
APPEND_STAT("badcrc_from_extstore", "%llu", (unsigned long long)thread_stats.badcrc_from_extstore);
}
#endif
APPEND_STAT("delete_misses", "%llu", (unsigned long long)thread_stats.delete_misses);
APPEND_STAT("delete_hits", "%llu", (unsigned long long)slab_stats.delete_hits);
APPEND_STAT("incr_misses", "%llu", (unsigned long long)thread_stats.incr_misses);
APPEND_STAT("incr_hits", "%llu", (unsigned long long)slab_stats.incr_hits);
APPEND_STAT("decr_misses", "%llu", (unsigned long long)thread_stats.decr_misses);
APPEND_STAT("decr_hits", "%llu", (unsigned long long)slab_stats.decr_hits);
APPEND_STAT("cas_misses", "%llu", (unsigned long long)thread_stats.cas_misses);
APPEND_STAT("cas_hits", "%llu", (unsigned long long)slab_stats.cas_hits);
APPEND_STAT("cas_badval", "%llu", (unsigned long long)slab_stats.cas_badval);
APPEND_STAT("touch_hits", "%llu", (unsigned long long)slab_stats.touch_hits);
APPEND_STAT("touch_misses", "%llu", (unsigned long long)thread_stats.touch_misses);
APPEND_STAT("auth_cmds", "%llu", (unsigned long long)thread_stats.auth_cmds);
APPEND_STAT("auth_errors", "%llu", (unsigned long long)thread_stats.auth_errors);
if (settings.idle_timeout) {
APPEND_STAT("idle_kicks", "%llu", (unsigned long long)thread_stats.idle_kicks);
}
APPEND_STAT("bytes_read", "%llu", (unsigned long long)thread_stats.bytes_read);
APPEND_STAT("bytes_written", "%llu", (unsigned long long)thread_stats.bytes_written);
APPEND_STAT("limit_maxbytes", "%llu", (unsigned long long)settings.maxbytes);
APPEND_STAT("accepting_conns", "%u", stats_state.accepting_conns);
APPEND_STAT("listen_disabled_num", "%llu", (unsigned long long)stats.listen_disabled_num);
APPEND_STAT("time_in_listen_disabled_us", "%llu", stats.time_in_listen_disabled_us);
APPEND_STAT("threads", "%d", settings.num_threads);
APPEND_STAT("conn_yields", "%llu", (unsigned long long)thread_stats.conn_yields);
APPEND_STAT("hash_power_level", "%u", stats_state.hash_power_level);
APPEND_STAT("hash_bytes", "%llu", (unsigned long long)stats_state.hash_bytes);
APPEND_STAT("hash_is_expanding", "%u", stats_state.hash_is_expanding);
if (settings.slab_reassign) {
APPEND_STAT("slab_reassign_rescues", "%llu", stats.slab_reassign_rescues);
APPEND_STAT("slab_reassign_chunk_rescues", "%llu", stats.slab_reassign_chunk_rescues);
APPEND_STAT("slab_reassign_evictions_nomem", "%llu", stats.slab_reassign_evictions_nomem);
APPEND_STAT("slab_reassign_inline_reclaim", "%llu", stats.slab_reassign_inline_reclaim);
APPEND_STAT("slab_reassign_busy_items", "%llu", stats.slab_reassign_busy_items);
APPEND_STAT("slab_reassign_busy_deletes", "%llu", stats.slab_reassign_busy_deletes);
APPEND_STAT("slab_reassign_running", "%u", stats_state.slab_reassign_running);
APPEND_STAT("slabs_moved", "%llu", stats.slabs_moved);
}
if (settings.lru_crawler) {
APPEND_STAT("lru_crawler_running", "%u", stats_state.lru_crawler_running);
APPEND_STAT("lru_crawler_starts", "%u", stats.lru_crawler_starts);
}
if (settings.lru_maintainer_thread) {
APPEND_STAT("lru_maintainer_juggles", "%llu", (unsigned long long)stats.lru_maintainer_juggles);
}
APPEND_STAT("malloc_fails", "%llu",
(unsigned long long)stats.malloc_fails);
APPEND_STAT("log_worker_dropped", "%llu", (unsigned long long)stats.log_worker_dropped);
APPEND_STAT("log_worker_written", "%llu", (unsigned long long)stats.log_worker_written);
APPEND_STAT("log_watcher_skipped", "%llu", (unsigned long long)stats.log_watcher_skipped);
APPEND_STAT("log_watcher_sent", "%llu", (unsigned long long)stats.log_watcher_sent);
STATS_UNLOCK();
#ifdef EXTSTORE
if (c->thread->storage) {
STATS_LOCK();
APPEND_STAT("extstore_compact_lost", "%llu", (unsigned long long)stats.extstore_compact_lost);
APPEND_STAT("extstore_compact_rescues", "%llu", (unsigned long long)stats.extstore_compact_rescues);
APPEND_STAT("extstore_compact_skipped", "%llu", (unsigned long long)stats.extstore_compact_skipped);
STATS_UNLOCK();
extstore_get_stats(c->thread->storage, &st);
APPEND_STAT("extstore_page_allocs", "%llu", (unsigned long long)st.page_allocs);
APPEND_STAT("extstore_page_evictions", "%llu", (unsigned long long)st.page_evictions);
APPEND_STAT("extstore_page_reclaims", "%llu", (unsigned long long)st.page_reclaims);
APPEND_STAT("extstore_pages_free", "%llu", (unsigned long long)st.pages_free);
APPEND_STAT("extstore_pages_used", "%llu", (unsigned long long)st.pages_used);
APPEND_STAT("extstore_objects_evicted", "%llu", (unsigned long long)st.objects_evicted);
APPEND_STAT("extstore_objects_read", "%llu", (unsigned long long)st.objects_read);
APPEND_STAT("extstore_objects_written", "%llu", (unsigned long long)st.objects_written);
APPEND_STAT("extstore_objects_used", "%llu", (unsigned long long)st.objects_used);
APPEND_STAT("extstore_bytes_evicted", "%llu", (unsigned long long)st.bytes_evicted);
APPEND_STAT("extstore_bytes_written", "%llu", (unsigned long long)st.bytes_written);
APPEND_STAT("extstore_bytes_read", "%llu", (unsigned long long)st.bytes_read);
APPEND_STAT("extstore_bytes_used", "%llu", (unsigned long long)st.bytes_used);
APPEND_STAT("extstore_bytes_fragmented", "%llu", (unsigned long long)st.bytes_fragmented);
APPEND_STAT("extstore_limit_maxbytes", "%llu", (unsigned long long)(st.page_count * st.page_size));
APPEND_STAT("extstore_io_queue", "%llu", (unsigned long long)(st.io_queue));
}
#endif
#ifdef TLS
if (settings.ssl_enabled) {
if (settings.ssl_session_cache) {
APPEND_STAT("ssl_new_sessions", "%llu", (unsigned long long)stats.ssl_new_sessions);
}
APPEND_STAT("ssl_handshake_errors", "%llu", (unsigned long long)stats.ssl_handshake_errors);
APPEND_STAT("time_since_server_cert_refresh", "%u", now - settings.ssl_last_cert_refresh_time);
}
#endif
}
static void process_stat_settings(ADD_STAT add_stats, void *c) {
assert(add_stats);
APPEND_STAT("maxbytes", "%llu", (unsigned long long)settings.maxbytes);
APPEND_STAT("maxconns", "%d", settings.maxconns);
APPEND_STAT("tcpport", "%d", settings.port);
APPEND_STAT("udpport", "%d", settings.udpport);
APPEND_STAT("inter", "%s", settings.inter ? settings.inter : "NULL");
APPEND_STAT("verbosity", "%d", settings.verbose);
APPEND_STAT("oldest", "%lu", (unsigned long)settings.oldest_live);
APPEND_STAT("evictions", "%s", settings.evict_to_free ? "on" : "off");
APPEND_STAT("domain_socket", "%s",
settings.socketpath ? settings.socketpath : "NULL");
APPEND_STAT("umask", "%o", settings.access);
APPEND_STAT("growth_factor", "%.2f", settings.factor);
APPEND_STAT("chunk_size", "%d", settings.chunk_size);
APPEND_STAT("num_threads", "%d", settings.num_threads);
APPEND_STAT("num_threads_per_udp", "%d", settings.num_threads_per_udp);
APPEND_STAT("stat_key_prefix", "%c", settings.prefix_delimiter);
APPEND_STAT("detail_enabled", "%s",
settings.detail_enabled ? "yes" : "no");
APPEND_STAT("reqs_per_event", "%d", settings.reqs_per_event);
APPEND_STAT("cas_enabled", "%s", settings.use_cas ? "yes" : "no");
APPEND_STAT("tcp_backlog", "%d", settings.backlog);
APPEND_STAT("binding_protocol", "%s",
prot_text(settings.binding_protocol));
APPEND_STAT("auth_enabled_sasl", "%s", settings.sasl ? "yes" : "no");
APPEND_STAT("auth_enabled_ascii", "%s", settings.auth_file ? settings.auth_file : "no");
APPEND_STAT("item_size_max", "%d", settings.item_size_max);
APPEND_STAT("maxconns_fast", "%s", settings.maxconns_fast ? "yes" : "no");
APPEND_STAT("hashpower_init", "%d", settings.hashpower_init);
APPEND_STAT("slab_reassign", "%s", settings.slab_reassign ? "yes" : "no");
APPEND_STAT("slab_automove", "%d", settings.slab_automove);
APPEND_STAT("slab_automove_ratio", "%.2f", settings.slab_automove_ratio);
APPEND_STAT("slab_automove_window", "%u", settings.slab_automove_window);
APPEND_STAT("slab_chunk_max", "%d", settings.slab_chunk_size_max);
APPEND_STAT("lru_crawler", "%s", settings.lru_crawler ? "yes" : "no");
APPEND_STAT("lru_crawler_sleep", "%d", settings.lru_crawler_sleep);
APPEND_STAT("lru_crawler_tocrawl", "%lu", (unsigned long)settings.lru_crawler_tocrawl);
APPEND_STAT("tail_repair_time", "%d", settings.tail_repair_time);
APPEND_STAT("flush_enabled", "%s", settings.flush_enabled ? "yes" : "no");
APPEND_STAT("dump_enabled", "%s", settings.dump_enabled ? "yes" : "no");
APPEND_STAT("hash_algorithm", "%s", settings.hash_algorithm);
APPEND_STAT("lru_maintainer_thread", "%s", settings.lru_maintainer_thread ? "yes" : "no");
APPEND_STAT("lru_segmented", "%s", settings.lru_segmented ? "yes" : "no");
APPEND_STAT("hot_lru_pct", "%d", settings.hot_lru_pct);
APPEND_STAT("warm_lru_pct", "%d", settings.warm_lru_pct);
APPEND_STAT("hot_max_factor", "%.2f", settings.hot_max_factor);
APPEND_STAT("warm_max_factor", "%.2f", settings.warm_max_factor);
APPEND_STAT("temp_lru", "%s", settings.temp_lru ? "yes" : "no");
APPEND_STAT("temporary_ttl", "%u", settings.temporary_ttl);
APPEND_STAT("idle_timeout", "%d", settings.idle_timeout);
APPEND_STAT("watcher_logbuf_size", "%u", settings.logger_watcher_buf_size);
APPEND_STAT("worker_logbuf_size", "%u", settings.logger_buf_size);
APPEND_STAT("resp_obj_mem_limit", "%u", settings.resp_obj_mem_limit);
APPEND_STAT("read_buf_mem_limit", "%u", settings.read_buf_mem_limit);
APPEND_STAT("track_sizes", "%s", item_stats_sizes_status() ? "yes" : "no");
APPEND_STAT("inline_ascii_response", "%s", "no"); // setting is dead, cannot be yes.
#ifdef HAVE_DROP_PRIVILEGES
APPEND_STAT("drop_privileges", "%s", settings.drop_privileges ? "yes" : "no");
#endif
#ifdef EXTSTORE
APPEND_STAT("ext_item_size", "%u", settings.ext_item_size);
APPEND_STAT("ext_item_age", "%u", settings.ext_item_age);
APPEND_STAT("ext_low_ttl", "%u", settings.ext_low_ttl);
APPEND_STAT("ext_recache_rate", "%u", settings.ext_recache_rate);
APPEND_STAT("ext_wbuf_size", "%u", settings.ext_wbuf_size);
APPEND_STAT("ext_compact_under", "%u", settings.ext_compact_under);
APPEND_STAT("ext_drop_under", "%u", settings.ext_drop_under);
APPEND_STAT("ext_max_frag", "%.2f", settings.ext_max_frag);
APPEND_STAT("slab_automove_freeratio", "%.3f", settings.slab_automove_freeratio);
APPEND_STAT("ext_drop_unread", "%s", settings.ext_drop_unread ? "yes" : "no");
#endif
#ifdef TLS
APPEND_STAT("ssl_enabled", "%s", settings.ssl_enabled ? "yes" : "no");
APPEND_STAT("ssl_chain_cert", "%s", settings.ssl_chain_cert);
APPEND_STAT("ssl_key", "%s", settings.ssl_key);
APPEND_STAT("ssl_verify_mode", "%d", settings.ssl_verify_mode);
APPEND_STAT("ssl_keyformat", "%d", settings.ssl_keyformat);
APPEND_STAT("ssl_ciphers", "%s", settings.ssl_ciphers ? settings.ssl_ciphers : "NULL");
APPEND_STAT("ssl_ca_cert", "%s", settings.ssl_ca_cert ? settings.ssl_ca_cert : "NULL");
APPEND_STAT("ssl_wbuf_size", "%u", settings.ssl_wbuf_size);
APPEND_STAT("ssl_session_cache", "%s", settings.ssl_session_cache ? "yes" : "no");
#endif
}
static int nz_strcmp(int nzlength, const char *nz, const char *z) {
int zlength=strlen(z);
return (zlength == nzlength) && (strncmp(nz, z, zlength) == 0) ? 0 : -1;
}
static bool get_stats(const char *stat_type, int nkey, ADD_STAT add_stats, void *c) {
bool ret = true;
if (add_stats != NULL) {
if (!stat_type) {
/* prepare general statistics for the engine */
STATS_LOCK();
APPEND_STAT("bytes", "%llu", (unsigned long long)stats_state.curr_bytes);
APPEND_STAT("curr_items", "%llu", (unsigned long long)stats_state.curr_items);
APPEND_STAT("total_items", "%llu", (unsigned long long)stats.total_items);
STATS_UNLOCK();
APPEND_STAT("slab_global_page_pool", "%u", global_page_pool_size(NULL));
item_stats_totals(add_stats, c);
} else if (nz_strcmp(nkey, stat_type, "items") == 0) {
item_stats(add_stats, c);
} else if (nz_strcmp(nkey, stat_type, "slabs") == 0) {
slabs_stats(add_stats, c);
} else if (nz_strcmp(nkey, stat_type, "sizes") == 0) {
item_stats_sizes(add_stats, c);
} else if (nz_strcmp(nkey, stat_type, "sizes_enable") == 0) {
item_stats_sizes_enable(add_stats, c);
} else if (nz_strcmp(nkey, stat_type, "sizes_disable") == 0) {
item_stats_sizes_disable(add_stats, c);
} else {
ret = false;
}
} else {
ret = false;
}
return ret;
}
static inline void get_conn_text(const conn *c, const int af,
char* addr, struct sockaddr *sock_addr) {
char addr_text[MAXPATHLEN];
addr_text[0] = '\0';
const char *protoname = "?";
unsigned short port = 0;
size_t pathlen = 0;
switch (af) {
case AF_INET:
(void) inet_ntop(af,
&((struct sockaddr_in *)sock_addr)->sin_addr,
addr_text,
sizeof(addr_text) - 1);
port = ntohs(((struct sockaddr_in *)sock_addr)->sin_port);
protoname = IS_UDP(c->transport) ? "udp" : "tcp";
break;
case AF_INET6:
addr_text[0] = '[';
addr_text[1] = '\0';
if (inet_ntop(af,
&((struct sockaddr_in6 *)sock_addr)->sin6_addr,
addr_text + 1,
sizeof(addr_text) - 2)) {
strcat(addr_text, "]");
}
port = ntohs(((struct sockaddr_in6 *)sock_addr)->sin6_port);
protoname = IS_UDP(c->transport) ? "udp6" : "tcp6";
break;
case AF_UNIX:
// this strncpy call originally could piss off an address
// sanitizer; we supplied the size of the dest buf as a limiter,
// but optimized versions of strncpy could read past the end of
// *src while looking for a null terminator. Since buf and
// sun_path here are both on the stack they could even overlap,
// which is "undefined". In all OSS versions of strncpy I could
// find this has no effect; it'll still only copy until the first null
// terminator is found. Thus it's possible to get the OS to
// examine past the end of sun_path but it's unclear to me if this
// can cause any actual problem.
//
// We need a safe_strncpy util function but I'll punt on figuring
// that out for now.
pathlen = sizeof(((struct sockaddr_un *)sock_addr)->sun_path);
if (MAXPATHLEN <= pathlen) {
pathlen = MAXPATHLEN - 1;
}
strncpy(addr_text,
((struct sockaddr_un *)sock_addr)->sun_path,
pathlen);
addr_text[pathlen] = '\0';
protoname = "unix";
break;
}
if (strlen(addr_text) < 2) {
/* Most likely this is a connected UNIX-domain client which
* has no peer socket address, but there's no portable way
* to tell for sure.
*/
sprintf(addr_text, "<AF %d>", af);
}
if (port) {
sprintf(addr, "%s:%s:%u", protoname, addr_text, port);
} else {
sprintf(addr, "%s:%s", protoname, addr_text);
}
}
static void conn_to_str(const conn *c, char *addr, char *svr_addr) {
if (!c) {
strcpy(addr, "<null>");
} else if (c->state == conn_closed) {
strcpy(addr, "<closed>");
} else {
struct sockaddr_in6 local_addr;
struct sockaddr *sock_addr = (void *)&c->request_addr;
/* For listen ports and idle UDP ports, show listen address */
if (c->state == conn_listening ||
(IS_UDP(c->transport) &&
c->state == conn_read)) {
socklen_t local_addr_len = sizeof(local_addr);
if (getsockname(c->sfd,
(struct sockaddr *)&local_addr,
&local_addr_len) == 0) {
sock_addr = (struct sockaddr *)&local_addr;
}
}
get_conn_text(c, sock_addr->sa_family, addr, sock_addr);
if (c->state != conn_listening && !(IS_UDP(c->transport) &&
c->state == conn_read)) {
struct sockaddr_storage svr_sock_addr;
socklen_t svr_addr_len = sizeof(svr_sock_addr);
getsockname(c->sfd, (struct sockaddr *)&svr_sock_addr, &svr_addr_len);
get_conn_text(c, svr_sock_addr.ss_family, svr_addr, (struct sockaddr *)&svr_sock_addr);
}
}
}
static void process_stats_conns(ADD_STAT add_stats, void *c) {
int i;
char key_str[STAT_KEY_LEN];
char val_str[STAT_VAL_LEN];
size_t extras_len = sizeof("unix:") + sizeof("65535");
char addr[MAXPATHLEN + extras_len];
char svr_addr[MAXPATHLEN + extras_len];
int klen = 0, vlen = 0;
assert(add_stats);