Skip to content

Commit

Permalink
Fast file transfers, part 2: Profit from sendfile().
Browse files Browse the repository at this point in the history
  • Loading branch information
jonashaag committed Mar 23, 2011
1 parent 8773d32 commit 7bbd8fe
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 55 deletions.
2 changes: 1 addition & 1 deletion Makefile
Expand Up @@ -92,7 +92,7 @@ memwatch:
watch -n 0.5 \ watch -n 0.5 \
'cat /proc/$$(pgrep -n python)/cmdline | tr "\0" " " | head -c -1; \ 'cat /proc/$$(pgrep -n python)/cmdline | tr "\0" " " | head -c -1; \
echo; echo; \ echo; echo; \
tail -n +25 /proc/$$(pidof -s python)/smaps' tail -n +25 /proc/$$(pgrep -n python)/smaps'


upload: upload:
python setup.py sdist upload python setup.py sdist upload
2 changes: 2 additions & 0 deletions bjoern/filewrapper.c
Expand Up @@ -12,6 +12,7 @@ FileWrapper_New(PyObject* self, PyObject* args, PyObject* kwargs)
} }
Py_INCREF(file); Py_INCREF(file);
FileWrapper* wrapper = PyObject_NEW(FileWrapper, &FileWrapper_Type); FileWrapper* wrapper = PyObject_NEW(FileWrapper, &FileWrapper_Type);
PyFile_IncUseCount((PyFileObject*)file);
wrapper->file = file; wrapper->file = file;
return (PyObject*)wrapper; return (PyObject*)wrapper;
} }
Expand All @@ -31,6 +32,7 @@ FileWrapper_Iter(PyObject* self)
static void static void
FileWrapper_dealloc(PyObject* self) FileWrapper_dealloc(PyObject* self)
{ {
PyFile_DecUseCount((PyFileObject*)((FileWrapper*)self)->file);
Py_DECREF(((FileWrapper*)self)->file); Py_DECREF(((FileWrapper*)self)->file);
PyObject_FREE(self); PyObject_FREE(self);
} }
Expand Down
2 changes: 2 additions & 0 deletions bjoern/filewrapper.h
@@ -1,5 +1,7 @@
#include "common.h" #include "common.h"


#define FileWrapper_CheckExact(x) ((x)->ob_type == &FileWrapper_Type)

PyTypeObject FileWrapper_Type; PyTypeObject FileWrapper_Type;


typedef struct { typedef struct {
Expand Down
4 changes: 0 additions & 4 deletions bjoern/request.c
Expand Up @@ -61,10 +61,6 @@ void Request_clean(Request* request)
Py_DECREF(request->iterable); Py_DECREF(request->iterable);
} }
Py_XDECREF(request->iterator); Py_XDECREF(request->iterator);
if(request->headers)
assert(request->headers->ob_refcnt >= 1);
if(request->status)
assert(request->status->ob_refcnt >= 1);
Py_XDECREF(request->headers); Py_XDECREF(request->headers);
Py_XDECREF(request->status); Py_XDECREF(request->status);
} }
Expand Down
3 changes: 2 additions & 1 deletion bjoern/request.h
Expand Up @@ -15,6 +15,7 @@ typedef struct {
unsigned keep_alive : 1; unsigned keep_alive : 1;
unsigned response_length_unknown : 1; unsigned response_length_unknown : 1;
unsigned chunked_response : 1; unsigned chunked_response : 1;
unsigned use_sendfile : 1;
} request_state; } request_state;


