Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Add "stats connections" to dump all connections

Change-Id: Ife8978790e9d5c50362afcf8c9edc5802117364f
  • Loading branch information...
commit 8c32bbf8e511421d487807b0f03a780eb70f7437 1 parent f99c147
Trond Norbye authored

Showing 1 changed file with 244 additions and 26 deletions. Show diff stats Hide diff stats

  1. 270  daemon/memcached.c
270  daemon/memcached.c
@@ -401,11 +401,6 @@ void safe_close(SOCKET sfd) {
401 401
     }
402 402
 }
403 403
 
404  
-/*
405  
- * Free list management for connections.
406  
- */
407  
-cache_t *conn_cache;      /* suffix cache */
408  
-
409 404
 /**
410 405
  * Reset all of the dynamic buffers used by a connection back to their
411 406
  * default sizes. The strategy for resizing the buffers is to allocate a
@@ -495,17 +490,14 @@ static bool conn_reset_buffersize(conn *c) {
495 490
  * all members and allocate the transfer buffers.
496 491
  *
497 492
  * @param buffer The memory allocated by the object cache
498  
- * @param unused1 not used
499  
- * @param unused2 not used
500 493
  * @return 0 on success, 1 if we failed to allocate memory
501 494
  */
502  
-static int conn_constructor(void *buffer, void *unused1, int unused2) {
503  
-    (void)unused1; (void)unused2;
504  
-
505  
-    conn *c = buffer;
  495
+static int conn_constructor(conn *c) {
506 496
     memset(c, 0, sizeof(*c));
507 497
     MEMCACHED_CONN_CREATE(c);
508 498
 
  499
+    c->state = conn_closing;
  500
+    c->sfd = INVALID_SOCKET;
509 501
     if (!conn_reset_buffersize(c)) {
510 502
         free(c->rbuf);
511 503
         free(c->wbuf);
@@ -530,11 +522,8 @@ static int conn_constructor(void *buffer, void *unused1, int unused2) {
530 522
  * Destructor for all connection objects. Release all allocated resources.
531 523
  *
532 524
  * @param buffer The memory allocated by the objec cache
533  
- * @param unused not used
534 525
  */
535  
-static void conn_destructor(void *buffer, void *unused) {
536  
-    (void)unused;
537  
-    conn *c = buffer;
  526
+static void conn_destructor(conn *c) {
538 527
     free(c->rbuf);
539 528
     free(c->wbuf);
540 529
     free(c->ilist);
@@ -547,11 +536,240 @@ static void conn_destructor(void *buffer, void *unused) {
547 536
     STATS_UNLOCK();
548 537
 }
549 538
 
  539
+/*
  540
+ * Free list management for connections.
  541
+ */
  542
+struct connections {
  543
+    conn* free;
  544
+    conn** all;
  545
+    pthread_mutex_t mutex;
  546
+    int next;
  547
+} connections = {
  548
+    .mutex = PTHREAD_MUTEX_INITIALIZER
  549
+};
  550
+
  551
+static void initialize_connections(void)
  552
+{
  553
+    connections.all = calloc(settings.maxconns, sizeof(conn*));
  554
+    if (connections.all == NULL) {
  555
+        settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
  556
+                                        "Failed to allocate memory for connections");
  557
+        exit(EX_OSERR);
  558
+    }
  559
+
  560
+    int preallocate = settings.maxconns / 2;
  561
+    if (preallocate < 1000) {
  562
+        preallocate = settings.maxconns;
  563
+    }
  564
+
  565
+    for (connections.next = 0; connections.next < preallocate; ++connections.next) {
  566
+        connections.all[connections.next] = malloc(sizeof(conn));
  567
+        if (conn_constructor(connections.all[connections.next]) != 0) {
  568
+            settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
  569
+                                            "Failed to allocate memory for connections");
  570
+            exit(EX_OSERR);
  571
+        }
  572
+        connections.all[connections.next]->next = connections.free;
  573
+        connections.free = connections.all[connections.next];
  574
+    }
  575
+}
  576
+
  577
+static conn *allocate_connection(void) {
  578
+    conn *ret;
  579
+
  580
+    pthread_mutex_lock(&connections.mutex);
  581
+    ret = connections.free;
  582
+    if (ret != NULL) {
  583
+        connections.free = connections.free->next;
  584
+        ret->next = NULL;
  585
+    }
  586
+    pthread_mutex_unlock(&connections.mutex);
  587
+
  588
+    if (ret == NULL) {
  589
+        ret = malloc(sizeof(conn));
  590
+        if (ret == NULL) {
  591
+            settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
  592
+                                            "Failed to allocate memory for connection");
  593
+            return NULL;
  594
+        }
  595
+
  596
+        if (conn_constructor(ret) != 0) {
  597
+            conn_destructor(ret);
  598
+            free(ret);
  599
+            settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
  600
+                                            "Failed to allocate memory for connection");
  601
+            return NULL;
  602
+        }
  603
+
  604
+        pthread_mutex_lock(&connections.mutex);
  605
+        if (connections.next == settings.maxconns) {
  606
+            free(ret);
  607
+            ret = NULL;
  608
+        } else {
  609
+            connections.all[connections.next++] = ret;
  610
+        }
  611
+        pthread_mutex_unlock(&connections.mutex);
  612
+    }
  613
+
  614
+    return ret;
  615
+}
  616
+
  617
+static void release_connection(conn *c) {
  618
+    c->sfd = INVALID_SOCKET;
  619
+    pthread_mutex_lock(&connections.mutex);
  620
+    c->next = connections.free;
  621
+    connections.free = c;
  622
+    pthread_mutex_unlock(&connections.mutex);
  623
+}
  624
+
  625
+static const char *substate_text(enum bin_substates state) {
  626
+    switch (state) {
  627
+    case bin_no_state: return "bin_no_state";
  628
+    case bin_reading_set_header: return "bin_reading_set_header";
  629
+    case bin_reading_cas_header: return "bin_reading_cas_header";
  630
+    case bin_read_set_value: return "bin_read_set_value";
  631
+    case bin_reading_get_key: return "bin_reading_get_key";
  632
+    case bin_reading_stat: return "bin_reading_stat";
  633
+    case bin_reading_del_header: return "bin_reading_del_header";
  634
+    case bin_reading_incr_header: return "bin_reading_incr_header";
  635
+    case bin_read_flush_exptime: return "bin_reading_flush_exptime";
  636
+    case bin_reading_sasl_auth: return "bin_reading_sasl_auth";
  637
+    case bin_reading_sasl_auth_data: return "bin_reading_sasl_auth_data";
  638
+    case bin_reading_packet: return "bin_reading_packet";
  639
+    default:
  640
+        return "illegal";
  641
+    }
  642
+}
  643
+
  644
+static const char *protocol_text(enum protocol protocol) {
  645
+    switch (protocol) {
  646
+    case ascii_prot: return "ascii";
  647
+    case binary_prot: return "binary";
  648
+    case negotiating_prot: return "negotiating";
  649
+    default:
  650
+        return "illegal";
  651
+    }
  652
+}
  653
+
  654
+static const char *transport_text(enum network_transport transport) {
  655
+    switch (transport) {
  656
+    case local_transport: return "unix sockets";
  657
+    case tcp_transport: return "TCP";
  658
+    case udp_transport: return "UDP";
  659
+    default:
  660
+        return "illegal";
  661
+    }
  662
+}
  663
+
  664
+static void add_connection_stats(ADD_STAT add_stats, conn *d, conn *c) {
  665
+    append_stat("conn", add_stats, d, "%p", c);
  666
+    if (c->sfd == INVALID_SOCKET) {
  667
+        append_stat("socket", add_stats, d, "disconnected");
  668
+    } else {
  669
+        append_stat("socket", add_stats, d, "%lu", (long)c->sfd);
  670
+        append_stat("protocol", add_stats, d, "%s", protocol_text(c->protocol));
  671
+        append_stat("transport", add_stats, d, "%s",
  672
+                    transport_text(c->transport));
  673
+        append_stat("nevents", add_stats, d, "%u", c->nevents);
  674
+        if (c->sasl_conn != NULL) {
  675
+            append_stat("sasl_conn", add_stats, d, "%p", c->sasl_conn);
  676
+        }
  677
+        append_stat("state", add_stats, d, "%s", state_text(c->state));
  678
+        if (c->protocol == binary_prot) {
  679
+            append_stat("substate", add_stats, d, "%s",
  680
+                        substate_text(c->substate));
  681
+        }
  682
+        append_stat("registered_in_libevent", add_stats, d, "%d",
  683
+                    (int)c->registered_in_libevent);
  684
+        append_stat("ev_flags", add_stats, d, "%x", c->ev_flags);
  685
+        append_stat("which", add_stats, d, "%x", c->which);
  686
+        append_stat("rbuf", add_stats, d, "%p", c->rbuf);
  687
+        append_stat("rcurr", add_stats, d, "%p", c->rcurr);
  688
+        append_stat("rsize", add_stats, d, "%u", c->rsize);
  689
+        append_stat("rbytes", add_stats, d, "%u", c->rbytes);
  690
+        append_stat("wbuf", add_stats, d, "%p", c->wbuf);
  691
+        append_stat("wcurr", add_stats, d, "%p", c->wcurr);
  692
+        append_stat("wsize", add_stats, d, "%u", c->wsize);
  693
+        append_stat("wbytes", add_stats, d, "%u", c->wbytes);
  694
+        append_stat("write_and_go", add_stats, d, "%p", c->write_and_go);
  695
+        append_stat("write_and_free", add_stats, d, "%p", c->write_and_free);
  696
+        append_stat("ritem", add_stats, d, "%p", c->ritem);
  697
+        append_stat("rlbytes", add_stats, d, "%u", c->rlbytes);
  698
+        append_stat("item", add_stats, d, "%p", c->item);
  699
+        append_stat("store_op", add_stats, d, "%u", c->store_op);
  700
+        append_stat("sbytes", add_stats, d, "%u", c->sbytes);
  701
+        append_stat("iov", add_stats, d, "%p", c->iov);
  702
+        append_stat("iovsize", add_stats, d, "%u", c->iovsize);
  703
+        append_stat("iovused", add_stats, d, "%u", c->iovused);
  704
+        append_stat("msglist", add_stats, d, "%p", c->msglist);
  705
+        append_stat("msgsize", add_stats, d, "%u", c->msgsize);
  706
+        append_stat("msgused", add_stats, d, "%u", c->msgused);
  707
+        append_stat("msgcurr", add_stats, d, "%u", c->msgcurr);
  708
+        append_stat("msgbytes", add_stats, d, "%u", c->msgbytes);
  709
+        append_stat("ilist", add_stats, d, "%p", c->ilist);
  710
+        append_stat("isize", add_stats, d, "%u", c->isize);
  711
+        append_stat("icurr", add_stats, d, "%p", c->icurr);
  712
+        append_stat("ileft", add_stats, d, "%u", c->ileft);
  713
+        append_stat("suffixlist", add_stats, d, "%p", c->suffixlist);
  714
+        append_stat("suffixsize", add_stats, d, "%u", c->suffixsize);
  715
+        append_stat("suffixcurr", add_stats, d, "%p", c->suffixcurr);
  716
+        append_stat("suffixleft", add_stats, d, "%u", c->suffixleft);
  717
+
  718
+        if (c->transport == udp_transport) {
  719
+            // @todo we should dump the packet header
  720
+            append_stat("request_id", add_stats, d, "%u", c->request_id);
  721
+            append_stat("hdrbuf", add_stats, d, "%p", c->hdrbuf);
  722
+            append_stat("hdrsize", add_stats, d, "%d", c->hdrsize);
  723
+        }
  724
+
  725
+        append_stat("noreply", add_stats, d, "%d", c->noreply);
  726
+        append_stat("refcount", add_stats, d, "%u", (int)c->refcount);
  727
+        append_stat("dynamic_buffer.buffer", add_stats, d, "%p",
  728
+                    c->dynamic_buffer.buffer);
  729
+        append_stat("dynamic_buffer.size", add_stats, d, "%zu",
  730
+                    c->dynamic_buffer.size);
  731
+        append_stat("dynamic_buffer.offset", add_stats, d, "%zu",
  732
+                    c->dynamic_buffer.offset);
  733
+        append_stat("engine_storage", add_stats, d, "%p", c->engine_storage);
  734
+        if (c->protocol == ascii_prot) {
  735
+            append_stat("ascii_cmd", add_stats, d, "%p", c->ascii_cmd);
  736
+        } else if (c->protocol == binary_prot) {
  737
+            // @todo we should decode the binary header
  738
+            append_stat("cas", add_stats, d, "%"PRIu64, c->cas);
  739
+            append_stat("cmd", add_stats, d, "%u", c->cmd);
  740
+            append_stat("opaque", add_stats, d, "%u", c->opaque);
  741
+            append_stat("keylen", add_stats, d, "%u", c->keylen);
  742
+        }
  743
+        append_stat("list_state", add_stats, d, "%u", c->list_state);
  744
+        append_stat("next", add_stats, d, "%p", c->next);
  745
+        append_stat("thread", add_stats, d, "%p", c->thread);
  746
+        append_stat("aiostat", add_stats, d, "%u", c->aiostat);
  747
+        append_stat("ewouldblock", add_stats, d, "%u", c->ewouldblock);
  748
+        append_stat("tap_nack_mode", add_stats, d, "%u", c->tap_nack_mode);
  749
+        append_stat("tap_iterator", add_stats, d, "%p", c->tap_iterator);
  750
+    }
  751
+}
  752
+
  753
+/**
  754
+ * Do a full stats of all of the connections.
  755
+ * Do _NOT_ try to follow _ANY_ of the pointers in the conn structure
  756
+ * because we read all of the values _DIRTY_. We preallocated the array
  757
+ * of all of the connection pointers during startup, so we _KNOW_ that
  758
+ * we can iterate through all of them. All of the conn structs will
  759
+ * only appear in the connections.all array when we've allocated them,
  760
+ * and we don't release them so it's safe to look at them.
  761
+ */
  762
+static void connection_stats(ADD_STAT add_stats, conn *c) {
  763
+    for (int ii = 0; ii < settings.maxconns && connections.all[ii]; ++ii) {
  764
+        add_connection_stats(add_stats, c, connections.all[ii]);
  765
+    }
  766
+}
  767
+
550 768
 conn *conn_new(const SOCKET sfd, STATE_FUNC init_state,
551 769
                const int event_flags,
552 770
                const int read_buffer_size, enum network_transport transport,
553 771
                struct event_base *base, struct timeval *timeout) {
554  
-    conn *c = cache_alloc(conn_cache);
  772
+    conn *c = allocate_connection();
555 773
     if (c == NULL) {
556 774
         return NULL;
557 775
     }
@@ -566,7 +784,7 @@ conn *conn_new(const SOCKET sfd, STATE_FUNC init_state,
566 784
             c->rbuf = mem;
567 785
         } else {
568 786
             assert(c->thread == NULL);
569  
-            cache_free(conn_cache, c);
  787
+            release_connection(c);
570 788
             return NULL;
571 789
         }
572 790
     }
@@ -637,7 +855,7 @@ conn *conn_new(const SOCKET sfd, STATE_FUNC init_state,
637 855
 
638 856
     if (!register_event(c, timeout)) {
639 857
         assert(c->thread == NULL);
640  
-        cache_free(conn_cache, c);
  858
+        release_connection(c);
641 859
         return NULL;
642 860
     }
643 861
 
@@ -723,7 +941,7 @@ void conn_close(conn *c) {
723 941
      */
724 942
     conn_reset_buffersize(c);
725 943
     assert(c->thread == NULL);
726  
-    cache_free(conn_cache, c);
  944
+    release_connection(c);
727 945
 }
728 946
 
729 947
 /*
@@ -1945,6 +2163,8 @@ static void process_bin_stat(conn *c) {
1945 2163
             }
1946 2164
         } else if (strncmp(subcommand, "aggregate", 9) == 0) {
1947 2165
             server_stats(&append_stats, c, true);
  2166
+        } else if (strncmp(subcommand, "connections", 11) == 0) {
  2167
+            connection_stats(&append_stats, c);
1948 2168
         } else if (strncmp(subcommand, "topkeys", 7) == 0) {
1949 2169
             topkeys_t *tk = get_independent_stats(c)->topkeys;
1950 2170
             if (tk != NULL) {
@@ -3893,6 +4113,8 @@ static char *process_stat(conn *c, token_t *tokens, const size_t ntokens) {
3893 4113
         return NULL;
3894 4114
     } else if (strcmp(subcommand, "aggregate") == 0) {
3895 4115
         server_stats(&append_stats, c, true);
  4116
+    } else if (strncmp(subcommand, "connections", 11) == 0) {
  4117
+        connection_stats(&append_stats, c);
3896 4118
     } else if (strcmp(subcommand, "topkeys") == 0) {
3897 4119
         topkeys_t *tk = get_independent_stats(c)->topkeys;
3898 4120
         if (tk != NULL) {
@@ -7280,6 +7502,9 @@ int main (int argc, char **argv) {
7280 7502
         exit(EX_USAGE);
7281 7503
     }
7282 7504
 
  7505
+    /* allocate the connection array */
  7506
+    initialize_connections();
  7507
+
7283 7508
     /* lose root privileges if we have them */
7284 7509
     if (getuid() == 0 || geteuid() == 0) {
7285 7510
         if (username == 0 || *username == '\0') {
@@ -7359,13 +7584,6 @@ int main (int argc, char **argv) {
7359 7584
     /* initialize other stuff */
7360 7585
     stats_init();
7361 7586
 
7362  
-    if (!(conn_cache = cache_create("conn", sizeof(conn), sizeof(void*),
7363  
-                                    conn_constructor, conn_destructor))) {
7364  
-        settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7365  
-                "Failed to create connection cache\n");
7366  
-        exit(EXIT_FAILURE);
7367  
-    }
7368  
-
7369 7587
     default_independent_stats = new_independent_stats();
7370 7588
 
7371 7589
 #ifndef __WIN32__

0 notes on commit 8c32bbf

Please sign in to comment.
Something went wrong with that request. Please try again.