Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

upstream_retry when multi-GET not-my-vbucket err

Change-Id: Iebbae35b0509cc16cbaf4610a3e433b60b71b94b

In this implementation of handling not-my-bucket errors during
ascii-to-binary proxying, we use the binary header's opaque field.

moxi uses the opaque field to store either a vbucket id or a key
index, depending on the command opcode.  The key index is for a char
array, and is used to retreive the key for a given command.  Given the
key, moxi can then re-retrieve the vbucket id.

Given the vbucket id, finally, during a not-my-vbucket error, moxi can
then call the incorrect_master() API of libvbucket.
  • Loading branch information...
commit 8f7f015a4d0392834778654c2b9441f432a09db0 1 parent c5516f4
@steveyen steveyen authored
View
71 cproxy.c
@@ -28,6 +28,8 @@ conn *conn_list_remove(conn *head, conn **tail,
bool is_compatible_request(conn *existing, conn *candidate);
+void propagate_error(downstream *d);
+
int init_mcs_st(mcs_st *mst, char *config);
// Function tables.
@@ -363,6 +365,7 @@ void cproxy_on_close_upstream_conn(conn *c) {
c, &found);
if (d->upstream_conn == NULL) {
d->upstream_suffix = NULL;
+ d->upstream_retry = 0;
// Don't need to do anything else, as we'll now just
// read and drop any remaining inflight downstream replies.
@@ -474,6 +477,7 @@ void cproxy_on_close_downstream_conn(conn *c) {
//
if (d->upstream_suffix == NULL) {
d->upstream_suffix = "SERVER_ERROR proxy downstream closed\r\n";
+ d->upstream_retry = 0;
}
// We sometimes see that drive_machine/transmit will not see
@@ -503,6 +507,7 @@ void cproxy_on_close_downstream_conn(conn *c) {
d->upstream_conn->cmd_retries++;
uc_retry = d->upstream_conn;
d->upstream_suffix = NULL;
+ d->upstream_retry = 0;
}
}
}
@@ -612,18 +617,18 @@ downstream *cproxy_reserve_downstream(proxy_td *ptd) {
assert(d->upstream_conn == NULL);
assert(d->upstream_suffix == NULL);
+ assert(d->upstream_retry == 0);
assert(d->downstream_used == 0);
assert(d->downstream_used_start == 0);
- assert(d->multiget == NULL);
assert(d->merger == NULL);
assert(d->timeout_tv.tv_sec == 0);
assert(d->timeout_tv.tv_usec == 0);
d->upstream_conn = NULL;
d->upstream_suffix = NULL;
+ d->upstream_retry = 0;
d->downstream_used = 0;
d->downstream_used_start = 0;
- d->multiget = NULL;
d->merger = NULL;
d->timeout_tv.tv_sec = 0;
d->timeout_tv.tv_usec = 0;
@@ -654,6 +659,24 @@ bool cproxy_release_downstream(downstream *d, bool force) {
fprintf(stderr, "release_downstream\n");
}
+ if (!force &&
+ d->upstream_retry > 0) {
+ if (settings.verbose > 2) {
+ fprintf(stderr, "%d: release_downstream, instead retrying %d\n",
+ d->upstream_conn->sfd, d->upstream_retry);
+ }
+
+ d->upstream_retry = 0;
+
+ if (d->propagate(d) == true) {
+ return true;
+ } else {
+ d->ptd->stats.stats.tot_downstream_propagate_failed++;
+
+ propagate_error(d);
+ }
+ }
+
d->ptd->stats.stats.tot_downstream_released++;
// Delink upstream conns.
@@ -720,6 +743,7 @@ bool cproxy_release_downstream(downstream *d, bool force) {
d->upstream_conn = NULL;
d->upstream_suffix = NULL; // No free(), expecting a static string.
+ d->upstream_retry = 0;
d->downstream_used = 0;
d->downstream_used_start = 0;
d->multiget = NULL;
@@ -1209,21 +1233,7 @@ void cproxy_assign_downstream(proxy_td *ptd) {
break;
}
- while (d->upstream_conn != NULL) {
- conn *uc = d->upstream_conn;
-
- if (settings.verbose > 1) {
- fprintf(stderr,
- "ERROR: %d could not forward upstream to downstream\n",
- uc->sfd);
- }
-
- upstream_error(uc);
-
- conn *curr = d->upstream_conn;
- d->upstream_conn = d->upstream_conn->next;
- curr->next = NULL;
- }
+ propagate_error(d);
cproxy_release_downstream(d, false);
}
@@ -1234,6 +1244,26 @@ void cproxy_assign_downstream(proxy_td *ptd) {
}
}
+void propagate_error(downstream *d) {
+ assert(d != NULL);
+
+ while (d->upstream_conn != NULL) {
+ conn *uc = d->upstream_conn;
+
+ if (settings.verbose > 1) {
+ fprintf(stderr,
+ "ERROR: %d could not forward upstream to downstream\n",
+ uc->sfd);
+ }
+
+ upstream_error(uc);
+
+ conn *curr = d->upstream_conn;
+ d->upstream_conn = d->upstream_conn->next;
+ curr->next = NULL;
+ }
+}
+
void upstream_error(conn *uc) {
assert(uc);
assert(uc->state == conn_pause);
@@ -1300,6 +1330,7 @@ bool cproxy_dettach_if_noreply(downstream *d, conn *uc) {
uc->noreply = false;
d->upstream_conn = NULL;
d->upstream_suffix = NULL;
+ d->upstream_retry = 0;
cproxy_reset_upstream(uc);
@@ -1764,6 +1795,11 @@ downstream *downstream_list_remove(downstream *head, downstream *d) {
* TODO: Handle binary upstream protocol.
*/
bool is_compatible_request(conn *existing, conn *candidate) {
+ // The not-my-vbucket error handling requires us to not
+ // squash ascii multi-GET requests, due to reusing the
+ // multiget-deduplication machinery during retries and
+ // to simplify the later codepaths.
+ /*
assert(existing);
assert(IS_ASCII(existing->protocol));
assert(IS_PROXY(existing->protocol));
@@ -1790,6 +1826,7 @@ bool is_compatible_request(conn *existing, conn *candidate) {
return true;
}
}
+ */
return false;
}
View
8 cproxy.h
@@ -392,6 +392,12 @@ struct downstream {
int downstream_used_start;
conn *upstream_conn; // Non-NULL when downstream is reserved.
char *upstream_suffix; // Last bit to write when downstreams are done.
+ int upstream_retry; // Will be >0 if we should retry the entire
+ // command again when all downstreams are done.
+ // Used in not-my-vbucket error case. During
+ // the retry, we'll reuse the same multiget
+ // de-duplication tracking table to avoid
+ // asking for successful keys again.
genhash_t *multiget; // Keyed by string.
genhash_t *merger; // Keyed by string, for merging replies like STATS.
@@ -576,7 +582,7 @@ struct multiget_entry {
bool multiget_ascii_downstream(
downstream *d, conn *uc,
int (*emit_start)(conn *c, char *cmd, int cmd_len),
- int (*emit_skey)(conn *c, char *skey, int skey_len, int vbucket),
+ int (*emit_skey)(conn *c, char *skey, int skey_len, int vbucket, int key_index),
int (*emit_end)(conn *c),
mcache *front_cache);
View
38 cproxy_multiget.c
@@ -25,7 +25,6 @@ void multiget_foreach_free(const void *key,
&ptd->stats.stats_cmd[STATS_CMD_TYPE_REGULAR][STATS_CMD_GET_KEY];
int length = 0;
-
multiget_entry *entry = (multiget_entry*)value;
while (entry != NULL) {
@@ -74,12 +73,11 @@ void multiget_remove_upstream(const void *key,
bool multiget_ascii_downstream(downstream *d, conn *uc,
int (*emit_start)(conn *c, char *cmd, int cmd_len),
- int (*emit_skey)(conn *c, char *skey, int skey_len, int vbucket),
+ int (*emit_skey)(conn *c, char *skey, int skey_len, int vbucket, int key_index),
int (*emit_end)(conn *c),
mcache *front_cache) {
assert(d != NULL);
assert(d->downstream_conns != NULL);
- assert(d->multiget == NULL);
assert(uc != NULL);
assert(uc->noreply == false);
@@ -103,13 +101,17 @@ bool multiget_ascii_downstream(downstream *d, conn *uc,
}
}
- if (uc->next != NULL) {
- // More than one upstream conn, so we need a hashtable
- // to track keys for de-deplication.
- //
+ // Always have a de-duplication map, due to not-my-vbucket error
+ // handling where any retry attempts should avoid retrying already
+ // successfully attempted keys.
+ //
+ // Previously, we used to only have a map when there was more than
+ // one upstream conn.
+ //
+ if (d->multiget == NULL) {
d->multiget = genhash_init(128, skeyhash_ops);
if (settings.verbose > 1) {
- fprintf(stderr, "cproxy multiget hash table new\n");
+ fprintf(stderr, "%d: cproxy multiget hash table new\n", uc->sfd);
}
}
@@ -139,8 +141,8 @@ bool multiget_ascii_downstream(downstream *d, conn *uc,
int cas_emit = (command[3] == 's');
if (settings.verbose > 1) {
- fprintf(stderr, "forward multiget %s (%d %d)\n",
- command, cmd_len, uc_num);
+ fprintf(stderr, "%d: forward multiget %s (%d %d)\n",
+ uc_cur->sfd, command, cmd_len, uc_num);
}
while (space != NULL) {
@@ -278,11 +280,22 @@ bool multiget_ascii_downstream(downstream *d, conn *uc,
// See if we've already requested this key via
// the multiget hash table, in order to
- // de-deplicate repeated keys.
+ // de-duplicate repeated keys.
//
bool first_request = true;
if (d->multiget != NULL) {
+ if (settings.verbose > 2) {
+ char key_buf[KEY_MAX_LENGTH + 10];
+ assert(key_len <= KEY_MAX_LENGTH);
+ memcpy(key_buf, key, key_len);
+ key_buf[key_len] = '\0';
+
+ fprintf(stderr,
+ "<%d multiget_ascii_downstream '%s' %d %d %d\n",
+ c->sfd, key_buf, vbucket, (int) (key - command), key_len);
+ }
+
// TODO: Use Trond's allocator here.
//
multiget_entry *entry =
@@ -318,7 +331,7 @@ bool multiget_ascii_downstream(downstream *d, conn *uc,
// Provide the preceding space as optimization
// for ascii-to-ascii configuration.
//
- emit_skey(c, key - 1, key_len + 1, vbucket);
+ emit_skey(c, key - 1, key_len + 1, vbucket, key - command);
} else {
ptd->stats.stats.tot_multiget_keys_dedupe++;
@@ -383,6 +396,7 @@ bool multiget_ascii_downstream(downstream *d, conn *uc,
if (cproxy_dettach_if_noreply(d, uc) == false) {
d->upstream_suffix = "END\r\n";
+ d->upstream_retry = 0;
cproxy_start_downstream_timeout(d, NULL);
}
View
2  cproxy_protocol.c
@@ -382,6 +382,7 @@ bool cproxy_optimize_set_ascii(downstream *d, conn *uc,
key, key_len, false)) {
d->upstream_conn = NULL;
d->upstream_suffix = NULL;
+ d->upstream_retry = 0;
out_string(uc, "STORED");
@@ -422,6 +423,7 @@ void cproxy_optimize_to_self(downstream *d, conn *uc,
d->upstream_conn = NULL;
d->upstream_suffix = NULL;
+ d->upstream_retry = 0;
cproxy_release_downstream(d, false);
}
View
5 cproxy_protocol_a2a.c
@@ -16,7 +16,7 @@
#define MAX_TOKENS 8
int a2a_multiget_start(conn *c, char *cmd, int cmd_len);
-int a2a_multiget_skey(conn *c, char *skey, int skey_len, int vbucket);
+int a2a_multiget_skey(conn *c, char *skey, int skey_len, int vbucket, int key_index);
int a2a_multiget_end(conn *c);
void cproxy_init_a2a() {
@@ -384,7 +384,7 @@ int a2a_multiget_start(conn *c, char *cmd, int cmd_len) {
/* An skey is a space prefixed key string.
*/
-int a2a_multiget_skey(conn *c, char *skey, int skey_len, int vbucket) {
+int a2a_multiget_skey(conn *c, char *skey, int skey_len, int vbucket, int key_index) {
return add_iov(c, skey, skey_len);
}
@@ -450,6 +450,7 @@ bool cproxy_broadcast_a2a_downstream(downstream *d,
if (cproxy_dettach_if_noreply(d, uc) == false) {
d->upstream_suffix = suffix;
+ d->upstream_retry = 0;
cproxy_start_downstream_timeout(d, NULL);
} else {
View
125 cproxy_protocol_a2b.c
@@ -172,7 +172,7 @@ bool a2b_fill_request_token(struct A2BSpec *spec,
void a2b_process_downstream_response(conn *c);
int a2b_multiget_start(conn *c, char *cmd, int cmd_len);
-int a2b_multiget_skey(conn *c, char *skey, int skey_len, int vbucket);
+int a2b_multiget_skey(conn *c, char *skey, int skey_len, int vbucket, int key_index);
int a2b_multiget_end(conn *c);
void cproxy_init_a2b() {
@@ -647,25 +647,118 @@ void a2b_process_downstream_response(conn *c) {
// Handle not-my-vbucket error response.
//
if (status == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET) {
- // For non-GET commands, enqueue a retry after informing the vbucket map.
- //
+ if (settings.verbose > 2) {
+ fprintf(stderr,
+ "<%d cproxy_process_a2b_downstream_response not-my-vbucket, "
+ "cmd: %x %d\n",
+ c->sfd, header->response.opcode, uc != NULL);
+ }
+
if (c->cmd != PROTOCOL_BINARY_CMD_GET &&
c->cmd != PROTOCOL_BINARY_CMD_GETK) {
- conn_set_state(c, conn_pause);
+ // For non-GET commands, enqueue a retry after informing
+ // the vbucket map.
+ //
+ if (uc == NULL) {
+ // If the client went away, though, don't retry.
+ //
+ conn_set_state(c, conn_pause);
+ return;
+ }
int vbucket = ntohl(header->response.opaque);
- mcs_server_invalid_vbucket(&d->mst, downstream_conn_index(d, c), vbucket);
+ if (settings.verbose > 2) {
+ fprintf(stderr,
+ "<%d cproxy_process_a2b_downstream_response not-my-vbucket, "
+ "cmd: %x not get/getk, vbucket: %d\n",
+ c->sfd, header->response.opcode, vbucket);
+ }
+
+ mcs_server_invalid_vbucket(&d->mst, downstream_conn_index(d, c),
+ vbucket);
+
+ conn_set_state(c, conn_pause);
assert(uc->thread);
assert(uc->thread->work_queue);
+ // TODO: Add a stats counter here for this case.
+ //
// Using work_send() so the call stack unwinds back to libevent.
//
work_send(uc->thread->work_queue, upstream_retry, uc->extra, uc);
- }
- return;
+ return;
+ } else {
+ // TODO: Add a stats counter here for this case.
+ //
+ // Handle ascii multi-GET commands by awaiting all NOOP's from
+ // downstream servers, eating the NOOP's, and retrying with
+ // the same multiget de-duplication map, which might be partially
+ // filled in already.
+ //
+ if (uc == NULL) {
+ // If the client went away, though, don't retry.
+ //
+ conn_set_state(c, conn_new_cmd);
+ return;
+ }
+
+ assert(d->multiget != NULL);
+ assert(uc->cmd_start != NULL);
+ assert(header->response.opaque != 0);
+
+ int key_index = ntohl(header->response.opaque);
+ char *key = uc->cmd_start + key_index;
+ int key_len = skey_len(key);
+
+ // The key is not NULL or space terminated.
+ //
+ char key_buf[KEY_MAX_LENGTH + 10];
+ assert(key_len <= KEY_MAX_LENGTH);
+ memcpy(key_buf, key, key_len);
+ key_buf[key_len] = '\0';
+
+ int vbucket = -1;
+
+ mcs_key_hash(&d->mst, key_buf, key_len, &vbucket);
+
+ mcs_server_invalid_vbucket(&d->mst, downstream_conn_index(d, c),
+ vbucket);
+
+ // Update the de-duplication map, removing the key, so that
+ // we'll reattempt another request for the key during the
+ // retry.
+ //
+ multiget_entry *entry = genhash_find(d->multiget, key_buf);
+
+ if (settings.verbose > 2) {
+ fprintf(stderr,
+ "<%d cproxy_process_a2b_downstream_response not-my-vbucket, "
+ "cmd: %x get/getk '%s' %d retry: %d, entry: %d, vbucket %d\n",
+ c->sfd, header->response.opcode, key_buf, key_len,
+ d->upstream_retry + 1, entry != NULL, vbucket);
+ }
+
+ genhash_delete(d->multiget, key_buf);
+
+ while (entry != NULL) {
+ multiget_entry *curr = entry;
+ entry = entry->next;
+ free(curr);
+ }
+
+ // Signal that we need to retry, where this counter is
+ // later checked after all NOOP's from downstreams are
+ // received.
+ //
+ d->upstream_retry++;
+
+ conn_set_state(c, conn_new_cmd);
+
+ return;
+ }
}
switch (c->cmd) {
@@ -955,7 +1048,6 @@ bool cproxy_forward_a2b_simple_downstream(downstream *d,
assert(uc != NULL);
assert(uc->item == NULL);
assert(uc->cmd_curr != -1);
- assert(d->multiget == NULL);
assert(d->merger == NULL);
// Handles get and gets.
@@ -1161,6 +1253,7 @@ bool cproxy_forward_a2b_simple_downstream(downstream *d,
if (d->upstream_suffix == NULL) {
d->upstream_suffix = "SERVER_ERROR a2b event oom\r\n";
+ d->upstream_retry = 0;
}
}
} else {
@@ -1173,6 +1266,7 @@ bool cproxy_forward_a2b_simple_downstream(downstream *d,
if (d->upstream_suffix == NULL) {
d->upstream_suffix = "CLIENT_ERROR a2b parse request\r\n";
+ d->upstream_retry = 0;
}
}
@@ -1193,7 +1287,7 @@ int a2b_multiget_start(conn *c, char *cmd, int cmd_len) {
/* An skey is a space prefixed key string.
*/
-int a2b_multiget_skey(conn *c, char *skey, int skey_len, int vbucket) {
+int a2b_multiget_skey(conn *c, char *skey, int skey_len, int vbucket, int key_index) {
char *key = skey + 1;
int key_len = skey_len - 1;
@@ -1210,9 +1304,21 @@ int a2b_multiget_skey(conn *c, char *skey, int skey_len, int vbucket) {
req->message.header.request.keylen = htons((uint16_t) key_len);
req->message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
req->message.header.request.bodylen = htonl(key_len);
+ req->message.header.request.opaque = htonl(key_index);
if (vbucket >= 0) {
req->message.header.request.reserved = htons(vbucket);
+
+ if (settings.verbose > 2) {
+ char key_buf[KEY_MAX_LENGTH + 10];
+ assert(key_len <= KEY_MAX_LENGTH);
+ memcpy(key_buf, key, key_len);
+ key_buf[key_len] = '\0';
+
+ fprintf(stderr,
+ "<%d a2b_multiget_skey '%s' %d %d\n",
+ c->sfd, key_buf, vbucket, key_index);
+ }
}
if (add_iov(c, ITEM_data(it), sizeof(req->bytes)) == 0 &&
@@ -1316,6 +1422,7 @@ bool cproxy_broadcast_a2b_downstream(downstream *d,
if (cproxy_dettach_if_noreply(d, uc) == false) {
d->upstream_suffix = suffix;
+ d->upstream_retry = 0;
cproxy_start_downstream_timeout(d, NULL);
} else {
View
2  cproxy_stats.c
@@ -303,8 +303,6 @@ int count_dot_pair(char *x, int xlen, char *y, int ylen) {
int xdot = count_dot(x, xlen);
int ydot = count_dot(y, ylen);
- assert(xdot == ydot);
-
return (xdot > ydot ? xdot : ydot);
}
Please sign in to comment.
Something went wrong with that request. Please try again.