Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

Commit

Permalink
support for per-vhost zmq router
Browse files Browse the repository at this point in the history
  • Loading branch information
Roberto De Ioris committed Aug 21, 2012
1 parent bdc6e81 commit 772156d
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 63 deletions.
35 changes: 27 additions & 8 deletions blastbeat.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@

struct bb_virtualhost;
struct bb_session;
struct bb_router;

// a dealer is a blackend node connecting to blastbeat
struct bb_dealer {
Expand All @@ -77,6 +78,9 @@ struct bb_dealer {
int unauthorized;
int spawn_sent;
uint64_t load;

struct bb_router *router;

struct bb_dealer *next;
};

Expand Down Expand Up @@ -509,11 +513,30 @@ struct bb_hostname {
struct bb_hostname *next;
};

struct bb_router_io {
ev_io event;
struct bb_router *router;
};

struct bb_router_prepare {
ev_prepare prepare;
struct bb_router *router;
};

struct bb_router {
char *zmq;
struct bb_virtualhost *vhost;
void *router;
int zmq_fd;
struct bb_router_io zmq_io;
struct bb_router_prepare zmq_check;
struct bb_router *next;
};

// the main server structure
struct blastbeat_server {
struct bb_acceptor *acceptors;
struct bb_virtualhost *vhosts;
char *zmq;

float ping_freq;
float stats_freq;
Expand All @@ -535,8 +558,6 @@ struct blastbeat_server {
uint64_t startup_memory;
uint64_t cache_memory;

void *router;
int zmq_fd;
struct ev_loop *loop;

uint64_t max_fd;
Expand All @@ -554,13 +575,11 @@ struct blastbeat_server {
struct bb_hostname *hnht[BLASTBEAT_HOSTNAME_HTSIZE];

struct bb_dealer *dealers;
struct bb_router *routers;

ev_io event_zmq;
ev_timer pinger;
ev_timer stats;

ev_prepare zmq_check;

};

void bb_ini_config(char *);
Expand Down Expand Up @@ -594,8 +613,8 @@ struct bb_session *bb_session_new(struct bb_connection *);
void bb_connection_close(struct bb_connection *);
void bb_session_close(struct bb_session *);

void bb_raw_zmq_send_msg(struct bb_session *, char *, size_t, char *, size_t, char *, size_t, char *, size_t);
void bb_zmq_send_msg(struct bb_session *, char *, size_t, char *, size_t, char *, size_t, char *, size_t);
void bb_raw_zmq_send_msg(struct bb_dealer *, struct bb_session *, char *, size_t, char *, size_t, char *, size_t);
void bb_zmq_send_msg(struct bb_dealer *, struct bb_session *, char *, size_t, char *, size_t, char *, size_t);
void bb_zmq_receiver(struct ev_loop *, struct ev_io *, int);
void bb_zmq_check_cb(struct ev_loop *, struct ev_prepare *, int);

Expand Down
6 changes: 3 additions & 3 deletions blastbeat.ini
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,18 @@ alias = 192.168.173.5:8083
; 1 MB cache
cache = 1
;timeout = 3
zmq = tcp://192.168.173.5:5001

; 100 kbit/s
bandwidth = 100

[blastbeat:192.168.173.5:8081]
certificate = quantal.crt
key = quantal.key
node = FOOBAR1
node = FOOBAR6
bind-ssl = 192.168.173.5:8081
;bind = 0.0.0.0:8084
zmq = tcp://192.168.173.5:5001

[blastbeat:localhost]
node = FOOBAR1
node = FOOBAR7
alias = 192.168.173.5:8084
29 changes: 28 additions & 1 deletion src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,28 @@ void bb_vhost_push_acceptor(struct bb_virtualhost *vhost, struct bb_acceptor *ac
}
}

static void bb_add_router(char *name, struct bb_virtualhost *vhost) {
struct bb_router *last_bbr = NULL, *bbr = blastbeat.routers;

while(bbr) {
last_bbr = bbr;
bbr = bbr->next;
}

bbr = bb_alloc(sizeof(struct bb_router));
memset(bbr, 0, sizeof(struct bb_router));

bbr->zmq = name;
bbr->vhost = vhost;

if (last_bbr) {
last_bbr->next = bbr;
}
else {
blastbeat.routers = bbr;
}
}

static void bb_main_config_add(char *key, char *value) {

is_opt( "bind") {
Expand All @@ -444,7 +466,7 @@ static void bb_main_config_add(char *key, char *value) {
}

is_opt( "zmq") {
blastbeat.zmq = value;
bb_add_router(value, NULL);
return;
}

Expand Down Expand Up @@ -550,6 +572,11 @@ static void bb_vhost_config_add(char *vhostname, char *key, char *value) {
return;
}

is_opt( "zmq") {
bb_add_router(value, vhost);
return;
}

is_opt( "bandwidth") {
// kbit/s
vhost->bandwidth = strtoll(value, NULL, 10) * 1000;
Expand Down
4 changes: 2 additions & 2 deletions src/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ static int header_ptr_value_cb(http_parser *parser, const char *buf, size_t len)
}

int bb_http_recv_body(struct bb_session *bbs, char *buf, size_t len) {
bb_zmq_send_msg(bbs, bbs->dealer->identity, bbs->dealer->len, (char *) &bbs->uuid_part1, BB_UUID_LEN, "body", 4, (char *) buf, len);
bb_zmq_send_msg(bbs->dealer, bbs, (char *) &bbs->uuid_part1, BB_UUID_LEN, "body", 4, (char *) buf, len);
return 0;
}

Expand Down Expand Up @@ -349,7 +349,7 @@ static int bb_session_headers_complete(http_parser *parser) {
if (bb_uwsgi(bbs)) {
return -1;
}
bb_zmq_send_msg(bbs, bbs->dealer->identity, bbs->dealer->len, (char *) &bbs->uuid_part1, BB_UUID_LEN, "uwsgi", 5, bbs->request.uwsgi_buf, bbs->request.uwsgi_pos);
bb_zmq_send_msg(bbs->dealer, bbs, (char *) &bbs->uuid_part1, BB_UUID_LEN, "uwsgi", 5, bbs->request.uwsgi_buf, bbs->request.uwsgi_pos);
return 0;
}

Expand Down
69 changes: 52 additions & 17 deletions src/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void bb_session_close(struct bb_session *bbs) {

// if linked to a dealer (and not in stealth mode), send a 'end' message
if (bbs->dealer && !bbs->stealth) {
bb_zmq_send_msg(bbs, bbs->dealer->identity, bbs->dealer->len, (char *) &bbs->uuid_part1, BB_UUID_LEN, "end", 3, "", 0);
bb_zmq_send_msg(bbs->dealer, bbs, (char *) &bbs->uuid_part1, BB_UUID_LEN, "end", 3, "", 0);
}
}

Expand Down Expand Up @@ -446,7 +446,12 @@ static void pinger_cb(struct ev_loop *loop, struct ev_timer *w, int revents) {

struct bb_dealer *bbd = blastbeat.dealers;
// get events before starting a potentially long write session
ev_feed_event(blastbeat.loop, &blastbeat.event_zmq, EV_READ);
struct bb_router *bbr = blastbeat.routers;
while(bbr) {
ev_feed_event(blastbeat.loop, &bbr->zmq_io.event, EV_READ);
bbr = bbr->next;
}

ev_tstamp now = bb_now;
while(bbd) {
ev_tstamp delta = now - bbd->last_seen;
Expand All @@ -455,10 +460,10 @@ static void pinger_cb(struct ev_loop *loop, struct ev_timer *w, int revents) {
bbd->status = BLASTBEAT_DEALER_OFF;
fprintf(stderr,"node \"%s\" is OFF\n", bbd->identity);
}
bb_raw_zmq_send_msg(NULL, bbd->identity, bbd->len, "", 0, "ping", 4, "", 0);
bb_raw_zmq_send_msg(bbd, NULL, "", 0, "ping", 4, "", 0);
}
if (!bbd->spawn_sent) {
bb_raw_zmq_send_msg(NULL, bbd->identity, bbd->len, "", 0, "spawn", 5, "", 0);
bb_raw_zmq_send_msg(bbd, NULL, "", 0, "spawn", 5, "", 0);
bbd->spawn_sent = 1;
}
bbd = bbd->next;
Expand Down Expand Up @@ -728,7 +733,7 @@ int main(int argc, char *argv[]) {
exit(1);
}

if (!blastbeat.zmq) {
if (!blastbeat.routers) {
fprintf(stderr, "config error: please specify at least one 'zmq' directive\n");
exit(1);
}
Expand Down Expand Up @@ -785,23 +790,53 @@ int main(int argc, char *argv[]) {

void *context = zmq_init (1);

blastbeat.router = zmq_socket(context, ZMQ_ROUTER);
struct bb_router *bbr = blastbeat.routers;
while(bbr) {
bbr->router = zmq_socket(context, ZMQ_ROUTER);

if (zmq_bind(blastbeat.router, blastbeat.zmq)) {
bb_error_exit("unable to bind to zmq socket: zmq_bind()");
}
if (zmq_bind(bbr->router, bbr->zmq)) {
bb_error_exit("unable to bind to zmq socket: zmq_bind()");
}

size_t opt_len = sizeof(int);
if (zmq_getsockopt(blastbeat.router, ZMQ_FD, &blastbeat.zmq_fd, &opt_len)) {
bb_error_exit("unable to configure zmq socket: zmq_getsockopt()");
}
size_t opt_len = sizeof(int);
if (zmq_getsockopt(bbr->router, ZMQ_FD, &bbr->zmq_fd, &opt_len)) {
bb_error_exit("unable to configure zmq socket: zmq_getsockopt()");
}

drop_privileges();
ev_io_init(&bbr->zmq_io.event, bb_zmq_receiver, bbr->zmq_fd, EV_READ);
bbr->zmq_io.router = bbr;
ev_io_start(blastbeat.loop, &bbr->zmq_io.event);

ev_io_init(&blastbeat.event_zmq, bb_zmq_receiver, blastbeat.zmq_fd, EV_READ);
ev_io_start(blastbeat.loop, &blastbeat.event_zmq);
ev_prepare_init(&bbr->zmq_check.prepare, bb_zmq_check_cb);
bbr->zmq_check.router = bbr;

ev_prepare_init(&blastbeat.zmq_check, bb_zmq_check_cb);
// here we map the router to the dealers
struct bb_virtualhost *vhost = bbr->vhost;
if (vhost) {
struct bb_vhost_dealer *bbvd = vhost->dealers;
while(bbvd) {
bbvd->dealer->router = bbr;
bbvd = bbvd->next;
}
}
else {
vhost = blastbeat.vhosts;
while(vhost) {
struct bb_vhost_dealer *bbvd = vhost->dealers;
while(bbvd) {
if (!bbvd->dealer->router) {
bbvd->dealer->router = bbr;
}
bbvd = bbvd->next;
}
vhost = vhost->next;
}
}

bbr = bbr->next;
}

drop_privileges();

// the first ping is after 1 second
ev_timer_init(&blastbeat.pinger, pinger_cb, 1.0, blastbeat.ping_freq);
Expand Down
6 changes: 3 additions & 3 deletions src/socketio.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ int bb_socketio_message(struct bb_session *bbs, char *buf, size_t len) {
// forward socket.io message to the right session
switch(buf[0]) {
case '3':
bb_zmq_send_msg(bbs, bbs->dealer->identity, bbs->dealer->len, (char *) &bbs->uuid_part1, BB_UUID_LEN, "socket.io/msg", 13, sio_body, sio_len);
bb_zmq_send_msg(bbs->dealer, bbs, (char *) &bbs->uuid_part1, BB_UUID_LEN, "socket.io/msg", 13, sio_body, sio_len);
break;
case '4':
bb_zmq_send_msg(bbs, bbs->dealer->identity, bbs->dealer->len, (char *) &bbs->uuid_part1, BB_UUID_LEN, "socket.io/json", 14, sio_body, sio_len);
bb_zmq_send_msg(bbs->dealer, bbs, (char *) &bbs->uuid_part1, BB_UUID_LEN, "socket.io/json", 14, sio_body, sio_len);
break;
case '5':
bb_zmq_send_msg(bbs, bbs->dealer->identity, bbs->dealer->len, (char *) &bbs->uuid_part1, BB_UUID_LEN, "socket.io/event", 15, sio_body, sio_len);
bb_zmq_send_msg(bbs->dealer, bbs, (char *) &bbs->uuid_part1, BB_UUID_LEN, "socket.io/event", 15, sio_body, sio_len);
break;
default:
fprintf(stderr,"SOCKET.IO MESSAGE TYPE: %c\n", buf[0]);
Expand Down
6 changes: 3 additions & 3 deletions src/spdy.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ static int bb_spdy_pass_body(struct bb_connection *bbc) {
return -1;
found:
if (!bbs->dealer) return -1;
bb_zmq_send_msg(bbs, bbs->dealer->identity, bbs->dealer->len, (char *) &bbs->uuid_part1, BB_UUID_LEN, "body", 4, bbc->spdy_body_buf, bbc->spdy_length);
bb_zmq_send_msg(bbs->dealer, bbs, (char *) &bbs->uuid_part1, BB_UUID_LEN, "body", 4, bbc->spdy_body_buf, bbc->spdy_length);
if (bbc->spdy_flags == 0x01) {
bb_zmq_send_msg(bbs, bbs->dealer->identity, bbs->dealer->len, (char *) &bbs->uuid_part1, BB_UUID_LEN, "body", 4, "", 0);
bb_zmq_send_msg(bbs->dealer, bbs, (char *) &bbs->uuid_part1, BB_UUID_LEN, "body", 4, "", 0);
}
return 0;

Expand Down Expand Up @@ -645,7 +645,7 @@ static int bb_manage_spdy_msg(struct bb_connection *bbc) {
// check for dealer as the host: header could be missing !!!
if (!bbs->dealer) return -1;
if (!bbs->request.no_uwsgi)
bb_zmq_send_msg(bbs, bbs->dealer->identity, bbs->dealer->len, (char *) &bbs->uuid_part1, BB_UUID_LEN, "uwsgi", 5, bbs->request.uwsgi_buf, bbs->request.uwsgi_pos);
bb_zmq_send_msg(bbs->dealer, bbs, (char *) &bbs->uuid_part1, BB_UUID_LEN, "uwsgi", 5, bbs->request.uwsgi_buf, bbs->request.uwsgi_pos);
break;
// RST
case 0x03:
Expand Down
2 changes: 1 addition & 1 deletion src/websocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void bb_websocket_pass(struct bb_session *bbs, char *buf, ssize_t len) {
bb_socketio_message(bbs->sio_session, buf, len);
return;
}
bb_zmq_send_msg(bbs, bbs->dealer->identity, bbs->dealer->len, (char *) &bbs->uuid_part1, BB_UUID_LEN, "websocket", 9, buf, len);
bb_zmq_send_msg(bbs->dealer, bbs, (char *) &bbs->uuid_part1, BB_UUID_LEN, "websocket", 9, buf, len);
}

int bb_manage_websocket(struct bb_session *bbs, char *buf, ssize_t len) {
Expand Down
Loading

0 comments on commit 772156d

Please sign in to comment.