Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

psgix.io support!

* See docs; psgix.io is a magic scalar.
* updated DIED to use confess instead of warn (for when people use psgix.io
  incorrectly).
* sockets aren't `shutdown(fd, SHUT_RD)` anymore to support psgix.io (just
  close() now).
  • Loading branch information...
commit a77c85d8bd132e5f472d8a9af1a1449c6ad66b50 1 parent f182c75
Jeremy Stashewsky authored
132 Feersum.xs
View
@@ -117,7 +117,8 @@ struct feer_conn {
U16 in_callback;
U16 responding;
U16 receiving;
- U16 _reservedflags:13;
+ U16 _reservedflags:12;
+ U16 made_raw:1;
U16 is_http11:1;
U16 poll_write_cb_is_io_handle:1;
U16 auto_cl:1;
@@ -171,6 +172,7 @@ static int prep_socket (int fd);
static HV *feer_stash, *feer_conn_stash;
static HV *feer_conn_reader_stash = NULL, *feer_conn_writer_stash = NULL;
+static MGVTBL psgix_io_vtbl;
static SV *request_cb_cv = NULL;
static bool request_cb_is_psgi = 0;
@@ -528,9 +530,13 @@ sv_2feer_conn_handle (SV *rv, bool can_croak)
}
static SV *
-new_feer_conn_handle (struct feer_conn *c, bool is_writer)
+new_feer_conn_handle (pTHX_ struct feer_conn *c, bool is_writer)
{
SV *sv;
+ if (c->made_raw) {
+ croak("psgix.io is active; cannot make a new conn handle");
+ return;
+ }
SvREFCNT_inc(c->self);
sv = newRV_noinc(newSVuv(PTR2UV(c)));
sv_bless(sv, is_writer ? feer_conn_writer_stash : feer_conn_reader_stash);
@@ -604,7 +610,7 @@ process_request_ready_rinq (void)
call_request_callback(c);
- if (c->wbuf_rinq) {
+ if (!c->made_raw && c->wbuf_rinq) {
// this was deferred until after the perl callback
conn_write_ready(c);
}
@@ -773,7 +779,6 @@ try_write_shutdown:
c->responding = RESPOND_SHUTDOWN;
stop_write_watcher(c);
make_blocking(c->fd);
- //shutdown(c->fd, SHUT_WR);
if(close(c->fd))
perror("close socket at shutdown");
c->fd = 0;
@@ -878,7 +883,6 @@ try_read_error:
stop_read_watcher(c);
stop_read_timer(c);
stop_write_watcher(c);
- //shutdown(c->fd, SHUT_RDWR);
if (close(c->fd))
perror("close on read error");
c->fd = 0;
@@ -894,7 +898,6 @@ dont_read_again:
c->receiving = RECEIVE_SHUTDOWN;
stop_read_watcher(c);
stop_read_timer(c);
- shutdown(c->fd, SHUT_RD);
goto try_read_cleanup;
try_read_again_reset_timer:
@@ -926,7 +929,6 @@ conn_read_timeout (EV_P_ ev_timer *w, int revents)
trace("read timeout %d\n", c->fd);
if (c->responding == RESPOND_NOT_STARTED) {
- shutdown(c->fd, SHUT_RD);
const char *msg;
if (c->receiving == RECEIVE_HEADERS) {
msg = "Headers took too long.";
@@ -942,7 +944,6 @@ conn_read_timeout (EV_P_ ev_timer *w, int revents)
stop_read_watcher(c);
stop_read_timer(c);
make_blocking(c->fd);
- //shutdown(c->fd, SHUT_RDWR);
if(close(c->fd))
perror("close socket at read timeout");
c->fd = 0;
@@ -1144,6 +1145,11 @@ respond_with_server_error (struct feer_conn *c, const char *msg, STRLEN msg_len,
{
SV *tmp;
+ if (c->made_raw) {
+ trace("wanted to respond with server error, but conn is raw\n");
+ return;
+ }
+
if (c->responding != RESPOND_NOT_STARTED) {
trouble("Tried to send server error but already responding!");
return;
@@ -1162,7 +1168,6 @@ respond_with_server_error (struct feer_conn *c, const char *msg, STRLEN msg_len,
stop_read_watcher(c);
stop_read_timer(c);
- shutdown(c->fd, SHUT_RD);
c->responding = RESPOND_SHUTDOWN;
c->receiving = RECEIVE_SHUTDOWN;
conn_write_ready(c);
@@ -1294,6 +1299,8 @@ feersum_init_tmpl_env(pTHX)
hv_stores(e, "HTTP_IF_MODIFIED_SINCE", &PL_sv_placeholder);
hv_stores(e, "HTTP_IF_NONE_MATCH", &PL_sv_placeholder);
hv_stores(e, "HTTP_CACHE_CONTROL", &PL_sv_placeholder);
+
+ hv_stores(e, "psgix.io", &PL_sv_placeholder);
feersum_tmpl_env = e;
}
@@ -1351,12 +1358,20 @@ feersum_env(pTHX_ struct feer_conn *c)
if (c->expected_cl > 0) {
hv_stores(e, "CONTENT_LENGTH", newSViv(c->expected_cl));
- hv_stores(e, "psgi.input", new_feer_conn_handle(c,0));
+ hv_stores(e, "psgi.input", new_feer_conn_handle(aTHX_ c,0));
}
else if (request_cb_is_psgi) {
// TODO: make psgi.input a valid, but always empty stream for PSGI mode?
}
+ if (request_cb_is_psgi) {
+ trace("making magical psgix.io env fd=%d\n",c->fd);
+ SV *fake_fh = newSViv(c->fd); // just some random dummy value
+ SV *selfref = sv_2mortal(feer_conn_2sv(c));
+ sv_magicext(fake_fh, selfref, PERL_MAGIC_ext, &psgix_io_vtbl, NULL, 0);
+ hv_stores(e, "psgix.io", fake_fh);
+ }
+
{
const char *qpos = r->path;
SV *pinfo, *qstr;
@@ -1446,6 +1461,11 @@ feersum_start_response (pTHX_ struct feer_conn *c, SV *message, AV *headers,
STRLEN len;
I32 i;
+ if (c->made_raw) {
+ croak("psgix.io is active; cannot start_response");
+ return;
+ }
+
trace("start_response fd=%d streaming=%d\n", c->fd, streaming);
if (c->responding)
@@ -1618,18 +1638,32 @@ feersum_start_psgi_streaming(pTHX_ struct feer_conn *c, SV *streamer)
static void
feersum_handle_psgi_response(pTHX_ struct feer_conn *c, SV *ret)
{
- if (!SvOK(ret) || !SvROK(ret)) {
- sv_setpvs(ERRSV, "Invalid PSGI response (expected defined)");
+ if (c->made_raw) { // psgix.io was invoked
+ if (IsArrayRef(ret) && av_len((AV*)SvRV(ret))+1 > 0) {
+ sv_setpvs(ERRSV,
+ "psgix.io mode is active; can't send this response");
+ call_died(aTHX_ c, "PSGI request");
+ return;
+ }
+ else if (!SvOK(ret)) {
+ return;
+ }
+ // else: fall through in case it's a non-array callback
+ }
+ else if (!SvOK(ret) || !SvROK(ret)) {
+ sv_setpvs(ERRSV, "Invalid PSGI response (expected reference)");
call_died(aTHX_ c, "PSGI request");
return;
}
- if (!IsArrayRef(ret)) {
- trace("PSGI response code-ref, c=%p cv=%p\n", c, ret);
+ if (SvOK(ret) && !IsArrayRef(ret)) {
+ trace("PSGI response non-array, c=%p ret=%p\n", c, ret);
feersum_start_psgi_streaming(aTHX_ c, ret);
return;
}
+ if (c->made_raw) return; // fell through above
+
AV *psgi_triplet = (AV*)SvRV(ret);
if (av_len(psgi_triplet)+1 != 3) {
sv_setpvs(ERRSV, "Invalid PSGI array response (expected triplet)");
@@ -1692,6 +1726,7 @@ call_request_callback (struct feer_conn *c)
dSP;
int flags;
c->in_callback++;
+ SvREFCNT_inc(c->self);
trace("request callback c=%p\n", c);
@@ -1737,6 +1772,7 @@ call_request_callback (struct feer_conn *c)
}
c->in_callback--;
+ SvREFCNT_dec(c->self);
}
static void
@@ -1757,7 +1793,7 @@ call_poll_callback (struct feer_conn *c, bool is_write)
ENTER;
SAVETMPS;
PUSHMARK(SP);
- XPUSHs(sv_2mortal(new_feer_conn_handle(c, is_write)));
+ XPUSHs(sv_2mortal(new_feer_conn_handle(aTHX_ c, is_write)));
PUTBACK;
call_sv(cb, G_DISCARD|G_EVAL|G_VOID);
SPAGAIN;
@@ -1857,6 +1893,52 @@ done_pump_io:
c->in_callback--;
}
+static int
+psgix_io_svt_get (pTHX_ SV *sv, MAGIC *mg)
+{
+ dSP;
+ struct feer_conn *c;
+ SV *writer;
+
+ ENTER;
+ SAVETMPS;
+
+ SV *conn_sv = sv_2mortal(newSVsv(mg->mg_obj));
+ sv_unmagic(sv, PERL_MAGIC_ext);
+
+ c = sv_2feer_conn(conn_sv);
+ trace("invoking psgix.io magic for fd=%d\n", c->fd);
+
+ PUSHMARK(SP);
+ XPUSHs(sv);
+ mXPUSHs(newSViv(c->fd));
+ PUTBACK;
+
+ call_pv("Feersum::Connection::_raw", G_VOID|G_DISCARD|G_EVAL);
+ SPAGAIN;
+
+ if (SvTRUE(ERRSV)) {
+ call_died(aTHX_ c, "psgix.io magic");
+ }
+ else {
+ // set the scalar value of the glob (will get cleaned up l8r)
+ GvSV(SvRV(sv)) = newRV_inc(c->self);
+
+ stop_read_watcher(c);
+ stop_read_timer(c);
+ stop_write_watcher(c);
+
+ c->responding = RESPOND_STREAMING;
+ c->receiving = RECEIVE_STREAMING;
+ c->made_raw = 1;
+ }
+
+ PUTBACK;
+ FREETMPS;
+ LEAVE;
+}
+
+
MODULE = Feersum PACKAGE = Feersum
PROTOTYPES: ENABLE
@@ -1992,6 +2074,8 @@ read (feer_conn_handle *hdl, SV *buf, size_t len, ...)
STRLEN buf_len = 0, src_len = 0;
ssize_t offset;
char *buf_ptr, *src_ptr;
+
+ if (c->made_raw) XSRETURN_UNDEF;
if (items == 4 && SvOK(ST(3)) && SvIOK(ST(3)))
offset = SvIV(ST(3));
@@ -2072,6 +2156,7 @@ STRLEN
write (feer_conn_handle *hdl, SV *body, ...)
CODE:
{
+ if (c->made_raw) XSRETURN_UNDEF;
if (c->responding != RESPOND_STREAMING)
croak("can only call write in streaming mode");
@@ -2109,6 +2194,7 @@ seek (feer_conn_handle *hdl, ssize_t offset, ...)
int whence = SEEK_CUR;
if (items == 3 && SvOK(ST(2)) && SvIOK(ST(2)))
whence = SvIV(ST(2));
+ if (c->made_raw) XSRETURN_UNDEF;
trace("seek fd=%d offset=%d whence=%d\n", c->fd, offset, whence);
@@ -2159,6 +2245,8 @@ close (feer_conn_handle *hdl)
Feersum::Connection::Writer::close = 2
CODE:
{
+ if (c->made_raw) XSRETURN_UNDEF;
+
switch (ix) {
case 1:
trace("close reader fd=%d, c=%p\n", c->fd, c);
@@ -2200,6 +2288,7 @@ _poll_cb (feer_conn_handle *hdl, SV *cb)
Feersum::Connection::Writer::poll_cb = 2
PPCODE:
{
+ if (c->made_raw) return;
if (ix < 1 || ix > 2)
croak("can't call _poll_cb directly");
else if (ix == 1)
@@ -2229,8 +2318,10 @@ SV *
start_streaming (struct feer_conn *c, SV *message, AV *headers)
PROTOTYPE: $$\@
CODE:
+ if (c->made_raw)
+ croak("psgix.io mode is active; can't start streaming");
feersum_start_response(aTHX_ c, message, headers, 1);
- RETVAL = new_feer_conn_handle(c, 1); // RETVAL gets mortalized
+ RETVAL = new_feer_conn_handle(aTHX_ c, 1); // RETVAL gets mortalized
OUTPUT:
RETVAL
@@ -2238,6 +2329,8 @@ int
send_response (struct feer_conn *c, SV* message, AV *headers, SV *body)
PROTOTYPE: $$\@$
CODE:
+ if (c->made_raw)
+ croak("psgix.io mode is active; can't send this response");
feersum_start_response(aTHX_ c, message, headers, 0);
RETVAL = feersum_write_whole_body(aTHX_ c, body);
OUTPUT:
@@ -2271,7 +2364,7 @@ DESTROY (struct feer_conn *c)
PPCODE:
{
int i;
- trace3("DESTROY conn %d %p\n", c->fd, c);
+ trace3("DESTROY conn fd=%d c=%p raw=%d\n", c->fd, c, (IV)c->made_raw);
if (c->rbuf) SvREFCNT_dec(c->rbuf);
@@ -2292,7 +2385,7 @@ DESTROY (struct feer_conn *c)
if (c->sa) Safefree(c->sa);
- if (c->fd) {
+ if (c->fd && !c->made_raw) {
make_blocking(c->fd);
if(close(c->fd))
perror("close socket at destruction");
@@ -2341,4 +2434,7 @@ BOOT:
SvREADONLY_on(psgi_serv10);
psgi_serv11 = newSVpvs("HTTP/1.1");
SvREADONLY_on(psgi_serv11);
+
+ Zero(&psgix_io_vtbl, 1, MGVTBL);
+ psgix_io_vtbl.svt_get = psgix_io_svt_get;
}
4 TODO
View
@@ -53,11 +53,13 @@ multiple Feersum threads, one Perl thread?
WebSocket support (v1.1)
* http://www.whatwg.org/specs/web-socket-protocol/
+ * Support psgix.io and Web::Hippie now (1.0), but would be good to
+ accelerate it.
* Do the handshake in C/XS, call request_handler once request is complete.
* I/O is done using the streaming interface (buffered)
* requires random numbers (drand48?) and an MD5 implementation (link
openssl? use the guts of Digest::MD5 somehow?)
* make this a separate module since if it brings in an openssl deps.
- * will this work for PSGI? psgix.web_socket or something?
+ * will this work for PSGI? magic psgix.web_socket or something?
Release t/Utils.pm's "simple_client" as "anyevent::anotherhttp" or something?
40 lib/Feersum.pm
View
@@ -35,7 +35,7 @@ sub use_socket {
# overload this to catch Feersum errors and exceptions thrown by request
# callbacks.
-sub DIED { warn "DIED: $@"; }
+sub DIED { Carp::confess "DIED: $@"; }
1;
__END__
@@ -156,6 +156,8 @@ dynamic ones:
psgix.input.buffered => 1,
psgix.output.buffered => 1,
psgix.body.scalar_refs => 1,
+ # warning: read notes below on this extension:
+ psgix.io => \$magical_io_socket,
Note that SCRIPT_NAME is always blank (but defined). PATH_INFO will contain
the path part of the requested URI.
@@ -190,18 +192,27 @@ it will be immediately flushed to the socket.
};
};
-=head3 PSGI extensions
+=head2 PSGI extensions
+
+=over 4
+
+=item psgix.body.scalar_refs
Scalar refs in the response body are supported, and is indicated as an via the
-C<psgix.body.scalar_refs> env variable. Passing by reference is
+B<psgix.body.scalar_refs> env variable. Passing by reference is
B<significantly> faster than copying a value onto the return stack or into an
array. It's also very useful when broadcasting a message to many connected
-clients.
+clients. This is a Feersum-native feature exposed to PSGI apps; very few
+other PSGI handlers will support this.
+
+=item psgix.output.buffered
Calls to C<< $w->write() >> will never block. This behaviour is indicated by
-C<psgix.output.buffered> in the PSGI env hash.
+B<psgix.output.buffered> in the PSGI env hash.
+
+=item psgix.input.buffered
-C<psgix.input.buffered> is also set, which means that calls to read on the
+B<psgix.input.buffered> is also set, which means that calls to read on the
input handle will also never block. Feersum currently buffers the entire
input before calling the callback.
@@ -213,6 +224,23 @@ works similarly to the method on the "writer" object, although that isn't
currently part of the PSGI 1.03 spec. The callback will be called once data
has been buffered.
+=item psgix.io
+
+The raw socket extension B<psgix.io> is provided in order to support
+L<Web::Hippie>. To obtain the L<IO::Socket> corresponding to this connection,
+read this environment variable.
+
+B<Caution>: This environment variable is magical! Reading the value of this
+environment variable will activate raw socket mode. Once activated, the usual
+means of responding to a request are B<disabled>.
+
+PSGI apps must return undef or a streaming callback once psgix.io has been
+activated. Returning a response triplet will call the C<Feersum::DIED>
+function (default behaviour is to confess). Trying to call the streaming
+starter callback will croak.
+
+=back
+
=head2 The Feersum-native interface
The Feersum-native interface is inspired by PSGI, but is inherently
27 lib/Feersum/Connection.pm
View
@@ -1,6 +1,8 @@
package Feersum::Connection;
use strict;
use Carp qw/croak/;
+use IO::Socket::INET;
+use Scalar::Util qw/reftype/;
sub new {
croak "Cannot instantiate Feersum::Connection directly";
@@ -41,13 +43,30 @@ sub _initiate_streaming_psgi {
return;
});
- if (ref($streamer) eq 'CODE') {
- goto &$streamer;
- }
- # Maybe it's callable but not a CODE-ref:
+ goto &$streamer if (reftype($streamer) eq 'CODE');
+ # assumes callable but not a CODE-ref:
$streamer->(@_);
}
+sub _raw {
+ # don't shift; need to modify $_[0] directly;
+ my $fileno = $_[1];
+ # Hack to make gensyms via new_from_fd() show up in the Feersum package.
+ # This may or may not save memory (HEKs?) over true gensyms.
+ no warnings 'redefine';
+ local *IO::Handle::gensym = sub {
+ no strict;
+ my $pkg = "Feersum::";
+ my $name = "FEER$fileno";
+ my $fullname = $pkg . $name;
+ my $gv = \*{$fullname};
+ delete $$pkg{$name};
+ $gv;
+ };
+ $_[0] = IO::Socket::INET->new_from_fd($fileno, '+<');
+ return;
+}
+
1;
__END__
158 t/54-psgix-io.t
View
@@ -0,0 +1,158 @@
+#!perl
+use warnings;
+use strict;
+use constant CLIENTS => 10;
+use constant ROUNDS => 4;
+use Test::More tests => 3 + ROUNDS*(
+ CLIENTS*3 + # server setup
+ CLIENTS*3 + # client setup
+ CLIENTS + # server msg
+ CLIENTS + # client send
+ CLIENTS*CLIENTS + # client msg
+ 4 # round
+);
+use lib 't'; use Utils;
+
+BEGIN { use_ok('Feersum') };
+
+my ($socket,$port) = get_listen_socket();
+ok $socket, "made listen socket";
+
+my $evh = Feersum->new();
+{
+ no warnings 'redefine';
+ *Feersum::DIED = sub {
+ my $err = shift;
+ fail "Died during request handler: $err";
+ };
+}
+$evh->use_socket($socket);
+
+our $CRLF = "\015\012";
+my $app = sub {
+ my $env = shift;
+ is $env->{HTTP_UPGRADE}, 'chatz', "server setup: got an upgrade req";
+ my $n = $env->{HTTP_X_CLIENT};
+ if ($n % 2) {
+ # test psgi.streaming
+ return sub {
+ my $respond = shift;
+ do_chat($n,$env);
+ };
+ }
+ else {
+ # test traditional responses (result should be ignored)
+ do_chat($n,$env);
+ return [];
+ }
+};
+$evh->psgi_request_handler($app);
+
+my $cv;
+
+# read lines, broadcast to other server-side handles
+my @ss_handles;
+sub do_chat {
+ my ($n, $env) = @_;
+ $cv->begin;
+ my $fh = $env->{'psgix.io'};
+ isa_ok $fh, 'IO::Socket', "server setup: $n fh";
+
+ # use AnyEvent::Handle here specifically as that's what Web::Hippie
+ # uses.
+ my $h = AnyEvent::Handle->new(fh => $fh);
+ $ss_handles[$n] = $h;
+ $h->{guard} = guard { pass "server setup: $n destroyed" };
+ $h->push_write("HTTP/1.1 101 Switching Protocols$CRLF$CRLF");
+ $h->push_read(line => sub {
+ my $line = $_[1];
+ is $line, "hello from $n", "server msg: read a line for server $n";
+ $line .= "\n";
+ $ss_handles[$_]->push_write($line) for (1..CLIENTS);
+ $cv->end;
+ });
+ $h->on_error(sub {
+ diag "server handle error: $_[2]";
+ $h->destroy; # important self-ref
+ $cv->croak("server handle: ".$_[2]);
+ });
+}
+
+for my $round (1..ROUNDS) {
+ $cv = AE::cv;
+
+ # connect a number of clients, keeping the handle in a client-side handles
+ # array. Once all of the clients are connected ($connect_cv synchronizes
+ # them) send a "hello from" line for each client. Check that every client
+ # gets every message.
+ my @cs_handles;
+ my $connect_cv = AE::cv(sub {
+ pass "round $round : all clients connected, sending chats...";
+ eval {
+ for my $n (1 .. CLIENTS) {
+ my $h = $cs_handles[$n];
+ $h->push_write("hello from $n\n");
+ pass "client send: wrote to $n";
+ }
+ };
+ warn "during connect cv: $@" if $@;
+ });
+
+ $connect_cv->begin;
+ for my $n (1 .. CLIENTS) {
+ $connect_cv->begin;
+ $cv->begin;
+ my $h = AnyEvent::Handle->new(
+ connect => ['127.0.0.1',$port],
+ timeout => 3,
+ );
+ $cs_handles[$n] = $h;
+ $h->{guard} = guard { pass "client setup: $n destroyed" };
+
+ $h->on_error(sub {
+ diag "client handle error: $_[2]";
+ $h->destroy;
+ $connect_cv->croak("client handle: ".$_[2]);
+ $cv->croak("client handle: ".$_[2]);
+ });
+
+ $h->push_write("GET / HTTP/1.1$CRLF".
+ "Upgrade: chatz$CRLF".
+ "X-Client: $n$CRLF".
+ $CRLF
+ );
+
+ # one line for the upgrade, CLIENTS lines for the chatting
+ $h->push_read(line => qr/$CRLF$CRLF/, sub {
+ my $line = $_[1];
+ is $line, 'HTTP/1.1 101 Switching Protocols',
+ "client setup: client $n got upgraded";
+ $connect_cv->end;
+ });
+ my $to_read = CLIENTS;
+ $h->push_read(line => sub {
+ my $line = $_[1];
+ $to_read--;
+ like $line, qr/^hello from \d+$/,
+ "client msg: $n got a chat, $to_read left";
+ unless ($to_read) {
+ pass "client setup: client $n is done";
+ $cv->end;
+ }
+ }) for (1 .. CLIENTS);
+ }
+ $connect_cv->end;
+
+ $connect_cv->recv;
+ pass "round: all connected";
+ $cv->recv;
+ pass "round: done round $round";
+ $_->destroy() for grep {defined} @cs_handles;
+ @cs_handles = ();
+ $_->destroy() for grep {defined} @ss_handles;
+ @ss_handles = ();
+ pass "round: cleaned up round $round";
+}
+
+pass "all done";
+done_testing;
Please sign in to comment.
Something went wrong with that request. Please try again.