/
mod_proxy_cluster.c
2954 lines (2693 loc) · 104 KB
/
mod_proxy_cluster.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* mod_cluster
*
* Copyright(c) 2008 Red Hat Middleware, LLC,
* and individual contributors as indicated by the @authors tag.
* See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library in the file COPYING.LIB;
* if not, write to the Free Software Foundation, Inc.,
* 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA
*
* @author Jean-Frederic Clere
* @version $Revision$
*/
#include "apr_strings.h"
#include "apr_version.h"
#include "httpd.h"
#include "http_config.h"
#include "http_log.h"
#include "http_main.h"
#include "http_request.h"
#include "http_protocol.h"
#include "http_core.h"
#include "ap_mpm.h"
#include "mod_proxy.h"
#include "mod_proxy_cluster.h"
#include "slotmem.h"
#include "node.h"
#include "host.h"
#include "context.h"
#include "balancer.h"
#include "sessionid.h"
#include "domain.h"
#if APR_HAVE_UNISTD_H
/* for getpid() */
#include <unistd.h>
#endif
/* define HAVE_CLUSTER_EX_DEBUG to have extented debug in mod_cluster */
#define HAVE_CLUSTER_EX_DEBUG 0
struct proxy_cluster_helper {
int count_active; /* currently active request using the worker */
#if AP_MODULE_MAGIC_AT_LEAST(20051115,4)
#else
apr_interval_time_t ping_timeout;
char ping_timeout_set;
#endif
};
typedef struct proxy_cluster_helper proxy_cluster_helper;
static struct node_storage_method *node_storage = NULL;
static struct host_storage_method *host_storage = NULL;
static struct context_storage_method *context_storage = NULL;
static struct balancer_storage_method *balancer_storage = NULL;
static struct sessionid_storage_method *sessionid_storage = NULL;
static struct domain_storage_method *domain_storage = NULL;
static apr_thread_mutex_t *lock = NULL;
static server_rec *main_server = NULL;
#define CREAT_ALL 0 /* create balancers/workers in all VirtualHost */
#define CREAT_NONE 1 /* don't create balancers (but add workers) */
#define CREAT_ROOT 2 /* Only create balancers/workers in the main server */
static int creat_bal = CREAT_ROOT;
static int use_alias = 0; /* 1 : Compare Alias with server_name */
static apr_time_t lbstatus_recalc_time = apr_time_from_sec(5); /* recalcul the lbstatus based on number of request in the time interval */
static apr_time_t wait_for_remove = apr_time_from_sec(10); /* wait until that before removing a removed node */
#define TIMESESSIONID 300 /* after 5 minutes the sessionid have probably timeout */
#define TIMEDOMAIN 300 /* after 5 minutes the sessionid have probably timeout */
/* Context table copy for local use */
struct proxy_context_table
{
int sizecontext;
int* contexts;
contextinfo_t* context_info;
};
typedef struct proxy_context_table proxy_context_table;
/* VHost table copy for local use */
struct proxy_vhost_table
{
int sizevhost;
int* vhosts;
hostinfo_t* vhost_info;
};
typedef struct proxy_vhost_table proxy_vhost_table;
/* reslist constructor */
/* XXX: Should use the proxy_util one. */
static apr_status_t connection_constructor(void **resource, void *params,
apr_pool_t *pool)
{
apr_pool_t *ctx;
apr_pool_t *scpool;
proxy_conn_rec *conn;
proxy_worker *worker = (proxy_worker *)params;
/*
* Create the subpool for each connection
* This keeps the memory consumption constant
* when disconnecting from backend.
*/
apr_pool_create(&ctx, pool);
conn = apr_pcalloc(pool, sizeof(proxy_conn_rec));
/*
* Create another subpool that manages the data for the
* socket and the connection member of the proxy_conn_rec struct as we
* destroy this data more frequently than other data in the proxy_conn_rec
* struct like hostname and addr (at least in the case where we have
* keepalive connections that timed out).
*/
#if AP_MODULE_MAGIC_AT_LEAST(20051115,13)
apr_pool_create(&scpool, ctx);
apr_pool_tag(scpool, "proxy_conn_scpool");
conn->scpool = scpool;
#endif
conn->pool = ctx;
conn->worker = worker;
#if APR_HAS_THREADS
conn->inreslist = 1;
#endif
*resource = conn;
return APR_SUCCESS;
}
#if APR_HAS_THREADS /* only needed when threads are used */
/* reslist destructor */
/* XXX: Should use the proxy_util one. */
static apr_status_t connection_destructor(void *resource, void *params,
apr_pool_t *pool)
{
proxy_conn_rec *conn = (proxy_conn_rec *)resource;
/* Destroy the pool only if not called from reslist_destroy */
if (conn->worker->cp->pool) {
apr_pool_destroy(conn->pool);
}
return APR_SUCCESS;
}
#endif
/* XXX: Should use the proxy_util one. */
#if APR_HAS_THREADS
static apr_status_t conn_pool_cleanup(void *theworker)
{
proxy_worker *worker = (proxy_worker *)theworker;
if (worker->cp->res) {
worker->cp->pool = NULL;
}
return APR_SUCCESS;
}
#endif
/* XXX: Should use the proxy_util one. */
static void init_conn_pool(apr_pool_t *p, proxy_worker *worker)
{
apr_pool_t *pool;
proxy_conn_pool *cp;
/*
* Create a connection pool's subpool.
* This pool is used for connection recycling.
* Once the worker is added it is never removed but
* it can be disabled.
*/
apr_pool_create(&pool, p);
apr_pool_tag(pool, "proxy_worker_cp");
/*
* Alloc from the same pool as worker.
* proxy_conn_pool is permanently attached to the worker.
*/
cp = (proxy_conn_pool *)apr_pcalloc(p, sizeof(proxy_conn_pool));
cp->pool = pool;
worker->cp = cp;
}
/**
* Add a node to the worker conf
* XXX: Contains code of ap_proxy_initialize_worker (proxy_util.c)
* XXX: If something goes wrong the worker can't be used and we leak memory... in a pool
* NOTE: pool is the request pool or any temporary pool. Use conf->pool for any data that live longer.
* @param node the pointer to the node structure
* @param conf a proxy_server_conf.
* @param balancer the balancer to update.
* @param pool a temporary pool.
* @server the server rec for logging purposes.
*
*/
static apr_status_t create_worker(proxy_server_conf *conf, proxy_balancer *balancer,
server_rec *server,
nodeinfo_t *node, apr_pool_t *pool)
{
char *url;
char *ptr;
int reuse = 0;
apr_status_t rv = APR_SUCCESS;
#if AP_MODULE_MAGIC_AT_LEAST(20051115,4)
#else
proxy_cluster_helper *helperping;
#endif
#if APR_HAS_THREADS
int mpm_threads;
#endif
proxy_worker *worker;
/* build the name (scheme and port) when needed */
url = apr_pstrcat(pool, node->mess.Type, "://", node->mess.Host, ":", node->mess.Port, NULL);
worker = ap_proxy_get_worker(pool, conf, url);
if (worker == NULL) {
/* creates it */
proxy_cluster_helper *helper;
const char *err = ap_proxy_add_worker(&worker, conf->pool, conf, url);
if (err) {
ap_log_error(APLOG_MARK, APLOG_NOTICE|APLOG_NOERRNO, 0, server,
"Created: worker for %s failed: %s", url, err);
return APR_EGENERAL;
}
worker->opaque = apr_pcalloc(conf->pool, sizeof(proxy_cluster_helper));
if (!worker->opaque)
return APR_EGENERAL;
helper = worker->opaque;
helper->count_active = 0;
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
"Created: worker for %s", url);
} else if (worker->id == 0) {
/* We are going to reuse a removed one */
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
"Created: reusing worker for %s", url);
if (worker->cp->pool == NULL) {
init_conn_pool(conf->pool, worker);
}
reuse = 1;
} else {
/* Check if the shared memory goes to the right place */
char *pptr = (char *) node;
pptr = pptr + node->offset;
if (worker->id == node->mess.id && worker->s == (proxy_worker_stat *) pptr) {
/* the share memory may have been removed and recreated */
if (!worker->s->status) {
worker->s->status = PROXY_WORKER_INITIALIZED;
strncpy(worker->s->route, node->mess.JVMRoute, PROXY_WORKER_MAX_ROUTE_SIZ);
worker->s->route[PROXY_WORKER_MAX_ROUTE_SIZ] = '\0';
/* XXX: We need that information from TC */
worker->s->redirect[0] = '\0';
worker->s->lbstatus = 0;
worker->s->lbfactor = -1; /* prevent using the node using status message */
}
return APR_SUCCESS; /* Done Already existing */
}
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
"Created: can't reuse worker as it for %s cleaning...", url);
if (worker->cp->pool) {
/* destroy and create a new one */
apr_pool_destroy(worker->cp->pool);
worker->cp->pool = NULL;
init_conn_pool(conf->pool, worker);
}
reuse = 1;
}
/* Get the shared memory for this worker */
ptr = (char *) node;
ptr = ptr + node->offset;
worker->s = (proxy_worker_stat *) ptr;
worker->id = node->mess.id;
worker->route = apr_pstrdup(conf->pool, node->mess.JVMRoute);
worker->redirect = apr_pstrdup(conf->pool, "");
worker->smax = node->mess.smax;
worker->ttl = node->mess.ttl;
if (node->mess.timeout) {
worker->timeout_set = 1;
worker->timeout = node->mess.timeout;
}
worker->flush_packets = node->mess.flushpackets;
worker->flush_wait = node->mess.flushwait;
#if AP_MODULE_MAGIC_AT_LEAST(20051115,4)
worker->ping_timeout = node->mess.ping;
worker->ping_timeout_set = 1;
worker->acquire_set = 1;
#else
helperping = worker->opaque;
helperping->ping_timeout = node->mess.ping;
helperping->ping_timeout_set = 1;
#endif
#if AP_MODULE_MAGIC_AT_LEAST(20051115,16)
/* For MODCLUSTER-217 */
worker->conn_timeout_set = 1;
worker->conn_timeout = node->mess.ping;
#endif
worker->keepalive = 1;
worker->keepalive_set = 1;
worker->is_address_reusable = 1;
worker->acquire = apr_time_make(0, 2 * 1000); /* 2 ms */
worker->retry = apr_time_from_sec(PROXY_WORKER_DEFAULT_RETRY);
/* from ap_proxy_initialize_worker() */
#if APR_HAS_THREADS
ap_mpm_query(AP_MPMQ_MAX_THREADS, &mpm_threads);
if (mpm_threads > 1) {
/* Set hard max to no more then mpm_threads */
if (worker->hmax == 0 || worker->hmax > mpm_threads) {
worker->hmax = mpm_threads;
}
if (worker->smax == -1 || worker->smax > worker->hmax) {
worker->smax = worker->hmax;
}
/* Set min to be lower then smax */
if (worker->min > worker->smax) {
worker->min = worker->smax;
}
}
else {
/* This will supress the apr_reslist creation */
worker->min = worker->smax = worker->hmax = 0;
}
if (worker->hmax) {
rv = apr_reslist_create(&(worker->cp->res),
worker->min, worker->smax,
worker->hmax, worker->ttl,
connection_constructor, connection_destructor,
worker, worker->cp->pool);
apr_pool_cleanup_register(worker->cp->pool, (void *)worker,
conn_pool_cleanup,
apr_pool_cleanup_null);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
"proxy: initialized worker %d in child %" APR_PID_T_FMT " for (%s) min=%d max=%d smax=%d",
worker->id, getpid(), worker->hostname, worker->min,
worker->hmax, worker->smax);
#if (APR_MAJOR_VERSION > 0)
/* Set the acquire timeout */
if (rv == APR_SUCCESS && worker->acquire_set) {
apr_reslist_timeout_set(worker->cp->res, worker->acquire);
}
#endif
}
else
#endif
{
rv = connection_constructor((void **)&(worker->cp->conn), worker, worker->cp->pool);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
"proxy: initialized single connection worker %d in child %" APR_PID_T_FMT " for (%s)",
worker->id, getpid(), worker->hostname);
}
/* end from ap_proxy_initialize_worker() */
/*
* The Shared datastatus may already contains a valid information
*/
if (!worker->s->status) {
worker->s->status = PROXY_WORKER_INITIALIZED;
strncpy(worker->s->route, node->mess.JVMRoute, PROXY_WORKER_MAX_ROUTE_SIZ);
worker->s->route[PROXY_WORKER_MAX_ROUTE_SIZ] = '\0';
/* XXX: We need that information from TC */
worker->s->redirect[0] = '\0';
worker->s->lbstatus = 0;
worker->s->lbfactor = -1; /* prevent using the node using status message */
}
if (!reuse) {
/*
* Create the corresponding balancer worker information
* copying for proxy_util.c ap_proxy_add_worker_to_balancer
*/
proxy_worker *runtime;
runtime = apr_array_push(balancer->workers);
memcpy(runtime, worker, sizeof(proxy_worker));
} else {
/* Update the corresponding balancer worker information */
proxy_worker *runtime;
int i;
runtime = (proxy_worker *)balancer->workers->elts;
for (i = 0; i < balancer->workers->nelts; i++, runtime++) {
if (runtime->name) {
if (strcmp(url, runtime->name) == 0) {
memcpy(runtime, worker, sizeof(proxy_worker));
}
}
}
}
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
"Created: worker for %s %d (status): %d", url, worker->id, worker->s->status);
return rv;
}
static balancerinfo_t *read_balancer_name(const char *name, apr_pool_t *pool)
{
int sizebal, i;
int *bal;
sizebal = balancer_storage->get_max_size_balancer();
if (sizebal == 0)
return NULL; /* Done broken. */
bal = apr_pcalloc(pool, sizeof(int) * sizebal);
sizebal = balancer_storage->get_ids_used_balancer(bal);
for (i=0; i<sizebal; i++) {
balancerinfo_t *balan;
balancer_storage->read_balancer(bal[i], &balan);
/* Something like balancer://cluster1 and cluster1 */
if (strcmp(balan->balancer, name) == 0) {
return balan;
}
}
return NULL;
}
/**
* Add balancer to the proxy_server_conf.
* NOTE: pool is the request pool or any temporary pool. Use conf->pool for any data that live longer.
* @param node the pointer to the node structure (contains the balancer information).
* @param conf a proxy_server_conf.
* @param balancer the balancer to update or NULL to create it.
* @param name the name of the balancer.
* @param pool a temporary pool.
* @server the server rec for logging purposes.
*
*/
static proxy_balancer *add_balancer_node(nodeinfo_t *node, proxy_server_conf *conf,
apr_pool_t *pool, server_rec *server)
{
proxy_balancer *balancer = NULL;
char *name = apr_pstrcat(pool, "balancer://", node->mess.balancer, NULL);
balancer = ap_proxy_get_balancer(pool, conf, name);
if (!balancer) {
ap_log_error(APLOG_MARK, APLOG_DEBUG|APLOG_NOERRNO, 0, server,
"add_balancer_node: Create balancer %s", name);
balancer = apr_array_push(conf->balancers);
memset(balancer, 0, sizeof(proxy_balancer));
balancer->name = apr_pstrdup(conf->pool, name);
balancer->lbmethod = ap_lookup_provider(PROXY_LBMETHOD, "byrequests", "0");
balancer->workers = apr_array_make(conf->pool, 5, sizeof(proxy_worker));
/* XXX Is this a right place to create mutex */
#if APR_HAS_THREADS
if (apr_thread_mutex_create(&(balancer->mutex),
APR_THREAD_MUTEX_DEFAULT, conf->pool) != APR_SUCCESS) {
/* XXX: Do we need to log something here */
ap_log_error(APLOG_MARK, APLOG_NOTICE|APLOG_NOERRNO, 0, server,
"add_balancer_node: Can't create lock for balancer");
}
#endif
} else {
ap_log_error(APLOG_MARK, APLOG_DEBUG|APLOG_NOERRNO, 0, server,
"add_balancer_node: Using balancer %s", name);
}
if (balancer && balancer->workers->nelts == 0) {
/* Logic to copy the shared memory information to the balancer */
balancerinfo_t *balan = read_balancer_name(&balancer->name[11], pool);
if (balan == NULL)
return balancer; /* Done broken */
/* XXX: StickySession, StickySessionRemove not in */
balancer->sticky = apr_psprintf(conf->pool, "%s|%s", balan->StickySessionCookie,
balan->StickySessionPath);
balancer->sticky_force = 0;
if (balan->StickySession)
balancer->sticky_force += STSESSION;
if (balan->StickySessionForce)
balancer->sticky_force += STSESSFOR;
if (balan->StickySessionRemove)
balancer->sticky_force += STSESSREM;
balancer->timeout = balan->Timeout;
balancer->max_attempts = balan->Maxattempts;
balancer->max_attempts_set = 1;
}
return balancer;
}
/*
* Adds the balancers and the workers to the VirtualHosts corresponding to node
* Note that the calling routine should lock before calling us.
* @param node the node information to add.
* @param pool temporary pool to use for temporary buffer.
*/
static void add_balancers_workers(nodeinfo_t *node, apr_pool_t *pool)
{
server_rec *s = main_server;
char *name = apr_pstrcat(pool, "balancer://", node->mess.balancer, NULL);
while (s) {
void *sconf = s->module_config;
proxy_server_conf *conf = (proxy_server_conf *)ap_get_module_config(sconf, &proxy_module);
proxy_balancer *balancer = ap_proxy_get_balancer(pool, conf, name);
if (!balancer && (creat_bal == CREAT_NONE ||
(creat_bal == CREAT_ROOT && s!=main_server))) {
s = s->next;
continue;
}
if (!balancer)
balancer = add_balancer_node(node, conf, pool, s);
else {
/* We "reuse" the balancer */
balancerinfo_t *balan = read_balancer_name(&balancer->name[11], pool);
if (balan != NULL) {
char *sticky = apr_psprintf(conf->pool, "%s|%s", balan->StickySessionCookie,
balan->StickySessionPath);
int sticky_force=0;
int changed = 0;
if (balan->StickySession)
sticky_force += STSESSION;
if (balan->StickySessionForce)
sticky_force += STSESSFOR;
if (balan->StickySessionRemove)
sticky_force += STSESSREM;
if (balancer->sticky_force != sticky_force) {
balancer->sticky_force = sticky_force;
changed = -1;
}
if (strcmp(sticky, balancer->sticky) != 0) {
balancer->sticky = sticky;
changed = -1;
}
balancer->timeout = balan->Timeout;
balancer->max_attempts = balan->Maxattempts;
balancer->max_attempts_set = 1;
if (changed) {
/* log a warning */
ap_log_error(APLOG_MARK, APLOG_NOTICE|APLOG_NOERRNO, 0, s,
"Balancer %s changed" , &balancer->name[11]);
}
}
}
if (balancer)
create_worker(conf, balancer, s, node, pool);
s = s->next;
}
}
/*
* Remove a node from the worker conf
*/
static int remove_workers_node(nodeinfo_t *node, proxy_server_conf *conf, apr_pool_t *pool, server_rec *server)
{
int i;
char *pptr = (char *) node;
pptr = pptr + node->offset;
proxy_cluster_helper *helper;
proxy_worker *worker = (proxy_worker *)conf->workers->elts;
for (i = 0; i < conf->workers->nelts; i++) {
if (worker->id == node->mess.id && worker->s == (proxy_worker_stat *) pptr)
break;
worker++;
}
if (i == conf->workers->nelts) {
/* XXX: Another process may use it, can't do: node_storage->remove_node(node); */
return 0; /* Done */
/* Here we loop through our workers not need to check that the worker->s is OK */
}
/* prevent other threads using it */
worker->s->status |= PROXY_WORKER_IN_ERROR;
/* apr_reslist_acquired_count */
i = 0;
#if APU_MAJOR_VERSION > 1 || (APU_MAJOR_VERSION == 1 && APU_MINOR_VERSION >= 3)
if (worker->cp->res)
i = apr_reslist_acquired_count(worker->cp->res);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
"remove_workers_node (reslist) %d %s", i, node->mess.JVMRoute);
#else
helper = worker->opaque;
if (helper) {
i = helper->count_active;
}
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
"remove_workers_node (helper) %d %s", i, node->mess.JVMRoute);
#endif
if (i == 0) {
/* No connection in use: clean the worker */
proxy_balancer *balancer;
char *name = apr_pstrcat(pool, "balancer://", node->mess.balancer, NULL);
/* mark the worker removed in the apr_array of the balancer */
balancer = (proxy_balancer *)conf->balancers->elts;
for (i = 0; i < conf->balancers->nelts; i++, balancer++) {
if (strcmp(balancer->name, name) == 0) {
int j;
proxy_worker *searched = (proxy_worker *)balancer->workers->elts;
for (j = 0; j < balancer->workers->nelts; j++, searched++) {
/* Here we loop through our workers not need to check that the worker->s is OK */
if (searched->id == worker->id) {
searched->id = 0; /* mark it removed */
}
}
}
}
/* Clear the connection pool (close the sockets) */
if (worker->cp->pool) {
apr_pool_destroy(worker->cp->pool);
worker->cp->pool = NULL;
}
/* XXX: Shouldnn't we remove the mutex too (worker->mutex) */
worker->id = 0; /* mark it removed */
return (0);
} else {
node->mess.lastcleantry = apr_time_now();
return (1); /* We should retry later */
}
}
/*
* Create/Remove workers corresponding to updated nodes.
* NOTE: It is called from proxy_cluster_watchdog_func and other locations
* It shouldn't call worker_nodes_are_updated() because there may be several VirtualHosts.
*/
static void update_workers_node(proxy_server_conf *conf, apr_pool_t *pool, server_rec *server, int check)
{
int *id, size, i;
apr_time_t last;
/* Check if we have to do something */
apr_thread_mutex_lock(lock);
if (check)
last = node_storage->worker_nodes_need_update(main_server, pool);
else
last = 1;
/* nodes_need_update will return 1 if last_updated is zero: first time we are called */
if (last == 0) {
apr_thread_mutex_unlock(lock);
return;
}
/* read the ident of the nodes */
size = node_storage->get_max_size_node();
if (size == 0) {
apr_thread_mutex_unlock(lock);
return;
}
id = apr_pcalloc(pool, sizeof(int) * size);
size = node_storage->get_ids_used_node(id);
/* XXX: How to skip the balancer that aren't controled by mod_manager */
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
"update_workers_node starting");
/* Only process the nodes that have been updated since our last update */
for (i=0; i<size; i++) {
nodeinfo_t *ou;
if (node_storage->read_node(id[i], &ou) != APR_SUCCESS)
continue;
if (ou->updatetime >= last && ou->mess.remove == 0) {
/* The node has changed */
add_balancers_workers(ou, pool);
}
}
apr_thread_mutex_unlock(lock);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
"update_workers_node done");
}
/* the worker corresponding to the id, note that we need to compare the shared memory pointer too */
static proxy_worker *get_worker_from_id(proxy_server_conf *conf, int id, proxy_worker_stat *stat)
{
int i;
proxy_worker *worker;
worker = (proxy_worker *)conf->workers->elts;
for (i = 0; i < conf->workers->nelts; i++) {
if (worker->id == id && worker->s == stat) {
return worker;
}
worker++;
}
return NULL;
}
/*
* Do a ping/pong to the node
* XXX: ajp_handle_cping_cpong should come from a provider as
* it is already in modules/proxy/ajp_utils.c
*/
static apr_status_t ajp_handle_cping_cpong(apr_socket_t *sock,
request_rec *r,
apr_interval_time_t timeout)
{
char buf[5];
apr_size_t written = 5;
apr_interval_time_t org;
apr_status_t status;
apr_status_t rv;
/* built the cping message */
buf[0] = 0x12;
buf[1] = 0x34;
buf[2] = (apr_byte_t) 0;
buf[3] = (apr_byte_t) 1;
buf[4] = (unsigned char)10;
status = apr_socket_send(sock, buf, &written);
if (status != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_ERR, status, r->server,
"ajp_cping_cpong(): send failed");
return status;
}
status = apr_socket_timeout_get(sock, &org);
if (status != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_ERR, status, r->server,
"ajp_cping_cpong(): apr_socket_timeout_get failed");
return status;
}
status = apr_socket_timeout_set(sock, timeout);
written = 5;
status = apr_socket_recv(sock, buf, &written);
if (status != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_ERR, 0, r->server,
"ajp_cping_cpong: apr_socket_recv failed");
goto cleanup;
}
if (buf[0] != 0x41 || buf[1] != 0x42 || buf[2] != 0 || buf[3] != 1 || buf[4] != (unsigned char)9) {
ap_log_error(APLOG_MARK, APLOG_ERR, 0, r->server,
"ajp_cping_cpong: awaited CPONG, received %02x %02x %02x %02x %02x",
buf[0] & 0xFF,
buf[1] & 0xFF,
buf[2] & 0xFF,
buf[3] & 0xFF,
buf[4] & 0xFF);
status = APR_EGENERAL;
}
cleanup:
rv = apr_socket_timeout_set(sock, org);
if (rv != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_ERR, 0, r->server,
"ajp_cping_cpong: apr_socket_timeout_set failed");
return rv;
}
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
"ajp_cping_cpong: Done");
return status;
}
static apr_status_t proxy_cluster_try_pingpong(request_rec *r, proxy_worker *worker,
char *url, proxy_server_conf *conf,
apr_interval_time_t ping, apr_interval_time_t workertimeout)
{
apr_status_t status;
apr_interval_time_t timeout;
apr_interval_time_t savetimeout;
char savetimeout_set;
#if AP_MODULE_MAGIC_AT_LEAST(20051115,4)
#else
proxy_cluster_helper *helperping;
#endif
proxy_conn_rec *backend = NULL;
char server_portstr[32];
char *locurl = url;
apr_uri_t *uri;
/* create space for state information */
status = ap_proxy_acquire_connection(worker->scheme, &backend, worker, r->server);
if (status != OK) {
if (backend) {
backend->close_on_recycle = 1;
ap_proxy_release_connection(worker->scheme, backend, r->server);
}
return status;
}
/* Step One: Determine Who To Connect To */
uri = apr_palloc(r->pool, sizeof(*uri)); /* We don't use it anyway */
status = ap_proxy_determine_connection(r->pool, r, conf, worker, backend,
uri, &locurl, worker->hostname, worker->port,
server_portstr,
sizeof(server_portstr));
if (status != OK) {
ap_proxy_release_connection(worker->scheme, backend, r->server);
return status;
}
/* Set the timeout: Note that the default timeout logic in the proxy_util.c is:
* 1 - worker->timeout (if timeout_set timeout=n in the worker)
* 2 - conf->timeout (if timeout_set ProxyTimeout 300)
* 3 - s->timeout (TimeOut 300).
* We hack it... Via 1
* Since 20051115.16 (2.2.9) there is a conn_timeout and conn_timeout_set.
* Changing the worker->timeout is a bad idea (we have to restore the value from the shared memory).
*/
#if AP_MODULE_MAGIC_AT_LEAST(20051115,4)
timeout = worker->ping_timeout;
#else
helperping = worker->opaque;
timeout = helperping->ping_timeout;
#endif
if (timeout <= 0)
timeout = apr_time_from_sec(10); /* 10 seconds */
#if AP_MODULE_MAGIC_AT_LEAST(20051115,16)
if (!worker->conn_timeout_set) {
savetimeout_set = worker->conn_timeout_set;
savetimeout = worker->conn_timeout;
worker->conn_timeout = timeout;
worker->conn_timeout_set = 1;
} else {
savetimeout_set = 0;
savetimeout = 0;
}
#else
/* XXX: side effects the worker may be used in another socket */
savetimeout_set = worker->timeout_set;
savetimeout = worker->timeout;
worker->timeout_set = 1;
worker->timeout = timeout;
#endif
/* Step Two: Make the Connection */
status = ap_proxy_connect_backend(worker->scheme, backend, worker, r->server);
#if AP_MODULE_MAGIC_AT_LEAST(20051115,16)
if (!savetimeout_set) {
worker->conn_timeout = savetimeout;
worker->conn_timeout_set = savetimeout_set;
}
#else
/* Restore the information from the node information */
if (workertimeout) {
worker->timeout = workertimeout;
worker->timeout_set = 1;
} else {
worker->timeout_set = 0;
worker->timeout = 0;
}
#endif
if (status != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
"proxy_cluster_try_pingpong: can't connect to backend");
ap_proxy_release_connection(worker->scheme, backend, r->server);
return status;
}
/* XXX: For the moment we support only AJP */
if (strcasecmp(worker->scheme, "AJP") == 0) {
status = ajp_handle_cping_cpong(backend->sock, r, timeout);
if (status != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
"proxy_cluster_try_pingpong: cping_cpong failed");
backend->close++;
}
}
ap_proxy_release_connection(worker->scheme, backend, r->server);
return status;
}
/*
* update the lbfactor of each node if needed,
*/
static void update_workers_lbstatus(proxy_server_conf *conf, apr_pool_t *pool, server_rec *server)
{
int *id, size, i;
apr_time_t now;
now = apr_time_now();
/* read the ident of the nodes */
size = node_storage->get_max_size_node();
if (size == 0)
return;
id = apr_pcalloc(pool, sizeof(int) * size);
size = node_storage->get_ids_used_node(id);
/* update lbstatus if needed */
for (i=0; i<size; i++) {
nodeinfo_t *ou;
if (node_storage->read_node(id[i], &ou) != APR_SUCCESS)
continue;
if (ou->mess.remove)
continue;
if (ou->mess.updatetimelb < (now - lbstatus_recalc_time)) {
/* The lbstatus needs to be updated */
int elected, oldelected;
proxy_worker_stat *stat;
char *ptr = (char *) ou;
ptr = ptr + ou->offset;
stat = (proxy_worker_stat *) ptr;
elected = stat->elected;
oldelected = ou->mess.oldelected;
ou->mess.updatetimelb = now;
ou->mess.oldelected = elected;
if (stat->lbfactor > 0)
stat->lbstatus = ((elected - oldelected) * 1000) / stat->lbfactor;
if (elected == oldelected) {
/* lbstatus_recalc_time without changes: test for broken nodes */
/* first get the worker, create a dummy request and do a ping */
char sport[7];
char *url;
apr_status_t rv;
apr_pool_t *rrp;
request_rec *rnew;
proxy_worker *worker = get_worker_from_id(conf, id[i], stat);
if (worker == NULL)
continue; /* skip it */
apr_snprintf(sport, sizeof(sport), ":%d", worker->port);
url = apr_pstrcat(pool, worker->scheme, "://", worker->hostname, sport, "/", NULL);
apr_pool_create(&rrp, pool);
apr_pool_tag(rrp, "subrequest");
rnew = apr_pcalloc(rrp, sizeof(request_rec));
rnew->pool = rrp;
/* we need only those ones */
rnew->server = server;
rnew->connection = apr_pcalloc(rrp, sizeof(conn_rec));
rnew->per_dir_config = server->lookup_defaults;
rnew->notes = apr_table_make(rnew->pool, 1);
rnew->method = "PING";
rnew->uri = "/";
rnew->headers_in = apr_table_make(rnew->pool, 1);
rv = proxy_cluster_try_pingpong(rnew, worker, url, conf, ou->mess.ping, ou->mess.timeout);
if (rv != APR_SUCCESS) {
/* We can't reach the node */
worker->s->status |= PROXY_WORKER_IN_ERROR;
ou->mess.num_failure_idle++;
if (ou->mess.num_failure_idle > 60) {
/* Failing for 5 minutes: time to mark it removed */
ou->mess.remove = 1;
ou->updatetime = now;
}
} else
ou->mess.num_failure_idle = 0;
} else
ou->mess.num_failure_idle = 0;
}
}
}
/*
* remove the sessionids that have timeout
*/
static void remove_timeout_sessionid(proxy_server_conf *conf, apr_pool_t *pool, server_rec *server)
{
int *id, size, i;
apr_time_t now;
now = apr_time_sec(apr_time_now());
/* read the ident of the sessionid */
size = sessionid_storage->get_max_size_sessionid();
if (size == 0)
return;
id = apr_pcalloc(pool, sizeof(int) * size);
size = sessionid_storage->get_ids_used_sessionid(id);
/* update lbstatus if needed */
for (i=0; i<size; i++) {
sessionidinfo_t *ou;
if (sessionid_storage->read_sessionid(id[i], &ou) != APR_SUCCESS)
continue;
if (ou->updatetime < (now - TIMESESSIONID)) {
/* Remove it */
sessionid_storage->remove_sessionid(ou);
}
}
}
/*
* remove the domain that have timeout
*/