typedef struct { typedef struct {
Expand All @@ -36,12 +37,12 @@ typedef struct {


request_state state; request_state state;


PyObject* status;
PyObject* headers; PyObject* headers;
PyObject* current_chunk; PyObject* current_chunk;
Py_ssize_t current_chunk_p; Py_ssize_t current_chunk_p;
PyObject* iterable; PyObject* iterable;
PyObject* iterator; PyObject* iterator;
PyObject* status;
} Request; } Request;


#define REQUEST_FROM_WATCHER(watcher) \ #define REQUEST_FROM_WATCHER(watcher) \
Expand Down
128 changes: 80 additions & 48 deletions bjoern/server.c
Expand Up @@ -5,6 +5,7 @@
#ifdef WANT_SIGINT_HANDLING #ifdef WANT_SIGINT_HANDLING
# include <sys/signal.h> # include <sys/signal.h>
#endif #endif
#include <sys/sendfile.h>
#include <ev.h> #include <ev.h>
#include "common.h" #include "common.h"
#include "wsgi.h" #include "wsgi.h"
Expand Down Expand Up @@ -33,6 +34,8 @@ static ev_io_callback ev_io_on_request;
static ev_io_callback ev_io_on_read; static ev_io_callback ev_io_on_read;
static ev_io_callback ev_io_on_write; static ev_io_callback ev_io_on_write;
static bool send_chunk(Request*); static bool send_chunk(Request*);
static bool do_sendfile(Request*);
static bool handle_nonzero_errno(Request*);


void server_run(const char* hostaddr, const int port) void server_run(const char* hostaddr, const int port)
{ {
Expand Down Expand Up @@ -187,46 +190,56 @@ ev_io_on_write(struct ev_loop* mainloop, ev_io* watcher, const int events)
Request* request = REQUEST_FROM_WATCHER(watcher); Request* request = REQUEST_FROM_WATCHER(watcher);


GIL_LOCK(0); GIL_LOCK(0);
assert(request->current_chunk);


if(send_chunk(request)) if(request->state.use_sendfile) {
goto out; /* sendfile */
if(request->current_chunk && send_chunk(request))
goto out;
/* abuse current_chunk_p to store the file fd */
request->current_chunk_p = PyObject_AsFileDescriptor(request->iterable);
if(do_sendfile(request))
goto out;
} else {
/* iterable */
if(send_chunk(request))
goto out;


if(request->iterator) { if(request->iterator) {
PyObject* next_chunk; PyObject* next_chunk;
next_chunk = wsgi_iterable_get_next_chunk(request); next_chunk = wsgi_iterable_get_next_chunk(request);
if(next_chunk) { if(next_chunk) {
if(request->state.chunked_response) { if(request->state.chunked_response) {
request->current_chunk = wrap_http_chunk_cruft_around(next_chunk); request->current_chunk = wrap_http_chunk_cruft_around(next_chunk);
Py_DECREF(next_chunk); Py_DECREF(next_chunk);
} else {
request->current_chunk = next_chunk;
}
assert(request->current_chunk_p == 0);
goto out;
} else { } else {
request->current_chunk = next_chunk; if(PyErr_Occurred()) {
PyErr_Print();
/* We can't do anything graceful here because at least one
* chunk is already sent... just close the connection */
DBG_REQ(request, "Exception in iterator, can not recover");
ev_io_stop(mainloop, &request->ev_watcher);
close(request->client_fd);
Request_free(request);
goto out;
}
Py_CLEAR(request->iterator);
} }
}

if(request->state.chunked_response) {
/* We have to send a terminating empty chunk + \r\n */
request->current_chunk = PyString_FromString("0\r\n\r\n");
assert(request->current_chunk_p == 0); assert(request->current_chunk_p == 0);
request->state.chunked_response = false;
goto out; goto out;
} else {
if(PyErr_Occurred()) {
PyErr_Print();
/* We can't do anything graceful here because at least one
* chunk is already sent... just close the connection */
DBG_REQ(request, "Exception in iterator, can not recover");
ev_io_stop(mainloop, &request->ev_watcher);
close(request->client_fd);
Request_free(request);
goto out;
}
Py_CLEAR(request->iterator);
} }
} }


if(request->state.chunked_response) {
/* We have to send a terminating empty chunk + \r\n */
request->current_chunk = PyString_FromString("0\r\n\r\n");
assert(request->current_chunk_p == 0);
request->state.chunked_response = false;
goto out;
}

ev_io_stop(mainloop, &request->ev_watcher); ev_io_stop(mainloop, &request->ev_watcher);
if(request->state.keep_alive) { if(request->state.keep_alive) {
DBG_REQ(request, "done, keep-alive"); DBG_REQ(request, "done, keep-alive");
Expand All @@ -249,38 +262,57 @@ static bool
send_chunk(Request* request) send_chunk(Request* request)
{ {
Py_ssize_t chunk_length; Py_ssize_t chunk_length;
Py_ssize_t sent_bytes; Py_ssize_t bytes_sent;


assert(request->current_chunk != NULL); assert(request->current_chunk != NULL);
assert(!(request->current_chunk_p == PyString_GET_SIZE(request->current_chunk) assert(!(request->current_chunk_p == PyString_GET_SIZE(request->current_chunk)
&& PyString_GET_SIZE(request->current_chunk) != 0)); && PyString_GET_SIZE(request->current_chunk) != 0));


sent_bytes = write( bytes_sent = write(
request->client_fd, request->client_fd,
PyString_AS_STRING(request->current_chunk) + request->current_chunk_p, PyString_AS_STRING(request->current_chunk) + request->current_chunk_p,
PyString_GET_SIZE(request->current_chunk) - request->current_chunk_p PyString_GET_SIZE(request->current_chunk) - request->current_chunk_p
); );


if(sent_bytes == -1) { if(bytes_sent == -1)
error: return handle_nonzero_errno(request);
if(errno == EAGAIN || errno == EWOULDBLOCK) {
/* Try again later */
return true;
} else {
/* Serious transmission failure. Hang up. */
fprintf(stderr, "Client %d hit errno %d\n", request->client_fd, errno);
Py_DECREF(request->current_chunk);
Py_XCLEAR(request->iterator);
request->state.keep_alive = false;
return false;
}
}


request->current_chunk_p += sent_bytes; request->current_chunk_p += bytes_sent;
if(request->current_chunk_p == PyString_GET_SIZE(request->current_chunk)) { if(request->current_chunk_p == PyString_GET_SIZE(request->current_chunk)) {
Py_CLEAR(request->current_chunk); Py_CLEAR(request->current_chunk);
request->current_chunk_p = 0; request->current_chunk_p = 0;
return false; return false;
} }
return true; return true;
} }

#define SENDFILE_CHUNK_SIZE 16*1024

static bool
do_sendfile(Request* request)
{
Py_ssize_t bytes_sent = sendfile(
request->client_fd,
request->current_chunk_p, /* current_chunk_p stores the file fd */
NULL, SENDFILE_CHUNK_SIZE
);
if(bytes_sent == -1)
return handle_nonzero_errno(request);
return bytes_sent != 0;
}

static bool
handle_nonzero_errno(Request* request)
{
if(errno == EAGAIN || errno == EWOULDBLOCK) {
/* Try again later */
return true;
} else {
/* Serious transmission failure. Hang up. */
fprintf(stderr, "Client %d hit errno %d\n", request->client_fd, errno);
Py_XDECREF(request->current_chunk);
Py_XCLEAR(request->iterator);
request->state.keep_alive = false;
return false;
}
}
8 changes: 8 additions & 0 deletions bjoern/wsgi.c
@@ -1,5 +1,6 @@
#include "common.h" #include "common.h"
#include "bjoernmodule.h" #include "bjoernmodule.h"
#include "filewrapper.h"
#include "wsgi.h" #include "wsgi.h"


static PyObject* (start_response)(PyObject* self, PyObject* args, PyObject *kwargs); static PyObject* (start_response)(PyObject* self, PyObject* args, PyObject *kwargs);
Expand Down Expand Up @@ -84,6 +85,13 @@ wsgi_call_application(Request* request)
Py_DECREF(retval); Py_DECREF(retval);
first_chunk = NULL; first_chunk = NULL;
} }
} else if(FileWrapper_CheckExact(retval)) {
request->state.use_sendfile = true;
request->iterable = ((FileWrapper*)retval)->file;
Py_INCREF(request->iterable);
Py_DECREF(retval);
request->iterator = NULL;
first_chunk = NULL;
} else { } else {
/* Generic iterable (list of length != 1, generator, ...) */ /* Generic iterable (list of length != 1, generator, ...) */
request->iterable = retval; request->iterable = retval;
Expand Down
2 changes: 1 addition & 1 deletion http-parser
Submodule http-parser updated from 5338fb to 967ab7

0 comments on commit 7bbd8fe

Please sign in to comment.