Skip to content

Commit

Permalink
Avoid leaking connections
Browse files Browse the repository at this point in the history
  • Loading branch information
dgl committed Oct 14, 2012
1 parent 2a51766 commit 42c5ada
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 22 deletions.
16 changes: 15 additions & 1 deletion lib/AnyEvent/Redis.pm
Expand Up @@ -2,7 +2,7 @@ package AnyEvent::Redis;

use strict;
use 5.008_001;
our $VERSION = '0.2302';
our $VERSION = '0.24';

use constant DEBUG => $ENV{ANYEVENT_REDIS_DEBUG};
use AnyEvent;
Expand All @@ -11,6 +11,7 @@ use AnyEvent::Socket;
use AnyEvent::Redis::Protocol;
use Carp qw( croak confess );
use Encode ();
use Scalar::Util qw(weaken);

our $AUTOLOAD;

Expand Down Expand Up @@ -81,6 +82,7 @@ sub connect {
}

return $cv if $self->{sock};
weaken $self;

$self->{sock} = tcp_connect $self->{host}, $self->{port}, sub {
my $fh = shift
Expand Down Expand Up @@ -110,6 +112,7 @@ sub connect {
my $is_pubsub = $command =~ /^p?(?:un)?subscribe\z/;
my $is_subscribe = $command =~ /^p?subscribe\z/;


# Are we already subscribed to anything?
if ($self->{sub} && %{$self->{sub}}) {
croak "Use of non-pubsub command during pubsub session may result in unexpected behaviour"
Expand Down Expand Up @@ -142,6 +145,13 @@ sub connect {

warn $send if DEBUG;

# $self is weakened to avoid leaks, hold on to a strong copy
# controlled via a CV.
my $cmd_cv = AE::cv;
$cmd_cv->cb(sub {
my $strong_self = $self;
});

# pubsub is very different - get it out of the way first

if ($is_pubsub) {
Expand Down Expand Up @@ -178,6 +188,7 @@ sub connect {
warn "Exception in callback (ignored): $@" if $@;
delete $self->{sub}->{$res->[1]};
$self->all_cv->end;
$cmd_cv->send;

} else {
warn "Unknown pubsub action: $action";
Expand Down Expand Up @@ -246,6 +257,7 @@ sub connect {
}

$self->all_cv->end;
$cmd_cv->send;
});

delete $self->{multi_write};
Expand All @@ -267,6 +279,7 @@ sub connect {
}

$self->all_cv->end;
$cmd_cv->send;
});

} else {
Expand All @@ -287,6 +300,7 @@ sub connect {
warn "Exception in callback (ignored): $@" if $@;

$self->all_cv->end;
$cmd_cv->send;
});

$self->{multi_write} = 1 if $command eq 'multi';
Expand Down
36 changes: 15 additions & 21 deletions t/pubsub.t
Expand Up @@ -13,8 +13,6 @@ test_redis {

my $pub = AnyEvent::Redis->new(host => "127.0.0.1", port => $port);

my $all_cv = AE::cv;

# $pub is for publishing
# $sub is for subscribing

Expand All @@ -24,50 +22,46 @@ test_redis {
my $count = 0;
my $expected_count = 10;

my $sub1_cv = $sub->subscribe("test.1", sub {
$sub->all_cv->begin;
$sub->subscribe("test.1", sub {
my($message, $chan) = @_;
$x += $message;
if(++$count == $expected_count) {
$sub->unsubscribe("test.1");
is $x, $expected_x, "Messages received, values as expected";
}
});
$all_cv->begin;
$sub1_cv->cb(sub { $sub1_cv->recv; $all_cv->end });

for(1 .. $expected_count) {
my $cv = $pub->publish("test.1" => $_);
$expected_x += $_;
# Need to be sure a client has subscribed
$expected_x = 0, redo unless $cv->recv;
}

# Pattern subscription
my $y = 0;
my $expected_y = 0;

my $count2 = 0;
my $expected_count2 = 10;

my $sub2_cv = $sub->psubscribe("test.*", sub {
$sub->psubscribe("testp.*", sub {
my($message, $chan) = @_;
$y += $message;
if(++$count2 == $expected_count2) {
$sub->punsubscribe("test.*");
if(++$count2 == $expected_count) {
$sub->punsubscribe("testp.*");
is $y, $expected_y, "Messages received, values as expected";
}
});
$all_cv->begin;
$sub2_cv->cb(sub { $sub2_cv->recv; $all_cv->end });

for(1 .. $expected_count2) {
my $cv = $pub->publish("test.$_" => $_);
for(1 .. $expected_count) {
my $cv = $pub->publish("test.1" => $_);
$expected_x += $_;
# Need to be sure a client has subscribed
$expected_x = 0, redo unless $cv->recv;
}

for(1 .. $expected_count) {
my $cv = $pub->publish("testp.$_" => $_);
$expected_y += $_;
# Need to be sure a client has subscribed
$expected_y = 0, redo unless $cv->recv;
}

$all_cv->recv;
$sub->all_cv->end;
done_testing;
};

1 comment on commit 42c5ada

@michaelfig
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

           # $self is weakened to avoid leaks, hold on to a strong copy
           # controlled via a CV.
           my $cmd_cv = AE::cv;
           $cmd_cv->cb(sub {
               my $strong_self = $self;
              });

seems like it doesn't actually create a strong reference. I think you mean:

           my $cmd_cv = do { my $strong_self = $self; AE::cv { undef $strong_self } };

I verified this by firing off a few Redis commands, then dropping my Redis ref. With PERL_ANYEVENT_DEBUG_WRAP=1, I get a warning backtrace that says $self is undef during AnyEvent::Redis _expect or all_cv.

Thanks for AnyEvent::Redis: it's really fun.
Michael.

Please sign in to comment.