Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Support close() on reader/writer handles, bump ver.

Separates the reader/writer from the main connection object.

Terminating writes with `$w->write(undef)` no longer works.

Fix up a bunch of amateur XS mistakes in the process.
  • Loading branch information...
commit 087428303b18e7cba867defa23c185b7d8d6f830 1 parent 276260c
@stash authored
View
4 Changes
@@ -1,4 +1,8 @@
Revision history for Perl extension Feersum
+0.02 Fri Apr 23 10:55:54 2010
+ Renamed a good chunk of the functions & classes.
+ Made psgi.input and stream-writer objects separate from the main connection class.
+
0.01 Fri Apr 23 10:55:54 2010
Started project.
View
280 Feersum.xs
@@ -90,6 +90,9 @@ struct feer_conn {
int16_t receiving;
};
+typedef struct feer_conn feer_conn_handle; // for typemap
+
+#define dCONN struct feer_conn *c = (struct feer_conn *)w->data
static void try_conn_write(EV_P_ struct ev_io *w, int revents);
static void try_conn_read(EV_P_ struct ev_io *w, int revents);
@@ -111,6 +114,7 @@ static const char const *http_code_to_msg (int code);
static int prep_socket (int fd);
static HV *feer_stash, *feer_conn_stash;
+static HV *feer_conn_reader_stash = NULL, *feer_conn_writer_stash = NULL;
static SV *request_cb_cv = NULL;
static SV *shutdown_cb_cv = NULL;
@@ -289,7 +293,7 @@ new_feer_conn (EV_P_ int conn_fd)
}
// for use in the typemap:
-static struct feer_conn *
+INLINE_UNLESS_DEBUG static struct feer_conn *
sv_2feer_conn (SV *rv)
{
if (!sv_isa(rv,"Feersum::Connection"))
@@ -297,7 +301,7 @@ sv_2feer_conn (SV *rv)
return (struct feer_conn *)SvPVX(SvRV(rv));
}
-static SV*
+INLINE_UNLESS_DEBUG static SV*
feer_conn_2sv (struct feer_conn *c)
{
SV *rv = newRV_inc(c->self);
@@ -311,6 +315,41 @@ feer_conn_2sv (struct feer_conn *c)
return rv;
}
+INLINE_UNLESS_DEBUG static feer_conn_handle *
+sv_2feer_conn_handle (SV *rv, bool can_croak)
+{
+ trace("sv 2 conn_handle\n");
+ if (!SvROK(rv))
+ croak("Expected a reference");
+ // do not allow subclassing
+ SV *sv = SvRV(rv);
+ if (sv_isobject(rv) &&
+ (SvSTASH(sv) == feer_conn_writer_stash ||
+ SvSTASH(sv) == feer_conn_reader_stash))
+ {
+ UV uv = SvUV(sv);
+ if (uv == 0) {
+ if (can_croak) croak("Operation not allowed: Handle is closed.");
+ return NULL;
+ }
+ return INT2PTR(feer_conn_handle*,uv);
+ }
+
+ if (can_croak)
+ croak("Expected a Feersum::Connection::Writer or ::Reader object");
+ return NULL;
+}
+
+static SV *
+new_feer_conn_handle (struct feer_conn *c, bool is_writer)
+{
+ SV *sv;
+ SvREFCNT_inc(c->self);
+ sv = newRV_noinc(newSVuv(PTR2UV(c)));
+ sv_bless(sv, is_writer ? feer_conn_writer_stash : feer_conn_reader_stash);
+ return sv;
+}
+
static void
process_request_ready_rinq (void)
{
@@ -333,7 +372,7 @@ static void
prepare_cb (EV_P_ ev_prepare *w, int revents)
{
trace("prepare!\n");
- if (!ev_is_active(&accept_w)) {
+ if (!ev_is_active(&accept_w) && !shutting_down) {
ev_io_start(EV_A, &accept_w);
//ev_prepare_stop(EV_A, w);
}
@@ -354,8 +393,6 @@ idle_cb (EV_P_ ev_idle *w, int revents)
ev_idle_stop(EV_A, w);
}
-#define dCONN struct feer_conn *c = (struct feer_conn *)w->data
-
static void
try_conn_write(EV_P_ struct ev_io *w, int revents)
{
@@ -509,7 +546,7 @@ try_read_error:
dont_read_again:
c->receiving = RECEIVE_SHUTDOWN;
- shutdown(c->fd, SHUT_RD);
+ shutdown(c->fd, SHUT_RD); // TODO: respect keep-alive
ev_io_stop(EV_A, w);
return;
@@ -527,7 +564,17 @@ accept_cb (EV_P_ ev_io *w, int revents)
struct sockaddr_in sa;
socklen_t sl = sizeof(struct sockaddr_in);
- trace("accept! %08x %d\n", revents, revents & EV_READ);
+ trace("accept! revents=0x%08x\n", revents);
+ if (!(revents & EV_READ))
+ return;
+
+ if (shutting_down) {
+ // shouldn't get called, but be defensive
+ close(w->fd);
+ ev_io_stop(EV_A, w);
+ return;
+ }
+
while (1) {
sl = sizeof(struct sockaddr_in);
int fd = accept(w->fd, (struct sockaddr *)&sa, &sl);
@@ -894,25 +941,6 @@ call_request_callback (struct feer_conn *c)
c->in_callback--;
}
-void
-finish_shutdown(pTHX)
-{
- ev_idle_stop(EV_DEFAULT, &ei);
- ev_prepare_stop(EV_DEFAULT, &ep);
- ev_check_stop(EV_DEFAULT, &ec);
-
- trace("... was last conn, going to try shutdown\n");
- if (shutdown_cb_cv) {
- dSP;
- PUSHMARK(SP);
- call_sv(shutdown_cb_cv, G_EVAL|G_VOID|G_DISCARD|G_NOARGS|G_KEEPERR);
- trace("... ok, called that handler\n");
- SvREFCNT_dec(shutdown_cb_cv);
- shutdown_cb_cv = NULL;
- }
-
-}
-
MODULE = Feersum PACKAGE = Feersum
PROTOTYPES: ENABLE
@@ -961,27 +989,50 @@ graceful_shutdown (SV *self, SV *cb)
SvREFCNT_inc(shutdown_cb_cv);
trace("assigned shutdown handler %p\n", SvRV(cb));
- close(accept_w.fd);
- ev_io_stop(EV_DEFAULT, &accept_w);
shutting_down = 1;
+ ev_io_stop(EV_DEFAULT, &accept_w);
+ close(accept_w.fd);
}
void
DESTROY (SV *self)
PPCODE:
{
+ trace("DESTROY server\n");
if (request_cb_cv)
SvREFCNT_dec(request_cb_cv);
}
-MODULE = Feersum PACKAGE = Feersum::Connection
+MODULE = Feersum PACKAGE = Feersum::Connection::Handle
PROTOTYPES: ENABLE
+int
+fileno (feer_conn_handle *hdl)
+ CODE:
+ RETVAL = c->fd;
+ OUTPUT:
+ RETVAL
+
+void
+DESTROY (SV *self)
+ PPCODE:
+{
+ feer_conn_handle *hdl = sv_2feer_conn_handle(self, 0);
+ if (hdl == NULL) {
+ trace("DESTROY handle (closed) class=%s\n", HvNAME(SvSTASH(SvRV(ST(0)))));
+ }
+ else {
+ struct feer_conn *c = (struct feer_conn *)hdl;
+ trace("DESTROY handle fd=%d, class=%s\n", c->fd, HvNAME(SvSTASH(SvRV(ST(0)))));
+ SvREFCNT_dec(c->self);
+ }
+}
+
SV*
-read (struct feer_conn *c, SV *buf, size_t len, ...)
+read (feer_conn_handle *hdl, SV *buf, size_t len, ...)
PROTOTYPE: $$$;$
- CODE:
+ PPCODE:
{
STRLEN buf_len, src_len;
char *buf_ptr, *src_ptr;
@@ -1044,8 +1095,86 @@ read (struct feer_conn *c, SV *buf, size_t len, ...)
XSRETURN_UNDEF;
}
+int
+write (feer_conn_handle *hdl, SV *body, ...)
+ CODE:
+{
+ if (c->responding != RESPOND_STREAMING)
+ croak("can only call write in streaming mode");
+
+ if (!SvOK(body)) {
+ XSRETURN_IV(0);
+ }
+
+ trace("write fd=%d c=%p, body=%p\n", c->fd, c, body);
+ if (SvROK(body)) {
+ SV *refd = SvRV(body);
+ if (SvOK(refd) && SvPOK(refd)) {
+ body = refd;
+ }
+ else {
+ croak("body must be a scalar, scalar ref or undef");
+ }
+ }
+ RETVAL = SvCUR(body);
+ add_sv_to_wbuf(c, body, 1);
+ conn_write_ready(c);
+}
+ OUTPUT:
+ RETVAL
+
+int
+_close (feer_conn_handle *hdl)
+ PROTOTYPE: $
+ ALIAS:
+ Feersum::Connection::Reader::close = 1
+ Feersum::Connection::Writer::close = 2
+ CODE:
+{
+ SV *hdl_sv = SvRV(ST(0));
+
+ switch (ix) {
+ case 1:
+ trace("close reader fd=%d, c=%p\n", c->fd, c);
+ RETVAL = shutdown(c->fd, SHUT_RD); // TODO: respect keep-alive
+ c->receiving = RECEIVE_SHUTDOWN;
+ break;
+ case 2:
+ trace("close writer fd=%d, c=%p\n", c->fd, c);
+ add_sv_to_wbuf(c, NULL, 1);
+ conn_write_ready(c);
+ c->responding = RESPOND_SHUTDOWN;
+ RETVAL = 1;
+ break;
+ default:
+ croak("cannot call _close directly");
+ }
+
+ // disassociate the handle from the conn
+ SvUVX(hdl_sv) = 0;
+ SvREFCNT_dec(c->self);
+}
+ OUTPUT:
+ RETVAL
+
+void
+_poll_cb (feer_conn_handle *hdl, CV *cb)
+ PROTOTYPE: $&
+ ALIAS:
+ Feersum::Connection::Reader::poll_cb = 1
+ Feersum::Connection::Writer::poll_cb = 2
+ PPCODE:
+{
+ croak("poll_cb is not yet supported (ix=%d)", ix);
+}
+
+MODULE = Feersum PACKAGE = Feersum::Connection
+
+PROTOTYPES: ENABLE
+
void
start_response (struct feer_conn *c, SV *message, AV *headers, int streaming)
+ PROTOTYPE: $$\@$
PPCODE:
{
const char *ptr;
@@ -1117,9 +1246,10 @@ start_response (struct feer_conn *c, SV *message, AV *headers, int streaming)
conn_write_ready(c);
}
-void
+int
write_whole_body (struct feer_conn *c, SV *body)
- PPCODE:
+ PROTOTYPE: $$
+ CODE:
{
int i;
const char *ptr;
@@ -1130,13 +1260,10 @@ write_whole_body (struct feer_conn *c, SV *body)
croak("can't use write_whole_body when in streaming mode");
if (!SvOK(body)) {
- sv_catpvn(c->wbuf, "Content-Length: 0" CRLFx2, 21);
- conn_write_ready(c);
- PUTBACK;
- return;
+ body = sv_2mortal(newSVpvn("",0));
+ body_is_string = 1;
}
-
- if (SvROK(body)) {
+ else if (SvROK(body)) {
SV *refd = SvRV(body);
if (SvOK(refd) && !SvROK(refd)) {
body = refd;
@@ -1150,9 +1277,10 @@ write_whole_body (struct feer_conn *c, SV *body)
body_is_string = 1;
}
- assert(c->wbuf);
+ RETVAL = c->wbuf ? SvCUR(c->wbuf) : 0;
if (body_is_string) {
+ if (!c->wbuf) c->wbuf = newSV(SvCUR(body) + 32);
sv_catpvf(c->wbuf, "Content-Length: %d" CRLFx2, SvCUR(body));
add_sv_to_wbuf(c,body,0);
}
@@ -1192,38 +1320,24 @@ write_whole_body (struct feer_conn *c, SV *body)
Safefree(svs);
}
+ RETVAL = SvCUR(c->wbuf) - RETVAL;
c->responding = RESPOND_SHUTDOWN;
conn_write_ready(c);
}
-
-void
-write (struct feer_conn *c, SV *body)
- PPCODE:
-{
- if (c->responding != RESPOND_STREAMING)
- croak("can only call write in streaming mode");
-
- if (!SvOK(body)) {
- trace("write fd=%d c=%p, body=undef\n", c->fd, c);
- add_sv_to_wbuf(c, NULL, 1);
- c->responding = RESPOND_SHUTDOWN;
- }
- else {
- trace("write fd=%d c=%p, body=%p\n", c->fd, c, body);
- if (SvROK(body)) {
- SV *refd = SvRV(body);
- if (SvOK(refd) && SvPOK(refd)) {
- body = refd;
- }
- else {
- croak("body must be a scalar, scalar ref or undef");
- }
- }
- add_sv_to_wbuf(c, body, 1);
- }
-
- conn_write_ready(c);
-}
+ OUTPUT:
+ RETVAL
+
+SV *
+_handle (struct feer_conn *c)
+ PROTOTYPE: $
+ ALIAS:
+ read_handle = 1
+ write_handle = 2
+ CODE:
+ if(!ix) croak("cannot call _handle directly");
+ RETVAL = new_feer_conn_handle(c, ix-1);
+ OUTPUT:
+ RETVAL
void
env (struct feer_conn *c, HV *e)
@@ -1248,7 +1362,7 @@ env (struct feer_conn *c, HV *e)
if (c->expected_cl >= 0) {
hv_store(e, "CONTENT_LENGTH", 14, newSViv(c->expected_cl), 0);
- hv_store(e, "psgi.input", 10, feer_conn_2sv(c), 0);
+ hv_store(e, "psgi.input", 10, new_feer_conn_handle(c,0), 0);
}
else {
hv_store(e, "CONTENT_LENGTH", 14, newSViv(0), 0);
@@ -1321,8 +1435,10 @@ env (struct feer_conn *c, HV *e)
int
fileno (struct feer_conn *c)
- PPCODE:
- XSRETURN_IV(c->fd);
+ CODE:
+ RETVAL = c->fd;
+ OUTPUT:
+ RETVAL
void
DESTROY (struct feer_conn *c)
@@ -1336,9 +1452,23 @@ DESTROY (struct feer_conn *c)
free(c->req);
}
if (c->fd) close(c->fd);
+
active_conns--;
- if (shutting_down && active_conns == 0) {
- finish_shutdown(aTHX);
+
+ if (shutting_down && active_conns <= 0) {
+ ev_idle_stop(EV_DEFAULT, &ei);
+ ev_prepare_stop(EV_DEFAULT, &ep);
+ ev_check_stop(EV_DEFAULT, &ec);
+
+ trace("... was last conn, going to try shutdown\n");
+ if (shutdown_cb_cv) {
+ PUSHMARK(SP);
+ call_sv(shutdown_cb_cv, G_EVAL|G_VOID|G_DISCARD|G_NOARGS|G_KEEPERR);
+ PUTBACK;
+ trace("... ok, called that handler\n");
+ SvREFCNT_dec(shutdown_cb_cv);
+ shutdown_cb_cv = NULL;
+ }
}
#ifdef DEBUG
sv_dump(c->self);
@@ -1353,6 +1483,8 @@ BOOT:
{
feer_stash = gv_stashpv("Feersum", 1);
feer_conn_stash = gv_stashpv("Feersum::Connection", 1);
+ feer_conn_writer_stash = gv_stashpv("Feersum::Connection::Writer",0);
+ feer_conn_reader_stash = gv_stashpv("Feersum::Connection::Reader",0);
I_EV_API("Feersum");
SV *ver_svs[2];
View
4 TODO
@@ -11,10 +11,6 @@ psgi.input streaming
* related: Connection: close bodies
* related: Transfer-Encoding: chunked bodies
-psgi.streaming
-
- * use close() rather than write(undef) to close-off a response
-
allow streaming responses to be HTTP/1.0 "Connection: close" rather than
HTTP/1.1 "Transfer-Encoding: chunked".
View
61 lib/Feersum.pm
@@ -11,14 +11,14 @@ Feersum - A scary-fast HTTP engine for Perl based on EV/libev
=cut
-our $VERSION = '0.01';
+our $VERSION = '0.02';
=head1 SYNOPSIS
use Feersum;
- my $endjinn = Feersum->new();
- $endjinn->use_socket($io_socket);
- $endjinn->request_handler(sub {
+ my $ngn = Feersum->new();
+ $ngn->use_socket($io_socket);
+ $ngn->request_handler(sub {
my $req = shift;
my $t; $t = EV::timer 2, 0, sub {
$req->send_response(
@@ -82,10 +82,11 @@ the C<write()> method (which really acts more like a buffered 'print').
# "1" tells Feersum to send a Transfer-Encoding: chunked response
my $req = shift;
$req->start_responding(200, \@headers, 1);
- $req->write(\"this is a reference to some shared chunk\n");
- $req->write("regular scalars are OK too\n");
+ my $w = $req->write_handle;
+ $w->write(\"this is a reference to some shared chunk\n");
+ $w->write("regular scalars are OK too\n");
# close off the stream
- $req->write(undef); # XXX: may change to ->close()
+ $w->close()
A PSGI-like environment hash is easy to obtain. Currently POST/PUT does not
stream input, but read() can be called on C<psgi.input> to get the body (which
@@ -98,7 +99,10 @@ for a callback to be registered on the arrival of more data.
$req->env(\%env);
if ($req->{REQUEST_METHOD} eq 'POST') {
my $body = '';
- $env{'psgi.input'}->read($body, $env{CONTENT_LENGTH});
+ my $r = delete $env{'psgi.input'};
+ $r->read($body, $env{CONTENT_LENGTH});
+ # optional: choose to stop receiving further input:
+ # $r->close();
}
The C<psgi.streaming> interface is emulated with a call to
@@ -110,18 +114,18 @@ any C<write> or C<start_response> calls.
my $req = shift;
$req->initiate_streaming(sub {
my $starter = shift;
- my $writer = $starter->(
+ my $w = $starter->(
"200 OK", ['Content-Type' => 'application/json']);
my $n = 0;
- $writer->write('[');
+ $w->write('[');
my $t; $t = EV::timer 1, 1, sub {
- $writer->write(q({"time":).time."},");
+ $w->write(q({"time":).time."},");
if ($n++ > 60) {
// stop the stream
- $writer->write("{}]");
- $writer->write(undef); # XXX: this may change to ->close()
+ $w->write("{}]");
+ $w->close();
undef $t;
}
};
@@ -160,6 +164,7 @@ sub DIED {
}
package Feersum::Connection;
+use strict;
sub send_response {
# my ($self, $msg, $hdrs, $body) = @_;
@@ -170,16 +175,36 @@ sub send_response {
sub initiate_streaming {
my $self = shift;
my $streamer = shift;
- Carp::croak "Feersum: Expected code reference argument to stream_response"
+ Carp::croak "Feersum: Expected coderef"
unless ref($streamer) eq 'CODE';
- my $start_cb = sub {
+ @_ = (sub {
$self->start_response($_[0],$_[1],1);
- return $self;
- };
- @_ = ($start_cb);
+ return $self->write_handle;
+ });
goto &$streamer;
}
+package Feersum::Connection::Handle;
+use strict;
+
+sub new {
+ Carp::croak "Cannot instantiate Feersum::Connection::Handles directly";
+}
+
+package Feersum::Connection::Reader;
+use strict;
+use base 'Feersum::Connection::Handle';
+
+sub write { Carp::croak "can't call write method on a read-only handle" }
+
+sub seek { Carp::carp "seek not supported."; return 0 }
+
+package Feersum::Connection::Writer;
+use strict;
+use base 'Feersum::Connection::Handle';
+
+sub read { Carp::croak "can't call read method on a write-only handle" }
+
package Feersum;
1;
View
12 t/05-streaming.t
@@ -2,7 +2,7 @@
use warnings;
use strict;
use constant CLIENTS => 10;
-use Test::More tests => 7 + 15 * CLIENTS;
+use Test::More tests => 7 + 16 * CLIENTS;
use Test::Exception;
use Test::Differences;
use Scalar::Util qw/blessed/;
@@ -38,9 +38,7 @@ $evh->request_handler(sub {
my $cnum = $env->{HTTP_X_CLIENT};
ok $cnum, "got client number";
- dies_ok {
- $r->write("some junk");
- } "calling write too early is wrong $cnum";
+ ok !$r->can('write'), "write method removed from connection object";
$cv->begin;
my $cb = $r->initiate_streaming(sub {
@@ -48,8 +46,8 @@ $evh->request_handler(sub {
my $start = shift;
is ref($start), 'CODE', "streaming handler got a code ref $cnum";
my $w = $start->("200 OK", ['Content-Type' => 'text/plain']);
- ok blessed($w) && $w->can('write'),
- "after starting, writer can write $cnum";
+ isa_ok($w, 'Feersum::Connection::Writer', "got a writer $cnum");
+ isa_ok($w, 'Feersum::Connection::Handle', "... it's a handle $cnum");
my $n = 0;
my $t; $t = AE::timer rand(),rand(), sub {
eval {
@@ -59,7 +57,7 @@ $evh->request_handler(sub {
pass "wrote chunk $n $cnum";
}
else {
- $w->write(undef);
+ $w->close();
pass "async writer finished $cnum";
dies_ok {
$w->write("after completion");
View
18 t/07-graceful-shutdown.t
@@ -2,7 +2,7 @@
use warnings;
use strict;
use constant CLIENTS => 12;
-use Test::More tests => 10 + 8 * CLIENTS;
+use Test::More tests => 10 + 9 * CLIENTS;
use Test::Exception;
use Test::Differences;
use Scalar::Util qw/blessed/;
@@ -44,18 +44,20 @@ $evh->request_handler(sub {
$cv->begin;
my $cb = $r->initiate_streaming(sub {
+ undef $r;
$started++;
my $start = shift;
is ref($start), 'CODE', "streaming handler got a code ref $cnum";
my $w = $start->("200 OK", ['Content-Type' => 'text/plain']);
- ok blessed($w) && $w->can('write'),
- "after starting, writer can write $cnum";
+ isa_ok($w, 'Feersum::Connection::Writer', "got a writer $cnum");
+ isa_ok($w, 'Feersum::Connection::Handle', "... it's a handle $cnum");
my $t; $t = AE::timer 1.5+rand(0.5), 0, sub {
lives_ok {
$w->write("So graceful!\n");
- $w->write(undef);
+ $w->close();
} "wrote after waiting a little $cnum";
- undef $t; # keep timer alive
+ undef $t; # keep timer alive until it runs
+ undef $w;
$cv->end;
$finished++;
};
@@ -94,10 +96,16 @@ sub client {
client($_) for (1..CLIENTS);
$cv->begin;
+my $death;
my $grace_t = AE::timer 1.0, 0, sub {
pass "calling for shutdown";
+ $death = AE::timer 2.5, 0, sub {
+ fail "SHUTDOWN TOOK TOO LONG";
+ exit 1;
+ };
$evh->graceful_shutdown(sub {
pass "all gracefully shut down, supposedly";
+ undef $death;
$cv->end;
});
};
View
6 typemap
@@ -1,9 +1,15 @@
struct feer_conn * T_feer_conn
+feer_conn_handle * T_feer_conn_handle
INPUT
T_feer_conn
$var = sv_2feer_conn($arg);
+T_feer_conn_handle
+ $var = sv_2feer_conn_handle($arg,1);
+ /* handle is really just a feer_conn struct: */
+ struct feer_conn *c = (struct feer_conn *)$var;
+
OUTPUT
T_feer_conn
$arg = feer_conn_2sv($var);
Please sign in to comment.
Something went wrong with that request. Please try again.