Skip to content

Commit

Permalink
Fix MODCLUSTER-273 and clean create_worker().
Browse files Browse the repository at this point in the history
  • Loading branch information
jfclere committed Jan 22, 2012
1 parent a6f2501 commit c35e1c8
Showing 1 changed file with 141 additions and 100 deletions.
241 changes: 141 additions & 100 deletions native/mod_proxy_cluster/mod_proxy_cluster.c
Expand Up @@ -231,166 +231,166 @@ static apr_status_t create_worker(proxy_server_conf *conf, proxy_balancer *balan
#if APR_HAS_THREADS
int mpm_threads;
#endif
proxy_worker **worker;
proxy_worker *worker;
/* build the name (scheme and port) when needed */
url = apr_pstrcat(pool, node->mess.Type, "://", node->mess.Host, ":", node->mess.Port, NULL);

*worker = ap_proxy_get_worker(pool, conf, url);
if ((*worker) == NULL) {
worker = ap_proxy_get_worker(pool, conf, url);
if (worker == NULL) {

/* creates it */
proxy_cluster_helper *helper;
const char *err = ap_proxy_add_worker(worker, conf->pool, conf, url);
const char *err = ap_proxy_add_worker(&worker, conf->pool, conf, url);
if (err) {
ap_log_error(APLOG_MARK, APLOG_NOTICE|APLOG_NOERRNO, 0, server,
"Created: worker for %s failed: %s", url, err);
return APR_EGENERAL;
}
(*worker)->opaque = apr_pcalloc(conf->pool, sizeof(proxy_cluster_helper));
if (!(*worker)->opaque)
worker->opaque = apr_pcalloc(conf->pool, sizeof(proxy_cluster_helper));
if (!worker->opaque)
return APR_EGENERAL;
helper = (*worker)->opaque;
helper = worker->opaque;
helper->count_active = 0;
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
"Created: worker for %s", url);
} else if ((*worker)->id == 0) {
} else if (worker->id == 0) {
/* We are going to reuse a removed one */
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
"Created: reusing worker for %s", url);
if ((*worker)->cp->pool == NULL) {
init_conn_pool(conf->pool, *worker);
if (worker->cp->pool == NULL) {
init_conn_pool(conf->pool, worker);
}
reuse = 1;
} else {
/* Check if the shared memory goes to the right place */
char *pptr = (char *) node;
pptr = pptr + node->offset;
if ((*worker)->id == node->mess.id && (*worker)->s == (proxy_worker_stat *) pptr) {
if (worker->id == node->mess.id && worker->s == (proxy_worker_stat *) pptr) {
/* the share memory may have been removed and recreated */
if (!(*worker)->s->status) {
(*worker)->s->status = PROXY_WORKER_INITIALIZED;
strncpy((*worker)->s->route, node->mess.JVMRoute, PROXY_WORKER_MAX_ROUTE_SIZ);
(*worker)->s->route[PROXY_WORKER_MAX_ROUTE_SIZ] = '\0';
if (!worker->s->status) {
worker->s->status = PROXY_WORKER_INITIALIZED;
strncpy(worker->s->route, node->mess.JVMRoute, PROXY_WORKER_MAX_ROUTE_SIZ);
worker->s->route[PROXY_WORKER_MAX_ROUTE_SIZ] = '\0';
/* XXX: We need that information from TC */
(*worker)->s->redirect[0] = '\0';
(*worker)->s->lbstatus = 0;
(*worker)->s->lbfactor = -1; /* prevent using the node using status message */
worker->s->redirect[0] = '\0';
worker->s->lbstatus = 0;
worker->s->lbfactor = -1; /* prevent using the node using status message */
}
return APR_SUCCESS; /* Done Already existing */
}
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
"Created: can't reuse worker as it for %s cleaning...", url);
if ((*worker)->cp->pool) {
if (worker->cp->pool) {
/* destroy and create a new one */
apr_pool_destroy((*worker)->cp->pool);
(*worker)->cp->pool = NULL;
init_conn_pool(conf->pool, *worker);
apr_pool_destroy(worker->cp->pool);
worker->cp->pool = NULL;
init_conn_pool(conf->pool, worker);
}
reuse = 1;
}

/* Get the shared memory for this worker */
ptr = (char *) node;
ptr = ptr + node->offset;
(*worker)->s = (proxy_worker_stat *) ptr;
worker->s = (proxy_worker_stat *) ptr;

(*worker)->id = node->mess.id;
(*worker)->route = apr_pstrdup(conf->pool, node->mess.JVMRoute);
(*worker)->redirect = apr_pstrdup(conf->pool, "");
(*worker)->smax = node->mess.smax;
(*worker)->ttl = node->mess.ttl;
worker->id = node->mess.id;
worker->route = apr_pstrdup(conf->pool, node->mess.JVMRoute);
worker->redirect = apr_pstrdup(conf->pool, "");
worker->smax = node->mess.smax;
worker->ttl = node->mess.ttl;
if (node->mess.timeout) {
(*worker)->timeout_set = 1;
(*worker)->timeout = node->mess.timeout;
worker->timeout_set = 1;
worker->timeout = node->mess.timeout;
}
(*worker)->flush_packets = node->mess.flushpackets;
(*worker)->flush_wait = node->mess.flushwait;
worker->flush_packets = node->mess.flushpackets;
worker->flush_wait = node->mess.flushwait;
#if AP_MODULE_MAGIC_AT_LEAST(20051115,4)
(*worker)->ping_timeout = node->mess.ping;
(*worker)->ping_timeout_set = 1;
(*worker)->acquire_set = 1;
worker->ping_timeout = node->mess.ping;
worker->ping_timeout_set = 1;
worker->acquire_set = 1;
#else
helperping = (*worker)->opaque;
helperping = worker->opaque;
helperping->ping_timeout = node->mess.ping;
helperping->ping_timeout_set = 1;
#endif
#if AP_MODULE_MAGIC_AT_LEAST(20051115,16)
/* For MODCLUSTER-217 */
(*worker)->conn_timeout_set = 1;
(*worker)->conn_timeout = node->mess.ping;
worker->conn_timeout_set = 1;
worker->conn_timeout = node->mess.ping;
#endif
(*worker)->keepalive = 1;
(*worker)->keepalive_set = 1;
(*worker)->is_address_reusable = 1;
(*worker)->acquire = apr_time_make(0, 2 * 1000); /* 2 ms */
(*worker)->retry = apr_time_from_sec(PROXY_WORKER_DEFAULT_RETRY);
worker->keepalive = 1;
worker->keepalive_set = 1;
worker->is_address_reusable = 1;
worker->acquire = apr_time_make(0, 2 * 1000); /* 2 ms */
worker->retry = apr_time_from_sec(PROXY_WORKER_DEFAULT_RETRY);

/* from ap_proxy_initialize_worker() */
#if APR_HAS_THREADS
ap_mpm_query(AP_MPMQ_MAX_THREADS, &mpm_threads);
if (mpm_threads > 1) {
/* Set hard max to no more then mpm_threads */
if ((*worker)->hmax == 0 || (*worker)->hmax > mpm_threads) {
(*worker)->hmax = mpm_threads;
if (worker->hmax == 0 || worker->hmax > mpm_threads) {
worker->hmax = mpm_threads;
}
if ((*worker)->smax == -1 || (*worker)->smax > (*worker)->hmax) {
(*worker)->smax = (*worker)->hmax;
if (worker->smax == -1 || worker->smax > worker->hmax) {
worker->smax = worker->hmax;
}
/* Set min to be lower then smax */
if ((*worker)->min > (*worker)->smax) {
(*worker)->min = (*worker)->smax;
if (worker->min > worker->smax) {
worker->min = worker->smax;
}
}
else {
/* This will supress the apr_reslist creation */
(*worker)->min = (*worker)->smax = (*worker)->hmax = 0;
worker->min = worker->smax = worker->hmax = 0;
}

if ((*worker)->hmax) {
rv = apr_reslist_create(&((*worker)->cp->res),
(*worker)->min, (*worker)->smax,
(*worker)->hmax, (*worker)->ttl,
if (worker->hmax) {
rv = apr_reslist_create(&(worker->cp->res),
worker->min, worker->smax,
worker->hmax, worker->ttl,
connection_constructor, connection_destructor,
(*worker), (*worker)->cp->pool);
worker, worker->cp->pool);

apr_pool_cleanup_register((*worker)->cp->pool, (void *)(*worker),
apr_pool_cleanup_register(worker->cp->pool, (void *)worker,
conn_pool_cleanup,
apr_pool_cleanup_null);

ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
"proxy: initialized worker %d in child %" APR_PID_T_FMT " for (%s) min=%d max=%d smax=%d",
(*worker)->id, getpid(), (*worker)->hostname, (*worker)->min,
(*worker)->hmax, (*worker)->smax);
worker->id, getpid(), worker->hostname, worker->min,
worker->hmax, worker->smax);

#if (APR_MAJOR_VERSION > 0)
/* Set the acquire timeout */
if (rv == APR_SUCCESS && (*worker)->acquire_set) {
apr_reslist_timeout_set((*worker)->cp->res, (*worker)->acquire);
if (rv == APR_SUCCESS && worker->acquire_set) {
apr_reslist_timeout_set(worker->cp->res, worker->acquire);
}
#endif
}
else
#endif
{

rv = connection_constructor((void **)&((*worker)->cp->conn), (*worker), (*worker)->cp->pool);
rv = connection_constructor((void **)&(worker->cp->conn), worker, worker->cp->pool);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
"proxy: initialized single connection worker %d in child %" APR_PID_T_FMT " for (%s)",
(*worker)->id, getpid(), (*worker)->hostname);
worker->id, getpid(), worker->hostname);
}
/* end from ap_proxy_initialize_worker() */

/*
* The Shared datastatus may already contains a valid information
*/
if (!(*worker)->s->status) {
(*worker)->s->status = PROXY_WORKER_INITIALIZED;
strncpy((*worker)->s->route, node->mess.JVMRoute, PROXY_WORKER_MAX_ROUTE_SIZ);
(*worker)->s->route[PROXY_WORKER_MAX_ROUTE_SIZ] = '\0';
if (!worker->s->status) {
worker->s->status = PROXY_WORKER_INITIALIZED;
strncpy(worker->s->route, node->mess.JVMRoute, PROXY_WORKER_MAX_ROUTE_SIZ);
worker->s->route[PROXY_WORKER_MAX_ROUTE_SIZ] = '\0';
/* XXX: We need that information from TC */
(*worker)->s->redirect[0] = '\0';
(*worker)->s->lbstatus = 0;
(*worker)->s->lbfactor = -1; /* prevent using the node using status message */
worker->s->redirect[0] = '\0';
worker->s->lbstatus = 0;
worker->s->lbfactor = -1; /* prevent using the node using status message */
}

if (!reuse) {
Expand All @@ -400,7 +400,7 @@ static apr_status_t create_worker(proxy_server_conf *conf, proxy_balancer *balan
*/
proxy_worker *runtime;
runtime = apr_array_push(balancer->workers);
memcpy(runtime, (*worker), sizeof(proxy_worker));
memcpy(runtime, worker, sizeof(proxy_worker));
} else {
/* Update the corresponding balancer worker information */
proxy_worker *runtime;
Expand All @@ -410,16 +410,36 @@ static apr_status_t create_worker(proxy_server_conf *conf, proxy_balancer *balan
for (i = 0; i < balancer->workers->nelts; i++, runtime++) {
if (runtime->name) {
if (strcmp(url, runtime->name) == 0) {
memcpy(runtime, (*worker), sizeof(proxy_worker));
memcpy(runtime, worker, sizeof(proxy_worker));
}
}
}
}
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
"Created: worker for %s %d (status): %d", url, (*worker)->id, (*worker)->s->status);
"Created: worker for %s %d (status): %d", url, worker->id, worker->s->status);
return rv;
}

static balancerinfo_t *read_balancer_name(const char *name, apr_pool_t *pool)
{
int sizebal, i;
int *bal;
sizebal = balancer_storage->get_max_size_balancer();
if (sizebal == 0)
return NULL; /* Done broken. */
bal = apr_pcalloc(pool, sizeof(int) * sizebal);
sizebal = balancer_storage->get_ids_used_balancer(bal);
for (i=0; i<sizebal; i++) {
balancerinfo_t *balan;
balancer_storage->read_balancer(bal[i], &balan);
/* Something like balancer://cluster1 and cluster1 */
if (strcmp(balan->balancer, name) == 0) {
return balan;
}
}
return NULL;
}

/**
* Add balancer to the proxy_server_conf.
* NOTE: pool is the request pool or any temporary pool. Use conf->pool for any data that live longer.
Expand Down Expand Up @@ -462,35 +482,23 @@ static proxy_balancer *add_balancer_node(nodeinfo_t *node, proxy_server_conf *co

if (balancer && balancer->workers->nelts == 0) {
/* Logic to copy the shared memory information to the balancer */
int sizebal, i;
int *bal;
sizebal = balancer_storage->get_max_size_balancer();
if (sizebal == 0)
balancerinfo_t *balan = read_balancer_name(&balancer->name[11], pool);
if (balan == NULL)
return balancer; /* Done broken */
bal = apr_pcalloc(pool, sizeof(int) * sizebal);
sizebal = balancer_storage->get_ids_used_balancer(bal);
for (i=0; i<sizebal; i++) {
balancerinfo_t *balan;
balancer_storage->read_balancer(bal[i], &balan);
/* Something like balancer://cluster1 and cluster1 */
if (strcmp(balan->balancer, &balancer->name[11]) == 0) {
/* XXX: StickySession, StickySessionRemove not in */
balancer->sticky = apr_psprintf(conf->pool, "%s|%s", balan->StickySessionCookie,
balan->StickySessionPath);
balancer->sticky_force = 0;
if (balan->StickySession)
balancer->sticky_force += STSESSION;
if (balan->StickySessionForce)
balancer->sticky_force += STSESSFOR;
if (balan->StickySessionRemove)
balancer->sticky_force += STSESSREM;
balancer->timeout = balan->Timeout;

balancer->max_attempts = balan->Maxattempts;
balancer->max_attempts_set = 1;
break;
}
}
/* XXX: StickySession, StickySessionRemove not in */
balancer->sticky = apr_psprintf(conf->pool, "%s|%s", balan->StickySessionCookie,
balan->StickySessionPath);
balancer->sticky_force = 0;
if (balan->StickySession)
balancer->sticky_force += STSESSION;
if (balan->StickySessionForce)
balancer->sticky_force += STSESSFOR;
if (balan->StickySessionRemove)
balancer->sticky_force += STSESSREM;
balancer->timeout = balan->Timeout;

balancer->max_attempts = balan->Maxattempts;
balancer->max_attempts_set = 1;
}
return balancer;
}
Expand Down Expand Up @@ -518,6 +526,39 @@ static void add_balancers_workers(nodeinfo_t *node, apr_pool_t *pool)
}
if (!balancer)
balancer = add_balancer_node(node, conf, pool, s);
else {
/* We "reuse" the balancer */
balancerinfo_t *balan = read_balancer_name(&balancer->name[11], pool);
if (balan != NULL) {
char *sticky = apr_psprintf(conf->pool, "%s|%s", balan->StickySessionCookie,
balan->StickySessionPath);
int sticky_force=0;
int changed = 0;
if (balan->StickySession)
sticky_force += STSESSION;
if (balan->StickySessionForce)
sticky_force += STSESSFOR;
if (balan->StickySessionRemove)
sticky_force += STSESSREM;

if (balancer->sticky_force != sticky_force) {
balancer->sticky_force = sticky_force;
changed = -1;
}
if (strcmp(sticky, balancer->sticky) != 0) {
balancer->sticky = sticky;
changed = -1;
}
balancer->timeout = balan->Timeout;
balancer->max_attempts = balan->Maxattempts;
balancer->max_attempts_set = 1;
if (changed) {
/* log a warning */
ap_log_error(APLOG_MARK, APLOG_NOTICE|APLOG_NOERRNO, 0, s,
"Balancer %s changed" , &balancer->name[11]);
}
}
}
if (balancer)
create_worker(conf, balancer, s, node, pool);
s = s->next;
Expand Down

0 comments on commit c35e1c8

Please sign in to comment.