From 7c846a16e8eb2930b35c7484720b8ef59125c277 Mon Sep 17 00:00:00 2001 From: Yao Yue Date: Fri, 29 Sep 2017 16:48:58 -0700 Subject: [PATCH] Twemcache 2.6.3 release --- build.sh | 23 +++++ configure.ac | 8 +- src/Makefile.am | 4 + src/mc.c | 84 ++++++++++++++++- src/mc_ascii.c | 220 +++++++++++++++++++++++++++++++++++++++++++- src/mc_core.c | 7 ++ src/mc_core.h | 10 ++ src/mc_hotkey.c | 196 +++++++++++++++++++++++++++++++++++++++ src/mc_hotkey.h | 30 ++++++ src/mc_items.c | 4 + src/mc_items.h | 6 ++ src/mc_kc_map.c | 85 +++++++++++++++++ src/mc_kc_map.h | 18 ++++ src/mc_key_window.c | 72 +++++++++++++++ src/mc_key_window.h | 15 +++ src/mc_ring_array.c | 159 ++++++++++++++++++++++++++++++++ src/mc_ring_array.h | 18 ++++ src/mc_stats.h | 3 + src/mc_thread.c | 2 +- src/mc_time.c | 22 ++++- src/mc_time.h | 1 + src/mc_util.c | 2 + 22 files changed, 977 insertions(+), 12 deletions(-) create mode 100755 build.sh create mode 100644 src/mc_hotkey.c create mode 100644 src/mc_hotkey.h create mode 100644 src/mc_kc_map.c create mode 100644 src/mc_kc_map.h create mode 100644 src/mc_key_window.c create mode 100644 src/mc_key_window.h create mode 100644 src/mc_ring_array.c create mode 100644 src/mc_ring_array.h diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..5c36181 --- /dev/null +++ b/build.sh @@ -0,0 +1,23 @@ +#!/bin/bash +export CI_ROOT=$(pwd) +rm -rf musl && mkdir musl +rm -rf libevent && mkdir libevent +wget https://artifactory.twitter.biz/libs-releases-local/musl-1.1.16.tar.gz +packer fetch --use-tfe --cluster=smf1 cache-user libevent latest +tar -xzf musl-1.1.16.tar.gz -C musl --strip-components=1 +cd musl +./configure --prefix=$CI_ROOT/musl/tmp --syslibdir=$CI_ROOT/musl/tmp/lib +make +make install +export PATH=$PATH:$CI_ROOT/musl/tmp/bin +cd $CI_ROOT +tar -xzf libevent-2.0.22-stable.tar.gz -C libevent --strip-components=1 +cd libevent +mkdir tmp +./configure --prefix=$CI_ROOT/libevent/tmp CC=musl-gcc --enable-static --disable-shared +make +make install +cd $CI_ROOT +autoreconf -fvi +CFLAGS="-ggdb3 -O2" ./configure --enable-debug=log --with-libevent=$CI_ROOT/libevent/tmp CC=musl-gcc --enable-static +make -j diff --git a/configure.ac b/configure.ac index c930be6..0e06c44 100644 --- a/configure.ac +++ b/configure.ac @@ -29,8 +29,12 @@ AC_LANG([C]) # Checks for OS and set OS variables AC_CANONICAL_HOST case $host_os in - linux*) OS_LINUX=yes ;; - darwin*) OS_DARWIN=yes ;; + linux*) OS_LINUX=yes + AC_DEFINE([OS_LINUX], [1], [Define to 1 if OS has Linux kernel]) + ;; + darwin*) OS_DARWIN=yes + AC_DEFINE([OS_DARWIN], [1], [Define to 1 if OS has Darwin kernel]) + ;; *) AC_MSG_ERROR([Your platform is not currently supported]) ;; esac AM_CONDITIONAL([RDYNAMIC], [test x$OS_LINUX = xyes]) diff --git a/src/Makefile.am b/src/Makefile.am index f9ab62b..26c5d0b 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -32,4 +32,8 @@ twemcache_SOURCES = \ mc_queue.h \ mc_cache.c mc_cache.h \ mc_klog.c mc_klog.h \ + mc_hotkey.c mc_hotkey.h \ + mc_ring_array.c mc_ring_array.h \ + mc_key_window.c mc_key_window.h \ + mc_kc_map.c mc_kc_map.h \ mc.c diff --git a/src/mc.c b/src/mc.c index 17fd13e..469fa21 100644 --- a/src/mc.c +++ b/src/mc.c @@ -123,12 +123,17 @@ static struct option long_options[] = { { "disable-cas", no_argument, NULL, 'C' }, /* disable cas */ { "describe-stats", no_argument, NULL, 'D' }, /* print stats description and exit */ { "show-sizes", no_argument, NULL, 'S' }, /* print slab & item struct sizes and exit */ + { "enable-hotkey", no_argument, NULL, 'H' }, /* enable hotkey detection */ { "output", required_argument, NULL, 'o' }, /* output logfile */ { "verbosity", required_argument, NULL, 'v' }, /* log verbosity level */ { "stats-aggr-interval", required_argument, NULL, 'A' }, /* stats aggregation interval in usec */ { "klog-entry", required_argument, NULL, 'x' }, /* command logging entry number */ { "klog-file", required_argument, NULL, 'X' }, /* command logging file */ { "klog-sample-rate", required_argument, NULL, 'y' }, /* command logging sampling rate */ + { "hotkey-redline-qps", required_argument, NULL, 'q' }, /* hotkey signalling begins at this qps */ + { "hotkey-sample-rate", required_argument, NULL, 'Y' }, /* hotkey sampling rate */ + { "hotkey-qps-threshold", required_argument, NULL, 'T' }, /* hotkey frequency signalling threshold */ + { "hotkey-bw-threshold", required_argument, NULL, 'j' }, /* hotkey bandwidth signalling threshold */ { "hash-power", required_argument, NULL, 'e' }, /* fixed sized hash table, as power of 2 */ { "threads", required_argument, NULL, 't' }, /* # of threads */ { "pidfile", required_argument, NULL, 'P' }, /* pid file */ @@ -161,6 +166,7 @@ static char short_options[] = "C" /* disable cas */ "D" /* print stats description and exit */ "S" /* print slab & item struct sizes and exit */ + "H" /* enable hotkey detection */ "o:" /* output logfile */ "v:" /* log verbosity level */ "A:" /* stats aggregation interval in msec */ @@ -171,6 +177,11 @@ static char short_options[] = "x:" /* command logging entry number */ "X:" /* command logging file */ "y:" /* command logging sample rate */ + "q:" /* hotkey signalling begins at this qps */ + "Y:" /* hotkey sampling rate */ + "T:" /* hotkey frequency signalling threshold */ + "j:" /* hotkey bandwidth signalling threshold */ + "B:" /* hotkey min key/val bandwidth signalling */ "R:" /* max request per event */ "c:" /* max simultaneous connections */ "b:" /* tcp backlog queue limit */ @@ -192,13 +203,14 @@ mc_show_usage(void) { log_stderr( "Usage:" CRLF - "twemcache [-?hVCELdkrDS]" CRLF + "twemcache [-?hVCELdkrDSH]" CRLF " [-o output file] [-v verbosity level]" CRLF " [-A stats aggr interval] [-t threads] [-P pid file] [-u user]" CRLF " [-e hash power] [-M eviction strategy]" CRLF " [-x command log entry] [-X command log file] [-y command log sample rate]" CRLF - " [-p port] [-U udp port] [-R max requests] [-c max conns] [-b backlog]" CRLF - " [-l interface] [-s unix path] [-a access mask]" CRLF + " [-q hotkey redline qps] [-Y hotkey sample rate] [-T hotkey qps threshold]" CRLF + " [-B hotkey bandwidth threshold] [-p port] [-U udp port] [-R max requests]" CRLF + " [-c max conns] [-b backlog] [-l interface] [-s unix path] [-a access mask]" CRLF " [-m max memory] [-f factor] [-n min item chunk size] [-I slab size]" CRLF " [-z slab profile]" ""); @@ -216,7 +228,8 @@ mc_show_usage(void) " -D, --describe-stats show version, name and description of each stats" CRLF " metric, and exit" CRLF " -S, --show-sizes show version, item overhead, minimum item size," CRLF - " slab overhead, default slab size, and exit" + " slab overhead, default slab size, and exit" CRLF + " -H, --enable-hotkey enable signalling of hotkey" ""); log_stderr( @@ -251,6 +264,22 @@ mc_show_usage(void) MC_KLOG_SMP_RATE ); + log_stderr( + " -q, --hotkey-redline-qps=N begin signalling if observed qps >= N (default: %d)" CRLF + " -Y, --hotkey-sample-rate=N sample one in every N gets for hotkey detection" CRLF + " (default: %d)" CRLF + " -T, --hotkey-qps-threshold=N signal hotkey if >N of the keys in the window" CRLF + " are the key being sampled (default: %f)" + " -j, --hotkey-bw-threshold=N signal hotkey if the bandwidth of the sampled key >=" CRLF + " N (default: %d)" + "", + HOTKEY_REDLINE_QPS, + HOTKEY_SAMPLE_RATE, + HOTKEY_QPS_THRESHOLD, + HOTKEY_BW_THRESHOLD + ); + + log_stderr( " -R, --max-requests=N set the maximum number of requests per event" CRLF " (default: %d)" CRLF @@ -551,6 +580,12 @@ mc_set_default_options(void) settings.pid_filename = MC_PID_FILE; settings.pid_file = 0; + settings.hotkey_enable = false; + settings.hotkey_redline_qps = HOTKEY_REDLINE_QPS; + settings.hotkey_sample_rate = HOTKEY_SAMPLE_RATE; + settings.hotkey_qps_threshold = HOTKEY_QPS_THRESHOLD; + settings.hotkey_bw_threshold = HOTKEY_BW_THRESHOLD; + memset(settings.profile, 0, sizeof(settings.profile)); settings.profile_last_id = SLABCLASS_MAX_ID; } @@ -560,6 +595,7 @@ mc_get_options(int argc, char **argv) { int c, value, factor; size_t len; + double float_value; bool tcp_specified, udp_specified; tcp_specified = false; @@ -620,6 +656,10 @@ mc_get_options(int argc, char **argv) show_version = 1; break; + case 'H': + settings.hotkey_enable = true; + break; + case 'o': settings.log_filename = optarg; break; @@ -705,6 +745,42 @@ mc_get_options(int argc, char **argv) settings.klog_sampling_rate = value; break; + case 'q': + value = mc_atoi(optarg, strlen(optarg)); + if (value < 0) { + log_stderr("twemcache: option -w requres a non negative number"); + return MC_ERROR; + } + settings.hotkey_redline_qps = value; + break; + + case 'Y': + value = mc_atoi(optarg, strlen(optarg)); + if (value <= 0) { + log_stderr("twemcache: option -Y requires a positive number"); + return MC_ERROR; + } + settings.hotkey_sample_rate = value; + break; + + case 'T': + float_value = strtod(optarg, NULL); + if (float_value <= 0 || float_value > 1) { + log_stderr("twemcache: option -T requires a number between 0 and 1"); + return MC_ERROR; + } + settings.hotkey_qps_threshold = float_value; + break; + + case 'j': + value = mc_atoi(optarg, strlen(optarg)); + if (value <= 0) { + log_stderr("twemcache: option -B requires a positive number"); + return MC_ERROR; + } + settings.hotkey_bw_threshold = value; + break; + case 't': value = mc_atoi(optarg, strlen(optarg)); if (value <= 0) { diff --git a/src/mc_ascii.c b/src/mc_ascii.c index 051f269..7cf58e2 100644 --- a/src/mc_ascii.c +++ b/src/mc_ascii.c @@ -87,6 +87,14 @@ extern struct settings settings; * config klog interval \r\n * config klog sampling reset\r\n * config klog sampling \r\n + * + * COMMAND SUBCOMMAND HK_COMMAND HK_SUBCOMMAND + * config hotkey enable yes\r\n + * config hotkey enable no\r\n + * config hotkey redline \r\n + * config hotkey sample_rate \r\n + * config hotkey qps_threshold \r\n + * config hotkey bw_threshold \r\n */ #define TOKEN_COMMAND 0 @@ -102,6 +110,8 @@ extern struct settings settings; #define TOKEN_AGGR_COMMAND 2 #define TOKEN_EVICT_COMMAND 2 #define TOKEN_MAXBYTES_COMMAND 2 +#define TOKEN_HK_COMMAND 2 +#define TOKEN_HK_SUBCOMMAND 3 #define TOKEN_KLOG_COMMAND 2 #define TOKEN_KLOG_SUBCOMMAND 3 #define TOKEN_MAX 8 @@ -893,11 +903,11 @@ asc_respond_get(struct conn *c, unsigned valid_key_iter, struct item *it, } if (return_cas) { sz = mc_snprintf(suffix, SUFFIX_MAX_LEN, " %"PRIu32" %"PRIu32" %"PRIu64, - it->dataflags, nbyte, item_get_cas(it)); + it->dataflags, nbyte, item_get_cas(it)); ASSERT(sz <= SUFFIX_SIZE + CAS_SUFFIX_SIZE); } else { sz = mc_snprintf(suffix, SUFFIX_MAX_LEN, " %"PRIu32" %"PRIu32, - it->dataflags, nbyte); + it->dataflags, nbyte); ASSERT(sz <= SUFFIX_SIZE); } if (sz < 0) { @@ -1122,6 +1132,14 @@ asc_process_update(struct conn *c, struct token *token, int ntoken) return; } + /** + * If hot key is enabled, we ignore any incoming flag value set by client. + * This flag will be used to send out signals like hotkey, backpressure etc. + */ + if (__atomic_load_n(&settings.hotkey_enable, __ATOMIC_RELAXED)) { + flags = 0; + } + it = item_alloc(id, key, nkey, flags, time_reltime(exptime), vlen); if (it == NULL) { log_warn("server error on c %d for req of type %d because of oom in " @@ -1245,8 +1263,8 @@ asc_process_delta(struct conn *c, struct token *token, int ntoken) break; case DELTA_NON_NUMERIC: - log_debug(LOG_NOTICE, "client error on c %d for req of type %d with " - "non-numeric value", c->sd, c->req_type); + log_warn("client error on c %d for req of type %d and key '%.*s' with " + "non-numeric value", c->sd, c->req_type, nkey, key); rsplen = asc_rsp_client_error(c); klog_write(c->peer, c->req_type, c->req, c->req_len, res, rsplen); @@ -1648,6 +1666,192 @@ asc_process_maxbytes(struct conn *c, struct token *token, int ntoken) asc_rsp_client_error(c); } +static void +asc_process_hk_enable(struct conn *c, struct token *token, int ntoken) +{ + struct token *t; + + t = &token[TOKEN_HK_SUBCOMMAND]; + if (strncmp(t->val, "yes", t->len) == 0) { + if (__atomic_load_n(&hotkey_realloc, __ATOMIC_RELAXED)) { + log_debug(LOG_NOTICE, "client error on c %d for req of type %d " + "hotkey cannot be enabled when reallocating", c->sd, + c->req_type); + asc_rsp_client_error(c); + return; + } + log_debug(LOG_NOTICE, "hotkey enabled at epoch %u", time_now()); + __atomic_store_n(&settings.hotkey_enable, true, __ATOMIC_RELAXED); + } else if (strncmp(t->val, "no", t->len) == 0) { + log_debug(LOG_NOTICE, "hotkey disabled at epoch %u", time_now()); + __atomic_store_n(&settings.hotkey_enable, false, __ATOMIC_RELAXED); + } else { + log_debug(LOG_NOTICE, "client error on c %d for req of type %d " + "with invalid hotkey enable subcommand '%.*s'", c->sd, + c->req_type, t->len, t->val); + asc_rsp_client_error(c); + return; + } + + asc_rsp_ok(c); +} + +static void +asc_process_hk_redline(struct conn *c, struct token *token, int ntoken) +{ + uint32_t option; + + if (__atomic_load_n(&settings.hotkey_enable, __ATOMIC_RELAXED)) { + log_debug(LOG_NOTICE, "client error on c %d for req of type %d " + " redline qps can only be changed with hotkey disabled", + c->sd, c->req_type); + + asc_rsp_client_error(c); + return; + } + + if (!mc_strtoul(token[TOKEN_HK_SUBCOMMAND].val, &option)) { + log_debug(LOG_NOTICE, "client error on c %d for req of type %d with " + "invalid option '%.*s'", c->sd, c->req_type, + token[TOKEN_HK_SUBCOMMAND].len, token[TOKEN_HK_SUBCOMMAND].val); + + asc_rsp_client_error(c); + return; + } + + if (option > 0) { + if (hotkey_update_redline(option) != MC_OK) { + log_debug(LOG_ERR, "not enough memory to accommodate new " + "hotkey redline qps setting!"); + asc_rsp_client_error(c); + return; + } + asc_rsp_ok(c); + return; + } + + log_debug(LOG_NOTICE, "client error on c %d for req of type %d with " + "invalid option %"PRId32"", c->sd, c->req_type, option); + + asc_rsp_client_error(c); +} + +static void +asc_process_hk_sample_rate(struct conn *c, struct token *token, int ntoken) +{ + uint32_t option; + + if (__atomic_load_n(&settings.hotkey_enable, __ATOMIC_RELAXED)) { + log_debug(LOG_NOTICE, "client error on c %d for req of type %d " + " sampling rate can only be changed with hotkey disabled", + c->sd, c->req_type); + + asc_rsp_client_error(c); + return; + } + + if (!mc_strtoul(token[TOKEN_HK_SUBCOMMAND].val, &option)) { + log_debug(LOG_NOTICE, "client error on c %d for req of type %d with " + "invalid option '%.*s'", c->sd, c->req_type, + token[TOKEN_HK_SUBCOMMAND].len, token[TOKEN_HK_SUBCOMMAND].val); + + asc_rsp_client_error(c); + return; + } + + if (option > 0) { + if (hotkey_update_sample_rate(option) != MC_OK) { + log_debug(LOG_ERR, "not enough memory to accommodate new " + "hotkey sample rate!"); + asc_rsp_client_error(c); + return; + } + asc_rsp_ok(c); + return; + } + + log_debug(LOG_NOTICE, "client error on c %d for req of type %d with " + "invalid option %"PRId32"", c->sd, c->req_type, option); + + asc_rsp_client_error(c); +} + +static void +asc_process_hk_qps_threshold(struct conn *c, struct token *token, int ntoken) +{ + double option; + + option = strtod(token[TOKEN_HK_SUBCOMMAND].val, NULL); + + if (option > 0 && option <= 1) { + hotkey_update_qps_threshold(option); + asc_rsp_ok(c); + return; + } + + log_debug(LOG_NOTICE, "client error on c %d for req of type %d with " + "invalid option %"PRId32"", c->sd, c->req_type, option); + + asc_rsp_client_error(c); +} + +static void +asc_process_hk_bw_threshold(struct conn *c, struct token *token, int ntoken) +{ + uint32_t option; + + if (!mc_strtoul(token[TOKEN_HK_SUBCOMMAND].val, &option)) { + log_debug(LOG_NOTICE, "client error on c %d for req of type %d with " + "invalid option '%.*s'", c->sd, c->req_type, + token[TOKEN_HK_SUBCOMMAND].len, token[TOKEN_HK_SUBCOMMAND].val); + + asc_rsp_client_error(c); + return; + } + + hotkey_update_bw_threshold(option); + asc_rsp_ok(c); +} + +static void +asc_process_hotkey(struct conn *c, struct token *token, int ntoken) +{ + struct token *t; + + if (!asc_validate_ntoken(c, ntoken)) { + return; + } + + if (ntoken != 5) { + log_hexdump(LOG_NOTICE, c->req, c->req_len, "client error on c %d for " + "req of type %d with %d invalid tokens", c->sd, + c->req_type, ntoken); + + asc_rsp_client_error(c); + return; + } + + t = &token[TOKEN_HK_COMMAND]; + + if (strncmp(t->val, "enable", t->len) == 0) { + asc_process_hk_enable(c, token, ntoken); + } else if (strncmp(t->val, "redline", t->len) == 0) { + asc_process_hk_redline(c, token, ntoken); + } else if (strncmp(t->val, "sample_rate", t->len) == 0) { + asc_process_hk_sample_rate(c, token, ntoken); + } else if (strncmp(t->val, "qps_threshold", t->len) == 0) { + asc_process_hk_qps_threshold(c, token, ntoken); + } else if (strncmp(t->val, "bw_threshold", t->len) == 0) { + asc_process_hk_bw_threshold(c, token, ntoken); + } else { + log_debug(LOG_NOTICE, "client error on c %d for req of type %d with " + "invalid hotkey subcommand '%.*s'", c->sd, c->req_type, + t->len, t->val); + + asc_rsp_client_error(c); + } +} + static void asc_process_config(struct conn *c, struct token *token, int ntoken) { @@ -1661,6 +1865,14 @@ asc_process_config(struct conn *c, struct token *token, int ntoken) asc_process_evict(c, token, ntoken); } else if (strncmp(t->val, "maxbytes", t->len) == 0) { asc_process_maxbytes(c, token, ntoken); + } else if (strncmp(t->val, "hotkey", t->len) == 0) { + asc_process_hotkey(c, token, ntoken); + } else { + log_debug(LOG_NOTICE, "client error on c %d for req of type %d with " + "invalid config subcommand '%.*s'", c->sd, c->req_type, + t->len, t->val); + + asc_rsp_client_error(c); } } diff --git a/src/mc_core.c b/src/mc_core.c index ddb8573..7514426 100644 --- a/src/mc_core.c +++ b/src/mc_core.c @@ -782,6 +782,8 @@ core_drive_machine(struct conn *c) break; } } + + time_update(); } void @@ -1078,6 +1080,11 @@ core_init(void) return status; } + status = hotkey_init(); + if (status != MC_OK) { + return status; + } + time_init(); /* start up worker, dispatcher and aggregator threads */ diff --git a/src/mc_core.h b/src/mc_core.h index 372153b..680db7d 100644 --- a/src/mc_core.h +++ b/src/mc_core.h @@ -223,6 +223,10 @@ typedef enum rsp_type { #include #include #include +#include +#include +#include +#include struct settings { /* options with no argument */ @@ -279,6 +283,12 @@ struct settings { size_t profile[SLABCLASS_MAX_IDS]; /* memory : slab profile */ uint8_t profile_last_id; /* memory : last id in slab profile */ + + bool hotkey_enable; /* hotkey : use hotkey detection? */ + size_t hotkey_redline_qps; /* hotkey : begin signalling at this QPS */ + size_t hotkey_sample_rate; /* hotkey : sampling ratio */ + double hotkey_qps_threshold; /* hotkey : theshold for hotkey signal (fraction) */ + size_t hotkey_bw_threshold; /* hotkey : bandwidth signalling threshold in bytes/s */ }; void core_write_and_free(struct conn *c, char *buf, int bytes); diff --git a/src/mc_hotkey.c b/src/mc_hotkey.c new file mode 100644 index 0000000..46879c6 --- /dev/null +++ b/src/mc_hotkey.c @@ -0,0 +1,196 @@ +#include + +#include + +extern struct settings settings; + +bool hotkey_realloc = false; +static uint64_t hotkey_counter; + +#define HOTKEY_WINDOW_SIZE HOTKEY_REDLINE_QPS * HOTKEY_TIMEFRAME / 1000 / HOTKEY_SAMPLE_RATE + +static size_t hotkey_redline_qps = HOTKEY_REDLINE_QPS; /* above this qps, hotkey signalling kicks in */ +static size_t hotkey_sample_rate = HOTKEY_SAMPLE_RATE; /* sampling rate, one in every hotkey_sample_rate + gets is sampled */ +static size_t hotkey_timeframe = HOTKEY_TIMEFRAME; /* timeframe for window at redline_qps, default + 1000 msec */ +static size_t hotkey_window_size = HOTKEY_WINDOW_SIZE; /* number of samples to keep, calculated as + redline qps * timeframe / sample rate */ +static size_t hotkey_threshold = HOTKEY_QPS_THRESHOLD * HOTKEY_WINDOW_SIZE; /* if key count in the + window >= hotkey_threshold, signal is given */ +static size_t hotkey_bw_threshold = HOTKEY_BW_THRESHOLD; /* if key consumes >= hotkey_bw_threshold + bytes/sec, signal is given */ +static uint64_t hotkey_qps_numerator = + (uint64_t)HOTKEY_WINDOW_SIZE * (uint64_t)HOTKEY_SAMPLE_RATE * 1000000ULL; + /* this figure is precalculated as part of the + calculation for determining realtime qps */ + +rstatus_t +hotkey_init(void) +{ + rstatus_t status; + + hotkey_redline_qps = settings.hotkey_redline_qps; + hotkey_sample_rate = settings.hotkey_sample_rate; + hotkey_window_size = hotkey_redline_qps * hotkey_timeframe / 1000 / hotkey_sample_rate; + hotkey_threshold = (size_t)(settings.hotkey_qps_threshold * hotkey_window_size); + hotkey_bw_threshold = settings.hotkey_bw_threshold; + hotkey_qps_numerator = hotkey_window_size * hotkey_sample_rate * 1000000; + hotkey_counter = 0; + + if ((status = key_window_init(hotkey_window_size)) != MC_OK) { + return status; + } + + if ((status = kc_map_init(hotkey_window_size)) != MC_OK) { + return status; + } + + return MC_OK; +} + +void +hotkey_deinit(void) +{ + key_window_deinit(); + kc_map_deinit(); +} + +/* given key count in window, key/val size, duration in usec, calculate bandwidth */ +static inline size_t +_get_bandwidth(size_t count, size_t size, size_t usec) +{ + return count * size * hotkey_sample_rate * 1000000 / usec; +} + +item_control_flags_t +hotkey_sample(const char *key, size_t klen, size_t vlen) +{ + if (++hotkey_counter % hotkey_sample_rate == 0) { + /* sample this key */ + size_t count; + uint64_t qps, bw; + uint64_t oldest_time, cur_time, time_diff; /* timestamps in usec */ + + /* get current time, push key into window with timestamp, then obtain + the count of that key */ + ASSERT(!key_window_full()); + cur_time = (time_now() * 1000000) + time_now_usec(); + count = key_window_push(key, klen, cur_time); + stats_thread_incr(hotkey_sampled); + + if (key_window_full()) { + /* calculate qps using window size, and the amount of time it took to + fill the window. */ + oldest_time = key_window_pop(); + /* the ternary conditional statement is here to prevent any divide by 0 errs */ + time_diff = (cur_time - oldest_time > 0) ? cur_time - oldest_time : 1; + qps = hotkey_qps_numerator / time_diff; + bw = _get_bandwidth(count, klen + vlen, time_diff); + + log_debug(LOG_DEBUG, "count of key %.*s: %d qps: %d bandwidth: %d", + klen, key, count, qps, bw); + + /* signal QPS hotkey if qps >= redline and key count >= threshold */ + if (qps >= hotkey_redline_qps && count >= __atomic_load_n(&hotkey_threshold, + __ATOMIC_RELAXED)) { + log_debug(LOG_INFO, "frequency hotkey detected: %.*s", klen, key); + stats_thread_incr(hotkey_qps); + return ITEM_HOT_QPS; + } + + /* signal bandwidth hotkey if bw consumption >= threshold */ + if (bw >= __atomic_load_n(&hotkey_bw_threshold, __ATOMIC_RELAXED)) { + log_debug(LOG_INFO, "bandwidth hotkey detected: %.*s", klen, key); + stats_thread_incr(hotkey_bw); + return ITEM_HOT_BW; + } + } + } + + return 0; +} + +static inline rstatus_t +_hotkey_realloc(void) +{ + rstatus_t status; + + hotkey_window_size = hotkey_redline_qps * hotkey_timeframe / 1000 / hotkey_sample_rate; + hotkey_qps_numerator = hotkey_window_size * hotkey_sample_rate * 1000000; + + key_window_deinit(); + if ((status = key_window_init(hotkey_window_size)) != MC_OK) { + return status; + } + + kc_map_deinit(); + if ((status = kc_map_init(hotkey_window_size)) != MC_OK) { + return status; + } + + return MC_OK; +} + +rstatus_t +hotkey_update_redline(size_t redline) +{ + rstatus_t status; + + ASSERT(!settings.hotkey_enable); + + if (hotkey_redline_qps == redline) { + return MC_OK; + } + + __atomic_store_n(&hotkey_realloc, true, __ATOMIC_RELAXED); + hotkey_redline_qps = settings.hotkey_redline_qps = redline; + if ((status = _hotkey_realloc()) != MC_OK) { + return status; + } + __atomic_store_n(&hotkey_realloc, false, __ATOMIC_RELAXED); + + return MC_OK; +} + +rstatus_t +hotkey_update_sample_rate(size_t sample_rate) +{ + rstatus_t status; + + ASSERT(!settings.hotkey_enable); + + if (hotkey_sample_rate == sample_rate) { + return MC_OK; + } + + __atomic_store_n(&hotkey_realloc, true, __ATOMIC_RELAXED); + hotkey_sample_rate = settings.hotkey_sample_rate = sample_rate; + if ((status = _hotkey_realloc()) != MC_OK) { + return status; + } + __atomic_store_n(&hotkey_realloc, false, __ATOMIC_RELAXED); + + return MC_OK; +} + +void +hotkey_update_qps_threshold(double qps_threshold) +{ + if (settings.hotkey_qps_threshold == qps_threshold) { + return; + } + settings.hotkey_qps_threshold = qps_threshold; + __atomic_store_n(&hotkey_threshold, (size_t)(settings.hotkey_qps_threshold * + hotkey_window_size), __ATOMIC_RELAXED); +} + +void +hotkey_update_bw_threshold(size_t bw_threshold) +{ + if (settings.hotkey_bw_threshold == bw_threshold) { + return; + } + settings.hotkey_bw_threshold = bw_threshold; + __atomic_store_n(&hotkey_bw_threshold, bw_threshold, __ATOMIC_RELAXED); +} diff --git a/src/mc_hotkey.h b/src/mc_hotkey.h new file mode 100644 index 0000000..5503f9a --- /dev/null +++ b/src/mc_hotkey.h @@ -0,0 +1,30 @@ +#ifndef _MC_HOTKEY_H_ +#define _MC_HOTKEY_H_ + +#include + +#include + +#define MAX_KEY_LEN 255 + +#define HOTKEY_REDLINE_QPS 80000 /* begin signalling hotkey if observed qps >= HOTKEY_REDLINE_QPS */ +#define HOTKEY_SAMPLE_RATE 100 /* sample one in every HOTKEY_SAMPLE_RATE keys */ +#define HOTKEY_TIMEFRAME 1000 /* in milliseconds. we calculate window size based on timeframe and + sample rate at the redline qps */ +#define HOTKEY_QPS_THRESHOLD 0.01 /* signal hotkey if key is observed taking >= HOTKEY_QPS_THRESHOLD + of the traffic in the window */ +#define HOTKEY_BW_THRESHOLD 200000 /* signal hotkey if key is observed taking >= HOTKEY_BW_THRESHOLD + bytes/second of bandwidth */ + +extern bool hotkey_realloc; + +rstatus_t hotkey_init(void); +void hotkey_deinit(void); +item_control_flags_t hotkey_sample(const char *key, size_t klen, size_t vlen); + +rstatus_t hotkey_update_redline(size_t redline); +rstatus_t hotkey_update_sample_rate(size_t sample_rate); +void hotkey_update_qps_threshold(double qps_threshold); +void hotkey_update_bw_threshold(size_t bw_threshold); + +#endif diff --git a/src/mc_items.c b/src/mc_items.c index 1d164e0..c25aefc 100644 --- a/src/mc_items.c +++ b/src/mc_items.c @@ -669,6 +669,10 @@ item_get(const char *key, size_t nkey) pthread_mutex_lock(&cache_lock); it = _item_get(key, nkey); + if (__atomic_load_n(&settings.hotkey_enable, __ATOMIC_RELAXED) && it != NULL) { + it->dataflags &= ~(ITEM_HOT_QPS | ITEM_HOT_BW); + it->dataflags |= hotkey_sample(key, nkey, it->nbyte); + } pthread_mutex_unlock(&cache_lock); return it; diff --git a/src/mc_items.h b/src/mc_items.h index cf26f05..a4dc3d7 100644 --- a/src/mc_items.h +++ b/src/mc_items.h @@ -35,8 +35,14 @@ typedef enum item_flags { ITEM_CAS = 2, /* item has cas */ ITEM_SLABBED = 4, /* item in free q */ ITEM_RALIGN = 8, /* item data (payload) is right-aligned */ + } item_flags_t; +typedef enum item_control_flags { + ITEM_HOT_QPS = 1, /* item is a hot key based on get QPS */ + ITEM_HOT_BW = 2, /* item is a hot key based on bandwidth consumption */ +} item_control_flags_t; + typedef enum item_set_result { SET_OK } item_set_result_t; diff --git a/src/mc_kc_map.c b/src/mc_kc_map.c new file mode 100644 index 0000000..c02b819 --- /dev/null +++ b/src/mc_kc_map.c @@ -0,0 +1,85 @@ +#include + +#include + +static struct kc_map_entry *table; +static size_t table_size; +static size_t table_nkey; + +static void +kc_map_entry_reset(struct kc_map_entry *kcme) +{ + kcme->klen = 0; + kcme->count = 0; +} + +rstatus_t +kc_map_init(size_t size) +{ + int i; + + /* allocate 2x the number of entries expected */ + table_size = 2 * size; + table = mc_alloc(sizeof(*table) * table_size); + + if (table == NULL) { + log_error("Could not allocate counter table for hotkey - OOM"); + return MC_ENOMEM; + } + + for (i = 0; i < table_size; ++i) { + kc_map_entry_reset(table + i); + } + + table_nkey = 0; + + return MC_OK; +} + +void +kc_map_deinit(void) +{ + mc_free(table); + table_size = 0; + table_nkey = 0; +} + +/* return true if empty entry or if key, klen match kcme */ +static inline bool +kc_map_match(const struct kc_map_entry *kcme, const char *key, size_t klen) +{ + return kcme->count == 0 || (kcme->klen == klen && strncmp(kcme->key, key, klen) == 0); +} + +struct kc_map_entry * +kc_map_incr(const char *key, size_t klen) +{ + size_t entry; + + ASSERT(table_nkey < table_size); + + /* hash, then iterate until we find either a match or empty entry */ + for (entry = hash(key, klen, 0) % table_size; + !kc_map_match(table + entry, key, klen); + entry = (entry + 1) % table_size); + + if ((table + entry)->count == 0) { + memcpy((table + entry)->key, key, klen); + (table + entry)->klen = klen; + (table + entry)->count = 1; + } else { + ++((table + entry)->count); + } + + return table + entry; +} + +void +kc_map_decr(struct kc_map_entry *kcme) +{ + if (kcme->count == 1) { + kc_map_entry_reset(kcme); + } else { + --(kcme->count); + } +} diff --git a/src/mc_kc_map.h b/src/mc_kc_map.h new file mode 100644 index 0000000..563626b --- /dev/null +++ b/src/mc_kc_map.h @@ -0,0 +1,18 @@ +#ifndef _MC_KC_MAP_H_ +#define _MC_KC_MAP_H_ + +#include + +struct kc_map_entry { + char key[MAX_KEY_LEN]; + size_t klen; + size_t count; +}; + +rstatus_t kc_map_init(size_t size); +void kc_map_deinit(void); + +struct kc_map_entry *kc_map_incr(const char *key, size_t klen); +void kc_map_decr(struct kc_map_entry *kcme); + +#endif diff --git a/src/mc_key_window.c b/src/mc_key_window.c new file mode 100644 index 0000000..33084e0 --- /dev/null +++ b/src/mc_key_window.c @@ -0,0 +1,72 @@ +#include + +static size_t key_window_size; +static size_t key_window_max; + +struct ring_array *queue; + +struct kw_entry { + struct kc_map_entry *kcme; + uint64_t timestamp_us; /* timestamp in usec */ +}; + +size_t +key_window_push(const char *key, size_t klen, uint64_t time) +{ + rstatus_t status; + struct kw_entry kwe; + + ASSERT(klen <= MAX_KEY_LEN); + + kwe.timestamp_us = time; + kwe.kcme = kc_map_incr(key, klen); + status = ring_array_push(&kwe, queue); + ++key_window_size; + + ASSERT(status == MC_OK); + (void)status; + + return kwe.kcme->count; +} + +uint64_t +key_window_pop(void) +{ + rstatus_t status; + struct kw_entry kwe; + + status = ring_array_pop(&kwe, queue); + --key_window_size; + + ASSERT(status == MC_OK); + (void)status; + + kc_map_decr(kwe.kcme); + return kwe.timestamp_us; +} + +bool +key_window_full(void) +{ + return key_window_size == key_window_max; +} + +rstatus_t +key_window_init(size_t size) +{ + if ((queue = ring_array_create(sizeof(struct kw_entry), size)) == NULL) { + return MC_ENOMEM; + } + + key_window_size = 0; + key_window_max = size; + return MC_OK; +} + +void +key_window_deinit(void) +{ + ring_array_destroy(&queue); + key_window_size = 0; + key_window_max = 0; +} diff --git a/src/mc_key_window.h b/src/mc_key_window.h new file mode 100644 index 0000000..480eae7 --- /dev/null +++ b/src/mc_key_window.h @@ -0,0 +1,15 @@ +#ifndef _MC_KEY_WINDOW_H_ +#define _MC_KEY_WINDOW_H_ + +#include + +#include + +size_t key_window_push(const char *key, size_t klen, uint64_t time); +uint64_t key_window_pop(void); +bool key_window_full(void); + +rstatus_t key_window_init(size_t size); +void key_window_deinit(void); + +#endif diff --git a/src/mc_ring_array.c b/src/mc_ring_array.c new file mode 100644 index 0000000..0fdb384 --- /dev/null +++ b/src/mc_ring_array.c @@ -0,0 +1,159 @@ +#include + +#include + +struct ring_array { + size_t elem_size; /* element size */ + uint32_t cap; /* total capacity (# items stored + 1) */ + uint32_t rpos; /* read offset */ + uint32_t wpos; /* write offset */ + union { + size_t pad; /* using a size_t member to force alignment at + native word boundary */ + uint8_t data[1]; /* beginning of array */ + }; +}; + +#define RING_ARRAY_HDR_SIZE offsetof(struct ring_array, data) + +/** + * The total number of slots allocated is (cap + 1) + * + * Each ring array should have exactly one reader and exactly one writer, as + * far as threads are concerned (which can be the same). This allows the use of + * atomic instructions to replace locks. + * + * We use an extra slot to differentiate full from empty. + * + * 1) If rpos == wpos, the buffer is empty. + * + * 2) If rpos is behind wpos (see below): + * # of occupied slots: wpos - rpos + * # of vacant slots: rpos + cap - wpos + 1 + * # of writable slots: rpos + cap - wpos + * full if: rpos == 0, wpos == cap + * + * 0 cap + * | | + * v v + * +-+-+-+---------------+-+-+ + * data | | | | ... | | | + * +-+-+-+---------------+-+-+ + * ^ ^ + * | | + * rpos wpos + * + * 3) If rpos is ahead of wpos (see below): + * # of occupied slots: wpos + cap - rpos + 1 + * # of vacant slots: rpos - wpos + * # of writable slots: rpos - wpos - 1 + * full if: rpos == wpos + 1 + * + * 0 cap + * | | + * v v + * +-+-+-+---------------+-+-+ + * data | | | | ... | | | + * +-+-+-+---------------+-+-+ + * ^ ^ + * | | + * wpos rpos + * + */ + +static inline uint32_t +ring_array_nelem(uint32_t rpos, uint32_t wpos, uint32_t cap) +{ + if (rpos <= wpos) { /* condition 1, 2) */ + return wpos - rpos; + } else { /* condition 3 */ + return wpos + (cap - rpos + 1); + } +} + +static inline bool +ring_array_empty(uint32_t rpos, uint32_t wpos) +{ + return rpos == wpos; +} + +static inline bool +ring_array_full(uint32_t rpos, uint32_t wpos, uint32_t cap) +{ + return ring_array_nelem(rpos, wpos, cap) == cap; +} + +rstatus_t +ring_array_push(const void *elem, struct ring_array *arr) +{ + /** + * Take snapshot of rpos, since another thread might be popping. Note: other + * members of arr do not need to be saved because we assume the other thread + * only pops and does not push; in other words, only one thread updates + * either rpos or wpos. + */ + uint32_t new_wpos; + uint32_t rpos = __atomic_load_n(&(arr->rpos), __ATOMIC_RELAXED); + + if (ring_array_full(rpos, arr->wpos, arr->cap)) { + log_debug(LOG_DEBUG, "Could not push to ring array %p; array is full", arr); + return MC_ERROR; + } + + memcpy(arr->data + (arr->elem_size * arr->wpos), elem, arr->elem_size); + + /* update wpos atomically */ + new_wpos = (arr->wpos + 1) % (arr->cap + 1); + __atomic_store_n(&(arr->wpos), new_wpos, __ATOMIC_RELAXED); + + return MC_OK; +} + +rstatus_t +ring_array_pop(void *elem, struct ring_array *arr) +{ + /* take snapshot of wpos, since another thread might be pushing */ + uint32_t new_rpos; + uint32_t wpos = __atomic_load_n(&(arr->wpos), __ATOMIC_RELAXED); + + if (ring_array_empty(arr->rpos, wpos)) { + log_debug(LOG_DEBUG, "Could not pop from ring array %p; array is empty", arr); + return MC_ERROR; + } + + if (elem != NULL) { + memcpy(elem, arr->data + (arr->elem_size * arr->rpos), arr->elem_size); + } + + /* update rpos atomically */ + new_rpos = (arr->rpos + 1) % (arr->cap + 1); + __atomic_store_n(&(arr->rpos), new_rpos, __ATOMIC_RELAXED); + + return MC_OK; +} + +struct ring_array * +ring_array_create(size_t elem_size, uint32_t cap) +{ + struct ring_array *arr; + + arr = mc_alloc(RING_ARRAY_HDR_SIZE + elem_size * (cap + 1)); + + if (arr == NULL) { + log_error("Could not allocate memory for ring array cap %u " + "elem_size %u", cap, elem_size); + return NULL; + } + + arr->elem_size = elem_size; + arr->cap = cap; + arr->rpos = arr->wpos = 0; + return arr; +} + +void +ring_array_destroy(struct ring_array **arr) +{ + mc_free(*arr); + *arr = NULL; +} diff --git a/src/mc_ring_array.h b/src/mc_ring_array.h new file mode 100644 index 0000000..7862139 --- /dev/null +++ b/src/mc_ring_array.h @@ -0,0 +1,18 @@ +#ifndef _MC_RING_ARRAY_H_ +#define _MC_RING_ARRAY_H_ + +#include + +struct ring_array; + +/* push an element into the array */ +rstatus_t ring_array_push(const void *elem, struct ring_array *arr); + +/* pop an element from the array */ +rstatus_t ring_array_pop(void *elem, struct ring_array *arr); + +/* creation/destruction */ +struct ring_array *ring_array_create(size_t elem_size, uint32_t cap); +void ring_array_destroy(struct ring_array **arr); + +#endif diff --git a/src/mc_stats.h b/src/mc_stats.h index ac0b9b6..99da761 100644 --- a/src/mc_stats.h +++ b/src/mc_stats.h @@ -83,6 +83,9 @@ ACTION( klog_logged, STATS_COUNTER, "# commands logged in buffer when klog is turned on") \ ACTION( klog_discarded, STATS_COUNTER, "# commands discarded when klog is turned on") \ ACTION( klog_skipped, STATS_COUNTER, "# commands skipped by sampling when klog is turned on")\ + ACTION( hotkey_sampled, STATS_COUNTER, "# keys sampled for hotkey detection") \ + ACTION( hotkey_qps, STATS_COUNTER, "# times qps based hotkey detected and signal given") \ + ACTION( hotkey_bw, STATS_COUNTER, "# times b/w based hotkey detected and signal given") \ ACTION( accept_eagain, STATS_COUNTER, "# EAGAIN when calling accept()") \ ACTION( accept_eintr, STATS_COUNTER, "# EINTR when calling accept()") \ ACTION( accept_emfile, STATS_COUNTER, "# EMFILE when calling accept()") \ diff --git a/src/mc_thread.c b/src/mc_thread.c index 626c58e..3843edf 100644 --- a/src/mc_thread.c +++ b/src/mc_thread.c @@ -237,7 +237,7 @@ thread_setup(struct thread_worker *t) thread_libevent_process, t); event_base_set(t->base, &t->notify_event); - status = event_add(&t->notify_event, 0); + status = event_add(&t->notify_event, NULL); if (status < 0) { log_error("event add failed: %s", strerror(errno)); return MC_ERROR; diff --git a/src/mc_time.c b/src/mc_time.c index 4e9331c..399ae55 100644 --- a/src/mc_time.c +++ b/src/mc_time.c @@ -67,11 +67,23 @@ static time_t process_started; * So, now actually holds 32-bit seconds since the server start time. */ static volatile rel_time_t now; +static volatile rel_time_t now_usec; void time_update(void) { - int status; + rstatus_t status; + +#if defined OS_LINUX + struct timespec timer; + + status = clock_gettime(CLOCK_REALTIME_COARSE, &timer); + if (status < 0) { + log_error("clock_gettime failed: %s", strerror(errno)); + } + now = (rel_time_t)(timer.tv_sec - process_started); + now_usec = timer.tv_nsec / 1000; +#else struct timeval timer; status = gettimeofday(&timer, NULL); @@ -79,6 +91,8 @@ time_update(void) log_error("gettimeofday failed: %s", strerror(errno)); } now = (rel_time_t) (timer.tv_sec - process_started); + now_usec = timer.tv_usec; +#endif log_debug(LOG_PVERB, "time updated to %u", now); } @@ -89,6 +103,12 @@ time_now(void) return now; } +rel_time_t +time_now_usec(void) +{ + return now_usec; +} + time_t time_now_abs(void) { diff --git a/src/mc_time.h b/src/mc_time.h index 21611be..dc9540a 100644 --- a/src/mc_time.h +++ b/src/mc_time.h @@ -42,6 +42,7 @@ typedef unsigned int rel_time_t; void time_update(void); rel_time_t time_now(void); +rel_time_t time_now_usec(void); time_t time_now_abs(void); time_t time_started(void); rel_time_t time_reltime(time_t exptime); diff --git a/src/mc_util.c b/src/mc_util.c index 829bb5b..8405764 100644 --- a/src/mc_util.c +++ b/src/mc_util.c @@ -33,7 +33,9 @@ #include #include #include +#ifdef MC_BACKTRACE #include +#endif #include #include