Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Implement per-tube statistics.

  • Loading branch information...
commit 7c451fb1848810038e8734cbc56ace9e8a19c896 1 parent 4ab2962
Keith Rarick authored
Showing with 170 additions and 49 deletions.
  1. +1 −0  conn.h
  2. +42 −0 doc/protocol.txt
  3. +92 −49 prot.c
  4. +30 −0 stat.h
  5. +3 −0  tube.c
  6. +2 −0  tube.h
View
1  conn.h
@@ -51,6 +51,7 @@
#define OP_IGNORE 13
#define OP_LIST_TUBES 14
#define OP_LIST_WATCHED_TUBES 15
+#define OP_STATS_TUBE 16
/* CONN_TYPE_* are bit masks */
#define CONN_TYPE_PRODUCER 1
View
42 doc/protocol.txt
@@ -392,6 +392,46 @@ to scalars. It contains these keys:
- "kicks" is the number of times this job has been kicked.
+The stats-tube command gives statistical information about the specified tube
+if it exists. Its form is:
+
+stats-tube <tube>\r\n
+
+ - <tube> is a name at most 200 bytes. Stats will be returned for this tube.
+
+The response is one of:
+
+ - "NOT_FOUND\r\n" if the tube does not exist.
+
+ - "OK <bytes>\r\n<data>\r\n"
+
+ - <bytes> is the size of the following data section in bytes.
+
+ - <data> is a sequence of bytes of length <bytes> from the previous line. It
+ is a YAML file with statistical information represented a dictionary.
+
+The stats-tube data is a YAML file representing a single dictionary of strings
+to scalars. It contains these keys:
+
+ - "name" is the tube's name.
+
+ - "current-jobs-urgent" is the number of ready jobs with priority < 1024 in
+ this tube.
+
+ - "current-jobs-ready" is the number of jobs in the ready queue in this tube.
+
+ - "current-jobs-reserved" is the number of jobs reserved by all clients in
+ this tube.
+
+ - "current-jobs-delayed" is the number of delayed jobs in this tube.
+
+ - "current-jobs-buried" is the number of buried jobs in this tube.
+
+ - "total-jobs" is the cumulative count of jobs created in this tube.
+
+ - "current-waiting" is the number of open connections that have issued a
+ reserve command while watching this tube but not yet received a response.
+
The stats command gives statistical information about the system as a whole.
Its form is:
@@ -438,6 +478,8 @@ of strings to scalars. It contains these keys:
- "cmd-stats-job" is the cumulative number of stats-job commands.
+ - "cmd-stats-tube" is the cumulative number of stats-tube commands.
+
- "cmd-list-tubes" is the cumulative number of list-tubes commands.
- "cmd-list-watched-tubes" is the cumulative number of list-watched-tubes
View
141 prot.c
@@ -25,6 +25,7 @@
#include <sys/uio.h>
#include <stdarg.h>
+#include "stat.h"
#include "prot.h"
#include "pq.h"
#include "ms.h"
@@ -58,6 +59,7 @@
#define CMD_IGNORE "ignore "
#define CMD_LIST_TUBES "list-tubes"
#define CMD_LIST_WATCHED_TUBES "list-watched-tubes"
+#define CMD_STATS_TUBE "stats-tube "
#define CONSTSTRLEN(m) (sizeof(m) - 1)
@@ -75,6 +77,7 @@
#define CMD_IGNORE_LEN CONSTSTRLEN(CMD_IGNORE)
#define CMD_LIST_TUBES_LEN CONSTSTRLEN(CMD_LIST_TUBES)
#define CMD_LIST_WATCHED_TUBES_LEN CONSTSTRLEN(CMD_LIST_WATCHED_TUBES)
+#define CMD_STATS_TUBE_LEN CONSTSTRLEN(CMD_STATS_TUBE)
#define MSG_FOUND "FOUND"
#define MSG_NOTFOUND "NOT_FOUND\r\n"
@@ -115,6 +118,7 @@
"cmd-kick: %llu\n" \
"cmd-stats: %llu\n" \
"cmd-stats-job: %llu\n" \
+ "cmd-stats-tube: %llu\n" \
"cmd-list-tubes: %llu\n" \
"cmd-list-watched-tubes: %llu\n" \
"job-timeouts: %llu\n" \
@@ -132,6 +136,17 @@
"uptime: %u\n" \
"\r\n"
+#define STATS_TUBE_FMT "---\n" \
+ "name: %s\n" \
+ "current-jobs-urgent: %u\n" \
+ "current-jobs-ready: %u\n" \
+ "current-jobs-reserved: %u\n" \
+ "current-jobs-delayed: %u\n" \
+ "current-jobs-buried: %u\n" \
+ "total-jobs: %llu\n" \
+ "current-waiting: %u\n" \
+ "\r\n"
+
#define JOB_STATS_FMT "---\n" \
"id: %llu\n" \
"tube: %s\n" \
@@ -152,7 +167,8 @@ static struct pq delay_q;
/* Doubly-linked list of waiting connections. */
static struct job graveyard = { &graveyard, &graveyard, 0 };
-static unsigned int buried_ct = 0, ready_ct = 0, urgent_ct = 0, waiting_ct = 0;
+static unsigned int ready_ct = 0;
+static struct stats global_stat = {0, 0, 0, 0, 0};
static tube default_tube;
static struct ms tubes;
@@ -162,9 +178,8 @@ static time_t start_time;
static unsigned long long int put_ct = 0, peek_ct = 0, reserve_ct = 0,
delete_ct = 0, release_ct = 0, bury_ct = 0, kick_ct = 0,
stats_job_ct = 0, stats_ct = 0, timeout_ct = 0,
- list_tubes_ct = 0, list_watched_tubes_ct = 0;
-
-static unsigned int cur_reserved_ct = 0;
+ list_tubes_ct = 0, stats_tube_ct = 0,
+ list_watched_tubes_ct = 0;
/* Doubly-linked list of connections with at least one reserved job. */
@@ -188,6 +203,7 @@ static const char * op_names[] = {
CMD_IGNORE,
CMD_LIST_TUBES,
CMD_LIST_WATCHED_TUBES,
+ CMD_STATS_TUBE,
};
#endif
@@ -247,13 +263,16 @@ reply_job(conn c, job j, const char *word)
conn
remove_waiting_conn(conn c)
{
+ tube t;
size_t i;
if (!(c->type & CONN_TYPE_WAITING)) return NULL;
c->type &= ~CONN_TYPE_WAITING;
- waiting_ct--;
+ global_stat.waiting_ct--;
for (i = 0; i < c->watch.used; i++) {
- ms_remove(&((tube) c->watch.items[i])->waiting, c);
+ t = c->watch.items[i];
+ t->stat.waiting_ct--;
+ ms_remove(&t->waiting, c);
}
return c;
}
@@ -262,7 +281,8 @@ static void
reserve_job(conn c, job j)
{
j->deadline = time(NULL) + j->ttr;
- cur_reserved_ct++; /* stats */
+ global_stat.reserved_ct++; /* stats */
+ j->tube->stat.reserved_ct++;
conn_insert(&running, c);
j->state = JOB_STATE_RESERVED;
job_insert(&c->reserved_jobs, j);
@@ -301,7 +321,10 @@ process_queue()
dprintf("got eligible job %llu in %s\n", j->id, j->tube->name);
j = pq_take(&j->tube->ready);
ready_ct--;
- if (j->pri < URGENT_THRESHOLD) urgent_ct--;
+ if (j->pri < URGENT_THRESHOLD) {
+ global_stat.urgent_ct--;
+ j->tube->stat.urgent_ct--;
+ }
reserve_job(remove_waiting_conn(ms_take(&j->tube->waiting)), j);
}
}
@@ -322,7 +345,10 @@ enqueue_job(job j, unsigned int delay)
if (!r) return 0;
j->state = JOB_STATE_READY;
ready_ct++;
- if (j->pri < URGENT_THRESHOLD) urgent_ct++;
+ if (j->pri < URGENT_THRESHOLD) {
+ global_stat.urgent_ct++;
+ j->tube->stat.urgent_ct++;
+ }
}
process_queue();
return 1;
@@ -332,7 +358,8 @@ static void
bury_job(job j)
{
job_insert(&graveyard, j);
- buried_ct++;
+ global_stat.buried_ct++;
+ j->tube->stat.buried_ct++;
j->state = JOB_STATE_BURIED;
j->bury_ct++;
}
@@ -347,7 +374,8 @@ enqueue_reserved_jobs(conn c)
j = job_remove(c->reserved_jobs.next);
r = enqueue_job(j, 0);
if (!r) bury_job(j);
- cur_reserved_ct--;
+ global_stat.reserved_ct--;
+ j->tube->stat.reserved_ct--;
if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
}
}
@@ -368,7 +396,10 @@ static job
remove_this_buried_job(job j)
{
j = job_remove(j);
- if (j) buried_ct--;
+ if (j) {
+ global_stat.buried_ct--;
+ j->tube->stat.buried_ct--;
+ }
return j;
}
@@ -467,12 +498,15 @@ remove_buried_job(unsigned long long int id)
static void
enqueue_waiting_conn(conn c)
{
+ tube t;
size_t i;
- waiting_ct++;
+ global_stat.waiting_ct++;
c->type |= CONN_TYPE_WAITING;
for (i = 0; i < c->watch.used; i++) {
- ms_append(&((tube) c->watch.items[i])->waiting, c);
+ t = c->watch.items[i];
+ t->stat.waiting_ct++;
+ ms_append(&t->waiting, c);
}
}
@@ -530,30 +564,6 @@ peek_job(unsigned long long int id)
find_buried_job(id);
}
-static unsigned int
-get_ready_job_ct()
-{
- return ready_ct;
-}
-
-static unsigned int
-get_buried_job_ct()
-{
- return buried_ct;
-}
-
-static unsigned int
-get_urgent_job_ct()
-{
- return urgent_ct;
-}
-
-static int
-count_cur_waiting()
-{
- return waiting_ct;
-}
-
static void
check_err(conn c, const char *s)
{
@@ -602,6 +612,7 @@ which_cmd(conn c)
TEST_CMD(c->cmd, CMD_BURY, OP_BURY);
TEST_CMD(c->cmd, CMD_KICK, OP_KICK);
TEST_CMD(c->cmd, CMD_JOBSTATS, OP_JOBSTATS);
+ TEST_CMD(c->cmd, CMD_STATS_TUBE, OP_STATS_TUBE);
TEST_CMD(c->cmd, CMD_STATS, OP_STATS);
TEST_CMD(c->cmd, CMD_USE, OP_USE);
TEST_CMD(c->cmd, CMD_WATCH, OP_WATCH);
@@ -657,6 +668,8 @@ enqueue_incoming_job(conn c)
/* we have a complete job, so let's stick it in the pqueue */
r = enqueue_job(j, j->delay);
put_ct++; /* stats */
+ global_stat.total_jobs_ct++;
+ j->tube->stat.total_jobs_ct++;
if (r) return reply_line(c, STATE_SENDWORD, MSG_INSERTED_FMT, j->id);
@@ -677,11 +690,11 @@ fmt_stats(char *buf, size_t size, void *x)
struct rusage ru = {{0, 0}, {0, 0}};
getrusage(RUSAGE_SELF, &ru); /* don't care if it fails */
return snprintf(buf, size, STATS_FMT,
- get_urgent_job_ct(),
- get_ready_job_ct(),
- cur_reserved_ct,
+ global_stat.urgent_ct,
+ ready_ct,
+ global_stat.reserved_ct,
get_delayed_job_ct(),
- get_buried_job_ct(),
+ global_stat.buried_ct,
put_ct,
peek_ct,
reserve_ct,
@@ -691,15 +704,16 @@ fmt_stats(char *buf, size_t size, void *x)
kick_ct,
stats_ct,
stats_job_ct,
+ stats_tube_ct,
list_tubes_ct,
list_watched_tubes_ct,
timeout_ct,
- total_jobs(),
+ global_stat.total_jobs_ct,
tubes.used,
count_cur_conns(),
count_cur_producers(),
count_cur_workers(),
- count_cur_waiting(),
+ global_stat.waiting_ct,
count_tot_conns(),
getpid(),
VERSION,
@@ -764,8 +778,10 @@ wait_for_job(conn c)
enqueue_waiting_conn(c);
}
+typedef int(*fmt_fn)(char *, size_t, void *);
+
static void
-do_stats(conn c, int(*fmt)(char *, size_t, void *), void *data)
+do_stats(conn c, fmt_fn fmt, void *data)
{
int r, stats_len;
@@ -816,10 +832,9 @@ do_list_tubes(conn c, ms l)
}
static int
-fmt_job_stats(char *buf, size_t size, void *jp)
+fmt_job_stats(char *buf, size_t size, job j)
{
time_t t;
- job j = (job) jp;
t = time(NULL);
return snprintf(buf, size, JOB_STATS_FMT,
@@ -837,6 +852,20 @@ fmt_job_stats(char *buf, size_t size, void *jp)
j->kick_ct);
}
+static int
+fmt_stats_tube(char *buf, size_t size, tube t)
+{
+ return snprintf(buf, size, STATS_TUBE_FMT,
+ t->name,
+ t->stat.urgent_ct,
+ t->ready.used,
+ t->stat.reserved_ct,
+ t->delay.used,
+ t->stat.buried_ct,
+ t->stat.total_jobs_ct,
+ t->stat.waiting_ct);
+}
+
static void
maybe_enqueue_incoming_job(conn c)
{
@@ -854,7 +883,10 @@ static job
remove_this_reserved_job(conn c, job j)
{
j = job_remove(j);
- if (j) cur_reserved_ct--;
+ if (j) {
+ global_stat.reserved_ct--;
+ j->tube->stat.reserved_ct--;
+ }
if (!job_list_any_p(&c->reserved_jobs)) conn_remove(c);
return j;
}
@@ -1104,7 +1136,18 @@ dispatch_cmd(conn c)
stats_job_ct++; /* stats */
if (!j->tube) return reply_serr(c, MSG_INTERNAL_ERROR);
- do_stats(c, fmt_job_stats, j);
+ do_stats(c, (fmt_fn) fmt_job_stats, j);
+ break;
+ case OP_STATS_TUBE:
+ name = c->cmd + CMD_STATS_TUBE_LEN;
+ if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
+
+ t = find_tube(name);
+ if (!t) return reply_msg(c, MSG_NOTFOUND);
+
+ stats_tube_ct++; /* stats */
+
+ do_stats(c, (fmt_fn) fmt_stats_tube, t);
break;
case OP_LIST_TUBES:
/* don't allow trailing garbage */
View
30 stat.h
@@ -0,0 +1,30 @@
+/* stat.h - stats struct */
+
+/* Copyright (C) 2008 Keith Rarick and Philotic Inc.
+
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef stat_h
+#define stat_h
+
+struct stats {
+ unsigned int urgent_ct;
+ unsigned int waiting_ct;
+ unsigned int buried_ct;
+ unsigned int reserved_ct;
+ long long unsigned int total_jobs_ct;
+};
+
+#endif /*stat_h*/
View
3  tube.c
@@ -19,6 +19,7 @@
#include <stdlib.h>
#include <string.h>
+#include "stat.h"
#include "tube.h"
#include "prot.h"
#include "util.h"
@@ -41,6 +42,8 @@ make_tube(const char *name)
pq_init(&t->delay, job_delay_cmp);
ms_init(&t->waiting, NULL, NULL);
+ t->stat = (struct stats) {0, 0, 0, 0};
+
return t;
}
View
2  tube.h
@@ -21,6 +21,7 @@
typedef struct tube *tube;
+#include "stat.h"
#include "pq.h"
#include "ms.h"
@@ -31,6 +32,7 @@ struct tube {
char name[MAX_TUBE_NAME_LEN];
struct pq ready, delay;
struct ms waiting; /* set of conns */
+ struct stats stat;
};
tube make_tube(const char *name);
Please sign in to comment.
Something went wrong with that request. Please try again.