Skip to content

Commit

Permalink
added experimental SCTP/persistent connections support
Browse files Browse the repository at this point in the history
  • Loading branch information
roberto@precise64 committed Mar 3, 2012
1 parent 8518747 commit 9d66112
Show file tree
Hide file tree
Showing 8 changed files with 319 additions and 31 deletions.
2 changes: 1 addition & 1 deletion buildconf/default.ini
Expand Up @@ -6,7 +6,7 @@ json = auto
sqlite3 = auto
zeromq = auto
snmp = true
sctp = false
sctp = auto
spooler = true
embedded = true
udp = true
Expand Down
2 changes: 1 addition & 1 deletion plugins/php/php_plugin.c
Expand Up @@ -524,7 +524,7 @@ PHP_FUNCTION(uwsgi_signal) {
RETURN_NULL();
}

function_entry uwsgi_php_functions[] = {
zend_function_entry uwsgi_php_functions[] = {
PHP_FE(uwsgi_version, NULL)
PHP_FE(uwsgi_setprocname, NULL)
PHP_FE(uwsgi_worker_id, NULL)
Expand Down
112 changes: 112 additions & 0 deletions proto/sctp.c
@@ -0,0 +1,112 @@
/* async uwsgi protocol parser */

#include "../uwsgi.h"

extern struct uwsgi_server uwsgi;

int uwsgi_proto_sctp_parser(struct wsgi_request *wsgi_req) {

struct sctp_sndrcvinfo sinfo;
int msg_flags;

ssize_t len = sctp_recvmsg(wsgi_req->socket->fd, wsgi_req->buffer, uwsgi.buffer_size, NULL, NULL, &sinfo, &msg_flags);

if (len < 0) {
uwsgi_error("sctp_recvmsg()");
if (msg_flags == 0) {
// connection lost, retrigger it
close(wsgi_req->socket->fd);
wsgi_req->socket->fd = connect_to_sctp(wsgi_req->socket->name, wsgi_req->socket->queue);
}
return -1;
}
else if (len == 0) {
uwsgi_log("lost connection with the SCTP server\n");
// connection lost, retrigger it
close(wsgi_req->socket->fd);
wsgi_req->socket->fd = connect_to_sctp(wsgi_req->socket->name, wsgi_req->socket->queue);
return -2;
}

// check for a request stream
if (sinfo.sinfo_stream != 0) {
uwsgi_log("invalid SCTP stream id (must be 0)\n");
return -1;
}

memcpy(&wsgi_req->uh, &sinfo.sinfo_ppid, sizeof(uint32_t));

/* big endian ? */
#ifdef __BIG_ENDIAN__
wsgi_req->uh.pktsize = uwsgi_swap16(wsgi_req->uh.pktsize);
#endif

#ifdef UWSGI_DEBUG
uwsgi_debug("uwsgi payload size: %d (0x%X) modifier1: %d modifier2: %d\n", wsgi_req->uh.pktsize, wsgi_req->uh.pktsize, wsgi_req->uh.modifier1, wsgi_req->uh.modifier2);
#endif

/* check for max buffer size */
if (wsgi_req->uh.pktsize > uwsgi.buffer_size) {
uwsgi_log("invalid request block size: %d (max %d)...skip\n", wsgi_req->uh.pktsize, uwsgi.buffer_size);
return -1;
}

return UWSGI_OK;

}

ssize_t uwsgi_proto_sctp_writev_header(struct wsgi_request * wsgi_req, struct iovec * iovec, size_t iov_len) {
ssize_t wlen = writev(wsgi_req->poll.fd, iovec, iov_len);
if (wlen < 0) {
uwsgi_req_error("writev()");
return 0;
}
return wlen;
}

ssize_t uwsgi_proto_sctp_writev(struct wsgi_request * wsgi_req, struct iovec * iovec, size_t iov_len) {
ssize_t wlen = writev(wsgi_req->poll.fd, iovec, iov_len);
if (wlen < 0) {
uwsgi_req_error("writev()");
return 0;
}
return wlen;
}

ssize_t uwsgi_proto_sctp_write(struct wsgi_request * wsgi_req, char *buf, size_t len) {
ssize_t wlen = write(wsgi_req->poll.fd, buf, len);
if (wlen < 0) {
uwsgi_req_error("write()");
return 0;
}
return wlen;
}

ssize_t uwsgi_proto_sctp_write_header(struct wsgi_request * wsgi_req, char *buf, size_t len) {
ssize_t wlen = write(wsgi_req->poll.fd, buf, len);
if (wlen < 0) {
uwsgi_req_error("write()");
return 0;
}
return wlen;
}

// accept on persistent connections is a noop
int uwsgi_proto_sctp_accept(struct wsgi_request *wsgi_req, int fd) {
return wsgi_req->socket->fd;
}

void uwsgi_proto_sctp_close(struct wsgi_request *wsgi_req) {

struct sctp_sndrcvinfo sinfo;
memset(&sinfo, 0, sizeof(struct sctp_sndrcvinfo));
// stream 2 is used for closing requests
sinfo.sinfo_stream = 2;
memcpy(&sinfo.sinfo_ppid, &wsgi_req->uh, sizeof(uint32_t));

if (wsgi_req->async_post) {
fclose(wsgi_req->async_post);
}
sctp_send(wsgi_req->poll.fd, &wsgi_req->uh , sizeof(uint32_t), &sinfo, 0);
}

129 changes: 119 additions & 10 deletions socket.c
Expand Up @@ -136,20 +136,114 @@ int bind_to_unix(char *socket_name, int listen_queue, int chmod_socket, int abst

#ifdef UWSGI_SCTP

#define MAX_SCTP_ADDRESS 4
/* sctp address format sctp:127.0.0.1,192.168.0.17:3031 */
int bind_to_sctp(char *socket_name, int listen_queue, char *sctp_port) {
int connect_to_sctp(char *socket_names, int queue) {

char *peers = uwsgi_str(socket_names);
int addresses = 0;

struct sockaddr_in *sins;

// first step: count required addresses;
char *p = strtok(peers, ",");
while(p) {
#ifdef UWSGI_DEBUG
uwsgi_log("p = %s\n", p);
#endif
addresses++;
p = strtok(NULL, ",");
}

free(peers);
peers = uwsgi_str(socket_names);

sins = uwsgi_calloc(sizeof(struct sockaddr_in) * addresses);

addresses = 0;
p = strtok(peers, ",");
while(p) {
char *port = strchr(p, ':');
if (!port) {
uwsgi_log("invalid SCTP address/port, please fix it and restart\n");
goto clear;
}
sins[addresses].sin_family = AF_INET;
*port = 0;
sins[addresses].sin_addr.s_addr = inet_addr(p);
sins[addresses].sin_port = htons( atoi(port+1) );
addresses++;
p = strtok(NULL, ",");
}

int serverfd = socket(AF_INET, SOCK_STREAM, IPPROTO_SCTP);
if (serverfd < 0) {
uwsgi_error("socket()");
goto clear;
}

struct sctp_event_subscribe events;
struct sctp_initmsg initmsg;

memset(&initmsg, 0, sizeof(initmsg));
initmsg.sinit_max_instreams = 17;
initmsg.sinit_num_ostreams = 17;

if (setsockopt(serverfd, IPPROTO_SCTP,
SCTP_INITMSG, &initmsg, sizeof(initmsg))) {
uwsgi_error("setsockopt()");
goto clear;
}

memset( (void *)&events, 0, sizeof(events) );
events.sctp_data_io_event = 1;
/*
events.sctp_peer_error_event = 1;
events.sctp_shutdown_event = 1;
*/

if (setsockopt( serverfd, SOL_SCTP, SCTP_EVENTS,
(const void *)&events, sizeof(events) )) {
uwsgi_error("setsockopt()");
goto clear;
}

if (sctp_connectx(serverfd, (struct sockaddr *) sins, addresses, NULL)) {
uwsgi_error("sctp_connectx()");
goto clear;
}



free(sins);
free(peers);

event_queue_add_fd_read(queue, serverfd);

uwsgi_log("connected to SCTP server %s\n", socket_names);

return serverfd;

clear:
free(sins);
free(peers);
sleep(1);
return connect_to_sctp(socket_names, queue);
}

/* sctp address format 127.0.0.1:3031,192.168.0.17:3031 */
int bind_to_sctp(char *socket_name, int listen_queue) {
int serverfd;
struct sockaddr_in uws_addr[MAX_SCTP_ADDRESS];
int num_ip = 0;
//struct sockaddr_in uws_addr[MAX_SCTP_ADDRESS];

//int num_ip = 0;

struct sctp_initmsg sctp_im;


sctp_port[0] = 0;
memset(uws_addr, 0, sizeof(struct sockaddr_in) * MAX_SCTP_ADDRESS);
//sctp_port[0] = 0;
//memset(uws_addr, 0, sizeof(struct sockaddr_in) * MAX_SCTP_ADDRESS);
memset(&sctp_im, 0, sizeof(struct sctp_initmsg));

/*
while (socket_name != NULL && num_ip < MAX_SCTP_ADDRESS) {
char *ap;
while ((ap = strsep(&socket_name, ",")) != NULL) {
Expand All @@ -161,26 +255,30 @@ int bind_to_sctp(char *socket_name, int listen_queue, char *sctp_port) {
}
}
}
*/

serverfd = socket(AF_INET, SOCK_STREAM, IPPROTO_SCTP);
if (serverfd < 0) {
uwsgi_error("socket()");
uwsgi_nuclear_blast();
}

uwsgi_log("binding on %d SCTP interfaces on port: %d\n", num_ip, ntohs(uws_addr[0].sin_port));
//uwsgi_log("binding on %d SCTP interfaces on port: %d\n", num_ip, ntohs(uws_addr[0].sin_port));


/*
if (sctp_bindx(serverfd, (struct sockaddr *) uws_addr, num_ip, SCTP_BINDX_ADD_ADDR) != 0) {
uwsgi_error("sctp_bindx()");
uwsgi_nuclear_blast();
}
*/

sctp_im.sinit_max_instreams = 0xFFFF;
sctp_im.sinit_num_ostreams = 0xFFFF;
sctp_im.sinit_max_instreams = 17;
sctp_im.sinit_num_ostreams = 17;

if (setsockopt(serverfd, IPPROTO_SCTP, SCTP_INITMSG, &sctp_im, sizeof(sctp_im))) {
uwsgi_error("setsockopt()");
uwsgi_nuclear_blast();
}

if (listen(serverfd, listen_queue) != 0) {
Expand Down Expand Up @@ -1069,7 +1167,18 @@ void uwsgi_add_sockets_to_queue(int queue) {

struct uwsgi_socket *uwsgi_sock = uwsgi.sockets;
while (uwsgi_sock) {
#ifdef UWSGI_SCTP
if (uwsgi_sock->fd == -1 && uwsgi_sock->proto_name && !strcmp(uwsgi_sock->proto_name, "sctp")) {
// continue until a connection is ready
uwsgi_sock->fd = connect_to_sctp(uwsgi_sock->name, queue);
uwsgi_sock->queue = queue;
}
else {
#endif
event_queue_add_fd_read(queue, uwsgi_sock->fd);
#ifdef UWSGI_SCTP
}
#endif
uwsgi_sock = uwsgi_sock->next;
}

Expand Down
11 changes: 11 additions & 0 deletions utils.c
Expand Up @@ -4032,6 +4032,17 @@ int uwsgi_file_to_string_list(char *filename, struct uwsgi_string_list **list) {
return 0;
}

void uwsgi_setup_post_buffering(void) {
uwsgi.async_post_buf = uwsgi_malloc(sizeof(char *) * uwsgi.cores);
if (!uwsgi.post_buffering_bufsize)
uwsgi.post_buffering_bufsize = 8192;
if (uwsgi.post_buffering_bufsize < uwsgi.post_buffering) {
uwsgi.post_buffering_bufsize = uwsgi.post_buffering;
uwsgi_log("setting request body buffering size to %d bytes\n", uwsgi.post_buffering_bufsize);
}

}

void uwsgi_emulate_cow_for_apps(int id) {
int i;
// check if we need to emulate fork() COW
Expand Down

0 comments on commit 9d66112

Please sign in to comment.