Skip to content

Commit

Permalink
full cleanup after protocol error; reorganize pubsub for clarity, rob…
Browse files Browse the repository at this point in the history
…ustness
  • Loading branch information
Chip committed Feb 19, 2011
1 parent 1a27634 commit 2a4640d
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 91 deletions.
15 changes: 11 additions & 4 deletions README
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ SYNOPSIS
port => 6379,
encoding => 'utf8',
on_error => sub { warn @_ },
on_cleanup => sub { warn "Connection closed: @_" },
);

# callback based
Expand Down Expand Up @@ -51,6 +52,12 @@ ESTABLISHING A CONNECTION
database-level error occurs. The error message will be passed to the
callback as the sole argument.

on_cleanup => $cb->($errmsg)
Optional. Callback that will be fired if a connection error occurs.
The error message will be passed to the callback as the sole
argument. After this callback, errors will be reported for all
outstanding requests.

METHODS
All methods supported by your version of Redis should be supported.

Expand All @@ -73,16 +80,16 @@ METHODS

# or...
$cv->cb(sub {
my($cv) = @_;
my($result, $err) = $cv->recv
my ($cv) = @_;
my ($result, $err) = $cv->recv
});

* Callback:

$redis->command(
# arguments,
sub {
my($result, $err) = @_;
my ($result, $err) = @_;
});

(Callback is a wrapper around the $cv approach.)
Expand Down Expand Up @@ -113,7 +120,7 @@ METHODS
with a callback:

my $cv = $redis->subscribe("test", sub {
my($message, $channel[, $actual_channel]) = @_;
my ($message, $channel[, $actual_channel]) = @_;
# ($actual_channel is provided for pattern subscriptions.)
});

Expand Down
201 changes: 114 additions & 87 deletions lib/AnyEvent/Redis.pm
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;
use AnyEvent::Redis::Protocol;
use Carp qw(croak);
use Carp qw( croak confess );
use Encode ();

our $AUTOLOAD;

