Skip to content

Commit

Permalink
fastrouter heavy optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
roberto@goyle committed Mar 10, 2012
1 parent 10ff849 commit a3617d0
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 154 deletions.
30 changes: 30 additions & 0 deletions plugins/fastrouter/fastrouter.c
Expand Up @@ -490,6 +490,36 @@ void fastrouter_loop(int id) {

struct fastrouter_session *fr_session;

ufr.mapper = uwsgi_fr_map_use_void;

if (ufr.use_cache) {
ufr.mapper = uwsgi_fr_map_use_cache;
}
else if (ufr.pattern) {
ufr.mapper = uwsgi_fr_map_use_pattern;
}
else if (ufr.has_subscription_sockets) {
ufr.mapper = uwsgi_fr_map_use_subscription;
}
else if (ufr.base) {
ufr.mapper = uwsgi_fr_map_use_base;
}
else if (ufr.code_string_code && ufr.code_string_function) {
ufr.mapper = uwsgi_fr_map_use_cache;
}
else if (ufr.to_socket) {
ufr.mapper = uwsgi_fr_map_use_to;
}
else if (ufr.static_nodes) {
ufr.mapper = uwsgi_fr_map_use_static_nodes;
}
#ifdef UWSGI_SCTP
else if (ufr.has_sctp_sockets > 0) {
ufr.mapper = uwsgi_fr_map_use_sctp;
}
#endif



ufr.timeouts = uwsgi_init_rb_timer();

Expand Down
15 changes: 15 additions & 0 deletions plugins/fastrouter/fr.h
Expand Up @@ -16,9 +16,12 @@
#define del_check_timeout(x) rb_erase(&x->rbt, timeouts);
#define del_timeout(x) rb_erase(&x->timeout->rbt, ufr.timeouts); free(x->timeout);

struct fastrouter_session;

struct uwsgi_fastrouter {

int (*mapper)(struct fastrouter_session *, char **);

int has_sockets;
int has_subscription_sockets;
#ifdef UWSGI_SCTP
Expand Down Expand Up @@ -147,3 +150,15 @@ struct fastrouter_session {
void uwsgi_fastrouter_switch_events(struct fastrouter_session *, int intersting_fd, char **);
void close_session(struct fastrouter_session *);
void fr_get_hostname(char *, uint16_t, char *, uint16_t, void *);

int uwsgi_fr_map_use_void(struct fastrouter_session *, char **);
int uwsgi_fr_map_use_cache(struct fastrouter_session *, char **);
int uwsgi_fr_map_use_pattern(struct fastrouter_session *, char **);
int uwsgi_fr_map_use_subscription(struct fastrouter_session *, char **);
int uwsgi_fr_map_use_base(struct fastrouter_session *, char **);
int uwsgi_fr_map_use_cs(struct fastrouter_session *, char **);
int uwsgi_fr_map_use_to(struct fastrouter_session *, char **);
int uwsgi_fr_map_use_static_nodes(struct fastrouter_session *, char **);
#ifdef UWSGI_SCTP
int uwsgi_fr_map_use_sctp(struct fastrouter_session *, char **);
#endif
155 changes: 2 additions & 153 deletions plugins/fastrouter/fr_events.c
Expand Up @@ -5,11 +5,6 @@
extern struct uwsgi_server uwsgi;
extern struct uwsgi_fastrouter ufr;

#ifdef UWSGI_SCTP
extern struct uwsgi_fr_sctp_node **uwsgi_fastrouter_sctp_nodes;
extern struct uwsgi_fr_sctp_node **uwsgi_fastrouter_sctp_nodes_current;
#endif

void uwsgi_fastrouter_switch_events(struct fastrouter_session *fr_session, int interesting_fd, char **magic_table) {

socklen_t solen = sizeof(int);
Expand All @@ -25,8 +20,6 @@ void uwsgi_fastrouter_switch_events(struct fastrouter_session *fr_session, int i
ssize_t len;
char *post_tmp_buf[0xffff];

int tmp_socket_name_len;

switch (fr_session->status) {

case FASTROUTER_STATUS_RECV_HDR:
Expand Down Expand Up @@ -75,154 +68,10 @@ void uwsgi_fastrouter_switch_events(struct fastrouter_session *fr_session, int i
//uwsgi_log("requested domain %.*s\n", fr_session->hostname_len, fr_session->hostname);
#endif

// the mapper hook
choose_node:
if (ufr.use_cache) {
fr_session->instance_address = uwsgi_cache_get(fr_session->hostname, fr_session->hostname_len, &fr_session->instance_address_len);
char *cs_mod = uwsgi_str_contains(fr_session->instance_address, fr_session->instance_address_len, ',');
if (cs_mod) {
fr_session->modifier1 = uwsgi_str_num(cs_mod + 1, (fr_session->instance_address_len - (cs_mod - fr_session->instance_address)) - 1);
fr_session->instance_address_len = (cs_mod - fr_session->instance_address);
}
}
else if (ufr.pattern) {
magic_table['s'] = uwsgi_concat2n(fr_session->hostname, fr_session->hostname_len, "", 0);
fr_session->tmp_socket_name = magic_sub(ufr.pattern, ufr.pattern_len, &tmp_socket_name_len, magic_table);
free(magic_table['s']);
fr_session->instance_address_len = tmp_socket_name_len;
fr_session->instance_address = fr_session->tmp_socket_name;
}
else if (ufr.has_subscription_sockets) {
fr_session->un = uwsgi_get_subscribe_node(&ufr.subscriptions, fr_session->hostname, fr_session->hostname_len, ufr.subscription_regexp);
if (fr_session->un && fr_session->un->len) {
fr_session->instance_address = fr_session->un->name;
fr_session->instance_address_len = fr_session->un->len;
fr_session->modifier1 = fr_session->un->modifier1;
}
else if (ufr.subscriptions == NULL && ufr.cheap && !ufr.i_am_cheap) {
uwsgi_gateway_go_cheap("uWSGI fastrouter", ufr.queue, &ufr.i_am_cheap);
}
}
else if (ufr.base) {
fr_session->tmp_socket_name = uwsgi_concat2nn(ufr.base, ufr.base_len, fr_session->hostname, fr_session->hostname_len, &tmp_socket_name_len);
fr_session->instance_address_len = tmp_socket_name_len;
fr_session->instance_address = fr_session->tmp_socket_name;
}
else if (ufr.code_string_code && ufr.code_string_function) {
if (uwsgi.p[ufr.code_string_modifier1]->code_string) {
fr_session->instance_address = uwsgi.p[ufr.code_string_modifier1]->code_string("uwsgi_fastrouter", ufr.code_string_code, ufr.code_string_function, fr_session->hostname, fr_session->hostname_len);
if (fr_session->instance_address) {
fr_session->instance_address_len = strlen(fr_session->instance_address);
char *cs_mod = uwsgi_str_contains(fr_session->instance_address, fr_session->instance_address_len, ',');
if (cs_mod) {
fr_session->modifier1 = uwsgi_str_num(cs_mod + 1, (fr_session->instance_address_len - (cs_mod - fr_session->instance_address)) - 1);
fr_session->instance_address_len = (cs_mod - fr_session->instance_address);
}
}
}
}
else if (ufr.to_socket) {
fr_session->instance_address = ufr.to_socket->name;
fr_session->instance_address_len = ufr.to_socket->name_len;
}
else if (ufr.static_nodes) {
if (!ufr.current_static_node) {
ufr.current_static_node = ufr.static_nodes;
}

fr_session->static_node = ufr.current_static_node;

// is it a dead node ?
if (fr_session->static_node->custom > 0) {

// gracetime passed ?
if (fr_session->static_node->custom + ufr.static_node_gracetime <= (uint64_t) uwsgi_now()) {
fr_session->static_node->custom = 0;
}
else {
struct uwsgi_string_list *tmp_node = fr_session->static_node;
struct uwsgi_string_list *next_node = fr_session->static_node->next;
fr_session->static_node = NULL;
// needed for 1-node only setups
if (!next_node)
next_node = ufr.static_nodes;

while (tmp_node != next_node) {
if (!next_node) {
next_node = ufr.static_nodes;
}

if (tmp_node == next_node)
break;

if (next_node->custom == 0) {
fr_session->static_node = next_node;
break;
}
next_node = next_node->next;
}
}
}

if (fr_session->static_node) {

fr_session->instance_address = fr_session->static_node->value;
fr_session->instance_address_len = fr_session->static_node->len;
// set the next one
ufr.current_static_node = fr_session->static_node->next;
}
else {
// set the next one
ufr.current_static_node = ufr.current_static_node->next;
}

}
#ifdef UWSGI_SCTP
else if (ufr.has_sctp_sockets > 0) {


if (!*uwsgi_fastrouter_sctp_nodes_current)
*uwsgi_fastrouter_sctp_nodes_current = *uwsgi_fastrouter_sctp_nodes;

struct uwsgi_fr_sctp_node *ufsn = *uwsgi_fastrouter_sctp_nodes_current;
int choosen_fd = -1;
// find the first available server
while (ufsn) {
if (ufr.fr_table[ufsn->fd]->status == FASTROUTER_STATUS_SCTP_NODE_FREE) {
ufsn->requests++;
choosen_fd = ufsn->fd;
break;
}
if (ufsn->next == *uwsgi_fastrouter_sctp_nodes_current) {
break;
}

ufsn = ufsn->next;
}

// no nodes available
if (choosen_fd == -1) {
fr_session->retry = 1;
del_timeout(fr_session);
fr_session->timeout = add_fake_timeout(fr_session);
break;
}

struct sctp_sndrcvinfo sinfo;
memset(&sinfo, 0, sizeof(struct sctp_sndrcvinfo));
memcpy(&sinfo.sinfo_ppid, &fr_session->uh, sizeof(uint32_t));
sinfo.sinfo_stream = fr_session->fd;
len = sctp_send(choosen_fd, fr_session->buffer, fr_session->uh.pktsize, &sinfo, 0);

fr_session->instance_fd = choosen_fd;
fr_session->status = FASTROUTER_STATUS_SCTP_RESPONSE;
ufr.fr_table[fr_session->instance_fd]->status = FASTROUTER_STATUS_SCTP_RESPONSE;
ufr.fr_table[fr_session->instance_fd]->fd = fr_session->fd;

// round robin
*uwsgi_fastrouter_sctp_nodes_current = (*uwsgi_fastrouter_sctp_nodes_current)->next;
if (ufr.mapper(fr_session, magic_table))
break;
}
#endif

// no address found
if (!fr_session->instance_address_len) {
Expand Down

0 comments on commit a3617d0

Please sign in to comment.