Skip to content

Commit

Permalink
proxy: ustats internal refactor
Browse files Browse the repository at this point in the history
If a ustat changes name for the same index, its value is reset to 0.

Cleans up some confusing code around how ustats are handled; the global
ctx and worker threads now use different structures with different
intent, instead of different sides of the same struct.

Speeds up `stats proxy` dumping by compacting stat names into a linear
buffer, and unrolls some snprintf's into direct memory copies plus a
faster itoa.

Also reorders the entries in the global ctx so the most frequently
accessed ones are clustered toward the front of the struct.
  • Loading branch information
dormando committed Jun 21, 2024
1 parent 5f69c6d commit ddc7398
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 45 deletions.
38 changes: 30 additions & 8 deletions proto_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,9 @@ void process_proxy_stats(void *arg, ADD_STAT add_stats, void *c) {
buffer_memory_limit = ctx->buffer_memory_limit;

// prepare aggregated counters.
struct proxy_user_stats *us = &ctx->user_stats;
uint64_t counters[us->num_stats];
struct proxy_user_stats_entry *us = ctx->user_stats;
int stats_num = ctx->user_stats_num;
uint64_t counters[stats_num];
memset(counters, 0, sizeof(counters));

// TODO (v3): more globals to remove and/or change API method.
Expand All @@ -200,8 +201,8 @@ void process_proxy_stats(void *arg, ADD_STAT add_stats, void *c) {
}
istats.vm_gc_runs += is->vm_gc_runs;
istats.vm_memory_kb += is->vm_memory_kb;
if (tus && tus->num_stats >= us->num_stats) {
for (int i = 0; i < us->num_stats; i++) {
if (tus && tus->num_stats >= stats_num) {
for (int i = 0; i < stats_num; i++) {
counters[i] += tus->counters[i];
}
}
Expand All @@ -212,10 +213,31 @@ void process_proxy_stats(void *arg, ADD_STAT add_stats, void *c) {
}

// return all of the user generated stats
for (int x = 0; x < us->num_stats; x++) {
if (us->names[x] && us->names[x][0]) {
snprintf(key_str, STAT_KEY_LEN-1, "user_%s", us->names[x]);
APPEND_STAT(key_str, "%llu", (unsigned long long)counters[x]);
if (ctx->user_stats_namebuf) {
char vbuf[INCR_MAX_STORAGE_LEN];
char *e = NULL; // ptr into vbuf
const char *pfx = "user_";
const size_t pfxlen = strlen(pfx);
for (int x = 0; x < stats_num; x++) {
if (us[x].cname) {
char *name = ctx->user_stats_namebuf + us[x].cname;
size_t nlen = strlen(name);
if (nlen > STAT_KEY_LEN-6) {
// impossible, but for paranoia.
nlen = STAT_KEY_LEN-6;
}
// avoiding an snprintf call for some performance ("user_%s")
memcpy(key_str, pfx, pfxlen);
memcpy(key_str+pfxlen, name, nlen);
key_str[pfxlen+nlen] = '\0';

// APPEND_STAT() calls another snprintf, which calls our
// add_stats argument. Lets skip yet another snprintf with
// some unrolling.
e = itoa_u64(counters[x], vbuf);
*(e+1) = '\0';
add_stats(key_str, pfxlen+nlen, vbuf, e-vbuf, c);
}
}
}

Expand Down
24 changes: 16 additions & 8 deletions proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,16 @@ struct proxy_int_stats {
};

struct proxy_user_stats {
size_t num_stats; // number of stats, for sizing various arrays
char **names; // not needed for worker threads
int num_stats; // number of stats, for sizing various arrays
uint64_t *counters; // array of counters.
};

struct proxy_user_stats_entry {
char *name;
unsigned int cname; // offset into compact name buffer
bool reset; // counter must reset this cycle
};

struct proxy_global_stats {
uint64_t config_reloads;
uint64_t config_reload_fails;
Expand Down Expand Up @@ -226,14 +231,21 @@ struct proxy_tunables {
typedef STAILQ_HEAD(globalobj_head_s, mcp_globalobj_s) globalobj_head_t;
typedef struct {
lua_State *proxy_state; // main configuration vm
lua_State *proxy_sharedvm; // sub VM for short-lock global events/data
void *proxy_code;
proxy_event_thread_t *proxy_io_thread;
uint64_t active_req_limit; // max total in-flight requests
uint64_t buffer_memory_limit; // max bytes for send/receive buffers.
#ifdef PROXY_TLS
void *tls_ctx;
#endif
int user_stats_num; // highest seen stat index
struct proxy_user_stats_entry *user_stats;
char *user_stats_namebuf; // compact linear buffer for stat names
struct proxy_tunables tunables; // NOTE: updates covered by stats_lock
struct proxy_global_stats global_stats;
// less frequently used entries down here.
void *proxy_code;
lua_State *proxy_sharedvm; // sub VM for short-lock global events/data
pthread_mutex_t stats_lock; // used for rare global counters
pthread_mutex_t config_lock;
pthread_cond_t config_cond;
pthread_t config_tid;
Expand All @@ -253,10 +265,6 @@ typedef struct {
bool loading; // bool indicating an active config load.
bool memprofile; // indicate if we want to profile lua memory.
uint8_t memprofile_thread_counter;
struct proxy_global_stats global_stats;
struct proxy_user_stats user_stats;
struct proxy_tunables tunables; // NOTE: updates covered by stats_lock
pthread_mutex_t stats_lock; // used for rare global counters
} proxy_ctx_t;

#define PROXY_GET_THR_CTX(L) ((*(LIBEVENT_THREAD **)lua_getextraspace(L))->proxy_ctx)
Expand Down
101 changes: 93 additions & 8 deletions proxy_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,74 @@ static void *_proxy_manager_thread(void *arg) {
return NULL;
}

// TODO: only run routine if something changed.
// This compacts all of the names for proxy user stats into a linear buffer,
// which can save considerable CPU when emitting a large number of stats. It
// also saves some total memory by having one linear buffer instead of many
// potentially small aligned allocations.
static void proxy_config_stats_prep(proxy_ctx_t *ctx) {
char *oldnamebuf = ctx->user_stats_namebuf;
struct proxy_user_stats_entry *entries = ctx->user_stats;
size_t namelen = 0;

STAT_L(ctx);
// find size of new compact name buffer
for (int x = 0; x < ctx->user_stats_num; x++) {
if (entries[x].name) {
namelen += strlen(entries[x].name) + 1; // null byte
} else if (entries[x].cname) {
char *name = oldnamebuf + entries[x].cname;
namelen += strlen(name) + 1;
}
}
// start one byte into the cname buffer so we can do faster checks on if a
// name exists or not. so extend the buffer by one byte.
namelen++;

char *namebuf = calloc(1, namelen);
// copy names into the compact buffer
char *p = namebuf + 1;
for (int x = 0; x < ctx->user_stats_num; x++) {
struct proxy_user_stats_entry *e = &entries[x];
char *newname = NULL;
if (e->name) {
// skip blank names.
if (e->name[0]) {
newname = e->name;
}
} else if (e->cname) {
// else re-copy from old buffer
newname = oldnamebuf + e->cname;
}

if (newname) {
// set the buffer offset for this name
e->cname = p - namebuf;
// copy in the name
size_t nlen = strlen(newname);
memcpy(p, newname, nlen);
p += nlen;
*p = '\0'; // add null byte
p++;
} else {
// name is blank or doesn't exist, ensure we skip it.
e->cname = 0;
}

if (e->name) {
// now get rid of the name buffer.
free(e->name);
e->name = NULL;
}
}

ctx->user_stats_namebuf = namebuf;
if (oldnamebuf) {
free(oldnamebuf);
}
STAT_UL(ctx);
}

static void proxy_config_reload(proxy_ctx_t *ctx) {
LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "start");
STAT_INCR(ctx, config_reloads, 1);
Expand All @@ -159,6 +227,8 @@ static void proxy_config_reload(proxy_ctx_t *ctx) {
return;
}

proxy_config_stats_prep(ctx);

// TODO (v2): create a temporary VM to test-load the worker code into.
// failing to load partway through the worker VM reloads can be
// critically bad if we're not careful about references.
Expand Down Expand Up @@ -186,6 +256,14 @@ static void proxy_config_reload(proxy_ctx_t *ctx) {
}
}

// Need to clear the reset flag for the stats system after pushing the new
// config to each worker.
STAT_L(ctx);
for (int x = 0; x < ctx->user_stats_num; x++) {
ctx->user_stats[x].reset = false;
}
STAT_UL(ctx);

lua_pop(ctx->proxy_state, 1); // drop config_pools return value
LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "done");
}
Expand Down Expand Up @@ -609,9 +687,10 @@ int proxy_thread_loadconf(proxy_ctx_t *ctx, LIBEVENT_THREAD *thr) {

// update user stats
STAT_L(ctx);
struct proxy_user_stats *us = &ctx->user_stats;
struct proxy_user_stats_entry *us = ctx->user_stats;
int stats_num = ctx->user_stats_num;
struct proxy_user_stats *tus = NULL;
if (us->num_stats != 0) {
if (stats_num != 0) {
pthread_mutex_lock(&thr->stats.mutex);
if (thr->proxy_user_stats == NULL) {
tus = calloc(1, sizeof(struct proxy_user_stats));
Expand All @@ -622,18 +701,24 @@ int proxy_thread_loadconf(proxy_ctx_t *ctx, LIBEVENT_THREAD *thr) {

// originally this was a realloc routine but it felt fragile.
// that might still be a better idea; still need to zero out the end.
uint64_t *counters = calloc(us->num_stats, sizeof(uint64_t));
uint64_t *counters = calloc(stats_num, sizeof(uint64_t));

// note that num_stats can _only_ grow in size.
// we also only care about counters on the worker threads.
if (tus->counters) {
assert(tus->num_stats <= us->num_stats);
memcpy(counters, tus->counters, tus->num_stats * sizeof(uint64_t));
// pull in old counters, if the names didn't change.
for (int x = 0; x < tus->num_stats; x++) {
if (us[x].reset) {
counters[x] = 0;
} else {
counters[x] = tus->counters[x];
}
}
assert(tus->num_stats <= stats_num);
free(tus->counters);
}

tus->counters = counters;
tus->num_stats = us->num_stats;
tus->num_stats = stats_num;

pthread_mutex_unlock(&thr->stats.mutex);
}
// also grab the concurrent request limit
Expand Down
45 changes: 28 additions & 17 deletions proxy_ustats.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,42 @@ int mcplib_add_stat(lua_State *L) {
}

STAT_L(ctx);
struct proxy_user_stats *us = &ctx->user_stats;
int stats_num = ctx->user_stats_num;
struct proxy_user_stats_entry *entries = ctx->user_stats;

// if num_stats is 0 we need to init sizes.
// TODO (v2): malloc fail checking. (should be rare/impossible)
if (us->num_stats < idx) {
// don't allocate counters memory for the global ctx.
char **nnames = calloc(idx, sizeof(char *));
if (us->names != NULL) {
for (int x = 0; x < us->num_stats; x++) {
nnames[x] = us->names[x];
}
free(us->names);
if (stats_num < idx) {
struct proxy_user_stats_entry *nentries = calloc(idx, sizeof(*entries));
// funny realloc; start with zeroed memory and copy in original.
if (entries) {
memcpy(nentries, entries, sizeof(*entries) * stats_num);
free(entries);
}
us->names = nnames;
us->num_stats = idx;
ctx->user_stats = nentries;
ctx->user_stats_num = idx;
entries = nentries;
}

idx--; // real slot start as 0.
// if slot has string in it, free first
if (us->names[idx] != NULL) {
free(us->names[idx]);
if (entries[idx].name != NULL) {
// If name changed, we have to reset the counter in the slot.
// Also only free/strdup the string if it's changed.
if (strcmp(name, entries[idx].name) != 0) {
entries[idx].reset = true;
free(entries[idx].name);
entries[idx].name = strdup(name);
}
// else the stat name didn't change, so don't do anything.
} else if (entries[idx].cname) {
char *oldname = ctx->user_stats_namebuf + entries[idx].cname;
if (strcmp(name, oldname) != 0) {
entries[idx].reset = true;
entries[idx].name = strdup(name);
}
} else {
entries[idx].name = strdup(name);
}
// strdup name into string slot
// TODO (v2): malloc failure.
us->names[idx] = strdup(name);
STAT_UL(ctx);

return 0;
Expand Down
10 changes: 6 additions & 4 deletions t/proxyustats.t
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,15 @@ subtest 'ustats incr/decr and perseverance over reload' => sub {
$p_srv->reload();
wait_reload($watcher);

# subtract 2 at idx 2.
print $ps "mg -2\r\n";
is(scalar <$ps>, "HD\r\n", "mg -2 hit");
# add 2 at idx 2.
print $ps "mg 2\r\n";
is(scalar <$ps>, "HD\r\n", "mg 2 hit");

$stats = mem_stats($ps, 'proxy');
# carried over since name did not change
is($stats->{user_a1}, 1, "user_a1 is 1");
is($stats->{user_b2}, 2, "user_b2 is 2");
# index 2 changed names, so should be reset.
is($stats->{user_b2}, 2, "user_b2 is 2 instead of 4");
is($stats->{user_b3}, 0, "user_b3 is 0");
is($stats->{user_b4}, 0, "user_b4 is 0");
};
Expand Down

0 comments on commit ddc7398

Please sign in to comment.