Permalink
Browse files

add retry_on_sqlstate feature, add query_seq slot for tracing, rewrit…

…e connection-pool handling code
  • Loading branch information...
1 parent 0308eee commit 862692195e0ffa8262cc68f21da8d1ccb8eef15f root committed May 16, 2012
Showing with 76 additions and 24 deletions.
  1. +2 −1 lib/AnyEvent/Pg.pm
  2. +74 −23 lib/AnyEvent/Pg/Pool.pm
View
@@ -365,7 +365,8 @@ sub _on_consume_input {
my $cmdRows = $result->cmdRows // '<undef>';
my $rows = $result->rows // '<undef>';
my $cols = $result->columns // '<undef>';
- $self->_debug("calling on_result status: $status, conn status: $conn_status, cmdRows: $cmdRows, columns: $cols, rows: $rows");
+ my $sqlstate = $result->errorField('sqlstate') // '<undef>';
+ $self->_debug("calling on_result status: $status, sqlstate: $sqlstate, conn status: $conn_status, cmdRows: $cmdRows, columns: $cols, rows: $rows");
}
$self->_maybe_callback($cq, 'on_result', $result);
}
@@ -20,10 +20,12 @@ sub _debug {
my $connecting = keys %{$pool->{connecting}};
my $idle = keys %{$pool->{idle}};
my $busy = keys %{$pool->{busy}};
+ my $delayed = ($pool->{delay_watcher} ? 1 : 0);
+ my $total = keys %{$pool->{conns}};
local ($ENV{__DIE__}, $@);
my ($pkg, $file, $line, $method) = (caller 1);
$method =~ s/.*:://;
- warn "[$pool c:$connecting/i:$idle/b:$busy]\@${pkg}::$method> @_ at $file line $line\n";
+ warn "[$pool c:$connecting/i:$idle/b:$busy|t:$total|d:$delayed]\@${pkg}::$method> @_ at $file line $line\n";
}
@@ -54,18 +56,26 @@ sub new {
connecting => {},
queue => [],
seq => 1,
+ query_seq => 1,
};
bless $pool, $class;
AE::postpone { $pool->_on_start };
$pool;
}
+sub is_dead { shift->{dead} }
+
sub _on_start {}
sub push_query {
my ($pool, %opts) = @_;
my %query;
+ my $retry_on_sqlstate = delete $opts{retry_on_sqlstate};
+ $retry_on_sqlstate = { map { $_ => 1 } @$retry_on_sqlstate }
+ if ref($retry_on_sqlstate) eq 'ARRAY';
+ $query{retry_on_sqlstate} = $retry_on_sqlstate // {};
$query{$_} = delete $opts{$_} for qw(on_result on_error on_done query args max_retries);
+ $query{seq} = $pool->{query_seq}++;
my $query = \%query;
my $queue = $pool->{queue};
push @$queue, $query;
@@ -93,6 +103,11 @@ sub _check_queue {
while (!$pool->_is_queue_empty) {
$debug and $debug & 8 and $pool->_debug('processing first query from the query');
unless (%$idle) {
+ if ($pool->{dead}) {
+ my $query = shift @{$pool->{queue}};
+ $pool->_maybe_callback($query, 'on_error');
+ next;
+ }
$debug and $debug & 8 and $pool->_debug('starting new connection');
$pool->_start_new_conn;
return;
@@ -115,6 +130,8 @@ sub _check_queue {
$debug and $debug & 8 and $pool->_debug('queue is empty!');
}
+my %error_severiry_fatal = map { $_ => 1 } qw(FATAL PANIC);
+
sub _on_query_result {
my ($pool, $seq, $conn, $result) = @_;
my $query = $pool->{current}{$seq};
@@ -123,32 +140,45 @@ sub _on_query_result {
$result->status == Pg::PQ::PGRES_FATAL_ERROR and
$pool->_debug("errorDescription:\n" . Dumper [$result->errorDescription]);
}
- if ($query->{fatal_error_seen}) {
- $debug and $debug & 8 and $pool->_debug("fatal_error_seen is set, ignoring later on_result");
+ if ($query->{retry}) {
+ $debug and $debug & 8 and $pool->_debug("retry is set, ignoring later on_result");
}
else {
- if ($result->status == Pg::PQ::PGRES_FATAL_ERROR and $result->errorField('severity') ne 'ERROR') {
- $pool->_debug("this is a real FATAL error, skipping the on_result callback");
- $query->{fatal_error_seen}++;
- }
- else {
- $query->{max_retries} = 0;
- $pool->_maybe_callback($query, 'on_result', $conn, $result);
+ if ($query->{max_retries} and $result->status == Pg::PQ::PGRES_FATAL_ERROR) {
+ if ($query->{retry_on_sqlstate}{$result->errorField('sqlstate')}) {
+ $pool->_debug("this is a retry-able error, skipping the on_result callback");
+ $query->{retry} = 1;
+ return;
+ }
+ if ($error_severiry_fatal{$result->errorField('severity')}) {
+ $pool->_debug("this is a real FATAL error, skipping the on_result callback");
+ $query->{retry} = 1;
+ return;
+ }
}
+ $query->{max_retries} = 0;
+ $pool->_maybe_callback($query, 'on_result', $conn, $result);
}
}
sub _on_query_done {
my ($pool, $seq, $conn) = @_;
my $query = delete $pool->{current}{$seq};
- $pool->_maybe_callback($query, ($query->{fatal_error_seen} ? 'on_error' : 'on_done'), $conn);
+ if (delete $query->{retry}) {
+ $debug and $debug & 8 and "unshifting failed query into queue";
+ $query->{max_retries}--;
+ unshift @{$pool->{queue}}, $query;
+ }
+ else {
+ $pool->_maybe_callback($query, 'on_done', $conn);
+ }
}
sub _start_new_conn {
my $pool = shift;
- if (keys %{$pool->{conns}} < $pool->{size} and
- $pool->{conn_retries} < $pool->{max_conn_retries} and
- !%{$pool->{connecting}} and
+ if (keys %{$pool->{conns}} < $pool->{size} and
+ !%{$pool->{connecting}} and
+ $pool->{conn_retries} <= $pool->{max_conn_retries} and
!$pool->{delay_watcher}) {
my $seq = $pool->{seq}++;
my $conn = AnyEvent::Pg->new($pool->{conninfo},
@@ -163,9 +193,10 @@ sub _start_new_conn {
$pool->{connecting}{$seq} = 1;
}
else {
- $debug and $debug & 8 and $pool->_debug('not starting new connection, conns: ' . (scalar keys %{$pool->{conns}}) .
- ", retries: $pool->{conn_retries}, connecting: ".(scalar keys %{$pool->{connecting}}) .
- ", delay watcher: $pool->{delay_watcher}");
+ $debug and $debug & 8 and $pool->_debug('not starting new connection, conns: '
+ . (scalar keys %{$pool->{conns}})
+ . ", retries: $pool->{conn_retries}, connecting: "
+ . (scalar keys %{$pool->{connecting}}));
}
}
@@ -187,9 +218,10 @@ sub _on_conn_error {
. "\$conn: $conn, \$pool->{conns}{$seq}: "
. ($pool->{conns}{$seq} // '<undef>'));
}
- delete $pool->{busy}{$seq}
+ delete $pool->{busy}{$seq} or delete $pool->{idle}{$seq}
or croak "internal error, pool is corrupted, seq: $seq\n" . Dumper($pool);
delete $pool->{conns}{$seq};
+ $pool->_maybe_callback('on_connect_error', $conn) if $pool->{dead};
$pool->_check_queue;
}
@@ -203,25 +235,44 @@ sub _on_conn_connect {
sub _on_conn_connect_error {
my ($pool, $seq, $conn) = @_;
$debug and $debug & 8 and $pool->_debug("unable to connect to database");
+
# the connection object will be removed from the Pool on the
# on_error callback that will be called just after this one
# returns:
delete $pool->{connecting}{$seq};
$pool->{busy}{$seq} = 1;
- if ($pool->_is_queue_empty) {
- $pool->{conn_retries} = 0;
+
+ if ($pool->{delay_watcher}) {
+ $debug and $debug & 8 and $pool->_debug("a delayed reconnection is already queued");
}
else {
- if ($pool->{conn_retries}++ < $pool->{max_conn_retries}) {
- $debug and $debug & 8 and $pool->_debug("starting timer for delayed reconnection");
+ # This failed connection is not counted against the limit
+ # unless it is the only connection remaining. Effectively the
+ # module will keep going until all the connections become
+ # broken and no more connections can be established.
+ $pool->{conn_retries}++ unless keys(%{$pool->{conns}}) > 1;
+
+ if ($pool->{conn_retries} <= $pool->{max_conn_retries}) {
+ $debug and $debug & 8 and $pool->_debug("starting timer for delayed reconnection $pool->{conn_delay}s");
$pool->{delay_watcher} = AE::timer $pool->{conn_delay}, 0, sub { $pool->_on_delayed_reconnect };
}
else {
- $pool->_maybe_callback('on_connect_error', $conn);
+ # giving up!
+ $debug and $debug & 8 and $pool->_debug("it has been imposible to connect to the database, giving up!!!");
+ $pool->{dead} = 1;
+ # processing continues on the on_conn_error callback
}
}
}
+sub _on_fatal_connect_error {
+ my ($pool, $conn) = @_;
+ # This error is fatal. After it happens, everything is going to
+ # fail.
+ $pool->{dead} = 1;
+
+}
+
sub _on_delayed_reconnect {
my $pool = shift;
$debug and $debug & 8 and $pool->_debug("_on_delayed_reconnect called");

0 comments on commit 8626921

Please sign in to comment.