Skip to content

Commit

Permalink
bug 1429 - stop retrying after too many ascii-to-binary retries
Browse files Browse the repository at this point in the history
There are two cases of retry tracking added here, so code
needed fixing in 2 different places...

- for simple, single-shot mutation commands
- for scatter-gather GETKQ's and NOOP translation case

Also, fixed a case when the multiget was NULL, introduced
in a previous performance enhancement fix.

Change-Id: I043f12f9733624b225196c29b144f5b1e02e5852
Reviewed-on: http://review.northscale.com:8080/700
Tested-by: Steve Yen <steve.yen@gmail.com>
Reviewed-by: Steve Yen <steve.yen@gmail.com>
  • Loading branch information
steveyen committed Jun 19, 2010
1 parent 4642096 commit 18cb892
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 33 deletions.
36 changes: 26 additions & 10 deletions cproxy.c
Expand Up @@ -629,6 +629,7 @@ downstream *cproxy_reserve_downstream(proxy_td *ptd) {
d->upstream_conn = NULL; d->upstream_conn = NULL;
d->upstream_suffix = NULL; d->upstream_suffix = NULL;
d->upstream_retry = 0; d->upstream_retry = 0;
d->upstream_retries = 0;
d->downstream_used = 0; d->downstream_used = 0;
d->downstream_used_start = 0; d->downstream_used_start = 0;
d->merger = NULL; d->merger = NULL;
Expand Down Expand Up @@ -667,19 +668,33 @@ bool cproxy_release_downstream(downstream *d, bool force) {
// //
if (!force && if (!force &&
d->upstream_retry > 0) { d->upstream_retry > 0) {
if (settings.verbose > 2) {
fprintf(stderr, "%d: release_downstream, instead retrying %d\n",
d->upstream_conn->sfd, d->upstream_retry);
}

d->upstream_retry = 0; d->upstream_retry = 0;
d->upstream_retries++;


if (cproxy_forward(d) == true) { // But, we can stop retrying if we've tried each server twice.
return true; //
} else { int max_retries = mcs_server_count(&d->mst) * 2;
d->ptd->stats.stats.tot_downstream_propagate_failed++;


propagate_error(d); if (d->upstream_retries <= max_retries) {
if (settings.verbose > 2) {
fprintf(stderr, "%d: release_downstream, instead retrying %d, %d <= %d\n",
d->upstream_conn->sfd,
d->upstream_retry, d->upstream_retries, max_retries);
}

if (cproxy_forward(d) == true) {
return true;
} else {
d->ptd->stats.stats.tot_downstream_propagate_failed++;

propagate_error(d);
}
} else {
if (settings.verbose > 2) {
fprintf(stderr, "%d: release_downstream, skipping retry %d, %d > %d\n",
d->upstream_conn->sfd,
d->upstream_retry, d->upstream_retries, max_retries);
}
} }
} }


Expand Down Expand Up @@ -750,6 +765,7 @@ bool cproxy_release_downstream(downstream *d, bool force) {
d->upstream_conn = NULL; d->upstream_conn = NULL;
d->upstream_suffix = NULL; // No free(), expecting a static string. d->upstream_suffix = NULL; // No free(), expecting a static string.
d->upstream_retry = 0; d->upstream_retry = 0;
d->upstream_retries = 0;
d->downstream_used = 0; d->downstream_used = 0;
d->downstream_used_start = 0; d->downstream_used_start = 0;
d->multiget = NULL; d->multiget = NULL;
Expand Down
1 change: 1 addition & 0 deletions cproxy.h
Expand Up @@ -383,6 +383,7 @@ struct downstream {
// the retry, we'll reuse the same multiget // the retry, we'll reuse the same multiget
// de-duplication tracking table to avoid // de-duplication tracking table to avoid
// asking for successful keys again. // asking for successful keys again.
int upstream_retries; // Count number of upstream_retry attempts.


genhash_t *multiget; // Keyed by string. genhash_t *multiget; // Keyed by string.
genhash_t *merger; // Keyed by string, for merging replies like STATS. genhash_t *merger; // Keyed by string, for merging replies like STATS.
Expand Down
74 changes: 51 additions & 23 deletions cproxy_protocol_a2b.c
Expand Up @@ -670,18 +670,36 @@ void a2b_process_downstream_response(conn *c) {
mcs_server_invalid_vbucket(&d->mst, downstream_conn_index(d, c), mcs_server_invalid_vbucket(&d->mst, downstream_conn_index(d, c),
vbucket); vbucket);


conn_set_state(c, conn_pause); // As long as the upstream is still open and we haven't
// retried too many times already.
//
int max_retries = (mcs_server_count(&d->mst) * 2);


assert(uc->thread); if (uc != NULL &&
assert(uc->thread->work_queue); uc->cmd_retries < max_retries) {
uc->cmd_retries++;


// TODO: Add a stats counter here for this case. conn_set_state(c, conn_pause);
//
// Using work_send() so the call stack unwinds back to libevent.
//
work_send(uc->thread->work_queue, upstream_retry, uc->extra, uc);


return; assert(uc->thread);
assert(uc->thread->work_queue);

// TODO: Add a stats counter here for this case.
//
// Using work_send() so the call stack unwinds back to libevent.
//
work_send(uc->thread->work_queue, upstream_retry, uc->extra, uc);

return;
} else {
if (settings.verbose > 2) {
fprintf(stderr,
"%d: cproxy_process_a2b_downstream_response not-my-vbucket, "
"cmd: %x skipping retry %d >= %d\n",
c->sfd, header->response.opcode, uc->cmd_retries,
max_retries);
}
}
} else { } else {
// TODO: Add a stats counter here for this case. // TODO: Add a stats counter here for this case.
// //
Expand All @@ -697,7 +715,6 @@ void a2b_process_downstream_response(conn *c) {
return; return;
} }


assert(d->multiget != NULL);
assert(uc->cmd_start != NULL); assert(uc->cmd_start != NULL);
assert(header->response.opaque != 0); assert(header->response.opaque != 0);


Expand All @@ -719,26 +736,37 @@ void a2b_process_downstream_response(conn *c) {
mcs_server_invalid_vbucket(&d->mst, downstream_conn_index(d, c), mcs_server_invalid_vbucket(&d->mst, downstream_conn_index(d, c),
vbucket); vbucket);


// Update the de-duplication map, removing the key, so that
// we'll reattempt another request for the key during the
// retry.
//
multiget_entry *entry = genhash_find(d->multiget, key_buf);

if (settings.verbose > 2) { if (settings.verbose > 2) {
fprintf(stderr, fprintf(stderr,
"<%d cproxy_process_a2b_downstream_response not-my-vbucket, " "<%d cproxy_process_a2b_downstream_response not-my-vbucket, "
"cmd: %x get/getk '%s' %d retry: %d, entry: %d, vbucket %d\n", "cmd: %x get/getk '%s' %d retry: %d, vbucket %d\n",
c->sfd, header->response.opcode, key_buf, key_len, c->sfd, header->response.opcode, key_buf, key_len,
d->upstream_retry + 1, entry != NULL, vbucket); d->upstream_retry + 1, vbucket);
} }


genhash_delete(d->multiget, key_buf); // Update the de-duplication map, removing the key, so that
// we'll reattempt another request for the key during the
// retry.
//
if (d->multiget != NULL) {
multiget_entry *entry = genhash_find(d->multiget, key_buf);

if (settings.verbose > 2) {
fprintf(stderr,
"<%d cproxy_process_a2b_downstream_response not-my-vbucket, "
"cmd: %x get/getk '%s' %d retry: %d, entry: %d, vbucket %d "
"deleting multiget entry\n",
c->sfd, header->response.opcode, key_buf, key_len,
d->upstream_retry + 1, entry != NULL, vbucket);
}

genhash_delete(d->multiget, key_buf);


while (entry != NULL) { while (entry != NULL) {
multiget_entry *curr = entry; multiget_entry *curr = entry;
entry = entry->next; entry = entry->next;
free(curr); free(curr);
}
} }


// Signal that we need to retry, where this counter is // Signal that we need to retry, where this counter is
Expand Down

0 comments on commit 18cb892

Please sign in to comment.