Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

remove backlog handler: instead flush all buffer in the first connection

  • Loading branch information...
commit a73c4307c09093d41620cb94b8b721cf1211cdfe 1 parent 11f8803
@miyagawa authored
Showing with 32 additions and 54 deletions.
  1. +0 −16 eg/demo.pl
  2. +0 −10 eg/templates/chat.html
  3. +32 −28 lib/Tatsumaki/MessageQueue.pm
View
16 eg/demo.pl
@@ -129,21 +129,6 @@ sub format_message {
$text;
}
-package ChatBacklogHandler;
-use base qw(Tatsumaki::Handler);
-__PACKAGE__->asynchronous(1);
-
-sub get {
- my($self, $channel) = @_;
-
- my $mq = Tatsumaki::MessageQueue->instance($channel);
- $mq->poll_backlog(20, sub {
- my @events = @_;
- $self->write(\@events);
- $self->finish;
- });
-}
-
package ChatRoomHandler;
use base qw(Tatsumaki::Handler);
@@ -161,7 +146,6 @@ package main;
'/chat/(\w+)/poll' => 'ChatPollHandler',
'/chat/(\w+)/mxhrpoll' => 'ChatMultipartPollHandler',
'/chat/(\w+)/post' => 'ChatPostHandler',
- '/chat/(\w+)/backlog' => 'ChatBacklogHandler',
'/chat/(\w+)' => 'ChatRoomHandler',
'/' => 'MainHandler',
]);
View
10 eg/templates/chat.html
@@ -70,16 +70,6 @@
if ($.cookie(cookieName))
$('#ident').attr('value', $.cookie(cookieName));
- $.ajax({
- url: '/chat/<?= $channel ?>/backlog?' + Math.random(),
- method: 'get',
- dataType: 'json',
- success: function(r) {
- $.each(r, function(i, e) {
- onNewEvent(e);
- });
- }
- });
});
</script>
<link rel="stylesheet" href="/static/screen.css" />
View
60 lib/Tatsumaki/MessageQueue.pm
@@ -23,12 +23,6 @@ sub append_backlog {
$self->backlog([ splice @new_backlog, 0, $BacklogLength ]);
}
-sub poll_backlog {
- my($self, $length, $cb) = @_;
- my @events = grep defined, @{$self->backlog}[0..$length-1];
- $cb->(reverse @events);
-}
-
sub publish {
my($self, @events) = @_;
@@ -39,13 +33,7 @@ sub publish {
if ($cb) {
# currently listening: flush and send the events right away
my @ev = (@{$session->{buffer}}, @events);
- try {
- $session->{cv}->send(@ev);
- $session->{cv} = AE::cv;
- $session->{buffer} = [];
- } catch {
- /Tatsumaki::Error::ClientDisconnect/ and delete $self->sessions->{$sid};
- };
+ $self->flush_events($sid, @ev);
} else {
# between long poll comet: buffer the events
# TODO: limit buffer length
@@ -54,35 +42,51 @@ sub publish {
if ($session->{persistent}) {
$session->{cv}->cb($cb); # poll forever
- } elsif ($cb or !$session->{timer}) {
- # no reconnection for 30 seconds: clear the session
- $session->{timer} = AE::timer 30, 0, sub {
- delete $self->sessions->{$sid};
- };
}
}
$self->append_backlog(@events);
}
+sub flush_events {
+ my($self, $sid, @events) = @_;
+
+ my $session = $self->sessions->{$sid} or return;
+ try {
+ $session->{cv}->send(@events);
+ $session->{cv} = AE::cv;
+ $session->{buffer} = [];
+
+ # no reconnection for 30 seconds: clear the session
+ $session->{timer} = AE::timer 30, 0, sub {
+ delete $self->sessions->{$sid};
+ } unless $session->{persistent};
+ } catch {
+ /Tatsumaki::Error::ClientDisconnect/ and delete $self->sessions->{$sid};
+ };
+}
+
sub poll_once {
my($self, $sid, $cb, $timeout) = @_;
- my $session = $self->sessions->{$sid} ||= {
- cv => AE::cv, persistent => 0, buffer => [],
+ my $is_new;
+ my $session = $self->sessions->{$sid} ||= do {
+ $is_new = 1;
+ + { cv => AE::cv, persistent => 0, buffer => [] };
};
+
$session->{cv}->cb(sub { $cb->($_[0]->recv) });
# reset garbage collection timeout with the long-poll timeout
$session->{timer} = AE::timer $timeout || 55, 0, sub {
- Scalar::Util::weaken $session;
- try {
- $session->{cv}->send();
- $session->{cv} = AE::cv;
- $session->{timer} = undef;
- } catch {
- /Tatsumaki::Error::ClientDisconnect/ and delete $self->sessions->{$sid};
- };
+ Scalar::Util::weaken $self;
+ $self->flush_events($sid);
};
+
+ # flush backlog for a new session
+ if ($is_new) {
+ my @events = reverse grep defined, @{$self->backlog};
+ $self->flush_events($sid, @events) if @events;
+ }
}
sub poll {
Please sign in to comment.
Something went wrong with that request. Please try again.