Skip to content

Commit

Permalink
proxy: hacky method of supporting noreply/quiet
Browse files Browse the repository at this point in the history
avoids sending the response to the client, in most cases. works by
stripping the noreply status from the request before sending it along,
so the proxy itself knows when to move the request forward.

has sharp edges:
 - only looking at the request object that's actually sent to the
   backend, instead of the request object that created the coroutine.
 - overriding tokens in lua to re-set the noreply mode would break the
   protocol.

So this change helps us validate the feature but solidifying it requires
moving it to the "edges" of processing; before the coroutine and after
any command assembly (or within the command assembly).
  • Loading branch information
dormando committed Mar 1, 2022
1 parent 2a903f0 commit fa745db
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 6 deletions.
53 changes: 53 additions & 0 deletions proto_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,36 @@ static void proxy_out_errstring(mc_resp *resp, const char *str) {
return;
}

// NOTE: See notes in mcp_queue_io; the secondary problem with setting the
// noreply mode from the response object is that the proxy can return strings
// manually, so we have no way to obey what the original request wanted in
// that case.
static void _set_noreply_mode(mc_resp *resp, mcp_resp_t *r) {
switch (r->mode) {
case RESP_MODE_NORMAL:
break;
case RESP_MODE_NOREPLY:
// ascii noreply only threw egregious errors to client
if (r->status == MCMC_OK) {
resp->skip = true;
}
break;
case RESP_MODE_METAQUIET:
if (r->resp.code == MCMC_CODE_MISS) {
resp->skip = true;
} else if (r->cmd[1] != 'g' && r->resp.code == MCMC_CODE_OK) {
// FIXME (v2): mcmc's parser needs to help us out a bit more
// here.
// This is a broken case in the protocol though; quiet mode
// ignores HD for mutations but not get.
resp->skip = true;
}
break;
default:
assert(1 == 0);
}
}

