From c35e1c818faaa1bb3d24db9ff00a45e106499dff Mon Sep 17 00:00:00 2001 From: jfclere Date: Sun, 22 Jan 2012 09:50:52 +0000 Subject: [PATCH] Fix MODCLUSTER-273 and clean create_worker(). --- native/mod_proxy_cluster/mod_proxy_cluster.c | 241 +++++++++++-------- 1 file changed, 141 insertions(+), 100 deletions(-) diff --git a/native/mod_proxy_cluster/mod_proxy_cluster.c b/native/mod_proxy_cluster/mod_proxy_cluster.c index 4719a87ab..76fb67a47 100644 --- a/native/mod_proxy_cluster/mod_proxy_cluster.c +++ b/native/mod_proxy_cluster/mod_proxy_cluster.c @@ -231,60 +231,60 @@ static apr_status_t create_worker(proxy_server_conf *conf, proxy_balancer *balan #if APR_HAS_THREADS int mpm_threads; #endif - proxy_worker **worker; + proxy_worker *worker; /* build the name (scheme and port) when needed */ url = apr_pstrcat(pool, node->mess.Type, "://", node->mess.Host, ":", node->mess.Port, NULL); - *worker = ap_proxy_get_worker(pool, conf, url); - if ((*worker) == NULL) { + worker = ap_proxy_get_worker(pool, conf, url); + if (worker == NULL) { /* creates it */ proxy_cluster_helper *helper; - const char *err = ap_proxy_add_worker(worker, conf->pool, conf, url); + const char *err = ap_proxy_add_worker(&worker, conf->pool, conf, url); if (err) { ap_log_error(APLOG_MARK, APLOG_NOTICE|APLOG_NOERRNO, 0, server, "Created: worker for %s failed: %s", url, err); return APR_EGENERAL; } - (*worker)->opaque = apr_pcalloc(conf->pool, sizeof(proxy_cluster_helper)); - if (!(*worker)->opaque) + worker->opaque = apr_pcalloc(conf->pool, sizeof(proxy_cluster_helper)); + if (!worker->opaque) return APR_EGENERAL; - helper = (*worker)->opaque; + helper = worker->opaque; helper->count_active = 0; ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server, "Created: worker for %s", url); - } else if ((*worker)->id == 0) { + } else if (worker->id == 0) { /* We are going to reuse a removed one */ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server, "Created: reusing worker for %s", url); - if ((*worker)->cp->pool == NULL) { - init_conn_pool(conf->pool, *worker); + if (worker->cp->pool == NULL) { + init_conn_pool(conf->pool, worker); } reuse = 1; } else { /* Check if the shared memory goes to the right place */ char *pptr = (char *) node; pptr = pptr + node->offset; - if ((*worker)->id == node->mess.id && (*worker)->s == (proxy_worker_stat *) pptr) { + if (worker->id == node->mess.id && worker->s == (proxy_worker_stat *) pptr) { /* the share memory may have been removed and recreated */ - if (!(*worker)->s->status) { - (*worker)->s->status = PROXY_WORKER_INITIALIZED; - strncpy((*worker)->s->route, node->mess.JVMRoute, PROXY_WORKER_MAX_ROUTE_SIZ); - (*worker)->s->route[PROXY_WORKER_MAX_ROUTE_SIZ] = '\0'; + if (!worker->s->status) { + worker->s->status = PROXY_WORKER_INITIALIZED; + strncpy(worker->s->route, node->mess.JVMRoute, PROXY_WORKER_MAX_ROUTE_SIZ); + worker->s->route[PROXY_WORKER_MAX_ROUTE_SIZ] = '\0'; /* XXX: We need that information from TC */ - (*worker)->s->redirect[0] = '\0'; - (*worker)->s->lbstatus = 0; - (*worker)->s->lbfactor = -1; /* prevent using the node using status message */ + worker->s->redirect[0] = '\0'; + worker->s->lbstatus = 0; + worker->s->lbfactor = -1; /* prevent using the node using status message */ } return APR_SUCCESS; /* Done Already existing */ } ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server, "Created: can't reuse worker as it for %s cleaning...", url); - if ((*worker)->cp->pool) { + if (worker->cp->pool) { /* destroy and create a new one */ - apr_pool_destroy((*worker)->cp->pool); - (*worker)->cp->pool = NULL; - init_conn_pool(conf->pool, *worker); + apr_pool_destroy(worker->cp->pool); + worker->cp->pool = NULL; + init_conn_pool(conf->pool, worker); } reuse = 1; } @@ -292,80 +292,80 @@ static apr_status_t create_worker(proxy_server_conf *conf, proxy_balancer *balan /* Get the shared memory for this worker */ ptr = (char *) node; ptr = ptr + node->offset; - (*worker)->s = (proxy_worker_stat *) ptr; + worker->s = (proxy_worker_stat *) ptr; - (*worker)->id = node->mess.id; - (*worker)->route = apr_pstrdup(conf->pool, node->mess.JVMRoute); - (*worker)->redirect = apr_pstrdup(conf->pool, ""); - (*worker)->smax = node->mess.smax; - (*worker)->ttl = node->mess.ttl; + worker->id = node->mess.id; + worker->route = apr_pstrdup(conf->pool, node->mess.JVMRoute); + worker->redirect = apr_pstrdup(conf->pool, ""); + worker->smax = node->mess.smax; + worker->ttl = node->mess.ttl; if (node->mess.timeout) { - (*worker)->timeout_set = 1; - (*worker)->timeout = node->mess.timeout; + worker->timeout_set = 1; + worker->timeout = node->mess.timeout; } - (*worker)->flush_packets = node->mess.flushpackets; - (*worker)->flush_wait = node->mess.flushwait; + worker->flush_packets = node->mess.flushpackets; + worker->flush_wait = node->mess.flushwait; #if AP_MODULE_MAGIC_AT_LEAST(20051115,4) - (*worker)->ping_timeout = node->mess.ping; - (*worker)->ping_timeout_set = 1; - (*worker)->acquire_set = 1; + worker->ping_timeout = node->mess.ping; + worker->ping_timeout_set = 1; + worker->acquire_set = 1; #else - helperping = (*worker)->opaque; + helperping = worker->opaque; helperping->ping_timeout = node->mess.ping; helperping->ping_timeout_set = 1; #endif #if AP_MODULE_MAGIC_AT_LEAST(20051115,16) /* For MODCLUSTER-217 */ - (*worker)->conn_timeout_set = 1; - (*worker)->conn_timeout = node->mess.ping; + worker->conn_timeout_set = 1; + worker->conn_timeout = node->mess.ping; #endif - (*worker)->keepalive = 1; - (*worker)->keepalive_set = 1; - (*worker)->is_address_reusable = 1; - (*worker)->acquire = apr_time_make(0, 2 * 1000); /* 2 ms */ - (*worker)->retry = apr_time_from_sec(PROXY_WORKER_DEFAULT_RETRY); + worker->keepalive = 1; + worker->keepalive_set = 1; + worker->is_address_reusable = 1; + worker->acquire = apr_time_make(0, 2 * 1000); /* 2 ms */ + worker->retry = apr_time_from_sec(PROXY_WORKER_DEFAULT_RETRY); /* from ap_proxy_initialize_worker() */ #if APR_HAS_THREADS ap_mpm_query(AP_MPMQ_MAX_THREADS, &mpm_threads); if (mpm_threads > 1) { /* Set hard max to no more then mpm_threads */ - if ((*worker)->hmax == 0 || (*worker)->hmax > mpm_threads) { - (*worker)->hmax = mpm_threads; + if (worker->hmax == 0 || worker->hmax > mpm_threads) { + worker->hmax = mpm_threads; } - if ((*worker)->smax == -1 || (*worker)->smax > (*worker)->hmax) { - (*worker)->smax = (*worker)->hmax; + if (worker->smax == -1 || worker->smax > worker->hmax) { + worker->smax = worker->hmax; } /* Set min to be lower then smax */ - if ((*worker)->min > (*worker)->smax) { - (*worker)->min = (*worker)->smax; + if (worker->min > worker->smax) { + worker->min = worker->smax; } } else { /* This will supress the apr_reslist creation */ - (*worker)->min = (*worker)->smax = (*worker)->hmax = 0; + worker->min = worker->smax = worker->hmax = 0; } - if ((*worker)->hmax) { - rv = apr_reslist_create(&((*worker)->cp->res), - (*worker)->min, (*worker)->smax, - (*worker)->hmax, (*worker)->ttl, + if (worker->hmax) { + rv = apr_reslist_create(&(worker->cp->res), + worker->min, worker->smax, + worker->hmax, worker->ttl, connection_constructor, connection_destructor, - (*worker), (*worker)->cp->pool); + worker, worker->cp->pool); - apr_pool_cleanup_register((*worker)->cp->pool, (void *)(*worker), + apr_pool_cleanup_register(worker->cp->pool, (void *)worker, conn_pool_cleanup, apr_pool_cleanup_null); ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server, "proxy: initialized worker %d in child %" APR_PID_T_FMT " for (%s) min=%d max=%d smax=%d", - (*worker)->id, getpid(), (*worker)->hostname, (*worker)->min, - (*worker)->hmax, (*worker)->smax); + worker->id, getpid(), worker->hostname, worker->min, + worker->hmax, worker->smax); #if (APR_MAJOR_VERSION > 0) /* Set the acquire timeout */ - if (rv == APR_SUCCESS && (*worker)->acquire_set) { - apr_reslist_timeout_set((*worker)->cp->res, (*worker)->acquire); + if (rv == APR_SUCCESS && worker->acquire_set) { + apr_reslist_timeout_set(worker->cp->res, worker->acquire); } #endif } @@ -373,24 +373,24 @@ static apr_status_t create_worker(proxy_server_conf *conf, proxy_balancer *balan #endif { - rv = connection_constructor((void **)&((*worker)->cp->conn), (*worker), (*worker)->cp->pool); + rv = connection_constructor((void **)&(worker->cp->conn), worker, worker->cp->pool); ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server, "proxy: initialized single connection worker %d in child %" APR_PID_T_FMT " for (%s)", - (*worker)->id, getpid(), (*worker)->hostname); + worker->id, getpid(), worker->hostname); } /* end from ap_proxy_initialize_worker() */ /* * The Shared datastatus may already contains a valid information */ - if (!(*worker)->s->status) { - (*worker)->s->status = PROXY_WORKER_INITIALIZED; - strncpy((*worker)->s->route, node->mess.JVMRoute, PROXY_WORKER_MAX_ROUTE_SIZ); - (*worker)->s->route[PROXY_WORKER_MAX_ROUTE_SIZ] = '\0'; + if (!worker->s->status) { + worker->s->status = PROXY_WORKER_INITIALIZED; + strncpy(worker->s->route, node->mess.JVMRoute, PROXY_WORKER_MAX_ROUTE_SIZ); + worker->s->route[PROXY_WORKER_MAX_ROUTE_SIZ] = '\0'; /* XXX: We need that information from TC */ - (*worker)->s->redirect[0] = '\0'; - (*worker)->s->lbstatus = 0; - (*worker)->s->lbfactor = -1; /* prevent using the node using status message */ + worker->s->redirect[0] = '\0'; + worker->s->lbstatus = 0; + worker->s->lbfactor = -1; /* prevent using the node using status message */ } if (!reuse) { @@ -400,7 +400,7 @@ static apr_status_t create_worker(proxy_server_conf *conf, proxy_balancer *balan */ proxy_worker *runtime; runtime = apr_array_push(balancer->workers); - memcpy(runtime, (*worker), sizeof(proxy_worker)); + memcpy(runtime, worker, sizeof(proxy_worker)); } else { /* Update the corresponding balancer worker information */ proxy_worker *runtime; @@ -410,16 +410,36 @@ static apr_status_t create_worker(proxy_server_conf *conf, proxy_balancer *balan for (i = 0; i < balancer->workers->nelts; i++, runtime++) { if (runtime->name) { if (strcmp(url, runtime->name) == 0) { - memcpy(runtime, (*worker), sizeof(proxy_worker)); + memcpy(runtime, worker, sizeof(proxy_worker)); } } } } ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server, - "Created: worker for %s %d (status): %d", url, (*worker)->id, (*worker)->s->status); + "Created: worker for %s %d (status): %d", url, worker->id, worker->s->status); return rv; } +static balancerinfo_t *read_balancer_name(const char *name, apr_pool_t *pool) +{ + int sizebal, i; + int *bal; + sizebal = balancer_storage->get_max_size_balancer(); + if (sizebal == 0) + return NULL; /* Done broken. */ + bal = apr_pcalloc(pool, sizeof(int) * sizebal); + sizebal = balancer_storage->get_ids_used_balancer(bal); + for (i=0; iread_balancer(bal[i], &balan); + /* Something like balancer://cluster1 and cluster1 */ + if (strcmp(balan->balancer, name) == 0) { + return balan; + } + } + return NULL; +} + /** * Add balancer to the proxy_server_conf. * NOTE: pool is the request pool or any temporary pool. Use conf->pool for any data that live longer. @@ -462,35 +482,23 @@ static proxy_balancer *add_balancer_node(nodeinfo_t *node, proxy_server_conf *co if (balancer && balancer->workers->nelts == 0) { /* Logic to copy the shared memory information to the balancer */ - int sizebal, i; - int *bal; - sizebal = balancer_storage->get_max_size_balancer(); - if (sizebal == 0) + balancerinfo_t *balan = read_balancer_name(&balancer->name[11], pool); + if (balan == NULL) return balancer; /* Done broken */ - bal = apr_pcalloc(pool, sizeof(int) * sizebal); - sizebal = balancer_storage->get_ids_used_balancer(bal); - for (i=0; iread_balancer(bal[i], &balan); - /* Something like balancer://cluster1 and cluster1 */ - if (strcmp(balan->balancer, &balancer->name[11]) == 0) { - /* XXX: StickySession, StickySessionRemove not in */ - balancer->sticky = apr_psprintf(conf->pool, "%s|%s", balan->StickySessionCookie, - balan->StickySessionPath); - balancer->sticky_force = 0; - if (balan->StickySession) - balancer->sticky_force += STSESSION; - if (balan->StickySessionForce) - balancer->sticky_force += STSESSFOR; - if (balan->StickySessionRemove) - balancer->sticky_force += STSESSREM; - balancer->timeout = balan->Timeout; - - balancer->max_attempts = balan->Maxattempts; - balancer->max_attempts_set = 1; - break; - } - } + /* XXX: StickySession, StickySessionRemove not in */ + balancer->sticky = apr_psprintf(conf->pool, "%s|%s", balan->StickySessionCookie, + balan->StickySessionPath); + balancer->sticky_force = 0; + if (balan->StickySession) + balancer->sticky_force += STSESSION; + if (balan->StickySessionForce) + balancer->sticky_force += STSESSFOR; + if (balan->StickySessionRemove) + balancer->sticky_force += STSESSREM; + balancer->timeout = balan->Timeout; + + balancer->max_attempts = balan->Maxattempts; + balancer->max_attempts_set = 1; } return balancer; } @@ -518,6 +526,39 @@ static void add_balancers_workers(nodeinfo_t *node, apr_pool_t *pool) } if (!balancer) balancer = add_balancer_node(node, conf, pool, s); + else { + /* We "reuse" the balancer */ + balancerinfo_t *balan = read_balancer_name(&balancer->name[11], pool); + if (balan != NULL) { + char *sticky = apr_psprintf(conf->pool, "%s|%s", balan->StickySessionCookie, + balan->StickySessionPath); + int sticky_force=0; + int changed = 0; + if (balan->StickySession) + sticky_force += STSESSION; + if (balan->StickySessionForce) + sticky_force += STSESSFOR; + if (balan->StickySessionRemove) + sticky_force += STSESSREM; + + if (balancer->sticky_force != sticky_force) { + balancer->sticky_force = sticky_force; + changed = -1; + } + if (strcmp(sticky, balancer->sticky) != 0) { + balancer->sticky = sticky; + changed = -1; + } + balancer->timeout = balan->Timeout; + balancer->max_attempts = balan->Maxattempts; + balancer->max_attempts_set = 1; + if (changed) { + /* log a warning */ + ap_log_error(APLOG_MARK, APLOG_NOTICE|APLOG_NOERRNO, 0, s, + "Balancer %s changed" , &balancer->name[11]); + } + } + } if (balancer) create_worker(conf, balancer, s, node, pool); s = s->next;