Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add pause-tube command

A new command pause-tube which results in no new jobs being reserved
from a tube until the given number of seconds has passed
  • Loading branch information...
commit d5d311d03676d82ea91ca33ea9f49dfb9e1ac7d0 1 parent b8a000e
Graham Barr gbarr authored
Showing with 123 additions and 13 deletions.
  1. +22 −0 doc/protocol.txt
  2. +96 −12 prot.c
  3. +1 −0  stat.h
  4. +2 −1  tube.c
  5. +2 −0  tube.h
22 doc/protocol.txt
View
@@ -484,6 +484,10 @@ to scalars. It contains these keys:
- "current-waiting" is the number of open connections that have issued a
reserve command while watching this tube but not yet received a response.
+ - "pause" is the number of seconds the tube has been paused for
+
+ - "cmd-pause-tube" is the cumulative number of pause-tube commands for this tube
+
The stats command gives statistical information about the system as a whole.
Its form is:
@@ -551,6 +555,8 @@ of strings to scalars. It contains these keys:
- "cmd-list-tubes-watched" is the cumulative number of list-tubes-watched
commands.
+ - "cmd-pause-tube" is the cumulative number of pause-tube commands
+
- "job-timeouts" is the cumulative count of times a job has timed out.
- "total-jobs" is the cumulative count of jobs created.
@@ -636,3 +642,19 @@ OK <bytes>\r\n
The quit command simply closes the connection. Its form is:
quit\r\n
+
+The pause-tube command can delay any new job being reserved for a given time. Its form is:
+
+pause-tube <tube-name> <delay>\r\n
+
+ - <tube> is the tube to pause
+
+ - <delay> is an integer number of seconds to wait before reserving any more
+ jobs from the queue
+
+There are two possible responses:
+
+ - "PAUSED\r\n" to indicate success.
+
+ - "NOT_FOUND\r\n" if the tube does not exist.
+
108 prot.c
View
@@ -70,6 +70,7 @@ size_t job_data_size_limit = JOB_DATA_SIZE_LIMIT_DEFAULT;
#define CMD_LIST_TUBES_WATCHED "list-tubes-watched"
#define CMD_STATS_TUBE "stats-tube "
#define CMD_QUIT "quit"
+#define CMD_PAUSE_TUBE "pause-tube"
#define CONSTSTRLEN(m) (sizeof(m) - 1)
@@ -93,6 +94,7 @@ size_t job_data_size_limit = JOB_DATA_SIZE_LIMIT_DEFAULT;
#define CMD_LIST_TUBE_USED_LEN CONSTSTRLEN(CMD_LIST_TUBE_USED)
#define CMD_LIST_TUBES_WATCHED_LEN CONSTSTRLEN(CMD_LIST_TUBES_WATCHED)
#define CMD_STATS_TUBE_LEN CONSTSTRLEN(CMD_STATS_TUBE)
+#define CMD_PAUSE_TUBE_LEN CONSTSTRLEN(CMD_PAUSE_TUBE)
#define MSG_FOUND "FOUND"
#define MSG_NOTFOUND "NOT_FOUND\r\n"
@@ -152,7 +154,8 @@ size_t job_data_size_limit = JOB_DATA_SIZE_LIMIT_DEFAULT;
#define OP_RESERVE_TIMEOUT 20
#define OP_TOUCH 21
#define OP_QUIT 22
-#define TOTAL_OPS 23
+#define OP_PAUSE_TUBE 23
+#define TOTAL_OPS 24
#define STATS_FMT "---\n" \
"current-jobs-urgent: %u\n" \
@@ -181,6 +184,7 @@ size_t job_data_size_limit = JOB_DATA_SIZE_LIMIT_DEFAULT;
"cmd-list-tubes: %" PRIu64 "\n" \
"cmd-list-tube-used: %" PRIu64 "\n" \
"cmd-list-tubes-watched: %" PRIu64 "\n" \
+ "cmd-pause-tube: %" PRIu64 "\n" \
"job-timeouts: %" PRIu64 "\n" \
"total-jobs: %" PRIu64 "\n" \
"max-job-size: %zu\n" \
@@ -211,6 +215,8 @@ size_t job_data_size_limit = JOB_DATA_SIZE_LIMIT_DEFAULT;
"current-using: %u\n" \
"current-watching: %u\n" \
"current-waiting: %u\n" \
+ "cmd-pause-tube: %u\n" \
+ "pause: %" PRIu64 "\n" \
"\r\n"
/* this number is pretty arbitrary */
@@ -255,7 +261,8 @@ static const char * op_names[] = {
CMD_PEEK_DELAYED,
CMD_RESERVE_TIMEOUT,
CMD_TOUCH,
- CMD_QUIT
+ CMD_QUIT,
+ CMD_PAUSE_TUBE
};
#endif
@@ -352,7 +359,7 @@ reserve_job(conn c, job j)
}
static job
-next_eligible_job()
+next_eligible_job(usec now)
{
tube t;
size_t i;
@@ -361,8 +368,12 @@ next_eligible_job()
dprintf("tubes.used = %zu\n", tubes.used);
for (i = 0; i < tubes.used; i++) {
t = tubes.items[i];
- dprintf("for %s t->waiting.used=%zu t->ready.used=%d\n",
- t->name, t->waiting.used, t->ready.used);
+ dprintf("for %s t->waiting.used=%zu t->ready.used=%d t->pause=%" PRIu64 "\n",
+ t->name, t->waiting.used, t->ready.used, t->pause);
+ if (t->pause) {
+ if (t->deadline_at > now) continue;
+ t->pause = 0;
+ }
if (t->waiting.used && t->ready.used) {
candidate = pq_peek(&t->ready);
if (!j || job_pri_cmp(candidate, j) < 0) j = candidate;
@@ -377,9 +388,10 @@ static void
process_queue()
{
job j;
+ usec now = now_usec();
dprintf("processing queue\n");
- while ((j = next_eligible_job())) {
+ while ((j = next_eligible_job(now))) {
dprintf("got eligible job %llu in %s\n", j->id, j->tube->name);
j = pq_take(&j->tube->ready);
ready_ct--;
@@ -408,12 +420,33 @@ delay_q_peek()
return j;
}
+static tube
+pause_tube_peek()
+{
+ int i;
+ tube t, nt = NULL;
+
+ for (i = 0; i < tubes.used; i++) {
+ t = tubes.items[i];
+ if (t->pause) {
+ if (!nt || t->deadline_at < nt->deadline_at) nt = t;
+ }
+ }
+
+ return nt;
+}
+
static void
set_main_delay_timeout()
{
- job j;
+ job j = delay_q_peek();
+ tube t = pause_tube_peek();
+ usec deadline_at = t ? t->deadline_at : 0;
+
+ if (j && (!deadline_at || j->deadline_at < deadline_at)) deadline_at = j->deadline_at;
- set_main_timeout((j = delay_q_peek()) ? j->deadline_at : 0);
+ dprintf("deadline_at=%" PRIu64 "\n", deadline_at);
+ set_main_timeout(deadline_at);
}
static int
@@ -710,6 +743,7 @@ which_cmd(conn c)
TEST_CMD(c->cmd, CMD_LIST_TUBE_USED, OP_LIST_TUBE_USED);
TEST_CMD(c->cmd, CMD_LIST_TUBES, OP_LIST_TUBES);
TEST_CMD(c->cmd, CMD_QUIT, OP_QUIT);
+ TEST_CMD(c->cmd, CMD_PAUSE_TUBE, OP_PAUSE_TUBE);
return OP_UNKNOWN;
}
@@ -822,6 +856,7 @@ fmt_stats(char *buf, size_t size, void *x)
op_ct[OP_LIST_TUBES],
op_ct[OP_LIST_TUBE_USED],
op_ct[OP_LIST_TUBES_WATCHED],
+ op_ct[OP_PAUSE_TUBE],
timeout_ct,
global_stat.total_jobs_ct,
job_data_size_limit,
@@ -892,6 +927,20 @@ read_ttr(usec *ttr, const char *buf, char **end)
return read_delay(ttr, buf, end);
}
+/* Read a tube name from the given buffer moving the buffer to the name start */
+static int
+read_tube_name(char **tubename, char *buf, char **end)
+{
+ size_t len;
+
+ while (buf[0] == ' ') buf++;
+ len = strspn(buf, NAME_CHARS);
+ if (len == 0) return -1;
+ if (tubename) *tubename = buf;
+ if (end) *end = buf + len;
+ return 0;
+}
+
static void
wait_for_job(conn c, int timeout)
{
@@ -1018,7 +1067,9 @@ fmt_stats_tube(char *buf, size_t size, tube t)
t->stat.total_jobs_ct,
t->using_ct,
t->watching_ct,
- t->stat.waiting_ct);
+ t->stat.waiting_ct,
+ t->stat.pause_ct,
+ t->pause / 1000000);
}
static void
@@ -1434,6 +1485,26 @@ dispatch_cmd(conn c)
case OP_QUIT:
conn_close(c);
break;
+ case OP_PAUSE_TUBE:
+ op_ct[type]++;
+
+ r = read_tube_name(&name, c->cmd + CMD_PAUSE_TUBE_LEN, &delay_buf);
+ if (r) return reply_msg(c, MSG_BAD_FORMAT);
+
+ r = read_delay(&delay, delay_buf, NULL);
+ if (r) return reply_msg(c, MSG_BAD_FORMAT);
+
+ *delay_buf = '\0';
+ t = tube_find(name);
+ if (!t) return reply_msg(c, MSG_NOTFOUND);
+
+ t->deadline_at = now_usec() + delay;
+ t->pause = delay;
+ t->stat.pause_ct++;
+ set_main_delay_timeout();
+
+ reply_line(c, STATE_SENDWORD, "PAUSED\r\n");
+ break;
default:
return reply_msg(c, MSG_UNKNOWN_COMMAND);
}
@@ -1656,16 +1727,29 @@ h_delay()
{
int r;
job j;
- usec t;
+ usec now;
+ int i;
+ tube t;
- t = now_usec();
+ now = now_usec();
while ((j = delay_q_peek())) {
- if (j->deadline_at > t) break;
+ if (j->deadline_at > now) break;
j = delay_q_take();
r = enqueue_job(j, 0, 0);
if (r < 1) bury_job(j, 0); /* out of memory, so bury it */
}
+ for (i = 0; i < tubes.used; i++) {
+ t = tubes.items[i];
+
+ dprintf("h_delay for %s t->waiting.used=%zu t->ready.used=%d t->pause=%" PRIu64 "\n",
+ t->name, t->waiting.used, t->ready.used, t->pause);
+ if (t->pause && t->deadline_at <= now) {
+ t->pause = 0;
+ process_queue();
+ }
+ }
+
set_main_delay_timeout();
}
1  stat.h
View
@@ -30,6 +30,7 @@ struct stats {
unsigned int waiting_ct;
unsigned int buried_ct;
unsigned int reserved_ct;
+ unsigned int pause_ct;
uint64_t total_jobs_ct;
};
3  tube.c
View
@@ -47,8 +47,9 @@ make_tube(const char *name)
t->buried.prev = t->buried.next = &t->buried;
ms_init(&t->waiting, NULL, NULL);
- t->stat = (struct stats) {0, 0, 0, 0};
+ t->stat = (struct stats) {0, 0, 0, 0, 0};
t->using_ct = t->watching_ct = 0;
+ t->deadline_at = t->pause = 0;
return t;
}
2  tube.h
View
@@ -40,6 +40,8 @@ struct tube {
struct stats stat;
unsigned int using_ct;
unsigned int watching_ct;
+ usec pause;
+ usec deadline_at;
};
extern struct ms tubes;
Please sign in to comment.
Something went wrong with that request. Please try again.