Skip to content

Commit

Permalink
Per-tube delay queue.
Browse files Browse the repository at this point in the history
  • Loading branch information
Keith Rarick committed Apr 9, 2008
1 parent 75964e9 commit df46041
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 27 deletions.
12 changes: 5 additions & 7 deletions doc/protocol.txt
Original file line number Diff line number Diff line change
Expand Up @@ -345,19 +345,15 @@ FOUND <id> <bytes>\r\n
- <data> is the job body -- a sequence of bytes of length <bytes> from the
previous line.

The kick command moves jobs into the ready queue. If there are any buried jobs
in the currently used tube, it will only kick buried jobs. Otherwise it will
kick delayed jobs. It looks like:
The kick command applies only to the currently used tube. It moves jobs into
the ready queue. If there are any buried jobs, it will only kick buried jobs.
Otherwise it will kick delayed jobs. It looks like:

kick <bound>\r\n

- <bound> is an integer upper bound on the number of jobs to kick. The server
will kick no more than <bound> jobs.

When the server kicks buried jobs, they are taken from the buried list in only
the currently used tube. However, when the server kicks delayed jobs, they are
taken from the global delay queue; thus jobs in any tube may be kicked.

The response is of the form:

KICKED <count>\r\n
Expand Down Expand Up @@ -441,6 +437,8 @@ to scalars. It contains these keys:
- "current-jobs-reserved" is the number of jobs reserved by all clients in
this tube.

- "current-jobs-delayed" is the number of delayed jobs in this tube.

- "current-jobs-buried" is the number of buried jobs in this tube.

- "total-jobs" is the cumulative count of jobs created in this tube.
Expand Down
82 changes: 62 additions & 20 deletions prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ size_t job_data_size_limit = ((1 << 16) - 1);
"current-jobs-urgent: %u\n" \
"current-jobs-ready: %u\n" \
"current-jobs-reserved: %u\n" \
"current-jobs-delayed: %u\n" \
"current-jobs-buried: %u\n" \
"total-jobs: %llu\n" \
"current-using: %u\n" \
Expand All @@ -171,8 +172,6 @@ size_t job_data_size_limit = ((1 << 16) - 1);
"kicks: %u\n" \
"\r\n"

static struct pq delay_q;

static unsigned int ready_ct = 0;
static struct stats global_stat = {0, 0, 0, 0, 0};

Expand Down Expand Up @@ -340,17 +339,42 @@ process_queue()
}
}

static job
delay_q_peek()
{
int i;
tube t;
job j = NULL, nj;

for (i = 0; i < tubes.used; i++) {
t = tubes.items[i];
nj = pq_peek(&t->delay);
if (!nj) continue;
if (!j || nj->deadline < j->deadline) j = nj;
}

return j;
}

static void
set_main_delay_timeout()
{
job j;

set_main_timeout((j = delay_q_peek()) ? j->deadline : 0);
}

static int
enqueue_job(job j, unsigned int delay)
{
int r;

if (delay) {
j->deadline = time(NULL) + delay;
r = pq_give(&delay_q, j);
r = pq_give(&j->tube->delay, j);
if (!r) return 0;
j->state = JOB_STATE_DELAYED;
set_main_timeout(pq_peek(&delay_q)->deadline);
set_main_delay_timeout();
} else {
r = pq_give(&j->tube->ready, j);
if (!r) return 0;
Expand Down Expand Up @@ -391,16 +415,11 @@ enqueue_reserved_jobs(conn c)
}
}

static job
delay_q_peek()
{
return pq_peek(&delay_q);
}

static job
delay_q_take()
{
return pq_take(&delay_q);
job j = delay_q_peek();
return j ? pq_take(&j->tube->delay) : NULL;
}

static job
Expand Down Expand Up @@ -434,17 +453,25 @@ kick_buried_job(tube t)
static unsigned int
get_delayed_job_ct()
{
return pq_used(&delay_q);
tube t;
size_t i;
unsigned int count = 0;

for (i = 0; i < tubes.used; i++) {
t = tubes.items[i];
count += pq_used(&t->delay);
}
return count;
}

static int
kick_delayed_job()
kick_delayed_job(tube t)
{
int r;
job j;

if (get_delayed_job_ct() < 1) return 0;
j = delay_q_take();
j = pq_take(&t->delay);
j->kick_ct++;
r = enqueue_job(j, 0);
if (r) return 1;
Expand All @@ -469,18 +496,18 @@ kick_buried_jobs(tube t, unsigned int n)

/* return the number of jobs successfully kicked */
static unsigned int
kick_delayed_jobs(unsigned int n)
kick_delayed_jobs(tube t, unsigned int n)
{
unsigned int i;
for (i = 0; (i < n) && kick_delayed_job(); ++i);
for (i = 0; (i < n) && kick_delayed_job(t); ++i);
return i;
}

static unsigned int
kick_jobs(tube t, unsigned int n)
{
if (buried_job_p(t)) return kick_buried_jobs(t, n);
return kick_delayed_jobs(n);
return kick_delayed_jobs(t, n);
}

static job
Expand Down Expand Up @@ -520,6 +547,21 @@ find_buried_job(unsigned long long int id)
return NULL;
}

static job
find_delayed_job(unsigned long long int id)
{
job j;
size_t i;
tube t;

for (i = 0; i < tubes.used; i++) {
t = tubes.items[i];
j = pq_find(&t->delay, id);
if (j) return j;
}
return NULL;
}

static job
remove_buried_job(unsigned long long int id)
{
Expand Down Expand Up @@ -591,7 +633,7 @@ peek_job(unsigned long long int id)
{
return find_reserved_job(id) ? :
peek_ready_job(id) ? :
pq_find(&delay_q, id) ? :
find_delayed_job(id) ? :
find_buried_job(id);
}

Expand Down Expand Up @@ -897,6 +939,7 @@ fmt_stats_tube(char *buf, size_t size, tube t)
t->stat.urgent_ct,
t->ready.used,
t->stat.reserved_ct,
pq_used(&t->delay),
t->stat.buried_ct,
t->stat.total_jobs_ct,
t->using_ct,
Expand Down Expand Up @@ -1465,7 +1508,7 @@ h_delay()
if (!r) bury_job(j); /* there was no room in the queue, so bury it */
}

set_main_timeout((j = delay_q_peek()) ? j->deadline : 0);
set_main_delay_timeout();
}

void
Expand Down Expand Up @@ -1504,7 +1547,6 @@ void
prot_init()
{
start_time = time(NULL);
pq_init(&delay_q, job_delay_cmp);

ms_init(&tubes, NULL, NULL);

Expand Down
1 change: 1 addition & 0 deletions tube.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ make_tube(const char *name)
if (t->name[MAX_TUBE_NAME_LEN - 1] != '\0') twarnx("truncating tube name");

pq_init(&t->ready, job_pri_cmp);
pq_init(&t->delay, job_delay_cmp);
t->buried = (struct job) { &t->buried, &t->buried, 0 };
ms_init(&t->waiting, NULL, NULL);

Expand Down
1 change: 1 addition & 0 deletions tube.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ struct tube {
unsigned int refs;
char name[MAX_TUBE_NAME_LEN];
struct pq ready;
struct pq delay;
struct job buried;
struct ms waiting; /* set of conns */
struct stats stat;
Expand Down

0 comments on commit df46041

Please sign in to comment.