Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

fix leak when long-poll client immediately reconnects after disconnec…

…ting
  • Loading branch information...
commit 5d25334faaf7790886bf2ef54cb54ef38d491cf1 1 parent a7ddd78
@mash mash authored
Showing with 117 additions and 2 deletions.
  1. +10 −2 lib/Tatsumaki/MessageQueue.pm
  2. +34 −0 t/mq/leak.psgi
  3. +73 −0 t/mq/leak.t
View
12 lib/Tatsumaki/MessageQueue.pm
@@ -67,7 +67,8 @@ sub flush_events {
if ($client->{persistent}) {
$client->{cv}->cb($cb);
} else {
- $client->{timer} = AE::timer 30, 0, sub {
+ undef $client->{longpoll_timer};
+ $client->{reconnect_timer} = AE::timer 30, 0, sub {
Scalar::Util::weaken $self;
warn "Sweep $client_id (no long-poll reconnect)" if DEBUG;
undef $client;
@@ -92,12 +93,19 @@ sub poll_once {
+ { cv => AE::cv, persistent => 0, buffer => [] };
};
+ if ( $client->{longpoll_timer} ) {
+ # close last connection from the same client_id
+ $self->flush_events($client_id);
+ undef $client->{longpoll_timer};
+ }
+ undef $client->{reconnect_timer};
+
$client->{cv}->cb(sub { $cb->($_[0]->recv) });
# reset garbage collection timeout with the long-poll timeout
# $timeout = 0 is a valid timeout for interval-polling
$timeout = 55 unless defined $timeout;
- $client->{timer} = AE::timer $timeout || 55, 0, sub {
+ $client->{longpoll_timer} = AE::timer $timeout || 55, 0, sub {
Scalar::Util::weaken $self;
warn "Timing out $client_id long-poll" if DEBUG;
$self->flush_events($client_id);
View
34 t/mq/leak.psgi
@@ -0,0 +1,34 @@
+#!/usr/bin/perl
+use strict;
+use warnings;
+use Scalar::Util;
+use Tatsumaki::Application;
+
+package MainHandler;
+use base qw(Tatsumaki::Handler);
+__PACKAGE__->asynchronous(1);
+use Tatsumaki::MessageQueue;
+
+sub get {
+ my $self = shift;
+
+ my $mq = Tatsumaki::MessageQueue->instance( '1' );
+ $mq->poll_once(
+ 'me',
+ sub {
+ $self->write(\@_);
+ $self->finish;
+ },
+ 2 # poll for [sec]
+ );
+ Scalar::Util::weaken( $self );
+}
+
+package main;
+
+my $app = Tatsumaki::Application->new([
+ '/' => 'MainHandler',
+]);
+
+return $app;
+
View
73 t/mq/leak.t
@@ -0,0 +1,73 @@
+use Test::More;
+use Test::Requires qw/
+Test::TCP
+Proc::Guard
+Unix::Lsof
+FindBin
+Plack::Runner
+/;
+use Test::TCP qw/empty_port wait_port/;
+use strict;
+use warnings;
+
+use Tatsumaki::Application;
+use Tatsumaki::MessageQueue;
+
+use AnyEvent;
+use AnyEvent::HTTP;
+
+my ($tatsumaki,$port) = prepare();
+
+is( count_close_wait_for_port($port), 0 );
+
+cycle($port);
+cycle($port);
+
+sleep(2);
+
+is( count_close_wait_for_port($port), 0 );
+
+done_testing;
+
+sub prepare {
+ my $psgi = "$FindBin::Bin/leak.psgi";
+
+ my $async_port = empty_port();
+ my $async = proc_guard(
+ sub {
+ my $runner = Plack::Runner->new;
+ my $app = Tatsumaki::Application->new([ '/' => 'MainHandler' ]);
+ $runner->parse_options( qw/-p/, $async_port, qw/-s Twiggy -a/, $psgi );
+ $runner->run;
+ }
+ );
+ wait_port( $async_port );
+
+ return ($async, $async_port);
+}
+
+sub cycle {
+ my $port = shift;
+
+ my $cv = AE::cv;
+ my $request = http_get "http://localhost:$port/",
+ timeout => 1, # shorter than long-poll timeout
+ sub {
+ $cv->send;
+ };
+ $cv->wait;
+}
+
+# count CLOSE_WAIT file descriptors on $port
+sub count_close_wait_for_port {
+ my $port = shift;
+
+ my ($output, $error) = lsof(qw/-i/, ":$port");
+ my @values = values %$output;
+ my @files;
+ for my $value (@values) {
+ push( @files, @{ $value->{ files } } );
+ }
+
+ return scalar grep { $_->{ 'tcp/tpi info' }{ 'connection state' } eq 'CLOSE_WAIT'; } @files;
+}
Please sign in to comment.
Something went wrong with that request. Please try again.