Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: tmm1/haproxy
base: master
...
head fork: tmm1/haproxy
compare: session-events-filter
Checking mergeability… Don't worry, you can still create the pull request.
  • 19 commits
  • 5 files changed
  • 0 commit comments
  • 1 contributor
Commits on Mar 23, 2012
@tmm1 MEDIUM: stats: Add `show events` command.
Tracing TCP connections that make hops through haproxy is currently very
challenging. To get a list of proxied connection pairs inside haproxy,
one must use `show sess` to dump the session table, grab the fd pairs
for each session, resolve those to inodes via the process's file
descriptor table, and then resolve those inodes to addresses via the TCP
connection table. This is quite cumbersome and slow (especially when the
TCP connection table is huge), and does not scale when there are
hundreds or thousands of connections happening per second.

This patch adds a new `show events` command to the stats socket, which
streams events when sessions are established or destroyed. This allows
any interested party to subscribe to these events and maintain their
own session table about the state inside haproxy. This data can then be
used to augment data collected from other sources (like pcap), to follow
a connection through all the hops it makes.
f9b431a
Commits on Mar 24, 2012
@tmm1 fix sprintf usage (no overlapping buffers) 04394e2
@tmm1 restrict session end events to established sessions only 02b31f1
@tmm1 require both sides were connected for end events 0a7fb0f
@tmm1 fix uninitialized variable warnings 36337f2
@tmm1 check DONT_WAKE flag just in case 2b3c802
@tmm1 typo c9ab0c7
@tmm1 add comment for session type entry a13bde0
@tmm1 shuffle around event handlers ec38225
@tmm1 safer stats_event_listener_remove 74ccd3f
@tmm1 avoid calling event handlers when there are no listeners f96ad3d
@tmm1 move code around and add comments ec760d8
@tmm1 better comments 23837c1
@tmm1 event streaming requires operator or admin 7e1dd35
@tmm1 extract duplicated listener messaging loop 6d79ec4
@tmm1 commas are hard b878f88
@tmm1 oops, use the argument c484c32
@tmm1 document fields dbf70fd
Commits on Mar 25, 2012
@tmm1 allow filtering events by frontend or backend name f0ec00d
View
22 doc/configuration.txt
@@ -8526,6 +8526,28 @@ show errors [<iid>]
is the slash ('/') in header name "header/bizarre", which is not a valid
HTTP character for a header name.
+show events [<name>]
+
+ Dump a stream of events about sessions as they are added and removed.
+ The two possible event formats are:
+
+ "+ <session_id> <inbound_peer> <inbound_sock> <outbound_sock> <outbound_peer>\n"
+ "- <session_id>\n"
+
+ Streaming will continue until a new command is received or the
+ connection is closed. If <name> is specified, the limit the dump to events
+ concerning either frontend or backend whose NAME is <name>.
+
+ This command is restricted and can only be issued on sockets configured
+ for levels "operator" or "admin".
+
+ Example:
+ >>> $ echo "show events" | socat stdio,ignoreeof /tmp/sock1
+ + 1 127.0.0.1:50869 127.0.0.1:9418 127.0.0.1:50870 127.0.0.1:6000
+ - 1
+ + 2 127.0.0.1:50874 127.0.0.1:9418 127.0.0.1:50875 127.0.0.1:6000
+ - 2
+
show info
Dump info about haproxy status on current process.
View
5 include/proto/dumpstats.h
@@ -53,6 +53,8 @@
#define STAT_CLI_O_SESS 6 /* dump sessions */
#define STAT_CLI_O_ERR 7 /* dump errors */
+#define STAT_CLI_EVENTS 8 /* event stream */
+
/* status codes (strictly 4 chars) used in the URL to display a message */
#define STAT_STATUS_UNKN "UNKN" /* an unknown error occured, shouldn't happen */
#define STAT_STATUS_DONE "DONE" /* the action is successful */
@@ -60,9 +62,12 @@
#define STAT_STATUS_EXCD "EXCD" /* an error occured becayse the buffer couldn't store all data */
#define STAT_STATUS_DENY "DENY" /* action denied */
+extern int stats_event_enabled;
int stats_sock_parse_request(struct stream_interface *si, char *line);
void stats_io_handler(struct stream_interface *si);
+void stats_event_new_session(struct session *s);
+void stats_event_end_session(struct session *s);
int stats_dump_raw_to_buffer(struct session *s, struct buffer *rep);
int stats_dump_http(struct session *s, struct buffer *rep, struct uri_auth *uri);
int stats_dump_proxy(struct session *s, struct proxy *px, struct uri_auth *uri);
View
4 include/types/session.h
@@ -231,6 +231,10 @@ struct session {
int bol; /* pointer to beginning of current line */
} errors;
struct {
+ struct list list; /* list of stats sessions in the STAT_CLI_EVENTS state */
+ char *id; /* if not NULL, id of the be/fe proxy to filter on */
+ } events;
+ struct {
const char *msg; /* pointer to a persistent message to be returned in PRINT state */
} cli;
} data_ctx; /* used by stats I/O handlers to dump the stats */
View
162 src/dumpstats.c
@@ -62,6 +62,7 @@ const char stats_sock_usage_msg[] =
" show stat : report counters for each proxy and server\n"
" show errors : report last request and response errors for each proxy\n"
" show sess [id] : report the list of current sessions or dump this session\n"
+ " show events : stream events about proxied sessions\n"
" get weight : report a server's current weight\n"
" set weight : change a server's weight\n"
" set timeout : change a timeout setting\n"
@@ -73,6 +74,75 @@ const char stats_permission_denied_msg[] =
"Permission denied\n"
"";
+/* Keep track of sessions that want streaming events (STAT_CLI_EVENT).
+ */
+int stats_event_enabled = 0;
+static struct list stats_event_listeners = LIST_HEAD_INIT(stats_event_listeners);
+
+/* Add a session to the list of event listeners.
+ */
+static inline void stats_event_listener_add(struct session *s)
+{
+ LIST_ADDQ(&stats_event_listeners, &s->data_ctx.events.list);
+ stats_event_enabled = 1;
+}
+
+/* Remove a session from the list of listeners, but only if it is a
+ * registered listener. This enables us to invoke the method on all
+ * disconnecting stats sockets to ensure they are cleaned up, regardless
+ * of how many times they switch between streaming and other commands.
+ */
+static inline void stats_event_listener_remove(struct session *s)
+{
+ int found = 0;
+ struct session *curr;
+ list_for_each_entry(curr, &stats_event_listeners, data_ctx.events.list) {
+ if (curr == s) {
+ found = 1;
+ break;
+ }
+ }
+
+ if (found) {
+ if (s->data_ctx.events.id)
+ free(s->data_ctx.events.id);
+ s->data_ctx.events.id = NULL;
+ LIST_DEL(&s->data_ctx.events.list);
+ }
+
+ if (LIST_ISEMPTY(&stats_event_listeners))
+ stats_event_enabled = 0;
+
+ /* Re-initialize stats output */
+ memset(&s->data_ctx.stats, 0, sizeof(s->data_ctx.stats));
+}
+
+/* Send a message to all registered event listeners.
+ */
+static inline void stats_event_listener_message_all(char *msg, struct session *s)
+{
+ struct session *curr;
+
+ list_for_each_entry(curr, &stats_event_listeners, data_ctx.events.list) {
+ struct stream_interface *si = &curr->si[1];
+ char *px_id;
+
+ if (!(si->flags & SI_FL_DONT_WAKE) && si->owner) {
+ /* filter by proxy if required */
+ if ((px_id = curr->data_ctx.events.id)) {
+ if (strcmp(s->fe->id, px_id) != 0 ||
+ ((s->be->cap & PR_CAP_BE) && strcmp(s->be->id, px_id) != 0))
+ continue;
+ }
+
+ if (buffer_feed(si->ib, msg) == -1) {
+ si->ib->flags |= BF_SEND_DONTWAIT;
+ task_wakeup(si->owner, TASK_WOKEN_MSG);
+ }
+ }
+ }
+}
+
/* allocate a new stats frontend named <name>, and return it
* (or NULL in case of lack of memory).
*/
@@ -358,7 +428,20 @@ int stats_sock_parse_request(struct stream_interface *si, char *line)
s->data_state = DATA_ST_INIT;
si->st0 = STAT_CLI_O_ERR; // stats_dump_errors_to_buffer
}
- else { /* neither "stat" nor "info" nor "sess" nor "errors"*/
+ else if (strcmp(args[1], "events") == 0) {
+ if (s->listener->perm.ux.level < ACCESS_LVL_OPER) {
+ s->data_ctx.cli.msg = stats_permission_denied_msg;
+ si->st0 = STAT_CLI_PRINT;
+ return 1;
+ }
+ if (*args[2])
+ s->data_ctx.events.id = strdup(args[2]);
+ else
+ s->data_ctx.events.id = NULL;
+ stats_event_listener_add(s);
+ si->st0 = STAT_CLI_EVENTS;
+ }
+ else { /* neither "stat" nor "info" nor "sess" nor "errors" nor "events" */
return 0;
}
}
@@ -714,7 +797,7 @@ void stats_io_handler(struct stream_interface *si)
si->shutw(si);
break;
}
- else if (si->st0 == STAT_CLI_GETREQ) {
+ else if (si->st0 == STAT_CLI_GETREQ || si->st0 == STAT_CLI_EVENTS) {
/* ensure we have some output room left in the event we
* would want to return some info right after parsing.
*/
@@ -754,7 +837,11 @@ void stats_io_handler(struct stream_interface *si)
trash[len] = '\0';
+ if (si->st0 == STAT_CLI_EVENTS)
+ stats_event_listener_remove(s);
+
si->st0 = STAT_CLI_PROMPT;
+
if (len) {
if (strcmp(trash, "quit") == 0) {
si->st0 = STAT_CLI_END;
@@ -840,7 +927,7 @@ void stats_io_handler(struct stream_interface *si)
if ((res->flags & BF_SHUTR) && (si->state == SI_ST_EST) && (si->st0 != STAT_CLI_GETREQ)) {
DPRINTF(stderr, "%s@%d: si to buf closed. req=%08x, res=%08x, st=%d\n",
__FUNCTION__, __LINE__, req->flags, res->flags, si->state);
- /* Other size has closed, let's abort if we have no more processing to do
+ /* Other side has closed, let's abort if we have no more processing to do
* and nothing more to consume. This is comparable to a broken pipe, so
* we forward the close to the request side so that it flows upstream to
* the client.
@@ -866,12 +953,17 @@ void stats_io_handler(struct stream_interface *si)
si->ib->rex = TICK_ETERNITY;
si->ob->wex = TICK_ETERNITY;
+ /* no timeouts when streaming events */
+ if (si->st0 == STAT_CLI_EVENTS)
+ s->req->rex = TICK_ETERNITY;
+
out:
DPRINTF(stderr, "%s@%d: st=%d, rqf=%x, rpf=%x, rql=%d, rqs=%d, rl=%d, rs=%d\n",
__FUNCTION__, __LINE__,
si->state, req->flags, res->flags, req->l, req->send_max, res->l, res->send_max);
if (unlikely(si->state == SI_ST_DIS || si->state == SI_ST_CLO)) {
+ stats_event_listener_remove(s);
/* check that we have released everything then unregister */
stream_int_unregister_handler(si);
}
@@ -3130,6 +3222,70 @@ int stats_dump_errors_to_buffer(struct session *s, struct buffer *rep)
return 1;
}
+/* Called whenever a new session is successfully established (reaches
+ * SI_ST_EST). If there are any stats sockets listening in the
+ * STAT_CLI_EVENTS state, they will be notified of this session's unique
+ * id, along with the sockname and peername of both sides of the session.
+ */
+void stats_event_new_session(struct session *s)
+{
+ if (LIST_ISEMPTY(&stats_event_listeners))
+ return;
+
+ char addrs[4][INET6_ADDRSTRLEN + strlen(":65535") + 1] = {"","","",""};
+ int i;
+
+ for(i = 0; i < 4; i++) {
+ struct sockaddr_storage sock;
+ socklen_t addr_size = sizeof(sock);
+ const void *sin_addr = NULL;
+ int port = 0;
+ int fd = s->si[ i/2 ].fd;
+
+ if (!(i%2==0 ? getpeername : getsockname)(fd, (struct sockaddr *)&sock, &addr_size)) {
+ switch (sock.ss_family) {
+ case AF_INET:
+ sin_addr = (const void *)&((struct sockaddr_in *)&sock)->sin_addr;
+ port = ntohs(((struct sockaddr_in *)&sock)->sin_port);
+ break;
+ case AF_INET6:
+ sin_addr = (const void *)&((struct sockaddr_in6 *)&sock)->sin6_addr;
+ port = ntohs(((struct sockaddr_in6 *)&sock)->sin6_port);
+ break;
+ }
+ switch (sock.ss_family) {
+ case AF_INET:
+ case AF_INET6:
+ inet_ntop(sock.ss_family, sin_addr, addrs[i], sizeof(addrs[i]));
+ snprintf(addrs[i]+strlen(addrs[i]), sizeof(addrs[i])-strlen(addrs[i])-1, ":%d", port);
+ break;
+ case AF_UNIX:
+ default:
+ sprintf(addrs[i], "%s", "unknown");
+ }
+ }
+ }
+
+ snprintf(trash, sizeof(trash), "+ %u %s %s %s %s\n", s->uniq_id,
+ addrs[0], // inbound peer
+ addrs[1], // inbound sock
+ addrs[3], // outbound sock
+ addrs[2] // outbound peer
+ );
+ stats_event_listener_message_all(trash, s);
+}
+
+/* Called when the session argument's s->si[1]->state goes from SI_ST_EST
+ * to SI_ST_CLO. All stats listeners are notified of this destroy event.
+ */
+void stats_event_end_session(struct session *s)
+{
+ if (LIST_ISEMPTY(&stats_event_listeners))
+ return;
+
+ snprintf(trash, sizeof(trash), "- %u\n", s->uniq_id);
+ stats_event_listener_message_all(trash, s);
+}
static struct cfg_kw_list cfg_kws = {{ },{
{ CFG_GLOBAL, "stats", stats_parse_global },
View
9 src/session.c
@@ -350,6 +350,9 @@ void sess_establish(struct session *s, struct stream_interface *si)
rep->analysers |= s->fe->fe_rsp_ana | s->be->be_rsp_ana;
rep->flags |= BF_READ_ATTACHED; /* producer is now attached */
req->wex = TICK_ETERNITY;
+
+ if (unlikely(stats_event_enabled))
+ stats_event_new_session(s);
}
/* Update stream interface status for input states SI_ST_ASS, SI_ST_QUE, SI_ST_TAR.
@@ -1655,6 +1658,12 @@ struct task *process_session(struct task *t)
s->do_log(s);
}
+ if (unlikely(stats_event_enabled)) {
+ if (s->si[1].state == SI_ST_CLO &&
+ s->si[1].prev_state == SI_ST_EST)
+ stats_event_end_session(s);
+ }
+
/* the task MUST not be in the run queue anymore */
session_free(s);
task_delete(t);

No commit comments for this range

Something went wrong with that request. Please try again.