sub new {
my($class, %args) = @_;
my ($class, %args) = @_;

my $host = delete $args{host} || '127.0.0.1';
my $port = delete $args{port} || 6379;
Expand All @@ -28,6 +28,7 @@ sub new {
bless {
host => $host,
port => $port,
pending_cvs => [],
%args,
}, $class;
}
Expand Down Expand Up @@ -60,8 +61,9 @@ sub cleanup {
delete $self->{sock};
$self->{on_error}->(@_) if $self->{on_error};
$self->{on_cleanup}->(@_) if $self->{on_cleanup};
my $ex = delete $self->{expected};
for (@$ex) {
for (splice(@{$self->{pending_cvs}}),
splice(@{$self->{multi_cvs} || []}))
{
eval { $_->croak(@_) };
warn "Exception in cleanup callback (ignored): $@" if $@;
}
Expand Down Expand Up @@ -105,42 +107,30 @@ sub connect {

$self->{cmd_cb} = sub {
my $command = lc shift;
my $not_subscribe = $command !~ /^p?subscribe\z/;
my $is_pubsub = $command =~ /^p?(?:un)?subscribe\z/;
my $is_subscribe = $command =~ /^p?subscribe\z/;

# Are we already subscribed to anything?
if ($self->{sub} && %{$self->{sub}}) {
croak "Use of non-pubsub command during pubsub session may result in unexpected behaviour"
unless $command =~ /^p?(?:un)?subscribe\z/;
unless $is_pubsub;
}
# Are we already in a transaction?
if ($self->{multi_write}) {
croak "Use of pubsub or multi command in transaction is not supported"
if $command =~ /^p?(?:un)?subscribe\z|^multi\z/;
if $is_pubsub || $command eq 'multi';
} else {
croak "Can't 'exec' a transaction because none is pending"
if $command eq 'exec';
}

my($cv, $cb);
my ($cv, $cb);
if (@_) {
$cv = pop if ref $_[-1] && UNIVERSAL::isa($_[-1], 'AnyEvent::CondVar');
$cb = pop if ref $_[-1] eq 'CODE';
}
$cv ||= AE::cv;
if ($cb && $not_subscribe) {
$cv->cb(sub {
my $cv = shift;
local $@;
eval {
my $res = $cv->recv;
$cb->($res);
};
if ($@) {
($self->{on_error} || sub { die @_ })->(my $err = $@);
}
});
}

$self->all_cv->begin;
croak "Must provide a CODE reference for subscriptions" if $is_subscribe && !$cb;

my $send = join("\r\n",
"*" . (1 + @_),
Expand All @@ -154,38 +144,106 @@ sub connect {

$hd->push_write($send);

if ($self->{sub} && %{$self->{sub}}) {
# pubsub is very different - get it out of the way first

if ($is_pubsub) {

my $already = $self->{sub} && %{$self->{sub}};

if ($is_subscribe) {
$self->{sub}->{$_} ||= [$cv, $cb] for @_;
}

if (!$already && @_) {
my $res_cb; $res_cb = sub {
$hd->push_read("AnyEvent::Redis::Protocol" => sub {
my ($res, $err) = @_;

if (ref $res) {
my $action = lc $res->[0];
warn "$action $res->[1]" if DEBUG;

if ($action eq 'message') {
$self->{sub}->{$res->[1]}[1]->($res->[2], $res->[1]);

} elsif ($action eq 'pmessage') {
$self->{sub}->{$res->[1]}[1]->($res->[3], $res->[2], $res->[1]);

} elsif ($action eq 'subscribe' || $action eq 'psubscribe') {
$self->{sub_count} = $res->[2];

# Remember subscriptions
$self->{sub}->{$_} ||= [$cv, $cb] for @_;
} elsif ($action eq 'unsubscribe' || $action eq 'punsubscribe') {
$self->{sub_count} = $res->[2];
eval { $self->{sub}->{$res->[1]}[0]->send };
warn "Exception in callback (ignored): $@" if $@;
delete $self->{sub}->{$res->[1]};
$self->all_cv->end;

} elsif ($command eq 'exec') {
} else {
warn "Unknown pubsub action: $action";
}
}

if ($self->{sub_count} || %{$self->{sub}}) {
# Carry on reading while we are subscribed
$res_cb->();
}
});
};

$res_cb->();
}

return $cv;
}

# non-pubsub from here on out

$cv->cb(sub {
my $cv = shift;
local $@;
eval {
my $res = $cv->recv;
$cb->($res);
};
if ($@) {
($self->{on_error} || sub { die @_ })->(my $err = $@);
}
}) if $cb;

$self->all_cv->begin;
push @{$self->{pending_cvs}}, $cv;

if ($command eq 'exec') {

# at end of transaction, expect bulk reply possibly including errors
$hd->push_read("AnyEvent::Redis::Protocol" => sub {
my ($res, $err) = @_;

$self->all_cv->end;
my $mcvs = delete $self->{multi_cvs} || [];
$self->_expect($cv);

my @mcvs = splice @{$self->{multi_cvs}};

if ($err || ref($res) ne 'ARRAY') {
for ($cv, @$mcvs) {
for ($cv, @mcvs) {
eval { $_->croak($res, 1) };
warn "Exception in callback (ignored): $@" if $@;
}
} else {
for my $i (0 .. $#$mcvs) {
for my $i (0 .. $#mcvs) {
my $r = $res->[$i];
eval {
ref($r) && UNIVERSAL::isa($r, 'AnyEvent::Redis::Error')
? $mcvs->[$i]->croak($$r)
: $mcvs->[$i]->send($r);
? $mcvs[$i]->croak($$r)
: $mcvs[$i]->send($r);
};
warn "Exception in callback (ignored): $@" if $@;
}
eval { $cv->send($res) };
warn "Exception in callback (ignored): $@" if $@;
}

$self->all_cv->end;
});

delete $self->{multi_write};
Expand All @@ -196,94 +254,63 @@ sub connect {
$hd->push_read("AnyEvent::Redis::Protocol" => sub {
my ($res, $err) = @_;

$self->all_cv->end;
$self->_expect($cv);

if (!$err && $res eq 'QUEUED') {
push @{$self->{multi_cvs}}, $cv;
}
else {
eval { $cv->croak($res) };
warn "Exception in callback (ignored): $@" if $@;
}

$self->all_cv->end;
});

} elsif ($not_subscribe) {
} else {

$hd->push_read("AnyEvent::Redis::Protocol" => sub {
my ($res, $err) = @_;

$self->_expect($cv);

if ($command eq 'info') {
$res = { map { split /:/, $_, 2 } split /\r\n/, $res };
} elsif ($command eq 'keys' && !ref $res) {
# Older versions of Redis (1.2) need this
$res = [split / /, $res];
}

$self->all_cv->end;
eval { $err ? $cv->croak($res) : $cv->send($res) };
warn "Exception in callback (ignored): $@" if $@;

$self->all_cv->end;
});

$self->{multi_write} = 1 if $command eq 'multi';

} else {
croak "Must provide a CODE reference for subscriptions" unless $cb;

# Remember subscriptions
$self->{sub}->{$_} ||= [$cv, $cb] for @_;

my $res_cb; $res_cb = sub {

$hd->push_read("AnyEvent::Redis::Protocol" => sub {
my ($res, $err) = @_;

if (ref $res) {
my $action = lc $res->[0];
warn "$action $res->[1]" if DEBUG;

if ($action eq 'message') {
$self->{sub}->{$res->[1]}[1]->($res->[2], $res->[1]);

} elsif ($action eq 'pmessage') {
$self->{sub}->{$res->[1]}[1]->($res->[3], $res->[2], $res->[1]);

} elsif ($action eq 'subscribe' || $action eq 'psubscribe') {
$self->{sub_count} = $res->[2];

} elsif ($action eq 'unsubscribe' || $action eq 'punsubscribe') {
$self->{sub_count} = $res->[2];
eval { $self->{sub}->{$res->[1]}[0]->send };
warn "Exception in callback (ignored): $@" if $@;
delete $self->{sub}->{$res->[1]};
$self->all_cv->end;

} else {
warn "Unknown pubsub action: $action";
}
}

if ($self->{sub_count} || %{$self->{sub}}) {
# Carry on reading while we are subscribed
$res_cb->();
}
});
};

$res_cb->();
}

return $cv;
};

for my $queue (@{$self->{connect_queue} || []}) {
my($cv, @args) = @$queue;
$self->{cmd_cb}->(@args, $cv);
my $cq = delete $self->{connect_queue} || [];
for my $q (@$cq) {
my $cv = shift @$q;
$self->{cmd_cb}->(@$q, $cv);
}

};

return $cv;
}

sub _expect {
my ($self, $cv) = @_;
my $p = shift @{$self->{pending_cvs} || []};
$p && $p == $cv or confess "BUG: mismatched CVs";
}

1;
__END__
Expand Down Expand Up @@ -386,8 +413,8 @@ There are two alternative approaches for handling results from commands:
# or...
$cv->cb(sub {
my($cv) = @_;
my($result, $err) = $cv->recv
my ($cv) = @_;
my ($result, $err) = $cv->recv
});
Expand All @@ -396,7 +423,7 @@ There are two alternative approaches for handling results from commands:
$redis->command(
# arguments,
sub {
my($result, $err) = @_;
my ($result, $err) = @_;
});
(Callback is a wrapper around the C<$cv> approach.)
Expand Down Expand Up @@ -429,7 +456,7 @@ subscription-related commands in a transaction.
The subscription methods (C<subscribe> and C<psubscribe>) must be used with a callback:
my $cv = $redis->subscribe("test", sub {
my($message, $channel[, $actual_channel]) = @_;
my ($message, $channel[, $actual_channel]) = @_;
# ($actual_channel is provided for pattern subscriptions.)
});
Expand Down

0 comments on commit 2a4640d

Please sign in to comment.