Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: tmm1/haproxy-dev
base: master
...
head fork: tmm1/haproxy-dev
compare: session-events
Checking mergeability… Don’t worry, you can still create the pull request.
  • 4 commits
  • 9 files changed
  • 0 commit comments
  • 1 contributor
View
22 doc/configuration.txt
@@ -10031,6 +10031,28 @@ shutdown sessions <backend>/<server>
maintenance mode, for instance. Such terminated sessions are reported with a
'K' flag in the logs.
+debug sess [proxy:<proxy_name>[:<server_name>]]
+
+ Dump a stream of events about sessions as they are added and removed.
+ The possible event formats are "Forward" and "Close":
+
+ "F <session_id> <in_peer> - <in_sock> | <out_sock> - <out_peer>\n"
+ "C <session_id>\n"
+
+ Streaming will continue until a new command is received or the
+ connection is closed. If <proxy_name> or <server_name> is specified, limit to
+ events concerning only the proxy and server specified.
+
+ This command is restricted and can only be issued on sockets configured
+ for levels "operator" or "admin".
+
+ Example:
+ >>> $ echo "set timeout cli 3600; debug sess" | socat stdio,ignoreeof /tmp/sock1
+ F 1 127.0.0.1:50869 - 127.0.0.1:9418 | 127.0.0.1:50870 - 127.0.0.1:6000
+ C 1
+ F 2 127.0.0.1:50874 - 127.0.0.1:9418 | 127.0.0.1:50875 - 127.0.0.1:6000
+ C 2
+
/*
* Local variables:
* fill-column: 79
View
5 include/proto/dumpstats.h
@@ -55,6 +55,8 @@
#define STAT_CLI_O_TAB 8 /* dump tables */
#define STAT_CLI_O_CLR 9 /* clear tables */
+#define STAT_CLI_EVENTS 8 /* event stream */
+
/* status codes (strictly 4 chars) used in the URL to display a message */
#define STAT_STATUS_UNKN "UNKN" /* an unknown error occured, shouldn't happen */
#define STAT_STATUS_DONE "DONE" /* the action is successful */
@@ -63,8 +65,11 @@
#define STAT_STATUS_DENY "DENY" /* action denied */
extern struct si_applet http_stats_applet;
+extern int stats_event_enabled;
void stats_io_handler(struct stream_interface *si);
+void stats_event_new_session(struct session *s);
+void stats_event_end_session(struct session *s);
#endif /* _PROTO_DUMPSTATS_H */
View
8 include/types/stream_interface.h
@@ -73,6 +73,8 @@ enum {
SI_FL_INDEP_STR = 0x0040, /* independant streams = don't update rex on write */
SI_FL_NOLINGER = 0x0080, /* may close without lingering. One-shot. */
SI_FL_SRC_ADDR = 0x1000, /* get the source ip/port with getsockname */
+ SI_FL_TO_SET = 0x2000, /* addr.to is set */
+ SI_FL_FROM_SET = 0x4000, /* addr.from is set */
};
/* target types */
@@ -167,6 +169,11 @@ struct stream_interface {
int bol; /* pointer to beginning of current line */
} errors;
struct {
+ struct list list; /* list of stats streams in the STAT_CLI_EVENTS state */
+ struct proxy *px; /* if not NULL, only send events associated with this proxy */
+ struct server *srv; /* if not NULL, only send events associated with this server */
+ } events;
+ struct {
void *target; /* table we want to dump, or NULL for all */
struct proxy *proxy; /* table being currently dumped (first if NULL) */
struct stksess *entry; /* last entry we were trying to dump (or first if NULL) */
@@ -189,6 +196,7 @@ struct stream_interface {
struct si_applet {
char *name; /* applet's name to report in logs */
void (*fct)(struct stream_interface *); /* internal I/O handler, may never be NULL */
+ void (*release)(struct stream_interface *); /* callback to release resources, may be NULL */
};
#endif /* _TYPES_STREAM_INTERFACE_H */
View
246 src/dumpstats.c
@@ -86,6 +86,7 @@ static const char stats_sock_usage_msg[] =
" disable : put a server or frontend in maintenance mode\n"
" enable : re-enable a server or frontend which is in maintenance mode\n"
" shutdown : kill a session or a frontend (eg:to release listening ports)\n"
+ " debug sess : stream events about proxied sessions\n"
"";
static const char stats_permission_denied_msg[] =
@@ -114,6 +115,77 @@ enum {
STAT_PX_ST_FIN,
};
+/* Keep track of sessions that want streaming events (STAT_CLI_EVENT).
+ */
+int stats_event_enabled = 0;
+static struct list stats_event_listeners = LIST_HEAD_INIT(stats_event_listeners);
+
+/* Add a session to the list of event listeners.
+ */
+static inline void stats_event_listener_add(struct stream_interface *si)
+{
+ LIST_ADDQ(&stats_event_listeners, &si->applet.ctx.events.list);
+ stats_event_enabled = 1;
+}
+
+/* Remove a session from the list of listeners, but only if it is a
+ * registered listener. This enables us to invoke the method on all
+ * disconnecting stats sockets to ensure they are cleaned up, regardless
+ * of how many times they switch between streaming and other commands.
+ */
+static inline void stats_event_listener_remove(struct stream_interface *si)
+{
+ int found = 0;
+ struct stream_interface *curr;
+ list_for_each_entry(curr, &stats_event_listeners, applet.ctx.events.list) {
+ if (curr == si) {
+ found = 1;
+ break;
+ }
+ }
+
+ if (found) {
+ si->applet.ctx.events.px = NULL;
+ si->applet.ctx.events.srv = NULL;
+ LIST_DEL(&si->applet.ctx.events.list);
+ }
+
+ if (LIST_ISEMPTY(&stats_event_listeners))
+ stats_event_enabled = 0;
+
+ /* Re-initialize stats output */
+ memset(&si->applet.ctx.stats, 0, sizeof(si->applet.ctx.stats));
+}
+
+/* Send a message to all registered event listeners.
+ */
+static inline void stats_event_listener_message_all(char *msg, struct session *s)
+{
+ struct stream_interface *curr;
+
+ list_for_each_entry(curr, &stats_event_listeners, applet.ctx.events.list) {
+ struct proxy *px;
+ struct server *srv;
+
+ if (!(curr->flags & SI_FL_DONT_WAKE) && curr->owner) {
+ /* filter by proxy and server if required */
+ if ((px = curr->applet.ctx.events.px)) {
+ if (s->be != px && s->fe != px)
+ continue; /* ignore */
+ if ((srv = curr->applet.ctx.events.srv)) {
+ if (target_srv(&s->target) != srv)
+ continue; /* ignore */
+ }
+ }
+
+ if (buffer_feed(curr->ib, msg) == -1) {
+ curr->ib->flags |= BF_SEND_DONTWAIT;
+ task_wakeup(curr->owner, TASK_WOKEN_MSG);
+ }
+ }
+ }
+}
+
/* This function is called from the session-level accept() in order to instanciate
* a new stats socket. It returns a positive value upon success, 0 if the connection
* needs to be closed and ignored, or a negative value upon critical failure.
@@ -772,7 +844,54 @@ static int stats_sock_parse_request(struct stream_interface *si, char *line)
args[arg] = line;
si->applet.ctx.stats.flags = 0;
- if (strcmp(args[0], "show") == 0) {
+ if (strcmp(args[0], "debug") == 0) {
+ if (strcmp(args[1], "sess") == 0) {
+ if (s->listener->perm.ux.level < ACCESS_LVL_OPER) {
+ si->applet.ctx.cli.msg = stats_permission_denied_msg;
+ si->applet.st0 = STAT_CLI_PRINT;
+ return 1;
+ }
+ if (*args[2] && !strncmp(args[2], "proxy", 5)) {
+ struct proxy *px = NULL;
+ struct server *srv = NULL;
+ char *px_name = args[2] + 6, *srv_name;
+
+ if ((srv_name = strchr(px_name, ':'))) {
+ *srv_name = 0;
+ srv_name += 1;
+ }
+
+ px = findproxy(px_name, PR_CAP_FE|PR_CAP_BE);
+ if (!px) {
+ si->applet.ctx.cli.msg = "Invalid proxy filter for event stream.";
+ si->applet.st0 = STAT_CLI_PRINT;
+ return 1;
+ }
+
+ if (srv_name && *srv_name) {
+ srv = findserver(px, srv_name);
+ if (!srv) {
+ si->applet.ctx.cli.msg = "Invalid server filter for event stream.";
+ si->applet.st0 = STAT_CLI_PRINT;
+ return 1;
+ }
+ }
+
+ si->applet.ctx.events.srv = srv;
+ si->applet.ctx.events.px = px;
+ } else {
+ si->applet.ctx.events.srv = NULL;
+ si->applet.ctx.events.px = NULL;
+ }
+
+ stats_event_listener_add(si);
+ si->applet.st0 = STAT_CLI_EVENTS;
+ }
+ else { /* not "sess" */
+ return 0;
+ }
+ }
+ else if (strcmp(args[0], "show") == 0) {
if (strcmp(args[1], "stat") == 0) {
if (*args[2] && *args[3] && *args[4]) {
si->applet.ctx.stats.flags |= STAT_BOUND;
@@ -824,7 +943,7 @@ static int stats_sock_parse_request(struct stream_interface *si, char *line)
else if (strcmp(args[1], "table") == 0) {
stats_sock_table_request(si, args, true);
}
- else { /* neither "stat" nor "info" nor "sess" nor "errors" no "table" */
+ else { /* neither "stat" nor "info" nor "sess" nor "errors" nor "table" */
return 0;
}
}
@@ -1346,6 +1465,14 @@ static int stats_sock_parse_request(struct stream_interface *si, char *line)
return 1;
}
+/* Callback to release a cli session.
+ */
+static void cli_session_release(struct stream_interface *si)
+{
+ /* remove if registered as event listener */
+ stats_event_listener_remove(si);
+}
+
/* This I/O handler runs as an applet embedded in a stream interface. It is
* used to processes I/O from/to the stats unix socket. The system relies on a
* state machine handling requests and various responses. We read a request,
@@ -1377,7 +1504,7 @@ static void cli_io_handler(struct stream_interface *si)
si->shutw(si);
break;
}
- else if (si->applet.st0 == STAT_CLI_GETREQ) {
+ else if (si->applet.st0 == STAT_CLI_GETREQ || si->applet.st0 == STAT_CLI_EVENTS) {
/* ensure we have some output room left in the event we
* would want to return some info right after parsing.
*/
@@ -1417,7 +1544,10 @@ static void cli_io_handler(struct stream_interface *si)
trash[len] = '\0';
+ if (si->applet.st0 == STAT_CLI_EVENTS)
+ stats_event_listener_remove(si);
si->applet.st0 = STAT_CLI_PROMPT;
+
if (len) {
if (strcmp(trash, "quit") == 0) {
si->applet.st0 = STAT_CLI_END;
@@ -1511,7 +1641,7 @@ static void cli_io_handler(struct stream_interface *si)
if ((res->flags & BF_SHUTR) && (si->state == SI_ST_EST) && (si->applet.st0 != STAT_CLI_GETREQ)) {
DPRINTF(stderr, "%s@%d: si to buf closed. req=%08x, res=%08x, st=%d\n",
__FUNCTION__, __LINE__, req->flags, res->flags, si->state);
- /* Other size has closed, let's abort if we have no more processing to do
+ /* Other side has closed, let's abort if we have no more processing to do
* and nothing more to consume. This is comparable to a broken pipe, so
* we forward the close to the request side so that it flows upstream to
* the client.
@@ -3725,6 +3855,112 @@ static int stats_table_request(struct stream_interface *si, bool show)
return 1;
}
+/* Called whenever a new session is successfully established (reaches
+ * SI_ST_EST). If there are any stats sockets listening in the
+ * STAT_CLI_EVENTS state, they will be notified of this session's unique
+ * id, along with the sockname and peername of both sides of the session.
+ */
+void stats_event_new_session(struct session *s)
+{
+ if (LIST_ISEMPTY(&stats_event_listeners))
+ return;
+
+ char addrs[4][INET6_ADDRSTRLEN + strlen(":65535") + 1] = {"","","",""};
+ int i;
+
+ struct stream_interface *si0 = &s->si[0], *si1 = &s->si[1];
+ socklen_t namelen;
+
+ /* si0 from/to = peer/sock */
+ if (!(si0->flags & SI_FL_FROM_SET)) {
+ namelen = sizeof(si0->addr.from);
+ getpeername(si0->fd, (struct sockaddr *)&si0->addr.from, &namelen);
+ si0->flags |= SI_FL_FROM_SET;
+ }
+ if (!(si0->flags & SI_FL_TO_SET)) {
+ namelen = sizeof(si0->addr.to);
+ getsockname(si0->fd, (struct sockaddr *)&si0->addr.to, &namelen);
+ si0->flags |= SI_FL_TO_SET;
+ }
+
+ /* si1 from/to = sock/peer (reversed) */
+ if (!(si1->flags & SI_FL_FROM_SET)) {
+ namelen = sizeof(si1->addr.from);
+ getsockname(si1->fd, (struct sockaddr *)&si1->addr.from, &namelen);
+ si1->flags |= SI_FL_FROM_SET;
+ }
+ if (!(si1->flags & SI_FL_TO_SET)) {
+ namelen = sizeof(si1->addr.to);
+ getpeername(si1->fd, (struct sockaddr *)&si1->addr.to, &namelen);
+ si1->flags |= SI_FL_TO_SET;
+ }
+
+ for(i = 0; i < 4; i++) {
+ struct sockaddr_storage *sock;
+ const void *sin_addr = NULL;
+ int port = 0;
+
+ switch (i) {
+ case 0: // inbound peer
+ sock = &si0->addr.from;
+ break;
+ case 1: // inbound sock
+ sock = &si0->addr.to;
+ break;
+ case 2: // outbound sock
+ sock = &si1->addr.from;
+ break;
+ case 3: // outbound peer
+ sock = &si1->addr.to;
+ break;
+ }
+
+ switch (sock->ss_family) {
+ case AF_INET:
+ sin_addr = (const void *)&((struct sockaddr_in *)sock)->sin_addr;
+ port = ntohs(((struct sockaddr_in *)sock)->sin_port);
+ break;
+ case AF_INET6:
+ sin_addr = (const void *)&((struct sockaddr_in6 *)sock)->sin6_addr;
+ port = ntohs(((struct sockaddr_in6 *)sock)->sin6_port);
+ break;
+ }
+
+ switch (sock->ss_family) {
+ case AF_INET:
+ case AF_INET6:
+ inet_ntop(sock->ss_family, sin_addr, addrs[i], sizeof(addrs[i]));
+ snprintf(addrs[i]+strlen(addrs[i]), sizeof(addrs[i])-strlen(addrs[i])-1, ":%d", port);
+ break;
+ case AF_UNIX:
+ sprintf(addrs[i], "unix:%d", s->listener->luid);
+ break;
+ default:
+ sprintf(addrs[i], "%s", "unknown");
+ }
+ }
+
+ snprintf(trash, sizeof(trash), "F %u %s - %s | %s - %s\n", s->uniq_id,
+ addrs[0], // inbound peer
+ addrs[1], // inbound sock
+ addrs[2], // outbound sock
+ addrs[3] // outbound peer
+ );
+ stats_event_listener_message_all(trash, s);
+}
+
+/* Called when the session argument's s->si[1]->state goes from SI_ST_EST
+ * to SI_ST_CLO. All stats listeners are notified of this destroy event.
+ */
+void stats_event_end_session(struct session *s)
+{
+ if (LIST_ISEMPTY(&stats_event_listeners))
+ return;
+
+ snprintf(trash, sizeof(trash), "C %u\n", s->uniq_id);
+ stats_event_listener_message_all(trash, s);
+}
+
/* print a line of text buffer (limited to 70 bytes) to <out>. The format is :
* <2 spaces> <offset=5 digits> <space or plus> <space> <70 chars max> <\n>
* which is 60 chars per line. Non-printable chars \t, \n, \r and \e are
@@ -3933,11 +4169,13 @@ static int stats_dump_errors_to_buffer(struct stream_interface *si)
struct si_applet http_stats_applet = {
.name = "<STATS>", /* used for logging */
.fct = http_stats_io_handler,
+ .release = NULL,
};
static struct si_applet cli_applet = {
.name = "<CLI>", /* used for logging */
.fct = cli_io_handler,
+ .release = cli_session_release,
};
static struct cfg_kw_list cfg_kws = {{ },{
View
1  src/frontend.c
@@ -54,6 +54,7 @@ void get_frt_addr(struct session *s)
if (get_original_dst(s->si[0].fd, (struct sockaddr_in *)&s->si[0].addr.to, &namelen) == -1)
getsockname(s->si[0].fd, (struct sockaddr *)&s->si[0].addr.to, &namelen);
+ s->si[0].flags |= SI_FL_TO_SET;
s->flags |= SN_FRT_ADDR_SET;
}
View
3  src/peers.c
@@ -1044,6 +1044,7 @@ static void peer_io_handler(struct stream_interface *si)
static struct si_applet peer_applet = {
.name = "<PEER>", /* used for logging */
.fct = peer_io_handler,
+ .release = peer_session_release,
};
/*
@@ -1079,7 +1080,6 @@ int peer_accept(struct session *s)
/* we have a dedicated I/O handler for the stats */
stream_int_register_handler(&s->si[1], &peer_applet);
copy_target(&s->target, &s->si[1].target); // for logging only
- s->si[1].release = peer_session_release;
s->si[1].applet.private = s;
s->si[1].applet.st0 = PEER_SESSION_ACCEPT;
@@ -1165,7 +1165,6 @@ static struct session *peer_session_create(struct peer *peer, struct peer_sessio
s->si[0].applet.st0 = PEER_SESSION_CONNECT;
stream_int_register_handler(&s->si[0], &peer_applet);
- s->si[0].release = peer_session_release;
s->si[1].fd = -1; /* just to help with debugging */
s->si[1].owner = t;
View
3  src/proto_tcp.c
@@ -439,10 +439,11 @@ int tcp_connect_server(struct stream_interface *si)
/* needs src ip/port for logging */
if (si->flags & SI_FL_SRC_ADDR) {
- socklen_t addrlen = sizeof(si->addr.to);
+ socklen_t addrlen = sizeof(si->addr.from);
if (getsockname(fd, (struct sockaddr *)&si->addr.from, &addrlen) == -1) {
Warning("Cannot get source address for logging.\n");
}
+ si->flags |= SI_FL_FROM_SET;
}
fdtab[fd].owner = si;
View
9 src/session.c
@@ -708,6 +708,9 @@ static void sess_establish(struct session *s, struct stream_interface *si)
rep->rto = s->be->timeout.server;
}
req->wex = TICK_ETERNITY;
+
+ if (unlikely(stats_event_enabled))
+ stats_event_new_session(s);
}
/* Update stream interface status for input states SI_ST_ASS, SI_ST_QUE, SI_ST_TAR.
@@ -2148,6 +2151,12 @@ struct task *process_session(struct task *t)
s->do_log(s);
}
+ if (unlikely(stats_event_enabled)) {
+ if (s->si[1].state == SI_ST_CLO &&
+ s->si[1].prev_state == SI_ST_EST)
+ stats_event_end_session(s);
+ }
+
/* the task MUST not be in the run queue anymore */
session_free(s);
task_delete(t);
View
2  src/stream_interface.c
@@ -316,7 +316,7 @@ struct task *stream_int_register_handler(struct stream_interface *si, struct si_
si->connect = NULL;
set_target_applet(&si->target, app);
si->applet.state = 0;
- si->release = NULL;
+ si->release = app->release;
si->flags |= SI_FL_WAIT_DATA;
return si->owner;
}

No commit comments for this range

Something went wrong with that request. Please try again.