Skip to content

Commit

Permalink
cproxy_on_connect_downstream_conn hooked up
Browse files Browse the repository at this point in the history
Downstream connect() is now non-blocking, leveraging on zstored
conn_connecting state in drive_machine().

During hookup of this code, refactored out a
delink_from_downstream_conns() helper func as we want to clear out the
downstream_conns[] array appropriately during an error.

Change-Id: Ie3941d156ae2bc48bf376c51a09c94bbefafe8bc
Reviewed-on: http://review.northscale.com/2262
Tested-by: Steve Yen <steve.yen@gmail.com>
Reviewed-by: Steve Yen <steve.yen@gmail.com>
  • Loading branch information
steveyen committed Sep 8, 2010
1 parent 8a09afd commit 204bfca
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 27 deletions.
81 changes: 54 additions & 27 deletions cproxy.c
Expand Up @@ -57,6 +57,8 @@ void format_host_ident(char *buf, int buf_len,
mcs_server_st *msst,
enum protocol host_protocol);

int delink_from_downstream_conns(conn *c);

// Function tables.
//
conn_funcs cproxy_listen_funcs = {
Expand Down Expand Up @@ -90,7 +92,7 @@ conn_funcs cproxy_upstream_funcs = {
conn_funcs cproxy_downstream_funcs = {
.conn_init = cproxy_init_downstream_conn,
.conn_close = cproxy_on_close_downstream_conn,
.conn_connect = NULL,
.conn_connect = cproxy_on_connect_downstream_conn,
.conn_process_ascii_command = cproxy_process_downstream_ascii,
.conn_process_binary_command = cproxy_process_downstream_binary,
.conn_complete_nread_ascii = cproxy_process_downstream_ascii_nread,
Expand All @@ -101,8 +103,6 @@ conn_funcs cproxy_downstream_funcs = {
.conn_binary_command_magic = PROTOCOL_BINARY_RES
};

static bool cproxy_forward(downstream *d);

/* Main function to create a proxy struct.
*/
proxy *cproxy_create(proxy_main *main,
Expand Down Expand Up @@ -508,27 +508,10 @@ void cproxy_on_close_upstream_conn(conn *c) {
c, NULL);
}

void cproxy_on_close_downstream_conn(conn *c) {
assert(c != NULL);
assert(c->sfd >= 0);
assert(c->state == conn_closing);

if (settings.verbose > 2) {
moxi_log_write("<%d cproxy_on_close_downstream_conn\n", c->sfd);
}

int delink_from_downstream_conns(conn *c) {
downstream *d = c->extra;

// Might have been set to NULL during cproxy_free_downstream().
// Or, when a downstream conn is in the thread-based free pool, it
// is not associated with any particular downstream.
//
if (d == NULL) {
// TODO: See if we need to remove the downstream conn from the
// thread-based free pool. This shouldn't happen, but we
// should then figure out how to put an assert() here.
//
return;
return -1;
}

c->extra = NULL;
Expand Down Expand Up @@ -556,6 +539,34 @@ void cproxy_on_close_downstream_conn(conn *c) {
}
}

return k;
}

void cproxy_on_close_downstream_conn(conn *c) {
assert(c != NULL);
assert(c->sfd >= 0);
assert(c->state == conn_closing);

if (settings.verbose > 2) {
moxi_log_write("<%d cproxy_on_close_downstream_conn\n", c->sfd);
}

downstream *d = c->extra;

// Might have been set to NULL during cproxy_free_downstream().
// Or, when a downstream conn is in the thread-based free pool, it
// is not associated with any particular downstream.
//
if (d == NULL) {
// TODO: See if we need to remove the downstream conn from the
// thread-based free pool. This shouldn't happen, but we
// should then figure out how to put an assert() here.
//
return;
}

int k = delink_from_downstream_conns(c);

proxy_td *ptd = d->ptd;
assert(ptd);

Expand Down Expand Up @@ -1238,7 +1249,7 @@ conn *cproxy_connect_downstream_conn(downstream *d,

int err = -1;
int fd = mcs_connect(mcs_server_st_hostname(msst),
mcs_server_st_port(msst), &err, true);
mcs_server_st_port(msst), &err, false);
if (fd != -1) {
conn *c = conn_new(fd, conn_pause, 0,
DATA_BUFFER_SIZE,
Expand Down Expand Up @@ -1571,7 +1582,7 @@ void propagate_error(downstream *d) {
}
}

static bool cproxy_forward(downstream *d) {
bool cproxy_forward(downstream *d) {
assert(d != NULL);
assert(d->ptd != NULL);
assert(d->upstream_conn != NULL);
Expand Down Expand Up @@ -2573,11 +2584,14 @@ void cproxy_upstream_state_change(conn *c, enum conn_states next_state) {
// -------------------------------------------------

void cproxy_on_connect_downstream_conn(conn *c) {
// TODO: Need to revisit this callback.
//
int k;

assert(c != NULL);
assert(c->host_ident);

downstream *d = c->extra;
assert(d != NULL);

if (settings.verbose > 2) {
moxi_log_write("%d: cproxy_on_connect_downstream_conn for %s",
c->sfd, c->host_ident);
Expand All @@ -2597,6 +2611,10 @@ void cproxy_on_connect_downstream_conn(conn *c) {
/* Check if the connection completed */
if (getsockopt(c->sfd, SOL_SOCKET, SO_ERROR, (void *) &error,
&errsz) == -1) {
if (settings.verbose) {
moxi_log_write("%d: connect error: %s, %s",
c->sfd, c->host_ident, strerror(error));
}
goto cleanup;
}

Expand All @@ -2618,15 +2636,24 @@ void cproxy_on_connect_downstream_conn(conn *c) {

conn_set_state(c, conn_pause);

zstored_release_downstream_conn(c, false);
cproxy_forward(d);

return;

cleanup:
// TODO: d->thread->ptd.tot_downstream_connect_failed++;
// TODO: d->stats.conn_failures++;
// TODO: d->error_count++;

k = delink_from_downstream_conns(c);
if (k >= 0) {
assert(d->downstream_conns[k] == NULL);

d->downstream_conns[k] = NULL_CONN;
}

conn_set_state(c, conn_closing);

update_event(c, 0);
}

Expand Down
2 changes: 2 additions & 0 deletions cproxy.h
Expand Up @@ -504,6 +504,8 @@ void cproxy_reset_upstream(conn *uc);

bool cproxy_update_event_write(downstream *d, conn *c);

bool cproxy_forward(downstream *d);

void upstream_error(conn *uc);
void upstream_retry(void *data0, void *data1);

Expand Down

0 comments on commit 204bfca

Please sign in to comment.