Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

MB-2724 - configurable auth_timeout with auth timeout blacklisting

auth_timeout is now command-line configurable, and can now be used
with the zstored downstream conn blacklisting flags.  For example...

    cycle=200,
    connect_max_errors=10,
    connect_retry_interval=30000,
    auth_timeout=100,
    ...

A new tot_auth_timeout stat also tracks how often we see
an auth timeout error.

Change-Id: Idfb64d627ff0b5b470332efb640b498382d9e5bf
Reviewed-on: http://review.membase.org/3628
Tested-by: Sean Lynch <seanl@literati.org>
Reviewed-by: Sean Lynch <seanl@literati.org>
  • Loading branch information...
commit 646bff3d4e1f0ff2e78c2c6adffcfd04f95eae72 1 parent 4dd1038
Steve Yen steveyen authored seanlynch committed
8 agent_stats.c
@@ -470,6 +470,9 @@ static void proxy_stats_dump_behavior(ADD_STAT add_stats,
470 470 APPEND_PREFIX_STAT("wait_queue_timeout", "%ld", // In millisecs.
471 471 (b->wait_queue_timeout.tv_sec * 1000 +
472 472 b->wait_queue_timeout.tv_usec / 1000));
  473 + APPEND_PREFIX_STAT("auth_timeout", "%ld", // In millisecs.
  474 + (b->auth_timeout.tv_sec * 1000 +
  475 + b->auth_timeout.tv_usec / 1000));
473 476 APPEND_PREFIX_STAT("time_stats", "%d", b->time_stats);
474 477 APPEND_PREFIX_STAT("connect_max_errors", "%d", b->connect_max_errors);
475 478 APPEND_PREFIX_STAT("connect_retry_interval", "%d", b->connect_retry_interval);
@@ -590,6 +593,8 @@ static void proxy_stats_dump_pstd_stats(ADD_STAT add_stats,
590 593 "%llu", (long long unsigned int) pstats->tot_downstream_timeout);
591 594 APPEND_PREFIX_STAT("tot_wait_queue_timeout",
592 595 "%llu", (long long unsigned int) pstats->tot_wait_queue_timeout);
  596 + APPEND_PREFIX_STAT("tot_auth_timeout",
  597 + "%llu", (long long unsigned int) pstats->tot_auth_timeout);
593 598 APPEND_PREFIX_STAT("tot_assign_downstream",
594 599 "%llu", (long long unsigned int) pstats->tot_assign_downstream);
595 600 APPEND_PREFIX_STAT("tot_assign_upstream",
@@ -1204,6 +1209,7 @@ static void add_proxy_stats(proxy_stats *agg, proxy_stats *x) {
1204 1209 x->tot_downstream_close_on_upstream_close;
1205 1210 agg->tot_downstream_timeout += x->tot_downstream_timeout;
1206 1211 agg->tot_wait_queue_timeout += x->tot_wait_queue_timeout;
  1212 + agg->tot_auth_timeout += x->tot_auth_timeout;
1207 1213 agg->tot_assign_downstream += x->tot_assign_downstream;
1208 1214 agg->tot_assign_upstream += x->tot_assign_upstream;
1209 1215 agg->tot_assign_recursion += x->tot_assign_recursion;
@@ -1457,6 +1463,8 @@ void map_pstd_foreach_emit(const void *k,
1457 1463 pstd->stats.tot_downstream_timeout);
1458 1464 more_stat("tot_wait_queue_timeout",
1459 1465 pstd->stats.tot_wait_queue_timeout);
  1466 + more_stat("tot_auth_timeout",
  1467 + pstd->stats.tot_auth_timeout);
1460 1468 more_stat("tot_assign_downstream",
1461 1469 pstd->stats.tot_assign_downstream);
1462 1470 more_stat("tot_assign_upstream",
119 cproxy.c
@@ -1472,28 +1472,39 @@ bool downstream_connect_init(downstream *d, mcs_server_st *msst,
1472 1472 host_ident = mcs_server_st_ident(msst, IS_ASCII(c->protocol));
1473 1473 }
1474 1474
1475   - zstored_error_count(c->thread, host_ident, false);
1476   -
1477 1475 if (c->cmd_start_time != 0 &&
1478 1476 d->ptd->behavior_pool.base.time_stats) {
1479 1477 downstream_connect_time_sample(&d->ptd->stats,
1480 1478 usec_now() - c->cmd_start_time);
1481 1479 }
1482 1480
1483   - if (cproxy_auth_downstream(msst, behavior, c->sfd)) {
  1481 + int rv;
  1482 +
  1483 + rv = cproxy_auth_downstream(msst, behavior, c->sfd);
  1484 + if (rv == 0) {
1484 1485 d->ptd->stats.stats.tot_downstream_auth++;
1485 1486
1486   - if (cproxy_bucket_downstream(msst, behavior, c->sfd)) {
  1487 + rv = cproxy_bucket_downstream(msst, behavior, c->sfd);
  1488 + if (rv == 0) {
1487 1489 d->ptd->stats.stats.tot_downstream_bucket++;
1488 1490
  1491 + zstored_error_count(c->thread, host_ident, false);
  1492 +
1489 1493 return true;
1490 1494 } else {
1491 1495 d->ptd->stats.stats.tot_downstream_bucket_failed++;
1492 1496 }
1493 1497 } else {
1494 1498 d->ptd->stats.stats.tot_downstream_auth_failed++;
  1499 + if (rv == 1) {
  1500 + d->ptd->stats.stats.tot_auth_timeout++;
  1501 + }
1495 1502 }
1496 1503
  1504 + // Treat a auth/bucket error as a blacklistable error.
  1505 + //
  1506 + zstored_error_count(c->thread, host_ident, true);
  1507 +
1497 1508 return false;
1498 1509 }
1499 1510
@@ -2535,9 +2546,11 @@ bool cproxy_start_downstream_timeout(downstream *d, conn *c) {
2535 2546 return (evtimer_add(&d->timeout_event, &d->timeout_tv) == 0);
2536 2547 }
2537 2548
2538   -bool cproxy_auth_downstream(mcs_server_st *server,
2539   - proxy_behavior *behavior,
2540   - int fd) {
  2549 +// Return 0 on success, -1 on general failure, 1 on timeout failure.
  2550 +//
  2551 +int cproxy_auth_downstream(mcs_server_st *server,
  2552 + proxy_behavior *behavior,
  2553 + int fd) {
2541 2554 assert(server);
2542 2555 assert(behavior);
2543 2556 assert(fd != -1);
@@ -2545,7 +2558,7 @@ bool cproxy_auth_downstream(mcs_server_st *server,
2545 2558 char buf[3000];
2546 2559
2547 2560 if (!IS_BINARY(behavior->downstream_protocol)) {
2548   - return true;
  2561 + return 0;
2549 2562 }
2550 2563
2551 2564 const char *usr = mcs_server_st_usr(server) != NULL ?
@@ -2557,7 +2570,7 @@ bool cproxy_auth_downstream(mcs_server_st *server,
2557 2570 int pwd_len = strlen(pwd);
2558 2571
2559 2572 if (usr_len <= 0) {
2560   - return true;
  2573 + return 0;
2561 2574 }
2562 2575
2563 2576 if (settings.verbose > 2) {
@@ -2572,7 +2585,7 @@ bool cproxy_auth_downstream(mcs_server_st *server,
2572 2585 moxi_log_write("auth failure args\n");
2573 2586 }
2574 2587
2575   - return false; // Probably misconfigured.
  2588 + return -1; // Probably misconfigured.
2576 2589 }
2577 2590
2578 2591 // The key should look like "PLAIN", or the sasl mech string.
@@ -2604,19 +2617,19 @@ bool cproxy_auth_downstream(mcs_server_st *server,
2604 2617 usr, buf_len);
2605 2618 }
2606 2619
2607   - return false;
  2620 + return -1;
2608 2621 }
2609 2622
2610 2623 protocol_binary_response_header res = { .bytes = {0} };
2611 2624
2612 2625 struct timeval *timeout = NULL;
2613   - if (behavior->downstream_timeout.tv_sec != 0 ||
2614   - behavior->downstream_timeout.tv_usec != 0) {
2615   - timeout = &behavior->downstream_timeout;
  2626 + if (behavior->auth_timeout.tv_sec != 0 ||
  2627 + behavior->auth_timeout.tv_usec != 0) {
  2628 + timeout = &behavior->auth_timeout;
2616 2629 }
2617 2630
2618   - if (mcs_io_read(fd, &res.bytes,
2619   - sizeof(res.bytes), timeout) == MCS_SUCCESS &&
  2631 + mcs_return mr = mcs_io_read(fd, &res.bytes, sizeof(res.bytes), timeout);
  2632 + if (mr == MCS_SUCCESS &&
2620 2633 res.response.magic == PROTOCOL_BINARY_RES) {
2621 2634 res.response.status = ntohs(res.response.status);
2622 2635 res.response.keylen = ntohs(res.response.keylen);
@@ -2627,13 +2640,19 @@ bool cproxy_auth_downstream(mcs_server_st *server,
2627 2640 int len = res.response.bodylen;
2628 2641 while (len > 0) {
2629 2642 int amt = (len > (int) sizeof(buf) ? (int) sizeof(buf) : len);
2630   - if (mcs_io_read(fd, buf, amt, timeout) != MCS_SUCCESS) {
  2643 +
  2644 + mr = mcs_io_read(fd, buf, amt, timeout);
  2645 + if (mr != MCS_SUCCESS) {
2631 2646 if (settings.verbose > 1) {
2632   - moxi_log_write("auth could not read response body (%d)\n",
2633   - usr, amt);
  2647 + moxi_log_write("auth could not read response body (%d) %d\n",
  2648 + usr, amt, mr);
  2649 + }
  2650 +
  2651 + if (mr == MCS_TIMEOUT) {
  2652 + return 1;
2634 2653 }
2635 2654
2636   - return false;
  2655 + return -1;
2637 2656 }
2638 2657
2639 2658 len -= amt;
@@ -2649,7 +2668,7 @@ bool cproxy_auth_downstream(mcs_server_st *server,
2649 2668 moxi_log_write("auth_downstream success for %s\n", usr);
2650 2669 }
2651 2670
2652   - return true;
  2671 + return 0;
2653 2672 }
2654 2673
2655 2674 if (settings.verbose > 1) {
@@ -2658,29 +2677,35 @@ bool cproxy_auth_downstream(mcs_server_st *server,
2658 2677 }
2659 2678 } else {
2660 2679 if (settings.verbose > 1) {
2661   - moxi_log_write("auth_downstream response error for %s\n",
2662   - usr);
  2680 + moxi_log_write("auth_downstream response error for %s, %d\n",
  2681 + usr, mr);
2663 2682 }
2664 2683 }
2665 2684
2666   - return false;
  2685 + if (mr == MCS_TIMEOUT) {
  2686 + return 1;
  2687 + }
  2688 +
  2689 + return -1;
2667 2690 }
2668 2691
2669   -bool cproxy_bucket_downstream(mcs_server_st *server,
2670   - proxy_behavior *behavior,
2671   - int fd) {
  2692 +// Return 0 on success, -1 on general failure, 1 on timeout failure.
  2693 +//
  2694 +int cproxy_bucket_downstream(mcs_server_st *server,
  2695 + proxy_behavior *behavior,
  2696 + int fd) {
2672 2697 assert(server);
2673 2698 assert(behavior);
2674 2699 assert(IS_PROXY(behavior->downstream_protocol));
2675 2700 assert(fd != -1);
2676 2701
2677 2702 if (!IS_BINARY(behavior->downstream_protocol)) {
2678   - return true;
  2703 + return 0;
2679 2704 }
2680 2705
2681 2706 int bucket_len = strlen(behavior->bucket);
2682 2707 if (bucket_len <= 0) {
2683   - return true; // When no bucket.
  2708 + return 0; // When no bucket.
2684 2709 }
2685 2710
2686 2711 protocol_binary_request_header req = { .bytes = {0} };
@@ -2701,19 +2726,19 @@ bool cproxy_bucket_downstream(mcs_server_st *server,
2701 2726 bucket_len);
2702 2727 }
2703 2728
2704   - return false;
  2729 + return -1;
2705 2730 }
2706 2731
2707 2732 protocol_binary_response_header res = { .bytes = {0} };
2708 2733
2709 2734 struct timeval *timeout = NULL;
2710   - if (behavior->downstream_timeout.tv_sec != 0 ||
2711   - behavior->downstream_timeout.tv_usec != 0) {
2712   - timeout = &behavior->downstream_timeout;
  2735 + if (behavior->auth_timeout.tv_sec != 0 ||
  2736 + behavior->auth_timeout.tv_usec != 0) {
  2737 + timeout = &behavior->auth_timeout;
2713 2738 }
2714 2739
2715   - if (mcs_io_read(fd, &res.bytes,
2716   - sizeof(res.bytes), timeout) == MCS_SUCCESS &&
  2740 + mcs_return mr = mcs_io_read(fd, &res.bytes, sizeof(res.bytes), timeout);
  2741 + if (mr == MCS_SUCCESS &&
2717 2742 res.response.magic == PROTOCOL_BINARY_RES) {
2718 2743 res.response.status = ntohs(res.response.status);
2719 2744 res.response.keylen = ntohs(res.response.keylen);
@@ -2726,9 +2751,16 @@ bool cproxy_bucket_downstream(mcs_server_st *server,
2726 2751 int len = res.response.bodylen;
2727 2752 while (len > 0) {
2728 2753 int amt = (len > (int) sizeof(buf) ? (int) sizeof(buf) : len);
2729   - if (mcs_io_read(fd, buf, amt, timeout) != MCS_SUCCESS) {
2730   - return false;
  2754 +
  2755 + mr = mcs_io_read(fd, buf, amt, timeout);
  2756 + if (mr != MCS_SUCCESS) {
  2757 + if (mr == MCS_TIMEOUT) {
  2758 + return 1;
  2759 + }
  2760 +
  2761 + return -1;
2731 2762 }
  2763 +
2732 2764 len -= amt;
2733 2765 }
2734 2766
@@ -2743,7 +2775,7 @@ bool cproxy_bucket_downstream(mcs_server_st *server,
2743 2775 behavior->bucket);
2744 2776 }
2745 2777
2746   - return true;
  2778 + return 0;
2747 2779 }
2748 2780
2749 2781 if (settings.verbose > 1) {
@@ -2753,7 +2785,11 @@ bool cproxy_bucket_downstream(mcs_server_st *server,
2753 2785 }
2754 2786 }
2755 2787
2756   - return false;
  2788 + if (mr == MCS_TIMEOUT) {
  2789 + return 1;
  2790 + }
  2791 +
  2792 + return -1;
2757 2793 }
2758 2794
2759 2795 int cproxy_max_retries(downstream *d) {
@@ -2987,8 +3023,9 @@ void zstored_error_count(LIBEVENT_THREAD *thread,
2987 3023 // rather than be released back to the thread->conn_hash,
2988 3024 // so update the dc_acquired here.
2989 3025 //
2990   - assert(conns->dc_acquired > 0);
2991   - conns->dc_acquired--;
  3026 + if (conns->dc_acquired > 0) {
  3027 + conns->dc_acquired--;
  3028 + }
2992 3029
2993 3030 // When zero downstream conns are available, wake up all
2994 3031 // waiting downstreams so they can proceed (possibly by
8 cproxy.h
@@ -115,6 +115,7 @@ struct proxy_behavior {
115 115 enum protocol downstream_protocol; // SL: Favored downstream protocol.
116 116 struct timeval downstream_timeout; // SL: Fields of 0 mean no timeout.
117 117 struct timeval wait_queue_timeout; // PL: Fields of 0 mean no timeout.
  118 + struct timeval auth_timeout; // PL: Fields of 0 mean no timeout.
118 119 bool time_stats; // IL: Capture timing stats.
119 120
120 121 uint32_t connect_max_errors; // IL: Pause when too many connect() errs.
@@ -284,6 +285,7 @@ struct proxy_stats {
284 285 uint64_t tot_downstream_close_on_upstream_close;
285 286 uint64_t tot_downstream_timeout;
286 287 uint64_t tot_wait_queue_timeout;
  288 + uint64_t tot_auth_timeout;
287 289 uint64_t tot_assign_downstream;
288 290 uint64_t tot_assign_upstream;
289 291 uint64_t tot_assign_recursion;
@@ -513,10 +515,10 @@ proxy *cproxy_find_proxy_by_auth(proxy_main *m,
513 515 const char *usr,
514 516 const char *pwd);
515 517
516   -bool cproxy_auth_downstream(mcs_server_st *server,
  518 +int cproxy_auth_downstream(mcs_server_st *server,
  519 + proxy_behavior *behavior, int fd);
  520 +int cproxy_bucket_downstream(mcs_server_st *server,
517 521 proxy_behavior *behavior, int fd);
518   -bool cproxy_bucket_downstream(mcs_server_st *server,
519   - proxy_behavior *behavior, int fd);
520 522
521 523 void cproxy_pause_upstream_for_downstream(proxy_td *ptd, conn *upstream);
522 524 conn *cproxy_find_downstream_conn(downstream *d, char *key, int key_length,
11 cproxy_config.c
@@ -55,6 +55,10 @@ proxy_behavior behavior_default_g = {
55 55 .tv_sec = 0,
56 56 .tv_usec = 0
57 57 },
  58 + .auth_timeout = {
  59 + .tv_sec = 0,
  60 + .tv_usec = 0
  61 + },
58 62 .time_stats = false,
59 63 .connect_max_errors = 0, // In zstored, 10.
60 64 .connect_retry_interval = 0, // In zstored, 30000.
@@ -645,6 +649,10 @@ void cproxy_parse_behavior_key_val(char *key,
645 649 int ms = strtol(val, NULL, 10);
646 650 behavior->wait_queue_timeout.tv_sec = floor(ms / 1000.0);
647 651 behavior->wait_queue_timeout.tv_usec = (ms % 1000) * 1000;
  652 + } else if (wordeq(key, "auth_timeout")) {
  653 + int ms = strtol(val, NULL, 10);
  654 + behavior->auth_timeout.tv_sec = floor(ms / 1000.0);
  655 + behavior->auth_timeout.tv_usec = (ms % 1000) * 1000;
648 656 } else if (wordeq(key, "time_stats")) {
649 657 behavior->time_stats = strtol(val, NULL, 10);
650 658 } else if (wordeq(key, "connect_max_errors")) {
@@ -808,6 +816,9 @@ void cproxy_dump_behavior_ex(proxy_behavior *b, char *prefix, int level,
808 816 vdump("wait_queue_timeout", "%ld", // In millisecs.
809 817 (b->wait_queue_timeout.tv_sec * 1000 +
810 818 b->wait_queue_timeout.tv_usec / 1000));
  819 + vdump("auth_timeout", "%ld", // In millisecs.
  820 + (b->auth_timeout.tv_sec * 1000 +
  821 + b->auth_timeout.tv_usec / 1000));
811 822 vdump("time_stats", "%d", b->time_stats);
812 823 vdump("connect_max_errors", "%u", b->connect_max_errors);
813 824 vdump("connect_retry_interval", "%u", b->connect_retry_interval);
2  mcs.c
@@ -599,7 +599,7 @@ mcs_return mcs_io_read(int fd, void *dta, size_t size, struct timeval *timeout)
599 599 if (select(fd + 1, &readfds, NULL, NULL, timeout) != 1) {
600 600 fcntl(fd, F_SETFL, flags | O_NONBLOCK);
601 601
602   - return MCS_FAILURE;
  602 + return MCS_TIMEOUT;
603 603 }
604 604 }
605 605
1  mcs.h
@@ -22,6 +22,7 @@
22 22 typedef enum {
23 23 MCS_SUCCESS = 0,
24 24 MCS_FAILURE,
  25 + MCS_TIMEOUT,
25 26 MCS_MAXIMUM_RETURN /* Always add new error code before */
26 27 } mcs_return;
27 28

0 comments on commit 646bff3

Please sign in to comment.
Something went wrong with that request. Please try again.