Permalink
Browse files

first implementation of multi-request per connection (need optimization)

  • Loading branch information...
1 parent f6f8dbe commit 27fda413980a5b3b2ebb890c4525ea3dd8f217ac roberto@goyle committed Dec 12, 2010
Showing with 110 additions and 9 deletions.
  1. +60 −1 plugins/python/uwsgi_pymodule.c
  2. +39 −0 plugins/python/wsgi_headers.c
  3. +10 −4 utils.c
  4. +1 −4 uwsgirouter5.py
@@ -1142,6 +1142,12 @@ typedef struct {
int fd;
int timeout;
int close;
+ int started ;
+ int has_cl;
+ uint16_t size;
+ uint16_t sent;
+ uint8_t modifier1;
+ uint8_t modifier2;
} uwsgi_Iter;
@@ -1154,16 +1160,64 @@ PyObject* uwsgi_Iter_next(PyObject *self) {
int rlen;
uwsgi_Iter *ui = (uwsgi_Iter *)self;
char buf[4096];
+ int i = 4;
+ struct uwsgi_header uh;
+ char *ub = (char *) &uh ;
UWSGI_RELEASE_GIL
uwsgi_log("waiting for data\n");
+
+ if (!ui->started) {
+ memset(&uh, 0, 4);
+ while(i) {
+ rlen = uwsgi_waitfd(ui->fd, ui->timeout);
+ if (rlen > 0) {
+ rlen = read(ui->fd, ub , i);
+ if (rlen <= 0) {
+ goto clear;
+ }
+ else {
+ i -= rlen;
+ ub += rlen;
+ }
+ }
+ else {
+ goto clear;
+ }
+ }
+
+ ui->started = 1;
+
+ if (uh.modifier1 == 'H') {
+ ui->size = 0;
+ UWSGI_GET_GIL
+ return PyString_FromStringAndSize((char *) &uh, 4);
+ }
+ else {
+ ui->has_cl = 1;
+ ui->size = uh.pktsize;
+ ui->sent = 0;
+ uwsgi_log("NEED TO READ %d\n", uh.pktsize);
+ }
+ }
+
+ if (ui->sent >= ui->size && ui->has_cl) {
+ goto clear;
+ }
+
rlen = uwsgi_waitfd(ui->fd, ui->timeout);
if (rlen > 0) {
- rlen = read(ui->fd, buf, 4096);
+ if (ui->has_cl) {
+ rlen = read(ui->fd, buf, UMIN((ui->size-ui->sent), 4096));
+ }
+ else {
+ rlen = read(ui->fd, buf, 4096);
+ }
if (rlen < 0) {
uwsgi_error("read()");
}
else if (rlen > 0) {
+ ui->sent+=rlen;
UWSGI_GET_GIL
return PyString_FromStringAndSize(buf, rlen);
}
@@ -1176,6 +1230,7 @@ PyObject* uwsgi_Iter_next(PyObject *self) {
close(ui->fd);
}
+clear:
UWSGI_GET_GIL
PyErr_SetNone(PyExc_StopIteration);
@@ -1365,6 +1420,10 @@ PyObject *py_uwsgi_send_message(PyObject * self, PyObject * args) {
ui->fd = uwsgi_fd;
ui->timeout = timeout;
ui->close = close_fd;
+ ui->started = 0;
+ ui->has_cl = 0;
+ ui->sent = 0;
+ ui->size = 0;
return (PyObject *) ui;
@@ -13,11 +13,13 @@ PyObject *py_uwsgi_spit(PyObject * self, PyObject * args) {
PyObject *headers, *head;
PyObject *h_key, *h_value;
int i, j;
+ struct uwsgi_header uh;
struct wsgi_request *wsgi_req = current_wsgi_req();
int base = 0;
int shift = 0;
+ int cl = -1;
// use writev()
@@ -51,34 +53,41 @@ PyObject *py_uwsgi_spit(PyObject * self, PyObject * args) {
}
wsgi_req->hvec[0].iov_len = wsgi_req->protocol_len;
+ uh.pktsize = wsgi_req->hvec[0].iov_len;
wsgi_req->hvec[1].iov_base = " ";
wsgi_req->hvec[1].iov_len = 1;
+ uh.pktsize += wsgi_req->hvec[1].iov_len;
#ifdef PYTHREE
wsgi_req->hvec[2].iov_base = PyBytes_AsString(PyUnicode_AsASCIIString(head));
wsgi_req->hvec[2].iov_len = strlen(wsgi_req->hvec[2].iov_base);
#else
wsgi_req->hvec[2].iov_base = PyString_AsString(head);
wsgi_req->hvec[2].iov_len = PyString_Size(head);
#endif
+ uh.pktsize += wsgi_req->hvec[2].iov_len;
wsgi_req->status = atoi(wsgi_req->hvec[2].iov_base);
wsgi_req->hvec[3].iov_base = nl;
wsgi_req->hvec[3].iov_len = NL_SIZE;
+ uh.pktsize += wsgi_req->hvec[3].iov_len;
}
else {
// drop http status on cgi mode
base = 3;
wsgi_req->hvec[0].iov_base = "Status: ";
wsgi_req->hvec[0].iov_len = 8;
+ uh.pktsize = wsgi_req->hvec[0].iov_len;
#ifdef PYTHREE
wsgi_req->hvec[1].iov_base = PyBytes_AsString(PyUnicode_AsASCIIString(head));
wsgi_req->hvec[1].iov_len = strlen(wsgi_req->hvec[1].iov_base);
#else
wsgi_req->hvec[1].iov_base = PyString_AsString(head);
wsgi_req->hvec[1].iov_len = PyString_Size(head);
#endif
+ uh.pktsize += wsgi_req->hvec[1].iov_len;
wsgi_req->status = atoi(wsgi_req->hvec[1].iov_base);
wsgi_req->hvec[2].iov_base = nl;
wsgi_req->hvec[2].iov_len = NL_SIZE;
+ uh.pktsize += wsgi_req->hvec[2].iov_len;
}
@@ -115,24 +124,38 @@ PyObject *py_uwsgi_spit(PyObject * self, PyObject * args) {
if (!h_value) {
goto clear;
}
+
#ifdef PYTHREE
wsgi_req->hvec[j].iov_base = PyBytes_AsString(PyUnicode_AsASCIIString(h_key));
wsgi_req->hvec[j].iov_len = strlen(wsgi_req->hvec[j].iov_base);
#else
wsgi_req->hvec[j].iov_base = PyString_AsString(h_key);
wsgi_req->hvec[j].iov_len = PyString_Size(h_key);
#endif
+ uh.pktsize += wsgi_req->hvec[j].iov_len;
wsgi_req->hvec[j + 1].iov_base = h_sep;
wsgi_req->hvec[j + 1].iov_len = H_SEP_SIZE;
+ uh.pktsize += wsgi_req->hvec[j+1].iov_len;
#ifdef PYTHREE
wsgi_req->hvec[j + 2].iov_base = PyBytes_AsString(PyUnicode_AsASCIIString(h_value));
wsgi_req->hvec[j + 2].iov_len = strlen(wsgi_req->hvec[j + 2].iov_base);
#else
wsgi_req->hvec[j + 2].iov_base = PyString_AsString(h_value);
wsgi_req->hvec[j + 2].iov_len = PyString_Size(h_value);
#endif
+
+ uh.pktsize += wsgi_req->hvec[j+2].iov_len;
+
+ if (wsgi_req->leave_open && cl == -1) {
+ if (!uwsgi_strncmp(wsgi_req->hvec[j].iov_base, wsgi_req->hvec[j].iov_len, "Content-Length", 14)) {
+ cl = atoi(wsgi_req->hvec[j + 2].iov_base);
+ uwsgi_log("DETECTED CL OF %d\n", cl);
+ }
+ }
wsgi_req->hvec[j + 3].iov_base = nl;
wsgi_req->hvec[j + 3].iov_len = NL_SIZE;
+
+ uh.pktsize += wsgi_req->hvec[j+3].iov_len;
//uwsgi_log( "%.*s: %.*s\n", wsgi_req->hvec[j].iov_len, (char *)wsgi_req->hvec[j].iov_base, wsgi_req->hvec[j+2].iov_len, (char *) wsgi_req->hvec[j+2].iov_base);
}
@@ -142,6 +165,22 @@ PyObject *py_uwsgi_spit(PyObject * self, PyObject * args) {
wsgi_req->hvec[j].iov_base = nl;
wsgi_req->hvec[j].iov_len = NL_SIZE;
+ uh.pktsize += wsgi_req->hvec[j].iov_len;
+
+ if (cl != -1) {
+ // send uwsgi header only if response size is lower than 0xFFFF
+ if (cl <= 0xffff) {
+ uh.modifier1 = 0;
+ uh.modifier2 = 0;
+ uint16_t pktsize = cl;
+ uh.pktsize += pktsize;
+ write(wsgi_req->poll.fd, &uh, 4);
+ }
+ else {
+ wsgi_req->leave_open = 0;
+ }
+ }
+
UWSGI_RELEASE_GIL
wsgi_req->headers_size = writev(wsgi_req->poll.fd, wsgi_req->hvec, j + 1);
UWSGI_GET_GIL
View
14 utils.c
@@ -357,7 +357,7 @@ void uwsgi_close_request(struct wsgi_request *wsgi_req) {
}
else if (wsgi_req->leave_open) {
// send OOB data to signal peer of EOS
- send(wsgi_req->poll.fd, "\0", 1, MSG_OOB);
+ //send(wsgi_req->poll.fd, "\0", 1, MSG_OOB);
}
uwsgi.workers[0].requests++;
uwsgi.workers[uwsgi.mywid].requests++;
@@ -1182,14 +1182,20 @@ int uwsgi_waitfd(int fd, int timeout) {
uwsgi_error("poll()");
}
else if (ret > 0) {
+ if (upoll[0].revents & POLLIN) {
+ uwsgi_log("DETECTED DATA\n");
+ return ret;
+ }
+
if (upoll[0].revents & POLLPRI) {
+ uwsgi_log("DETECTED PRI DATA\n");
rlen = recv(fd, &oob, 1, MSG_OOB);
uwsgi_log("RECEIVE OOB DATA %d !!!\n", rlen);
+ if (rlen < 0) {
+ return -1;
+ }
return 0;
}
- else {
- return ret;
- }
}
return ret;
View
@@ -1,12 +1,9 @@
import uwsgi
-fd = uwsgi.connect("192.168.173.100:3033")
-for part in uwsgi.send_message(fd, 0, 4, {"leave_open":"1"}, 30):
- print part
+fd = uwsgi.connect("127.0.0.1:3033")
def application(e,s):
for part in uwsgi.send_message(fd, 0, 4, e, 30, e['wsgi.input'].fileno(), uwsgi.cl()):
- print part
yield part

0 comments on commit 27fda41

Please sign in to comment.