Skip to content

Commit

Permalink
Add "stats connections" to dump all connections
Browse files Browse the repository at this point in the history
Change-Id: Ife8978790e9d5c50362afcf8c9edc5802117364f
  • Loading branch information
trondn committed Feb 1, 2012
1 parent fd28ee4 commit 3797ae0
Showing 1 changed file with 243 additions and 26 deletions.
269 changes: 243 additions & 26 deletions daemon/memcached.c
Expand Up @@ -402,11 +402,6 @@ void safe_close(SOCKET sfd) {
}
}

/*
* Free list management for connections.
*/
cache_t *conn_cache; /* suffix cache */

/**
* Reset all of the dynamic buffers used by a connection back to their
* default sizes. The strategy for resizing the buffers is to allocate a
Expand Down Expand Up @@ -496,17 +491,14 @@ static bool conn_reset_buffersize(conn *c) {
* all members and allocate the transfer buffers.
*
* @param buffer The memory allocated by the object cache
* @param unused1 not used
* @param unused2 not used
* @return 0 on success, 1 if we failed to allocate memory
*/
static int conn_constructor(void *buffer, void *unused1, int unused2) {
(void)unused1; (void)unused2;

conn *c = buffer;
static int conn_constructor(conn *c) {
memset(c, 0, sizeof(*c));
MEMCACHED_CONN_CREATE(c);

c->state = conn_closing;
c->sfd = INVALID_SOCKET;
if (!conn_reset_buffersize(c)) {
free(c->rbuf);
free(c->wbuf);
Expand All @@ -531,11 +523,8 @@ static int conn_constructor(void *buffer, void *unused1, int unused2) {
* Destructor for all connection objects. Release all allocated resources.
*
* @param buffer The memory allocated by the objec cache
* @param unused not used
*/
static void conn_destructor(void *buffer, void *unused) {
(void)unused;
conn *c = buffer;
static void conn_destructor(conn *c) {
free(c->rbuf);
free(c->wbuf);
free(c->ilist);
Expand All @@ -548,11 +537,239 @@ static void conn_destructor(void *buffer, void *unused) {
STATS_UNLOCK();
}

/*
* Free list management for connections.
*/
struct connections {
conn* free;
conn** all;
pthread_mutex_t mutex;
int next;
} connections = {
.mutex = PTHREAD_MUTEX_INITIALIZER
};

static void initialize_connections(void)
{
connections.all = calloc(settings.maxconns, sizeof(conn*));
if (connections.all == NULL) {
settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
"Failed to allocate memory for connections");
exit(EX_OSERR);
}

int preallocate = settings.maxconns / 2;
if (preallocate < 1000) {
preallocate = settings.maxconns;
}

for (connections.next = 0; connections.next < preallocate; ++connections.next) {
connections.all[connections.next] = malloc(sizeof(conn));
if (conn_constructor(connections.all[connections.next]) != 0) {
settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
"Failed to allocate memory for connections");
exit(EX_OSERR);
}
connections.all[connections.next]->next = connections.free;
connections.free = connections.all[connections.next];
}
}

static conn *allocate_connection(void) {
conn *ret;

pthread_mutex_lock(&connections.mutex);
ret = connections.free;
if (ret != NULL) {
connections.free = connections.free->next;
ret->next = NULL;
}
pthread_mutex_unlock(&connections.mutex);

if (ret == NULL) {
ret = malloc(sizeof(conn));
if (ret == NULL) {
settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
"Failed to allocate memory for connection");
return NULL;
}

if (conn_constructor(ret) != 0) {
conn_destructor(ret);
free(ret);
settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
"Failed to allocate memory for connection");
return NULL;
}

pthread_mutex_lock(&connections.mutex);
if (connections.next == settings.maxconns) {
free(ret);
ret = NULL;
} else {
connections.all[connections.next++] = ret;
}
pthread_mutex_unlock(&connections.mutex);
}

return ret;
}

static void release_connection(conn *c) {
c->sfd = INVALID_SOCKET;
pthread_mutex_lock(&connections.mutex);
c->next = connections.free;
connections.free = c;
pthread_mutex_unlock(&connections.mutex);
}

static const char *substate_text(enum bin_substates state) {
switch (state) {
case bin_no_state: return "bin_no_state";
case bin_reading_set_header: return "bin_reading_set_header";
case bin_reading_cas_header: return "bin_reading_cas_header";
case bin_read_set_value: return "bin_read_set_value";
case bin_reading_get_key: return "bin_reading_get_key";
case bin_reading_stat: return "bin_reading_stat";
case bin_reading_del_header: return "bin_reading_del_header";
case bin_reading_incr_header: return "bin_reading_incr_header";
case bin_read_flush_exptime: return "bin_reading_flush_exptime";
case bin_reading_sasl_auth: return "bin_reading_sasl_auth";
case bin_reading_sasl_auth_data: return "bin_reading_sasl_auth_data";
case bin_reading_packet: return "bin_reading_packet";
default:
return "illegal";
}
}

static const char *protocol_text(enum protocol protocol) {
switch (protocol) {
case ascii_prot: return "ascii";
case binary_prot: return "binary";
case negotiating_prot: return "negotiating";
default:
return "illegal";
}
}

static const char *transport_text(enum network_transport transport) {
switch (transport) {
case local_transport: return "unix sockets";
case tcp_transport: return "TCP";
case udp_transport: return "UDP";
default:
return "illegal";
}
}

static void add_connection_stats(ADD_STAT add_stats, conn *d, conn *c) {
append_stat("conn", add_stats, d, "%p", c);
if (c->sfd == INVALID_SOCKET) {
append_stat("socket", add_stats, d, "disconnected");
} else {
append_stat("socket", add_stats, d, "%lu", (long)c->sfd);
append_stat("protocol", add_stats, d, "%s", protocol_text(c->protocol));
append_stat("transport", add_stats, d, "%s",
transport_text(c->transport));
append_stat("nevents", add_stats, d, "%u", c->nevents);
if (c->sasl_conn != NULL) {
append_stat("sasl_conn", add_stats, d, "%p", c->sasl_conn);
}
append_stat("state", add_stats, d, "%s", state_text(c->state));
if (c->protocol == binary_prot) {
append_stat("substate", add_stats, d, "%s",
substate_text(c->substate));
}
append_stat("registered_in_libevent", add_stats, d, "%d",
(int)c->registered_in_libevent);
append_stat("ev_flags", add_stats, d, "%x", c->ev_flags);
append_stat("which", add_stats, d, "%x", c->which);
append_stat("rbuf", add_stats, d, "%p", c->rbuf);
append_stat("rcurr", add_stats, d, "%p", c->rcurr);
append_stat("rsize", add_stats, d, "%u", c->rsize);
append_stat("rbytes", add_stats, d, "%u", c->rbytes);
append_stat("wbuf", add_stats, d, "%p", c->wbuf);
append_stat("wcurr", add_stats, d, "%p", c->wcurr);
append_stat("wsize", add_stats, d, "%u", c->wsize);
append_stat("wbytes", add_stats, d, "%u", c->wbytes);
append_stat("write_and_go", add_stats, d, "%p", c->write_and_go);
append_stat("write_and_free", add_stats, d, "%p", c->write_and_free);
append_stat("ritem", add_stats, d, "%p", c->ritem);
append_stat("rlbytes", add_stats, d, "%u", c->rlbytes);
append_stat("item", add_stats, d, "%p", c->item);
append_stat("store_op", add_stats, d, "%u", c->store_op);
append_stat("sbytes", add_stats, d, "%u", c->sbytes);
append_stat("iov", add_stats, d, "%p", c->iov);
append_stat("iovsize", add_stats, d, "%u", c->iovsize);
append_stat("iovused", add_stats, d, "%u", c->iovused);
append_stat("msglist", add_stats, d, "%p", c->msglist);
append_stat("msgsize", add_stats, d, "%u", c->msgsize);
append_stat("msgused", add_stats, d, "%u", c->msgused);
append_stat("msgcurr", add_stats, d, "%u", c->msgcurr);
append_stat("msgbytes", add_stats, d, "%u", c->msgbytes);
append_stat("ilist", add_stats, d, "%p", c->ilist);
append_stat("isize", add_stats, d, "%u", c->isize);
append_stat("icurr", add_stats, d, "%p", c->icurr);
append_stat("ileft", add_stats, d, "%u", c->ileft);
append_stat("suffixlist", add_stats, d, "%p", c->suffixlist);
append_stat("suffixsize", add_stats, d, "%u", c->suffixsize);
append_stat("suffixcurr", add_stats, d, "%p", c->suffixcurr);
append_stat("suffixleft", add_stats, d, "%u", c->suffixleft);

if (c->transport == udp_transport) {
// @todo we should dump the packet header
append_stat("request_id", add_stats, d, "%u", c->request_id);
append_stat("hdrbuf", add_stats, d, "%p", c->hdrbuf);
append_stat("hdrsize", add_stats, d, "%d", c->hdrsize);
}

append_stat("noreply", add_stats, d, "%d", c->noreply);
append_stat("refcount", add_stats, d, "%u", (int)c->refcount);
append_stat("dynamic_buffer.buffer", add_stats, d, "%p",
c->dynamic_buffer.buffer);
append_stat("dynamic_buffer.size", add_stats, d, "%zu",
c->dynamic_buffer.size);
append_stat("dynamic_buffer.offset", add_stats, d, "%zu",
c->dynamic_buffer.offset);
append_stat("engine_storage", add_stats, d, "%p", c->engine_storage);
if (c->protocol == ascii_prot) {
append_stat("ascii_cmd", add_stats, d, "%p", c->ascii_cmd);
} else if (c->protocol == binary_prot) {
// @todo we should decode the binary header
append_stat("cas", add_stats, d, "%"PRIu64, c->cas);
append_stat("cmd", add_stats, d, "%u", c->cmd);
append_stat("opaque", add_stats, d, "%u", c->opaque);
append_stat("keylen", add_stats, d, "%u", c->keylen);
}
append_stat("list_state", add_stats, d, "%u", c->list_state);
append_stat("next", add_stats, d, "%p", c->next);
append_stat("thread", add_stats, d, "%p", c->thread);
append_stat("aiostat", add_stats, d, "%u", c->aiostat);
append_stat("ewouldblock", add_stats, d, "%u", c->ewouldblock);
append_stat("tap_iterator", add_stats, d, "%p", c->tap_iterator);
}
}

/**
* Do a full stats of all of the connections.
* Do _NOT_ try to follow _ANY_ of the pointers in the conn structure
* because we read all of the values _DIRTY_. We preallocated the array
* of all of the connection pointers during startup, so we _KNOW_ that
* we can iterate through all of them. All of the conn structs will
* only appear in the connections.all array when we've allocated them,
* and we don't release them so it's safe to look at them.
*/
static void connection_stats(ADD_STAT add_stats, conn *c) {
for (int ii = 0; ii < settings.maxconns && connections.all[ii]; ++ii) {
add_connection_stats(add_stats, c, connections.all[ii]);
}
}

conn *conn_new(const SOCKET sfd, STATE_FUNC init_state,
const int event_flags,
const int read_buffer_size, enum network_transport transport,
struct event_base *base, struct timeval *timeout) {
conn *c = cache_alloc(conn_cache);
conn *c = allocate_connection();
if (c == NULL) {
return NULL;
}
Expand All @@ -567,7 +784,7 @@ conn *conn_new(const SOCKET sfd, STATE_FUNC init_state,
c->rbuf = mem;
} else {
assert(c->thread == NULL);
cache_free(conn_cache, c);
release_connection(c);
return NULL;
}
}
Expand Down Expand Up @@ -638,7 +855,7 @@ conn *conn_new(const SOCKET sfd, STATE_FUNC init_state,

if (!register_event(c, timeout)) {
assert(c->thread == NULL);
cache_free(conn_cache, c);
release_connection(c);
return NULL;
}

Expand Down Expand Up @@ -723,7 +940,7 @@ void conn_close(conn *c) {
*/
conn_reset_buffersize(c);
assert(c->thread == NULL);
cache_free(conn_cache, c);
release_connection(c);
}

/*
Expand Down Expand Up @@ -1953,6 +2170,8 @@ static void process_bin_stat(conn *c) {
}
} else if (strncmp(subcommand, "aggregate", 9) == 0) {
server_stats(&append_stats, c, true);
} else if (strncmp(subcommand, "connections", 11) == 0) {
connection_stats(&append_stats, c);
} else {
ret = settings.engine.v1->get_stats(settings.engine.v0, c,
subcommand, nkey,
Expand Down Expand Up @@ -3946,6 +4165,8 @@ static char *process_stat(conn *c, token_t *tokens, const size_t ntokens) {
return NULL;
} else if (strcmp(subcommand, "aggregate") == 0) {
server_stats(&append_stats, c, true);
} else if (strncmp(subcommand, "connections", 11) == 0) {
connection_stats(&append_stats, c);
} else {
/* getting here means that the subcommand is either engine specific or
is invalid. query the engine and see. */
Expand Down Expand Up @@ -7384,6 +7605,9 @@ int main (int argc, char **argv) {
exit(EX_USAGE);
}

/* allocate the connection array */
initialize_connections();

/* lose root privileges if we have them */
if (getuid() == 0 || geteuid() == 0) {
if (username == 0 || *username == '\0') {
Expand Down Expand Up @@ -7463,13 +7687,6 @@ int main (int argc, char **argv) {
/* initialize other stuff */
stats_init();

if (!(conn_cache = cache_create("conn", sizeof(conn), sizeof(void*),
conn_constructor, conn_destructor))) {
settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
"Failed to create connection cache\n");
exit(EXIT_FAILURE);
}

default_independent_stats = new_independent_stats();

#ifndef __WIN32__
Expand Down

0 comments on commit 3797ae0

Please sign in to comment.