diff --git a/cproxy.c b/cproxy.c index b4e05dee..dc2a547c 100644 --- a/cproxy.c +++ b/cproxy.c @@ -57,6 +57,8 @@ void format_host_ident(char *buf, int buf_len, mcs_server_st *msst, enum protocol host_protocol); +int delink_from_downstream_conns(conn *c); + // Function tables. // conn_funcs cproxy_listen_funcs = { @@ -90,7 +92,7 @@ conn_funcs cproxy_upstream_funcs = { conn_funcs cproxy_downstream_funcs = { .conn_init = cproxy_init_downstream_conn, .conn_close = cproxy_on_close_downstream_conn, - .conn_connect = NULL, + .conn_connect = cproxy_on_connect_downstream_conn, .conn_process_ascii_command = cproxy_process_downstream_ascii, .conn_process_binary_command = cproxy_process_downstream_binary, .conn_complete_nread_ascii = cproxy_process_downstream_ascii_nread, @@ -101,8 +103,6 @@ conn_funcs cproxy_downstream_funcs = { .conn_binary_command_magic = PROTOCOL_BINARY_RES }; -static bool cproxy_forward(downstream *d); - /* Main function to create a proxy struct. */ proxy *cproxy_create(proxy_main *main, @@ -508,27 +508,10 @@ void cproxy_on_close_upstream_conn(conn *c) { c, NULL); } -void cproxy_on_close_downstream_conn(conn *c) { - assert(c != NULL); - assert(c->sfd >= 0); - assert(c->state == conn_closing); - - if (settings.verbose > 2) { - moxi_log_write("<%d cproxy_on_close_downstream_conn\n", c->sfd); - } - +int delink_from_downstream_conns(conn *c) { downstream *d = c->extra; - - // Might have been set to NULL during cproxy_free_downstream(). - // Or, when a downstream conn is in the thread-based free pool, it - // is not associated with any particular downstream. - // if (d == NULL) { - // TODO: See if we need to remove the downstream conn from the - // thread-based free pool. This shouldn't happen, but we - // should then figure out how to put an assert() here. - // - return; + return -1; } c->extra = NULL; @@ -556,6 +539,34 @@ void cproxy_on_close_downstream_conn(conn *c) { } } + return k; +} + +void cproxy_on_close_downstream_conn(conn *c) { + assert(c != NULL); + assert(c->sfd >= 0); + assert(c->state == conn_closing); + + if (settings.verbose > 2) { + moxi_log_write("<%d cproxy_on_close_downstream_conn\n", c->sfd); + } + + downstream *d = c->extra; + + // Might have been set to NULL during cproxy_free_downstream(). + // Or, when a downstream conn is in the thread-based free pool, it + // is not associated with any particular downstream. + // + if (d == NULL) { + // TODO: See if we need to remove the downstream conn from the + // thread-based free pool. This shouldn't happen, but we + // should then figure out how to put an assert() here. + // + return; + } + + int k = delink_from_downstream_conns(c); + proxy_td *ptd = d->ptd; assert(ptd); @@ -1238,7 +1249,7 @@ conn *cproxy_connect_downstream_conn(downstream *d, int err = -1; int fd = mcs_connect(mcs_server_st_hostname(msst), - mcs_server_st_port(msst), &err, true); + mcs_server_st_port(msst), &err, false); if (fd != -1) { conn *c = conn_new(fd, conn_pause, 0, DATA_BUFFER_SIZE, @@ -1571,7 +1582,7 @@ void propagate_error(downstream *d) { } } -static bool cproxy_forward(downstream *d) { +bool cproxy_forward(downstream *d) { assert(d != NULL); assert(d->ptd != NULL); assert(d->upstream_conn != NULL); @@ -2573,11 +2584,14 @@ void cproxy_upstream_state_change(conn *c, enum conn_states next_state) { // ------------------------------------------------- void cproxy_on_connect_downstream_conn(conn *c) { - // TODO: Need to revisit this callback. - // + int k; + assert(c != NULL); assert(c->host_ident); + downstream *d = c->extra; + assert(d != NULL); + if (settings.verbose > 2) { moxi_log_write("%d: cproxy_on_connect_downstream_conn for %s", c->sfd, c->host_ident); @@ -2597,6 +2611,10 @@ void cproxy_on_connect_downstream_conn(conn *c) { /* Check if the connection completed */ if (getsockopt(c->sfd, SOL_SOCKET, SO_ERROR, (void *) &error, &errsz) == -1) { + if (settings.verbose) { + moxi_log_write("%d: connect error: %s, %s", + c->sfd, c->host_ident, strerror(error)); + } goto cleanup; } @@ -2618,7 +2636,8 @@ void cproxy_on_connect_downstream_conn(conn *c) { conn_set_state(c, conn_pause); - zstored_release_downstream_conn(c, false); + cproxy_forward(d); + return; cleanup: @@ -2626,7 +2645,15 @@ void cproxy_on_connect_downstream_conn(conn *c) { // TODO: d->stats.conn_failures++; // TODO: d->error_count++; + k = delink_from_downstream_conns(c); + if (k >= 0) { + assert(d->downstream_conns[k] == NULL); + + d->downstream_conns[k] = NULL_CONN; + } + conn_set_state(c, conn_closing); + update_event(c, 0); } diff --git a/cproxy.h b/cproxy.h index 8a90927d..3790ef35 100644 --- a/cproxy.h +++ b/cproxy.h @@ -504,6 +504,8 @@ void cproxy_reset_upstream(conn *uc); bool cproxy_update_event_write(downstream *d, conn *c); +bool cproxy_forward(downstream *d); + void upstream_error(conn *uc); void upstream_retry(void *data0, void *data1);