Skip to content

Commit

Permalink
don't use EV_PERSIST for listen socket
Browse files Browse the repository at this point in the history
  • Loading branch information
kr committed Mar 7, 2011
1 parent 4066a5e commit 21ca254
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 50 deletions.
8 changes: 6 additions & 2 deletions beanstalkd.c
Expand Up @@ -259,7 +259,7 @@ opts(int argc, char **argv)
int
main(int argc, char **argv)
{
int r;
int r, l;
struct event_base *ev_base;
struct job binlog_jobs = {};

Expand All @@ -285,13 +285,17 @@ main(int argc, char **argv)

r = make_server_socket(host_addr, port);
if (r == -1) twarnx("make_server_socket()"), exit(111);
l = r;

if (user) su(user);
ev_base = event_init();
set_sig_handlers();
nudge_fd_limit();

unbrake((evh) h_accept);
r = listen(l, 1024);
if (r == -1) twarn("listen()");
accept_handler = (evh)h_accept;
unbrake();

binlog_jobs.prev = binlog_jobs.next = &binlog_jobs;
binlog_init(&binlog_jobs);
Expand Down
2 changes: 1 addition & 1 deletion conn.c
Expand Up @@ -282,7 +282,7 @@ conn_close(conn c)

cur_conn_ct--; /* stats */

unbrake(NULL);
unbrake();
remove_waiting_conn(c);
conn_remove(c);
if (has_reserved_job(c)) enqueue_reserved_jobs(c);
Expand Down
6 changes: 3 additions & 3 deletions dat.h
Expand Up @@ -281,9 +281,9 @@ void prot_replay_binlog(job binlog_jobs);

int make_server_socket(char *host_addr, char *port);

void brake();
void unbrake(evh h);
void set_main_timeout();
void unbrake();
extern int listening;
extern evh accept_handler;


extern char *binlog_dir;
Expand Down
42 changes: 7 additions & 35 deletions net.c
Expand Up @@ -32,8 +32,8 @@

static int listen_socket = -1;
static struct event listen_evq;
static evh accept_handler;
static int brakes_are_on = 1;
evh accept_handler;
int listening = 0;

int
make_server_socket(char *host, char *port)
Expand Down Expand Up @@ -130,45 +130,17 @@ make_server_socket(char *host, char *port)
}

void
brake()
{
int r;

if (brakes_are_on) return;
brakes_are_on = 1;
twarnx("too many connections; putting on the brakes");

r = event_del(&listen_evq);
if (r == -1) twarn("event_del()");

r = listen(listen_socket, 0);
if (r == -1) twarn("listen()");
}

void
unbrake(evh h)
unbrake()
{
int r;
struct timeval tv;

if (!brakes_are_on) return;
brakes_are_on = 0;
if (listening) return;
listening = 1;

accept_handler = h ? : accept_handler;
event_set(&listen_evq, listen_socket, EV_READ | EV_PERSIST,
event_set(&listen_evq, listen_socket, EV_READ,
accept_handler, &listen_evq);

set_main_timeout();

r = listen(listen_socket, 1024);
if (r == -1) twarn("listen()");
}

void
set_main_timeout()
{
int r;
struct timeval tv;

timeval_from_usec(&tv, 10 * MSEC);

r = event_add(&listen_evq, &tv);
Expand Down
20 changes: 11 additions & 9 deletions prot.c
Expand Up @@ -1744,7 +1744,7 @@ h_delay()
}
}

set_main_timeout();
unbrake();
}

void
Expand All @@ -1755,30 +1755,32 @@ h_accept(const int fd, const short which, struct event *ev)
socklen_t addrlen;
struct sockaddr_in6 addr;

if (which == EV_TIMEOUT) return h_delay(), set_main_timeout();
listening = 0;

if (which == EV_TIMEOUT) return h_delay(), unbrake();

addrlen = sizeof addr;
cfd = accept(fd, (struct sockaddr *)&addr, &addrlen);
if (cfd == -1) {
if (errno != EAGAIN && errno != EWOULDBLOCK) twarn("accept()");
if (errno == EMFILE) return brake(), v();
set_main_timeout();
if (errno == EMFILE) return;
unbrake();
return;
}

flags = fcntl(cfd, F_GETFL, 0);
if (flags < 0) return twarn("getting flags"), close(cfd), set_main_timeout();
if (flags < 0) return twarn("getting flags"), close(cfd), unbrake();

r = fcntl(cfd, F_SETFL, flags | O_NONBLOCK);
if (r < 0) return twarn("setting O_NONBLOCK"), close(cfd), set_main_timeout();
if (r < 0) return twarn("setting O_NONBLOCK"), close(cfd), unbrake();

c = make_conn(cfd, STATE_WANTCOMMAND, default_tube, default_tube);
if (!c) return twarnx("make_conn() failed"), close(cfd), brake();
if (!c) return twarnx("make_conn() failed"), close(cfd), v();

dbgprintf("accepted conn, fd=%d\n", cfd);
r = conn_set_evq(c, EV_READ | EV_PERSIST, (evh) h_conn);
if (r == -1) return twarnx("conn_set_evq() failed"), close(cfd), brake();
set_main_timeout();
if (r == -1) return twarnx("conn_set_evq() failed"), close(cfd), v();
unbrake();
}

void
Expand Down

0 comments on commit 21ca254

Please sign in to comment.