Browse files

Per-tube buried list; kick takes tube name.

  • Loading branch information...
1 parent d77875d commit c80708b48f25b2bb8556e2d3282b66f01e83b835 Keith Rarick committed Feb 26, 2008
Showing with 53 additions and 23 deletions.
  1. +1 −1 conn.h
  2. +7 −4 doc/protocol.txt
  3. +1 −1 job.c
  4. +41 −17 prot.c
  5. +1 −0 tube.c
  6. +2 −0 tube.h
View
2 conn.h
@@ -21,8 +21,8 @@
#include "event.h"
#include "ms.h"
-#include "job.h"
#include "tube.h"
+#include "job.h"
#define STATE_WANTCOMMAND 0
#define STATE_WANTDATA 1
View
11 doc/protocol.txt
@@ -331,15 +331,18 @@ FOUND <id> <bytes>\r\n
- <data> is the job body -- a sequence of bytes of length <bytes> from the
previous line.
-The kick command moves jobs into the ready queue. If there are any buried jobs,
-it will only kick buried jobs. Otherwise it will kick delayed jobs. It looks
-like:
+The kick command moves jobs into the ready queue. If there are any buried jobs
+in the specified tube, it will only kick buried jobs. Otherwise it will kick
+delayed jobs. It looks like:
-kick <bound>\r\n
+kick <bound> <tube>\r\n
- <bound> is an integer upper bound on the number of jobs to kick. The server
will kick no more than <bound> jobs.
+ - <tube> is a name at most 200 bytes. Buried jobs will be kicked from this
+ tube.
+
The response is of the form
KICKED <count>\r\n
View
2 job.c
@@ -19,8 +19,8 @@
#include <stdlib.h>
#include <string.h>
-#include "job.h"
#include "tube.h"
+#include "job.h"
#include "util.h"
static unsigned long long int next_id = 1;
View
58 prot.c
@@ -163,7 +163,6 @@
static struct pq delay_q;
-static struct job graveyard = { &graveyard, &graveyard, 0 };
static unsigned int ready_ct = 0;
static struct stats global_stat = {0, 0, 0, 0, 0};
@@ -205,9 +204,9 @@ static const char * op_names[] = {
#endif
static int
-buried_job_p()
+buried_job_p(tube t)
{
- return job_list_any_p(&graveyard);
+ return job_list_any_p(&t->buried);
}
static void
@@ -354,7 +353,7 @@ enqueue_job(job j, unsigned int delay)
static void
bury_job(job j)
{
- job_insert(&graveyard, j);
+ job_insert(&j->tube->buried, j);
global_stat.buried_ct++;
j->tube->stat.buried_ct++;
j->state = JOB_STATE_BURIED;
@@ -401,13 +400,13 @@ remove_this_buried_job(job j)
}
static int
-kick_buried_job()
+kick_buried_job(tube t)
{
int r;
job j;
- if (!buried_job_p()) return 0;
- j = remove_this_buried_job(graveyard.next);
+ if (!buried_job_p(t)) return 0;
+ j = remove_this_buried_job(t->buried.next);
j->kick_ct++;
r = enqueue_job(j, 0);
if (r) return 1;
@@ -446,10 +445,10 @@ kick_delayed_job()
/* return the number of jobs successfully kicked */
static unsigned int
-kick_buried_jobs(unsigned int n)
+kick_buried_jobs(tube t, unsigned int n)
{
unsigned int i;
- for (i = 0; (i < n) && kick_buried_job(); ++i);
+ for (i = 0; (i < n) && kick_buried_job(t); ++i);
return i;
}
@@ -463,30 +462,50 @@ kick_delayed_jobs(unsigned int n)
}
static unsigned int
-kick_jobs(unsigned int n)
+kick_jobs(tube t, unsigned int n)
{
- if (buried_job_p()) return kick_buried_jobs(n);
+ if (buried_job_p(t)) return kick_buried_jobs(t, n);
return kick_delayed_jobs(n);
}
static job
peek_buried_job()
{
- return buried_job_p() ? graveyard.next : NULL;
+ tube t;
+ size_t i;
+
+ for (i = 0; i < tubes.used; i++) {
+ t = tubes.items[i];
+ if (buried_job_p(t)) return t->buried.next;
+ }
+ return NULL;
}
static job
-find_buried_job(unsigned long long int id)
+find_buried_job_in_tube(tube t, unsigned long long int id)
{
job j;
- for (j = graveyard.next; j != &graveyard; j = j->next) {
+ for (j = t->buried.next; j != &t->buried; j = j->next) {
if (j->id == id) return j;
}
return NULL;
}
static job
+find_buried_job(unsigned long long int id)
+{
+ job j;
+ size_t i;
+
+ for (i = 0; i < tubes.used; i++) {
+ j = find_buried_job_in_tube(tubes.items[i], id);
+ if (j) return j;
+ }
+ return NULL;
+}
+
+static job
remove_buried_job(unsigned long long int id)
{
return remove_this_buried_job(find_buried_job(id));
@@ -1100,15 +1119,19 @@ dispatch_cmd(conn c)
break;
case OP_KICK:
errno = 0;
- count = strtoul(c->cmd + CMD_KICK_LEN, &end_buf, 10);
- if (end_buf == c->cmd + CMD_KICK_LEN) {
+ count = strtoul(c->cmd + CMD_KICK_LEN, &name, 10);
+ if (name++ == c->cmd + CMD_KICK_LEN) {
return reply_msg(c, MSG_BAD_FORMAT);
}
if (errno) return reply_msg(c, MSG_BAD_FORMAT);
kick_ct++; /* stats */
- i = kick_jobs(count);
+ t = find_tube(name);
+ if (!t) return reply_msg(c, MSG_NOTFOUND);
+
+ i = kick_jobs(t, count);
+ t = NULL;
return reply_line(c, STATE_SENDWORD, "KICKED %u\r\n", i);
case OP_STATS:
@@ -1144,6 +1167,7 @@ dispatch_cmd(conn c)
stats_tube_ct++; /* stats */
do_stats(c, (fmt_fn) fmt_stats_tube, t);
+ t = NULL;
break;
case OP_LIST_TUBES:
/* don't allow trailing garbage */
View
1 tube.c
@@ -39,6 +39,7 @@ make_tube(const char *name)
if (t->name[MAX_TUBE_NAME_LEN - 1] != '\0') twarnx("truncating tube name");
pq_init(&t->ready, job_pri_cmp);
+ t->buried = (struct job) { &t->buried, &t->buried, 0 };
ms_init(&t->waiting, NULL, NULL);
t->stat = (struct stats) {0, 0, 0, 0};
View
2 tube.h
@@ -22,6 +22,7 @@
typedef struct tube *tube;
#include "stat.h"
+#include "job.h"
#include "pq.h"
#include "ms.h"
@@ -31,6 +32,7 @@ struct tube {
unsigned int refs;
char name[MAX_TUBE_NAME_LEN];
struct pq ready;
+ struct job buried;
struct ms waiting; /* set of conns */
struct stats stat;
};

0 comments on commit c80708b

Please sign in to comment.