diff --git a/eg/chat/app.psgi b/eg/chat/app.psgi index 42ce3fb..ff07895 100644 --- a/eg/chat/app.psgi +++ b/eg/chat/app.psgi @@ -3,13 +3,39 @@ use warnings; use Tatsumaki; use Tatsumaki::Error; use Tatsumaki::Application; +use Tatsumaki::MessageQueue; use Time::HiRes; -package ChatPollHandler; +package ChatServerSentEventsHandler; use base qw(Tatsumaki::Handler); __PACKAGE__->asynchronous(1); -use Tatsumaki::MessageQueue; +sub get { + my($self, $channel) = @_; + + my $client_id = $self->request->param('client_id') || rand(1); + my $mq = Tatsumaki::MessageQueue->instance($channel); + + $self->server_sent_events_push(1); + + $mq->poll($client_id, sub { + my @events = @_; + for my $event (@events) { + if ($event->{type} eq 'message') { + $self->stream_write({ + event => $event->{type}, + data => $event + }); + } else { + $self->stream_write({ data => $event }); + } + } + }); +} + +package ChatPollHandler; +use base qw(Tatsumaki::Handler); +__PACKAGE__->asynchronous(1); sub get { my($self, $channel) = @_; @@ -87,6 +113,7 @@ use File::Basename; my $chat_re = '[\w\.\-]+'; my $app = Tatsumaki::Application->new([ + "/chat/($chat_re)/sse" => 'ChatServerSentEventsHandler', "/chat/($chat_re)/poll" => 'ChatPollHandler', "/chat/($chat_re)/mxhrpoll" => 'ChatMultipartPollHandler', "/chat/($chat_re)/post" => 'ChatPostHandler', diff --git a/eg/chat/templates/chat.html b/eg/chat/templates/chat.html index 99dd0cd..b5b9b30 100644 --- a/eg/chat/templates/chat.html +++ b/eg/chat/templates/chat.html @@ -1,5 +1,6 @@ % my $channel = $_[0]->{handler}->args->[0]; % my $mxhr = $_[0]->{handler}->request->param('mxhr'); +% my $sse = $_[0]->{handler}->request->param('sse'); Tatsumaki Chat demo @@ -71,8 +72,18 @@ }); s.load('/chat/<%= $channel %>/mxhrpoll'); } else { + +% if ($sse) { + var e = new EventSource("/chat/<%= $channel %>/sse"); + e.addEventListener("message", function (payload) { + var data = JSON.parse(payload.data) + onNewEvent(data); + }); +% } else { $.ev.handlers.message = onNewEvent; $.ev.loop('/chat/<%= $channel %>/poll?client_id=' + Math.random()); +% } + } if ($.cookie(cookieName)) @@ -128,4 +139,4 @@

Chat room: <%= $channel %>

- \ No newline at end of file + diff --git a/lib/Tatsumaki/Handler.pm b/lib/Tatsumaki/Handler.pm index c146a1a..3e4a310 100644 --- a/lib/Tatsumaki/Handler.pm +++ b/lib/Tatsumaki/Handler.pm @@ -16,6 +16,7 @@ has response => (is => 'rw', isa => 'Tatsumaki::Response', lazy_build => 1); has args => (is => 'rw', isa => 'ArrayRef'); has writer => (is => 'rw'); has mxhr => (is => 'rw', isa => 'Bool'); +has sse => (is => 'rw', isa => 'Bool'); has mxhr_boundary => (is => 'rw', isa => 'Str', lazy => 1, lazy_build => 1); has json => (is => 'rw', isa => 'JSON', lazy => 1, default => sub { JSON->new->utf8 }); has binary => (is => 'rw', isa => 'Bool'); @@ -44,6 +45,21 @@ sub asynchronous { sub nonblocking { shift->asynchronous(@_) } # alias +sub server_sent_events_push { + my ($self, $enable) = @_; + if ($enable) { + Carp::croak("asynchronous should be set to do Server-Sent Events push") + unless $self->is_asynchronous; + + $self->response->content_type('text/event-stream'); + $self->flush; + + return $self->sse(1); + } else { + return $self->sse; + } +} + sub multipart_xhr_push { my $self = shift; if ($_[0]) { @@ -169,6 +185,22 @@ sub get_chunk { if ($self->mxhr) { my $json = $self->json->encode($_[0]); return "Content-Type: application/json\n\n$json\n--" . $self->mxhr_boundary. "\n"; + } elsif ($self->sse) { + $self->response->content_type('text/event-stream'); + my $data = ref $_[0]->{data} + ? $self->json->encode($_[0]->{data}) + : defined $_[0]->{data} + ? $_[0]->{data} + : ""; + + if (defined $_[0]->{event}) { # TODO: `id` and `retry` and comment line + return join "\n", + "event: " . $_[0]->{event}, + "data: " . $data, + "\n"; + } else { + return "data: $data\n\n"; + } } else { $self->response->content_type('application/json'); return $self->json->encode($_[0]);