Skip to content

Commit

Permalink
some global stats for bg logger
Browse files Browse the repository at this point in the history
four obvious ones. have a handy place to do the summarization when the logger
wakes up for its run, avoiding the locks.
  • Loading branch information
dormando committed Jun 17, 2016
1 parent ec7a2e0 commit 0503b5e
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 18 deletions.
59 changes: 42 additions & 17 deletions logger.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ static void logger_link_q(logger *l) {

/* Completes rendering of log line, copies to subscribed watchers */
#define LOGGER_PARSE_SCRATCH 4096
static enum logger_parse_entry_ret logger_parse_entry(logentry *e) {
static enum logger_parse_entry_ret logger_parse_entry(logentry *e, struct logger_stats *ls) {
int total = 0;
int line_size = 0;
int x;
Expand Down Expand Up @@ -140,22 +140,26 @@ static enum logger_parse_entry_ret logger_parse_entry(logentry *e) {
if (w->failed_flush) {
L_DEBUG("LOGGER: Fast skipped for watcher [%d] due to failed_flush\n", w->sfd);
w->skipped++;
ls->watcher_skipped++;
} else if (w->skipped > 0) {
char *skip_scr = NULL;
if ((skip_scr = (char *) bipbuf_request(w->buf, line_size + 128)) != NULL) {
total = snprintf(skip_scr, 128, "[skipped: %llu]\n", (unsigned long long) w->skipped);
if (total >= 128 || total <= 0) {
L_DEBUG("LOGGER: Failed to flatten skipped message into watcher [%d]\n", w->sfd);
w->skipped++;
ls->watcher_skipped++;
} else {
/* These can't fail because bipbuf_request succeeded. */
bipbuf_push(w->buf, total + 1);
bipbuf_offer(w->buf, (unsigned char *) scratch, line_size);
w->skipped = 0;
ls->watcher_sent++;
}
} else {
L_DEBUG("LOGGER: Continuing to fast skip for watcher [%d]\n", w->sfd);
w->skipped++;
ls->watcher_skipped++;
w->failed_flush = true;
}
} else {
Expand All @@ -167,9 +171,12 @@ static enum logger_parse_entry_ret logger_parse_entry(logentry *e) {
L_DEBUG("LOGGER: Watcher had no free space for line of size (%d)\n", line_size);
w->failed_flush = true;
w->skipped++;
ls->watcher_skipped++;
break;
}
}
if (!w->failed_flush)
ls->watcher_sent++;
}
}

Expand Down Expand Up @@ -216,7 +223,7 @@ static void logger_close_watcher(logger_watcher *w) {
/* Reads a particular worker thread's available bipbuf bytes. Parses each log
* entry into the watcher buffers.
*/
static int logger_thread_read(logger *l) {
static int logger_thread_read(logger *l, struct logger_stats *ls) {
unsigned int size;
unsigned int pos = 0;
unsigned char *data;
Expand All @@ -234,7 +241,7 @@ static int logger_thread_read(logger *l) {
while (pos < size && watcher_count > 0) {
enum logger_parse_entry_ret ret;
e = (logentry *) (data + pos);
ret = logger_parse_entry(e);
ret = logger_parse_entry(e, ls);
if (ret != LOGGER_PARSE_ENTRY_OK) {
fprintf(stderr, "LOGGER: Failed to parse log entry\n");
abort();
Expand All @@ -245,6 +252,10 @@ static int logger_thread_read(logger *l) {

pthread_mutex_lock(&l->mutex);
data = bipbuf_poll(l->buf, size);
ls->worker_written += l->written;
ls->worker_dropped += l->dropped;
l->written = 0;
l->dropped = 0;
pthread_mutex_unlock(&l->mutex);
if (data == NULL) {
fprintf(stderr, "LOGGER: unexpectedly couldn't advance buf pointer\n");
Expand All @@ -254,18 +265,6 @@ static int logger_thread_read(logger *l) {
return size; /* maybe the count of objects iterated? */
}

/* helper function or #define: iterate over loggers */
/* called with logger_stack_lock held */
static int logger_iterate(void) {
logger *l = NULL;
int count = 0;
for (l = logger_stack_head; l != NULL; l=l->next) {
/* lock logger, call function to manipulate it */
count += logger_thread_read(l);
}
return count;
}

/* Since the event loop code isn't reusable without a refactor, and we have a
* limited number of potential watchers, we run our own poll loop.
* This calls poll() unnecessarily during write flushes, should be possible to
Expand Down Expand Up @@ -372,6 +371,15 @@ static int logger_poll_watchers(int force_poll, int watcher) {
return flushed;
}

static void logger_thread_sum_stats(struct logger_stats *ls) {
STATS_LOCK();
stats.log_worker_dropped += ls->worker_dropped;
stats.log_worker_written += ls->worker_written;
stats.log_watcher_skipped += ls->watcher_skipped;
stats.log_watcher_sent += ls->watcher_sent;
STATS_UNLOCK();
}

#define MAX_LOGGER_SLEEP 100000
#define MIN_LOGGER_SLEEP 0

Expand All @@ -381,13 +389,19 @@ static void *logger_thread(void *arg) {
L_DEBUG("LOGGER: Starting logger thread\n");
while (do_run_logger_thread) {
int found_logs = 0;
logger *l;
struct logger_stats ls;
memset(&ls, 0, sizeof(struct logger_stats));
if (to_sleep)
usleep(to_sleep);

/* Call function to iterate each logger. */
pthread_mutex_lock(&logger_stack_lock);
/* check poll() for current slow watchers */
found_logs = logger_iterate();
for (l = logger_stack_head; l != NULL; l=l->next) {
/* lock logger, call function to manipulate it */
found_logs += logger_thread_read(l, &ls);
}

logger_poll_watchers(1, WATCHER_ALL);
pthread_mutex_unlock(&logger_stack_lock);

Expand All @@ -400,6 +414,7 @@ static void *logger_thread(void *arg) {
if (to_sleep < 50)
to_sleep = MIN_LOGGER_SLEEP;
}
logger_thread_sum_stats(&ls);
}

return NULL;
Expand Down Expand Up @@ -436,6 +451,14 @@ void logger_init(void) {
if (start_logger_thread() != 0) {
abort();
}

/* This can be removed once the global stats initializer is improved */
STATS_LOCK();
stats.log_worker_dropped = 0;
stats.log_worker_written = 0;
stats.log_watcher_skipped = 0;
stats.log_watcher_sent = 0;
STATS_UNLOCK();
/* FIXME: temp hack to always add STDERR watcher */
//logger_add_watcher(NULL, 0);
return;
Expand Down Expand Up @@ -489,6 +512,7 @@ enum logger_ret_type logger_log(logger *l, const enum log_entry_type event, cons
e = (logentry *) bipbuf_request(buf, (sizeof(logentry) + reqlen));
if (e == NULL) {
pthread_mutex_unlock(&l->mutex);
l->dropped++;
return LOGGER_RET_NOSPACE;
}
e->gid = gid;
Expand Down Expand Up @@ -530,6 +554,7 @@ enum logger_ret_type logger_log(logger *l, const enum log_entry_type event, cons
pthread_mutex_unlock(&l->mutex);
return LOGGER_RET_ERR;
}
l->written++;
L_DEBUG("LOGGER: Requested %d bytes, wrote %d bytes\n", reqlen, total + 1);

pthread_mutex_unlock(&l->mutex);
Expand Down
10 changes: 9 additions & 1 deletion logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ typedef struct _logger {
struct _logger *prev;
struct _logger *next;
pthread_mutex_t mutex; /* guard for this + *buf */
uint64_t logged; /* entries written to the buffer */
uint64_t written; /* entries written to the buffer */
uint64_t dropped; /* entries dropped */
uint64_t blocked; /* times blocked instead of dropped */
uint16_t fetcher_ratio; /* log one out of every N fetches */
Expand All @@ -90,6 +90,14 @@ typedef struct {
bipbuf_t *buf; /* per-watcher output buffer */
} logger_watcher;


struct logger_stats {
uint64_t worker_dropped;
uint64_t worker_written;
uint64_t watcher_skipped;
uint64_t watcher_sent;
};

extern pthread_key_t logger_key;

/* public functions */
Expand Down
4 changes: 4 additions & 0 deletions memcached.c
Original file line number Diff line number Diff line change
Expand Up @@ -2656,6 +2656,10 @@ static void server_stats(ADD_STAT add_stats, conn *c) {
}
APPEND_STAT("malloc_fails", "%llu",
(unsigned long long)stats.malloc_fails);
APPEND_STAT("log_worker_dropped", "%llu", (unsigned long long)stats.log_worker_dropped);
APPEND_STAT("log_worker_written", "%llu", (unsigned long long)stats.log_worker_written);
APPEND_STAT("log_watcher_skipped", "%llu", (unsigned long long)stats.log_watcher_skipped);
APPEND_STAT("log_watcher_sent", "%llu", (unsigned long long)stats.log_watcher_sent);
STATS_UNLOCK();
}

Expand Down
4 changes: 4 additions & 0 deletions memcached.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ struct stats {
bool lru_crawler_running; /* crawl in progress */
uint64_t lru_maintainer_juggles; /* number of LRU bg pokes */
uint64_t time_in_listen_disabled_us; /* elapsed time in microseconds while server unable to process new connections */
uint64_t log_worker_dropped; /* logs dropped by worker threads */
uint64_t log_worker_written; /* logs written by worker threads */
uint64_t log_watcher_skipped; /* logs watchers missed */
uint64_t log_watcher_sent; /* logs sent to watcher buffers */
struct timeval maxconns_entered; /* last time maxconns entered */
};

Expand Down

0 comments on commit 0503b5e

Please sign in to comment.