// this resumes every yielded coroutine (and re-resumes if necessary).
// called from the worker thread after responses have been pulled from the
// network.
Expand All @@ -492,6 +522,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con
if (type == LUA_TUSERDATA) {
mcp_resp_t *r = luaL_checkudata(Lc, -1, "mcp.response");
LOGGER_LOG(NULL, LOG_PROXYCMDS, LOGGER_PROXY_RAW, NULL, r->start, r->cmd, r->resp.type, r->resp.code);
_set_noreply_mode(resp, r);
if (r->buf) {
// response set from C.
// FIXME (v2): write_and_free() ? it's a bit wrong for here.
Expand Down Expand Up @@ -760,6 +791,28 @@ static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc) {
r->buf = NULL;
r->blen = 0;
r->start = rq->start; // need to inherit the original start time.
// Set noreply mode.
// TODO (v2): the response "inherits" the request's noreply mode, which isn't
// strictly correct; we should inherit based on the request that spawned
// the coroutine but the structure doesn't allow that yet.
// Should also be able to settle this exact mode from the parser so we
// don't have to re-branch here.
if (rq->pr.noreply) {
if (rq->pr.cmd_type == CMD_TYPE_META) {
r->mode = RESP_MODE_METAQUIET;
for (int x = 2; x < rq->pr.ntokens; x++) {
if (rq->request[rq->pr.tokens[x]] == 'q') {
rq->request[rq->pr.tokens[x]] = ' ';
}
}
} else {
r->mode = RESP_MODE_NOREPLY;
rq->request[rq->pr.reqlen - 3] = 'Y';
}
} else {
r->mode = RESP_MODE_NORMAL;
}

int x;
int end = rq->pr.reqlen-2 > RESP_CMD_MAX ? RESP_CMD_MAX : rq->pr.reqlen-2;
for (x = 0; x < end; x++) {
Expand Down
13 changes: 10 additions & 3 deletions proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ enum proxy_defines {
enum proxy_cmd_types {
CMD_TYPE_GENERIC = 0,
CMD_TYPE_GET, // get/gets/gat/gats
CMD_TYPE_UPDATE, // add/set/cas/prepend/append/replace
CMD_TYPE_META, // m*'s.
};

Expand Down Expand Up @@ -273,6 +272,7 @@ struct mcp_parser_s {
uint32_t klen; // length of key.
uint16_t tokens[PARSER_MAX_TOKENS]; // offsets for start of each token
bool has_space; // a space was found after the last byte parsed.
bool noreply; // if quiet/noreply mode is set.
union {
struct mcp_parser_meta_s meta;
} t;
Expand Down Expand Up @@ -355,15 +355,22 @@ struct proxy_event_thread_s {
struct proxy_tunables tunables; // periodically copied from main ctx
};

enum mcp_resp_mode {
RESP_MODE_NORMAL = 0,
RESP_MODE_NOREPLY,
RESP_MODE_METAQUIET
};

#define RESP_CMD_MAX 8
typedef struct {
mcmc_resp_t resp;
struct timeval start; // start time inherited from paired request
char cmd[RESP_CMD_MAX+1]; // until we can reverse CMD_*'s to strings directly.
int status; // status code from mcmc_read()
char *buf; // response line + potentially value.
size_t blen; // total size of the value to read.
int status; // status code from mcmc_read()
int bread; // amount of bytes read into value so far.
char cmd[RESP_CMD_MAX+1]; // until we can reverse CMD_*'s to strings directly.
enum mcp_resp_mode mode; // reply mode (for noreply fixing)
} mcp_resp_t;

// re-cast an io_pending_t into this more descriptive structure.
Expand Down
21 changes: 21 additions & 0 deletions proxy_await.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,27 @@ static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int aw
mcp_resp_t *r = lua_newuserdatauv(Lc, sizeof(mcp_resp_t), 1);
memset(r, 0, sizeof(mcp_resp_t));
r->start = rq->start;
// Set noreply mode.
// TODO (v2): the response "inherits" the request's noreply mode, which isn't
// strictly correct; we should inherit based on the request that spawned
// the coroutine but the structure doesn't allow that yet.
// Should also be able to settle this exact mode from the parser so we
// don't have to re-branch here.
if (rq->pr.noreply) {
if (rq->pr.cmd_type == CMD_TYPE_META) {
r->mode = RESP_MODE_METAQUIET;
for (int x = 2; x < rq->pr.ntokens; x++) {
if (rq->request[rq->pr.tokens[x]] == 'q') {
rq->request[rq->pr.tokens[x]] = ' ';
}
}
} else {
r->mode = RESP_MODE_NOREPLY;
rq->request[rq->pr.reqlen - 3] = 'Y';
}
} else {
r->mode = RESP_MODE_NORMAL;
}

int x;
int end = rq->pr.reqlen-2 > RESP_CMD_MAX ? RESP_CMD_MAX : rq->pr.reqlen-2;
Expand Down
29 changes: 26 additions & 3 deletions proxy_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ static int _process_tokenize(mcp_parser_t *pr, const size_t max) {
}
endloop:

// endcap token so we can quickly find the length of any token by looking
// at the next one.
pr->tokens[curtoken] = len;
pr->ntokens = curtoken;
P_DEBUG("%s: cur_tokens: %d\n", __func__, curtoken);

Expand Down Expand Up @@ -124,7 +127,7 @@ static int _process_request_metaflags(mcp_parser_t *pr, int token) {
return -1;
}
P_DEBUG("%s: setting meta flag: %d\n", __func__, *cur - 65);
pr->t.meta.flags |= 1 << (*cur - 65);
pr->t.meta.flags |= (uint64_t)1 << (*cur - 65);
state = 1;
}
break;
Expand All @@ -138,6 +141,13 @@ static int _process_request_metaflags(mcp_parser_t *pr, int token) {
}
}

// not too great hack for noreply detection: this can be flattened out
// once a few other contexts are fixed and we detect the noreply from the
// coroutine start instead.
if (pr->t.meta.flags & ((uint64_t)1 << 48)) {
pr->noreply = true;
}

return 0;
}

Expand Down Expand Up @@ -198,6 +208,18 @@ static int _process_request_gat(mcp_parser_t *pr) {
return 0;
}

#define NOREPLYSTR "noreply"
#define NOREPLYLEN sizeof(NOREPLYSTR)-1
// given a tokenized parser for a normal ASCII command, checks for noreply
// mode.
static int _process_request_noreply(mcp_parser_t *pr) {
if (pr->tokens[pr->ntokens] - pr->tokens[pr->ntokens-1] >= NOREPLYLEN
&& strncmp(NOREPLYSTR, pr->request + pr->tokens[pr->ntokens-1], NOREPLYLEN) == 0) {
pr->noreply = true;
}
return 0;
}

// we need t find the bytes supplied immediately so we can read the request
// from the client properly.
// set <key> <flags> <exptime> <bytes> [noreply]\r\n
Expand Down Expand Up @@ -226,7 +248,7 @@ static int _process_request_storage(mcp_parser_t *pr, size_t max) {

pr->vlen = vlen;

return 0;
return _process_request_noreply(pr);
}

// common request with key: <cmd> <key> <args>
Expand All @@ -235,7 +257,7 @@ static int _process_request_simple(mcp_parser_t *pr, const size_t max) {
pr->keytoken = 1; // second token is usually the key... stupid GAT.

_process_request_key(pr);
return 0;
return _process_request_noreply(pr);
}

// TODO: return code ENUM with error types.
Expand Down Expand Up @@ -278,6 +300,7 @@ int process_request(mcp_parser_t *pr, const char *command, size_t cmdlen) {
break;
case 2:
if (cm[0] == 'm') {
type = CMD_TYPE_META;
switch (cm[1]) {
case 'g':
cmd = CMD_MG;
Expand Down

0 comments on commit fa745db

Please sign in to comment.