Skip to content

Commit

Permalink
add reset method to Mojo::Pg::PubSub
Browse files Browse the repository at this point in the history
  • Loading branch information
kraih committed Mar 6, 2017
1 parent 87f2416 commit df9e83b
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 19 deletions.
3 changes: 2 additions & 1 deletion Changes
@@ -1,5 +1,6 @@

3.03 2017-02-19
3.03 2017-03-06
- Added reset method to Mojo::Pg::PubSub.

3.02 2017-02-18
- Fixed quoting bugs in queries generated with SQL::Abstract.
Expand Down
27 changes: 13 additions & 14 deletions lib/Mojo/Pg/PubSub.pm
Expand Up @@ -6,7 +6,7 @@ use Scalar::Util 'weaken';

has 'pg';

sub DESTROY { Mojo::Util::_global_destruction() or shift->_cleanup }
sub DESTROY { Mojo::Util::_global_destruction() or shift->reset }

sub json { ++$_[0]{json}{$_[1]} and return $_[0] }

Expand All @@ -19,6 +19,12 @@ sub listen {

sub notify { $_[0]->_db->notify(_json(@_)) and return $_[0] }

sub reset {
my $self = shift;
$self->{db}->_unwatch;
delete @$self{qw(chans db pid)};
}

sub unlisten {
my ($self, $name, $cb) = @_;
my $chan = $self->{chans}{$name};
Expand All @@ -27,18 +33,9 @@ sub unlisten {
return $self;
}

sub _cleanup {
my $self = shift;
$self->{db}->_unwatch;
delete @$self{qw(chans db pid)};
}

sub _db {
my $self = shift;

# Fork-safety
$self->_cleanup unless ($self->{pid} //= $$) eq $$;

return $self->{db} if $self->{db};

my $db = $self->{db} = $self->pg->db;
Expand Down Expand Up @@ -92,10 +89,6 @@ pattern used by L<Mojo::Pg>. It is based on PostgreSQL notifications and allows
many consumers to share the same database connection, to avoid many common
scalability problems.
All subscriptions will be reset automatically and the database connection
re-established if a new process has been forked, this allows multiple processes
to share the same L<Mojo::Pg::PubSub> object safely.
=head1 EVENTS
L<Mojo::Pg::PubSub> inherits all events from L<Mojo::EventEmitter> and can
Expand Down Expand Up @@ -168,6 +161,12 @@ L</"json">.
Notify a channel. Automatic encoding of Perl values to JSON text can be
activated with L</"json">.
=head2 reset
$pubsub->reset;
Reset all subscriptions and the database connection.
=head2 unlisten
$pubsub = $pubsub->unlisten('foo');
Expand Down
6 changes: 2 additions & 4 deletions t/pubsub.t
Expand Up @@ -125,7 +125,7 @@ is_deeply \@test, [], 'no messages';
is_deeply \@test, ['works too'], 'right messages';
};

# Fork-safety
# Reset
$pg = Mojo::Pg->new($ENV{TEST_ONLINE});
@dbhs = @test = ();
$pg->pubsub->on(reconnect => sub { push @dbhs, pop->dbh });
Expand All @@ -134,14 +134,12 @@ ok $dbhs[0], 'database handle';
$pg->pubsub->notify(pstest => 'first');
is_deeply \@test, ['first'], 'right messages';
{
local $$ = -23;
$pg->pubsub->reset;
$pg->pubsub->notify(pstest => 'second');
ok $dbhs[1], 'database handle';
isnt $dbhs[0], $dbhs[1], 'different database handles';
is_deeply \@test, ['first'], 'right messages';
$pg->pubsub->listen(pstest => sub { push @test, pop });
$pg->pubsub->notify(pstest => 'third');
ok !$dbhs[2], 'no database handle';
is_deeply \@test, ['first', 'third'], 'right messages';
};

Expand Down

0 comments on commit df9e83b

Please sign in to comment.