Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Finish #852 - add a delay feature.

  • Loading branch information...
commit 024fe3c1c0611bd66c52680f5ea9908675478247 1 parent 2ae48bc
Keith Rarick authored
View
20 beanstalkd.c
@@ -689,6 +689,24 @@ h_conn(const int fd, const short which, conn c)
}
static void
+h_delay()
+{
+ int r;
+ job j;
+ time_t t;
+
+ t = time(NULL);
+ while ((j = delay_q_peek())) {
+ if (j->deadline > t) break;
+ j = delay_q_take();
+ r = enqueue_job(j, 0);
+ if (!r) bury_job(j); /* there was no room in the queue, so bury it */
+ }
+
+ set_main_timeout((j = delay_q_peek()) ? j->deadline : 0);
+}
+
+static void
h_accept(const int fd, const short which, struct event *ev)
{
conn c;
@@ -696,6 +714,8 @@ h_accept(const int fd, const short which, struct event *ev)
socklen_t addrlen;
struct sockaddr addr;
+ if (which == EV_TIMEOUT) return h_delay();
+
addrlen = sizeof addr;
cfd = accept(fd, &addr, &addrlen);
if (cfd == -1) {
View
4 job.c
@@ -55,13 +55,13 @@ job_pri_cmp(job a, job b)
int
job_delay_cmp(job a, job b)
{
- if (a->delay == b->delay) {
+ if (a->deadline == b->deadline) {
/* we can't just subtract because id has too many bits */
if (a->id > b->id) return 1;
if (a->id < b->id) return -1;
return 0;
}
- return a->delay - b->delay;
+ return a->deadline - b->deadline;
}
job
View
20 net.c
@@ -9,6 +9,8 @@
static int listen_socket = -1;
static struct event listen_evq;
static evh accept_handler;
+static time_t main_deadline = 0;
+static int brakes_are_on = 1;
int
make_server_socket(int host, int port)
@@ -51,6 +53,8 @@ brake()
{
int r;
+ if (brakes_are_on) return;
+ brakes_are_on = 1;
twarnx("too many connections; putting on the brakes");
r = event_del(&listen_evq);
@@ -65,17 +69,27 @@ unbrake(evh h)
{
int r;
+ if (!brakes_are_on) return;
+ brakes_are_on = 0;
twarnx("releasing the brakes");
accept_handler = h ? : accept_handler;
event_set(&listen_evq, listen_socket, EV_READ | EV_PERSIST,
accept_handler, &listen_evq);
- errno = 0;
- r = event_add(&listen_evq, NULL);
- if (r == -1) twarn("event_add()");
+ set_main_timeout(main_deadline);
r = listen(listen_socket, 1024);
if (r == -1) twarn("listen()");
}
+void
+set_main_timeout(time_t deadline)
+{
+ int r;
+ struct timeval tv = {deadline - time(NULL), 0};
+
+ main_deadline = deadline;
+ r = event_add(&listen_evq, deadline ? &tv : NULL);
+ if (r == -1) twarn("event_add()");
+}
View
1  net.h
@@ -15,5 +15,6 @@ int make_server_socket(int host, int port);
void brake();
void unbrake(evh h);
+void set_main_timeout(time_t deadline);
#endif /*net_h*/
View
7 pq.c
@@ -104,6 +104,13 @@ pq_take(pq q)
}
job
+pq_peek(pq q)
+{
+ if (q->used == 0) return NULL;
+ return q->heap[0];
+}
+
+job
pq_find(pq q, unsigned long long int id)
{
unsigned int i;
View
3  pq.h
@@ -21,6 +21,9 @@ int pq_give(pq q, job j);
/* return a job if the queue contains jobs, else NULL */
job pq_take(pq q);
+/* return a job if the queue contains jobs, else NULL */
+job pq_peek(pq q);
+
/* return a job that matches the given id, else NULL */
/* This is O(n), so don't do it much. */
job pq_find(pq q, unsigned long long int id);
View
16 prot.c
@@ -9,6 +9,7 @@
#include "conn.h"
#include "util.h"
#include "reserve.h"
+#include "net.h"
static pq ready_q;
static pq delay_q;
@@ -90,9 +91,11 @@ enqueue_job(job j, unsigned int delay)
int r;
if (delay) {
+ j->deadline = time(NULL) + delay;
r = pq_give(delay_q, j);
if (!r) return 0;
j->state = JOB_STATE_DELAY;
+ set_main_timeout(pq_peek(delay_q)->deadline);
} else {
r = pq_give(ready_q, j);
if (!r) return 0;
@@ -103,6 +106,18 @@ enqueue_job(job j, unsigned int delay)
return 1;
}
+job
+delay_q_peek()
+{
+ return pq_peek(delay_q);
+}
+
+job
+delay_q_take()
+{
+ return pq_take(delay_q);
+}
+
void
bury_job(job j)
{
@@ -157,6 +172,7 @@ job
peek_job(unsigned long long int id)
{
return pq_find(ready_q, id) ? :
+ pq_find(delay_q, id) ? :
find_reserved_job(id) ? :
find_reserved_job_in_list(&wait_queue, id) ? :
find_buried_job(id);
View
2  prot.h
@@ -25,6 +25,8 @@ conn remove_waiting_conn(conn c);
void enqueue_waiting_conn(conn c);
int enqueue_job(job j, unsigned int delay);
+job delay_q_peek();
+job delay_q_take();
void bury_job(job j);
int kick_job();
void process_queue();
Please sign in to comment.
Something went wrong with that request. Please try again.