Skip to content

Commit

Permalink
Added reserve timeout support.
Browse files Browse the repository at this point in the history
  • Loading branch information
dustin committed Jun 12, 2008
1 parent 615c445 commit 209a0d6
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 11 deletions.
13 changes: 10 additions & 3 deletions conn.c
Expand Up @@ -20,6 +20,7 @@
#include <stdio.h>
#include <time.h>
#include <errno.h>
#include <limits.h>

#include "conn.h"
#include "net.h"
Expand Down Expand Up @@ -84,6 +85,7 @@ make_conn(int fd, char start_state, tube use, tube watch)
c->state = start_state;
c->type = 0;
c->cmd_read = 0;
c->pending_timeout = -1;
c->in_job = c->out_job = NULL;
c->in_job_read = c->out_job_sent = 0;
c->prev = c->next = c; /* must be out of a linked list right now */
Expand Down Expand Up @@ -146,18 +148,23 @@ has_reserved_job(conn c)
int
conn_set_evq(conn c, const int events, evh handler)
{
int r, margin = 0;
struct timeval tv = {0, 0};
int r, margin = 0, should_timeout=0;
struct timeval tv = {INT_MAX, 0};

event_set(&c->evq, c->fd, events, handler, c);

if (conn_waiting(c)) margin = 1;
if (has_reserved_job(c)) {
time_t t = soonest_job(c)->deadline - time(NULL) - margin;
tv.tv_sec = t > 0 ? t : 0;
should_timeout = 1;
}
if (c->pending_timeout >= 0) {
tv.tv_sec = min(tv.tv_sec, c->pending_timeout);
should_timeout = 1;
}

r = event_add(&c->evq, has_reserved_job(c) ? &tv : NULL);
r = event_add(&c->evq, should_timeout ? &tv : NULL);
if (r == -1) return twarn("event_add() err %d", errno), -1;

return 0;
Expand Down
1 change: 1 addition & 0 deletions conn.h
Expand Up @@ -44,6 +44,7 @@ struct conn {
char state;
char type;
struct event evq;
int pending_timeout;

/* we cannot share this buffer with the reply line because we might read in
* command line data for a subsequent command, and we need to store it
Expand Down
18 changes: 16 additions & 2 deletions doc/protocol.txt
Expand Up @@ -194,13 +194,22 @@ A process that wants to consume jobs from the queue uses "reserve", "delete",

reserve\r\n

Alternatively, you can specify a timeout as follows:

reserve-with-timeout <seconds>\r\n

This will return a newly-reserved job. If no job is available to be reserved,
beanstalkd will wait to send a response until one becomes available. Once a
job is reserved for the client, the client has limited time to run (TTR) the
job before the job times out. When the job times out, the server will put the
job back into the ready queue. Both the TTR and the actual time left can be
found in response to the stats-job command.

A timeout value of 0 will cause the server to immediately return either a
response or TIMED_OUT. A positive value of timeout will limit the amount of
time the client will block on the reserve request until a job becomes
available.

During the TTR of a reserved job, the last second is kept by the server as a
safety margin, during which the client will not be made to wait for another
job. If the client issues a reserve command during the safety margin, or if
Expand All @@ -212,8 +221,13 @@ DEADLINE_SOON\r\n
This gives the client a chance to delete or release its reserved job before
the server automatically releases it.

Otherwise, the only response to this command is a successful reservation in
the form of a text line followed by the job body:
TIMED_OUT\r\n

If a non-negative timeout was specified and the timeout exceeded before a job
became available, the server will respond with TIMED_OUT.

Otherwise, the only other response to this command is a successful reservation
in the form of a text line followed by the job body:

RESERVED <id> <bytes>\r\n
<data>\r\n
Expand Down
30 changes: 24 additions & 6 deletions prot.c
Expand Up @@ -50,6 +50,7 @@ size_t job_data_size_limit = ((1 << 16) - 1);
#define CMD_PEEK_DELAYED "peek-delayed"
#define CMD_PEEK_BURIED "peek-buried"
#define CMD_RESERVE "reserve"
#define CMD_RESERVE_TIMEOUT "reserve-with-timeout "
#define CMD_DELETE "delete "
#define CMD_RELEASE "release "
#define CMD_BURY "bury "
Expand All @@ -71,6 +72,7 @@ size_t job_data_size_limit = ((1 << 16) - 1);
#define CMD_PEEK_BURIED_LEN CONSTSTRLEN(CMD_PEEK_BURIED)
#define CMD_PEEKJOB_LEN CONSTSTRLEN(CMD_PEEKJOB)
#define CMD_RESERVE_LEN CONSTSTRLEN(CMD_RESERVE)
#define CMD_RESERVE_TIMEOUT_LEN CONSTSTRLEN(CMD_RESERVE_TIMEOUT)
#define CMD_DELETE_LEN CONSTSTRLEN(CMD_DELETE)
#define CMD_RELEASE_LEN CONSTSTRLEN(CMD_RELEASE)
#define CMD_BURY_LEN CONSTSTRLEN(CMD_BURY)
Expand All @@ -89,6 +91,7 @@ size_t job_data_size_limit = ((1 << 16) - 1);
#define MSG_NOTFOUND "NOT_FOUND\r\n"
#define MSG_RESERVED "RESERVED"
#define MSG_DEADLINE_SOON "DEADLINE_SOON\r\n"
#define MSG_TIMED_OUT "TIMED_OUT\r\n"
#define MSG_DELETED "DELETED\r\n"
#define MSG_RELEASED "RELEASED\r\n"
#define MSG_BURIED "BURIED\r\n"
Expand Down Expand Up @@ -137,7 +140,8 @@ size_t job_data_size_limit = ((1 << 16) - 1);
#define OP_STATS_TUBE 17
#define OP_PEEK_READY 18
#define OP_PEEK_DELAYED 19
#define TOTAL_OPS 20
#define OP_RESERVE_TIMEOUT 20
#define TOTAL_OPS 21

#define STATS_FMT "---\n" \
"current-jobs-urgent: %u\n" \
Expand All @@ -151,6 +155,7 @@ size_t job_data_size_limit = ((1 << 16) - 1);
"cmd-peek-delayed: %llu\n" \
"cmd-peek-buried: %llu\n" \
"cmd-reserve: %llu\n" \
"cmd-reserve-with-timeout: %llu\n" \
"cmd-delete: %llu\n" \
"cmd-release: %llu\n" \
"cmd-use: %llu\n" \
Expand Down Expand Up @@ -704,6 +709,7 @@ which_cmd(conn c)
TEST_CMD(c->cmd, CMD_PEEK_READY, OP_PEEK_READY);
TEST_CMD(c->cmd, CMD_PEEK_DELAYED, OP_PEEK_DELAYED);
TEST_CMD(c->cmd, CMD_PEEK_BURIED, OP_PEEK_BURIED);
TEST_CMD(c->cmd, CMD_RESERVE_TIMEOUT, OP_RESERVE_TIMEOUT);
TEST_CMD(c->cmd, CMD_RESERVE, OP_RESERVE);
TEST_CMD(c->cmd, CMD_DELETE, OP_DELETE);
TEST_CMD(c->cmd, CMD_RELEASE, OP_RELEASE);
Expand Down Expand Up @@ -809,6 +815,7 @@ fmt_stats(char *buf, size_t size, void *x)
op_ct[OP_PEEK_DELAYED],
op_ct[OP_PEEK_BURIED],
op_ct[OP_RESERVE],
op_ct[OP_RESERVE_TIMEOUT],
op_ct[OP_DELETE],
op_ct[OP_RELEASE],
op_ct[OP_USE],
Expand Down Expand Up @@ -882,13 +889,16 @@ read_ttr(unsigned int *ttr, const char *buf, char **end)
}

static void
wait_for_job(conn c)
wait_for_job(conn c, int timeout)
{
int r;

c->state = STATE_WAIT;
enqueue_waiting_conn(c);

/* Set the pending timeout to the requested timeout amount */
c->pending_timeout = timeout;

/* this conn is waiting, but we want to know if they hang up */
r = conn_update_evq(c, EV_READ | EV_PERSIST);
if (r == -1) return twarnx("update events failed"), conn_close(c);
Expand Down Expand Up @@ -1068,7 +1078,7 @@ find_or_make_tube(const char *name)
static void
dispatch_cmd(conn c)
{
int r, i;
int r, i, timeout = -1;
unsigned int count;
job j;
unsigned char type;
Expand Down Expand Up @@ -1189,9 +1199,13 @@ dispatch_cmd(conn c)

reply_job(c, j, MSG_FOUND);
break;
case OP_RESERVE:
case OP_RESERVE_TIMEOUT:
errno = 0;
timeout = strtol(c->cmd + CMD_RESERVE_TIMEOUT_LEN, &end_buf, 10);
if (errno) return reply_msg(c, MSG_BAD_FORMAT);
case OP_RESERVE: /* FALLTHROUGH */
/* don't allow trailing garbage */
if (c->cmd_len != CMD_RESERVE_LEN + 2) {
if (type == OP_RESERVE && c->cmd_len != CMD_RESERVE_LEN + 2) {
return reply_msg(c, MSG_BAD_FORMAT);
}

Expand All @@ -1203,7 +1217,7 @@ dispatch_cmd(conn c)
}

/* try to get a new job for this guy */
wait_for_job(c);
wait_for_job(c, timeout);
process_queue();
break;
case OP_DELETE:
Expand Down Expand Up @@ -1415,6 +1429,10 @@ h_conn_timeout(conn c)
if (should_timeout) {
dprintf("conn_waiting(%p) = %d\n", c, conn_waiting(c));
return reply_msg(remove_waiting_conn(c), MSG_DEADLINE_SOON);
} else if (conn_waiting(c) && c->pending_timeout >= 0) {
dprintf("conn_waiting(%p) = %d\n", c, conn_waiting(c));
c->pending_timeout=-1;
return reply_msg(remove_waiting_conn(c), MSG_TIMED_OUT);
}
}

Expand Down

0 comments on commit 209a0d6

Please sign in to comment.