Permalink
Browse files

Support Websockets under Web::Hippie.

Essentially re-implements psgix.io support.
It's better this time, I hope.
  • Loading branch information...
1 parent 79a8372 commit f44386133ef6068adbf113aa2e6ce4bd2a9ef221 @stash committed Oct 9, 2010
Showing with 81 additions and 110 deletions.
  1. +42 −79 Feersum.xs
  2. +20 −15 lib/Feersum.pm
  3. +3 −2 lib/Feersum/Connection.pm
  4. +16 −14 t/54-psgix-io.t
View
@@ -117,8 +117,7 @@ struct feer_conn {
U16 in_callback;
U16 responding;
U16 receiving;
- U16 _reservedflags:12;
- U16 made_raw:1;
+ U16 _reservedflags:13;
U16 is_http11:1;
U16 poll_write_cb_is_io_handle:1;
U16 auto_cl:1;
@@ -136,7 +135,6 @@ static void feersum_start_response
static int feersum_write_whole_body (pTHX_ struct feer_conn *c, SV *body);
static void feersum_handle_psgi_response(
pTHX_ struct feer_conn *c, SV *ret, bool can_recurse);
-static void feersum_psgi_triplet(pTHX_ struct feer_conn *c, AV *psgi_triplet);
static int feersum_close_handle(pTHX_ struct feer_conn *c, bool is_writer);
static void start_read_watcher(struct feer_conn *c);
@@ -536,10 +534,6 @@ static SV *
new_feer_conn_handle (pTHX_ struct feer_conn *c, bool is_writer)
{
SV *sv;
- if (c->made_raw) {
- croak("psgix.io is active; cannot make a new conn handle");
- return;
- }
SvREFCNT_inc(c->self);
sv = newRV_noinc(newSVuv(PTR2UV(c)));
sv_bless(sv, is_writer ? feer_conn_writer_stash : feer_conn_reader_stash);
@@ -613,7 +607,7 @@ process_request_ready_rinq (void)
call_request_callback(c);
- if (!c->made_raw && c->wbuf_rinq) {
+ if (c->wbuf_rinq) {
// this was deferred until after the perl callback
conn_write_ready(c);
}
@@ -1014,6 +1008,7 @@ process_request_headers (struct feer_conn *c, int body_offset)
trace("processing headers %d minor_version=%d\n",c->fd,req->minor_version);
bool body_is_required;
+ bool next_req_follows = 0;
c->is_http11 = (req->minor_version == 1);
@@ -1023,11 +1018,11 @@ process_request_headers (struct feer_conn *c, int body_offset)
str_eq("HEAD", 4, req->method, req->method_len) ||
str_eq("DELETE", 6, req->method, req->method_len))
{
- // Not supposed to have a body. Additional bytes are either a mistake
- // or pipelined requests under HTTP/1.1
-
- // XXX ignore them for now
- goto got_it_all;
+ // Not supposed to have a body. Additional bytes are either a
+ // mistake, a websocket negotiation or pipelined requests under
+ // HTTP/1.1
+ next_req_follows = 1;
+ trace("next req follows fd=%d, boff=%d\n",c->fd,body_offset);
}
else if (str_eq("PUT", 3, req->method, req->method_len) ||
str_eq("POST", 4, req->method, req->method_len))
@@ -1041,23 +1036,23 @@ process_request_headers (struct feer_conn *c, int body_offset)
goto got_bad_request;
}
- // a body potentially follows the headers. Let feer_req retain its
- // pointers into rbuf and make a new scalar for more body data.
+ // a body or follow-on data potentially follows the headers. Let feer_req
+ // retain its pointers into rbuf and make a new scalar for more body data.
STRLEN from_len;
char *from = SvPV(c->rbuf,from_len);
from += body_offset;
int need = from_len - body_offset;
int new_alloc = (need > READ_INIT_FACTOR*READ_BUFSZ)
? need : READ_INIT_FACTOR*READ_BUFSZ-1;
trace("new rbuf for body %d need=%d alloc=%d\n",c->fd, need, new_alloc);
- SV *new_rbuf = newSV(new_alloc);
- if (need)
- sv_setpvn(new_rbuf, from, need);
- else
- SvPOK_on(new_rbuf);
+ SV *new_rbuf = newSVpvn(need ? from : "", need);
req->buf = c->rbuf;
c->rbuf = new_rbuf;
+ SvCUR_set(req->buf, body_offset);
+
+ if (next_req_follows)
+ goto got_it_all;
// determine how much we need to read
int i;
@@ -1148,11 +1143,6 @@ respond_with_server_error (struct feer_conn *c, const char *msg, STRLEN msg_len,
{
SV *tmp;
- if (c->made_raw) {
- trace("wanted to respond with server error, but conn is raw\n");
- return;
- }
-
if (c->responding != RESPOND_NOT_STARTED) {
trouble("Tried to send server error but already responding!");
return;
@@ -1368,7 +1358,6 @@ feersum_env(pTHX_ struct feer_conn *c)
}
if (request_cb_is_psgi) {
- trace("making magical psgix.io env fd=%d\n",c->fd);
SV *fake_fh = newSViv(c->fd); // just some random dummy value
SV *selfref = sv_2mortal(feer_conn_2sv(c));
sv_magicext(fake_fh, selfref, PERL_MAGIC_ext, &psgix_io_vtbl, NULL, 0);
@@ -1464,11 +1453,6 @@ feersum_start_response (pTHX_ struct feer_conn *c, SV *message, AV *headers,
STRLEN len;
I32 i;
- if (c->made_raw) {
- croak("psgix.io is active; cannot start_response");
- return;
- }
-
trace("start_response fd=%d streaming=%d\n", c->fd, streaming);
if (c->responding)
@@ -1506,8 +1490,8 @@ feersum_start_response (pTHX_ struct feer_conn *c, SV *message, AV *headers,
message = sv_2mortal(newSVpvf("%d %.*s",code,len,ptr));
}
- // don't generate or strip Content-Length headers for 304 responses.
- c->auto_cl = (code == 304) ? 0 : 1;
+ // don't generate or strip Content-Length headers for 304 or 1xx
+ c->auto_cl = (code == 304 || (100 <= code && code <= 199)) ? 0 : 1;
add_const_to_wbuf(c, c->is_http11 ? "HTTP/1.1 " : "HTTP/1.0 ", 9);
add_sv_to_wbuf(c, message);
@@ -1642,19 +1626,7 @@ static void
feersum_handle_psgi_response(
pTHX_ struct feer_conn *c, SV *ret, bool can_recurse)
{
- if (c->made_raw) { // psgix.io was invoked
- if (IsArrayRef(ret) && av_len((AV*)SvRV(ret))+1 > 0) {
- sv_setpvs(ERRSV,
- "psgix.io mode is active; can't send this response");
- call_died(aTHX_ c, "PSGI request");
- return;
- }
- else if (!SvOK(ret)) {
- return;
- }
- // else: fall through in case it's a non-array callback
- }
- else if (!SvOK(ret) || !SvROK(ret)) {
+ if (!SvOK(ret) || !SvROK(ret)) {
sv_setpvs(ERRSV, "Invalid PSGI response (expected reference)");
call_died(aTHX_ c, "PSGI request");
return;
@@ -1672,8 +1644,6 @@ feersum_handle_psgi_response(
return;
}
- if (c->made_raw) return; // fell through above
-
AV *psgi_triplet = (AV*)SvRV(ret);
if (av_len(psgi_triplet)+1 != 3) {
sv_setpvs(ERRSV, "Invalid PSGI array response (expected triplet)");
@@ -1736,7 +1706,7 @@ feersum_close_handle (pTHX_ struct feer_conn *c, bool is_writer)
SvREFCNT_dec(c->rbuf);
c->rbuf = NULL;
}
- RETVAL = shutdown(c->fd, SHUT_RD); // TODO: respect keep-alive
+ RETVAL = shutdown(c->fd, SHUT_RD);
c->receiving = RECEIVE_SHUTDOWN;
}
@@ -1940,16 +1910,13 @@ static int
psgix_io_svt_get (pTHX_ SV *sv, MAGIC *mg)
{
dSP;
- struct feer_conn *c;
- SV *writer;
+
+ struct feer_conn *c = sv_2feer_conn(mg->mg_obj);
+ sv_unmagic(sv, PERL_MAGIC_ext);
ENTER;
SAVETMPS;
- SV *conn_sv = sv_2mortal(newSVsv(mg->mg_obj));
- sv_unmagic(sv, PERL_MAGIC_ext);
-
- c = sv_2feer_conn(conn_sv);
trace("invoking psgix.io magic for fd=%d\n", c->fd);
PUSHMARK(SP);
@@ -1964,24 +1931,32 @@ psgix_io_svt_get (pTHX_ SV *sv, MAGIC *mg)
call_died(aTHX_ c, "psgix.io magic");
}
else {
- // set the scalar value of the glob (will get cleaned up l8r)
- GvSV(SvRV(sv)) = newRV_inc(c->self);
+ SV *io_glob = SvRV(sv);
+ GvSV(io_glob) = newRV_inc(c->self);
+
+ // put whatever remainder data into the socket buffer. For keepalive
+ // support the opposite operation is required; pull the data out of
+ // the socket buffer and back into feersum.
+ if (c->rbuf && SvOK(c->rbuf) && SvCUR(c->rbuf)) {
+ STRLEN rbuf_len;
+ const char *rbuf_ptr = SvPV(c->rbuf, rbuf_len);
+ IO *io = GvIOp(io_glob);
+ assert(io != NULL);
+ PerlIO_unread(IoIFP(io), (const void *)rbuf_ptr, rbuf_len);
+ sv_setpvs(c->rbuf, "");
+ }
stop_read_watcher(c);
stop_read_timer(c);
- stop_write_watcher(c);
-
- c->responding = RESPOND_STREAMING;
- c->receiving = RECEIVE_STREAMING;
- c->made_raw = 1;
+ // don't stop write watcher in case there's outstanding data.
}
PUTBACK;
FREETMPS;
LEAVE;
+ return 0;
}
-
MODULE = Feersum PACKAGE = Feersum
PROTOTYPES: ENABLE
@@ -2111,7 +2086,8 @@ DESTROY (SV *self)
struct feer_conn *c = (struct feer_conn *)hdl;
trace3("DESTROY handle fd=%d, class=%s\n", c->fd,
HvNAME(SvSTASH(SvRV(self))));
- feersum_close_handle(aTHX_ c, (ix == 2));
+ if (ix == 2) // only close the writer on destruction
+ feersum_close_handle(aTHX_ c, 1);
}
}
@@ -2123,8 +2099,6 @@ read (feer_conn_handle *hdl, SV *buf, size_t len, ...)
STRLEN buf_len = 0, src_len = 0;
ssize_t offset;
char *buf_ptr, *src_ptr;
-
- if (c->made_raw) XSRETURN_UNDEF;
if (items == 4 && SvOK(ST(3)) && SvIOK(ST(3)))
offset = SvIV(ST(3));
@@ -2206,7 +2180,6 @@ write (feer_conn_handle *hdl, ...)
PROTOTYPE: $;$
CODE:
{
- if (c->made_raw) XSRETURN_UNDEF;
if (c->responding != RESPOND_STREAMING)
croak("can only call write in streaming mode");
@@ -2244,7 +2217,6 @@ seek (feer_conn_handle *hdl, ssize_t offset, ...)
int whence = SEEK_CUR;
if (items == 3 && SvOK(ST(2)) && SvIOK(ST(2)))
whence = SvIV(ST(2));
- if (c->made_raw) XSRETURN_UNDEF;
trace("seek fd=%d offset=%d whence=%d\n", c->fd, offset, whence);
@@ -2295,8 +2267,6 @@ close (feer_conn_handle *hdl)
Feersum::Connection::Writer::close = 2
CODE:
{
- if (c->made_raw) XSRETURN_UNDEF;
-
assert(ix);
RETVAL = feersum_close_handle(aTHX_ c, (ix == 2));
SvUVX(hdl_sv) = 0;
@@ -2312,7 +2282,6 @@ _poll_cb (feer_conn_handle *hdl, SV *cb)
Feersum::Connection::Writer::poll_cb = 2
PPCODE:
{
- if (c->made_raw) return;
if (ix < 1 || ix > 2)
croak("can't call _poll_cb directly");
else if (ix == 1)
@@ -2342,8 +2311,6 @@ SV *
start_streaming (struct feer_conn *c, SV *message, AV *headers)
PROTOTYPE: $$\@
CODE:
- if (c->made_raw)
- croak("psgix.io mode is active; can't start streaming");
feersum_start_response(aTHX_ c, message, headers, 1);
RETVAL = new_feer_conn_handle(aTHX_ c, 1); // RETVAL gets mortalized
OUTPUT:
@@ -2353,8 +2320,6 @@ int
send_response (struct feer_conn *c, SV* message, AV *headers, SV *body)
PROTOTYPE: $$\@$
CODE:
- if (c->made_raw)
- croak("psgix.io mode is active; can't send this response");
feersum_start_response(aTHX_ c, message, headers, 0);
RETVAL = feersum_write_whole_body(aTHX_ c, body);
OUTPUT:
@@ -2364,8 +2329,6 @@ void
_send_psgi_response (struct feer_conn *c, SV *psgi_triplet)
PROTOTYPE: $$
PPCODE:
- if (c->made_raw)
- croak("psgix.io mode is active; can't send this response");
feersum_handle_psgi_response(aTHX_ c, psgi_triplet, 0); // no recurse
void
@@ -2396,7 +2359,7 @@ DESTROY (struct feer_conn *c)
PPCODE:
{
int i;
- trace3("DESTROY conn fd=%d c=%p raw=%d\n", c->fd, c, (IV)c->made_raw);
+ trace3("DESTROY conn fd=%d c=%p\n", c->fd, c);
if (c->rbuf) SvREFCNT_dec(c->rbuf);
@@ -2417,7 +2380,7 @@ DESTROY (struct feer_conn *c)
if (c->sa) Safefree(c->sa);
- if (c->fd && !c->made_raw) {
+ if (c->fd) {
make_blocking(c->fd);
if(close(c->fd))
perror("close socket at destruction");
Oops, something went wrong.

0 comments on commit f443861

Please sign in to comment.