Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support Server-Sent Events #23

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 29 additions & 2 deletions eg/chat/app.psgi
Expand Up @@ -3,13 +3,39 @@ use warnings;
use Tatsumaki;
use Tatsumaki::Error;
use Tatsumaki::Application;
use Tatsumaki::MessageQueue;
use Time::HiRes;

package ChatPollHandler;
package ChatServerSentEventsHandler;
use base qw(Tatsumaki::Handler);
__PACKAGE__->asynchronous(1);

use Tatsumaki::MessageQueue;
sub get {
my($self, $channel) = @_;

my $client_id = $self->request->param('client_id') || rand(1);
my $mq = Tatsumaki::MessageQueue->instance($channel);

$self->server_sent_events_push(1);

$mq->poll($client_id, sub {
my @events = @_;
for my $event (@events) {
if ($event->{type} eq 'message') {
$self->stream_write({
event => $event->{type},
data => $event
});
} else {
$self->stream_write({ data => $event });
}
}
});
}

package ChatPollHandler;
use base qw(Tatsumaki::Handler);
__PACKAGE__->asynchronous(1);

sub get {
my($self, $channel) = @_;
Expand Down Expand Up @@ -87,6 +113,7 @@ use File::Basename;

my $chat_re = '[\w\.\-]+';
my $app = Tatsumaki::Application->new([
"/chat/($chat_re)/sse" => 'ChatServerSentEventsHandler',
"/chat/($chat_re)/poll" => 'ChatPollHandler',
"/chat/($chat_re)/mxhrpoll" => 'ChatMultipartPollHandler',
"/chat/($chat_re)/post" => 'ChatPostHandler',
Expand Down
13 changes: 12 additions & 1 deletion eg/chat/templates/chat.html
@@ -1,5 +1,6 @@
% my $channel = $_[0]->{handler}->args->[0];
% my $mxhr = $_[0]->{handler}->request->param('mxhr');
% my $sse = $_[0]->{handler}->request->param('sse');
<html>
<head>
<title>Tatsumaki Chat demo</title>
Expand Down Expand Up @@ -71,8 +72,18 @@
});
s.load('/chat/<%= $channel %>/mxhrpoll');
} else {

% if ($sse) {
var e = new EventSource("/chat/<%= $channel %>/sse");
e.addEventListener("message", function (payload) {
var data = JSON.parse(payload.data)
onNewEvent(data);
});
% } else {
$.ev.handlers.message = onNewEvent;
$.ev.loop('/chat/<%= $channel %>/poll?client_id=' + Math.random());
% }

}

if ($.cookie(cookieName))
Expand Down Expand Up @@ -128,4 +139,4 @@ <h1 class="chat-room-name">Chat room: <%= $channel %></h1>
<div id="footer">Powered by <a href="http://github.com/miyagawa/Tatsumaki">Tatsumaki/<%= $Tatsumaki::VERSION %></a>.</div>

</body>
</html>
</html>
32 changes: 32 additions & 0 deletions lib/Tatsumaki/Handler.pm
Expand Up @@ -16,6 +16,7 @@ has response => (is => 'rw', isa => 'Tatsumaki::Response', lazy_build => 1);
has args => (is => 'rw', isa => 'ArrayRef');
has writer => (is => 'rw');
has mxhr => (is => 'rw', isa => 'Bool');
has sse => (is => 'rw', isa => 'Bool');
has mxhr_boundary => (is => 'rw', isa => 'Str', lazy => 1, lazy_build => 1);
has json => (is => 'rw', isa => 'JSON', lazy => 1, default => sub { JSON->new->utf8 });
has binary => (is => 'rw', isa => 'Bool');
Expand Down Expand Up @@ -44,6 +45,21 @@ sub asynchronous {

sub nonblocking { shift->asynchronous(@_) } # alias

sub server_sent_events_push {
my ($self, $enable) = @_;
if ($enable) {
Carp::croak("asynchronous should be set to do Server-Sent Events push")
unless $self->is_asynchronous;

$self->response->content_type('text/event-stream');
$self->flush;

return $self->sse(1);
} else {
return $self->sse;
}
}

sub multipart_xhr_push {
my $self = shift;
if ($_[0]) {
Expand Down Expand Up @@ -169,6 +185,22 @@ sub get_chunk {
if ($self->mxhr) {
my $json = $self->json->encode($_[0]);
return "Content-Type: application/json\n\n$json\n--" . $self->mxhr_boundary. "\n";
} elsif ($self->sse) {
$self->response->content_type('text/event-stream');
my $data = ref $_[0]->{data}
? $self->json->encode($_[0]->{data})
: defined $_[0]->{data}
? $_[0]->{data}
: "";

if (defined $_[0]->{event}) { # TODO: `id` and `retry` and comment line
return join "\n",
"event: " . $_[0]->{event},
"data: " . $data,
"\n";
} else {
return "data: $data\n\n";
}
} else {
$self->response->content_type('application/json');
return $self->json->encode($_[0]);
Expand Down