Skip to content

Commit

Permalink
Updated pubsub code after a615378
Browse files Browse the repository at this point in the history
  • Loading branch information
Jan Henning Thorsen committed Nov 13, 2018
1 parent 71655ff commit 2c8f5d8
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 121 deletions.
1 change: 1 addition & 0 deletions .perltidyrc
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
-bt=2 # High brace tightness
-sbt=2 # High square bracket tightness
-isbc # Don't indent comments without leading space
-wn # Opening and closing containers to be "welded" together
128 changes: 56 additions & 72 deletions lib/Mojo/mysql/PubSub.pm
Original file line number Diff line number Diff line change
Expand Up @@ -9,121 +9,105 @@ has 'mysql';

sub DESTROY {
my $self = shift;
return unless $self->{db} and $self->mysql;
$self->mysql->db->query('delete from mojo_pubsub_subscribe where pid = ?', $self->_subscriber_pid);
return unless $self->{wait_db} and $self->mysql;
$self->mysql->db->query('delete from mojo_pubsub_subscribe where pid = ?', $self->{wait_db}->pid);
}

sub listen {
my ($self, $channel, $cb) = @_;
my $pid = $self->_subscriber_pid;
warn "listen channel:$channel subscriber:$pid\n" if DEBUG;
$self->mysql->db->query('replace mojo_pubsub_subscribe(pid, channel, ts) values (?, ?, current_timestamp)',
$pid, $channel);
my $sync_db = $self->mysql->db;
my $wait_pid = $self->_wait_db($sync_db)->pid;
warn qq|[PubSub] (@{[$wait_pid]}) listen "$channel"\n| if DEBUG;
$sync_db->query('replace mojo_pubsub_subscribe (pid, channel, ts) values (?, ?, current_timestamp)',
$wait_pid, $channel);
push @{$self->{chans}{$channel}}, $cb;
return $cb;
}

sub notify {
my ($self, $channel, $payload) = @_;
my $db = $self->mysql->db;
$payload //= '';
warn "notify channel:$channel $payload\n" if DEBUG;
$self->_init($db) unless $self->{init}++;
$db->query('insert into mojo_pubsub_notify(channel, payload) values (?, ?)', $channel, $payload);
my $sync_db = $self->mysql->db;
warn qq|[PubSub] channel:$channel <<< "@{[$payload // '']}"\n| if DEBUG;
$self->_init($sync_db) unless $self->{init};
$sync_db->query('insert into mojo_pubsub_notify (channel, payload) values (?, ?)', $channel, $payload // '');
return $self;
}

sub unlisten {
my ($self, $channel, $cb) = @_;
my $pid = $self->_subscriber_pid;
warn "unlisten channel:$channel subscriber:$pid\n" if DEBUG;

my $chan = $self->{chans}{$channel};
@$chan = grep { $cb ne $_ } @$chan;
return $self if @$chan;
$self->mysql->db->query('delete from mojo_pubsub_subscribe where pid = ? and channel = ?', $pid, $channel);

my $sync_db = $self->mysql->db;
my $wait_pid = $self->_wait_db($sync_db)->pid;
warn qq|[PubSub] ($wait_pid) unlisten "$channel"\n| if DEBUG;
$sync_db->query('delete from mojo_pubsub_subscribe where pid = ? and channel = ?', $wait_pid, $channel);
delete $self->{chans}{$channel};
return $self;
}

sub _notifications {
my $self = shift;
my $result = $self->{db}->query('select id, channel, payload from mojo_pubsub_notify where id > ?', $self->{last_id});
sub _init {
my ($self, $sync_db) = @_;
$self->mysql->migrations->name('pubsub')->from_data->migrate;
$sync_db->query('delete from mojo_pubsub_notify where ts < date_add(current_timestamp, interval -10 minute)');
$sync_db->query('delete from mojo_pubsub_subscribe where ts < date_add(current_timestamp, interval -1 hour)');
$self->{init} = 1;
}

sub _notifications {
my ($self, $sync_db) = @_;
my $result
= $sync_db->query('select id, channel, payload from mojo_pubsub_notify where id > ? order by id', $self->{last_id});
while (my $row = $result->array) {
my ($id, $channel, $payload) = @$row;
$self->{last_id} = $id;
next unless exists $self->{chans}{$channel};
warn "received $id on $channel: $payload\n" if DEBUG;
warn qq/[PubSub] channel:$channel >>> "$payload"\n/ if DEBUG;
for my $cb (@{$self->{chans}{$channel}}) { $self->$cb($payload) }
}
}

sub _init {
my ($self, $db) = @_;

$self->mysql->migrations->name('pubsub')->from_data->migrate;

# cleanup old subscriptions and notifications
$db->query('delete from mojo_pubsub_notify where ts < date_add(current_timestamp, interval -10 minute)');
$db->query('delete from mojo_pubsub_subscribe where ts < date_add(current_timestamp, interval -1 hour)');
}

sub _subscriber_pid {
my $self = shift;
sub _wait_db {
my ($self, $sync_db) = @_;

# Fork-safety
if (($self->{pid} //= $$) ne $$) {
my $pid = $self->{db}->pid if $self->{db};
warn 'forked subscriber pid:' . ($pid || 'N/A') . "\n" if DEBUG;
$self->{db}->disconnect if $pid;
delete @$self{qw(chans init pid db)};
}
delete @$self{qw(wait_db chans pid)} if ($self->{pid} //= $$) ne $$;

return $self->{db}->pid if $self->{db};
return $self->{wait_db} if $self->{wait_db};

$self->{db} = $self->mysql->db;
my $pid = $self->{db}->pid;
$self->_init($sync_db) unless $self->{init};
my $wait_db = $self->{wait_db} = $self->mysql->db;
$sync_db->query('replace mojo_pubsub_subscribe (pid, channel) values (?, ?)', $wait_db->pid, $_)
for keys %{$self->{chans}};

$self->_init($self->{db}) unless $self->{init}++;

if (defined $self->{last_id}) {

# read unread notifications
$self->_notifications;
if ($self->{last_id}) {
$self->_notifications($sync_db);
}
else {
# get id of the last message
my $array = $self->{db}->query('select id from mojo_pubsub_notify order by id desc limit 1')->array;
$self->{last_id} = defined $array ? $array->[0] : 0;
my $last = $sync_db->query('select id from mojo_pubsub_notify order by id desc limit 1')->array;
$self->{last_id} = defined $last ? $last->[0] : 0;
}

# re-subscribe
$self->{db}->query('replace mojo_pubsub_subscribe(pid, channel) values (?, ?)', $pid, $_) for keys %{$self->{chans}};

weaken $self->{db}->{mysql};
weaken $wait_db->{mysql};
weaken $self;

my $cb;
$cb = sub {
my ($db, $err, $result) = @_;
if ($err) {
warn "wake up error: $err" if DEBUG;
eval { $db->disconnect };
delete $self->{db};
eval { $self->_subscriber_pid };
}
elsif ($self and $self->{db}) {
$self->_notifications;
$db->query('update mojo_pubsub_subscribe set ts = current_timestamp where pid = ?', $pid);
$db->query('select sleep(600)', $cb);
}
my ($db, $err, $res) = @_;
return unless $self;
warn qq|[PubSub] (@{[$db->pid]}) sleep(600) @{[$err ? "!!! $err" : $res->array->[0]]}\n| if DEBUG;
my $sync_db = $self->mysql->db;
return (delete $self->{wait_db}, $self->_wait_db($sync_db)) if $err;
$res->finish;
$db->query('select sleep(600)', $cb);
$sync_db->query('update mojo_pubsub_subscribe set ts = current_timestamp where pid = ?', $db->pid);
$self->_notifications($self->mysql->db);
};
$self->{db}->query('select sleep(600)', $cb);

warn "reconnect subscriber pid: $pid\n" if DEBUG;
$self->emit(reconnect => $self->{db});

return $pid;
warn qq|[PubSub] (@{[$wait_db->pid]}) reconnect\n| if DEBUG;
$self->emit(reconnect => $wait_db);
return $wait_db->query('select sleep(600)', $cb);
}

1;
Expand Down Expand Up @@ -253,7 +237,7 @@ drop table mojo_pubsub_notify;
drop table if exists mojo_pubsub_subscribe;
drop table if exists mojo_pubsub_notify;
create table mojo_pubsub_subscribe(
create table mojo_pubsub_subscribe (
id integer auto_increment primary key,
pid integer not null,
channel varchar(64) not null,
Expand All @@ -262,7 +246,7 @@ create table mojo_pubsub_subscribe(
key ts_idx(ts)
);
create table mojo_pubsub_notify(
create table mojo_pubsub_notify (
id integer auto_increment primary key,
channel varchar(64) not null,
payload text,
Expand Down
90 changes: 41 additions & 49 deletions t/pubsub.t
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
BEGIN { $ENV{MOJO_REACTOR} = 'Mojo::Reactor::Poll' }
use Mojo::Base -strict;
use Mojo::IOLoop;
use Mojo::mysql;
use Test::More;

plan skip_all => 'TEST_ONLINE=mysql://root@/test' unless $ENV{TEST_ONLINE};

my $mysql = Mojo::mysql->new($ENV{TEST_ONLINE});
my (@pids, @test, $first, $second);
my (@pids, @payload);

{
my @warn;
Expand All @@ -18,53 +17,46 @@ my (@pids, @test, $first, $second);

$ENV{MOJO_PUBSUB_EXPERIMENTAL} = 1;

$first = $mysql->pubsub->listen(test => sub { push @test, pop });
Mojo::IOLoop->delay(
sub {
Mojo::IOLoop->timer(0.05, shift->begin);
$mysql->pubsub->notify(test => 'first');
},
sub {
Mojo::IOLoop->timer(0.05, shift->begin);
is_deeply \@test, ['first'], 'right messages 1';
$mysql->db->query('insert into mojo_pubsub_notify(channel, payload) values (?, ?)', 'test', 'second');
},
sub {
Mojo::IOLoop->timer(0.05, shift->begin);
is_deeply \@test, ['first', 'second'], 'right messages 1-2';
$mysql->db->query('insert into mojo_pubsub_notify(channel, payload) values (?, ?), (?, ?), (?, ?)',
'test', 'third', 'test', 'fourth', 'test', 'fifth');
},
sub {
Mojo::IOLoop->timer(0.05, shift->begin);
is_deeply \@test, ['first', 'second', 'third', 'fourth', 'fifth'], 'right messages 1-5';
@test = ();

# Second subscribe
$second = $mysql->pubsub->listen(test => sub { push @test, pop });
$mysql->pubsub->notify('test')->notify(test => 'first');
},
sub {
Mojo::IOLoop->timer(0.05, shift->begin);
is_deeply \@test, ['', '', 'first', 'first'], 'right messages dual';
$mysql->pubsub->unlisten(test => $first)->notify(test => 'second');
},
sub {
Mojo::IOLoop->timer(0.05, shift->begin);
is_deeply \@test, ['', '', 'first', 'first', 'second'], 'right messages single';

@test = ();
$mysql->pubsub->{db}->{dbh}->{Warn} = 0;
$mysql->db->query('kill ?', $pids[0]);
$mysql->pubsub->notify(test => 'works');
},
sub {
ok $pids[1], 'second database pid';
isnt $pids[0], $pids[1], 'different database pids';
is_deeply \@test, ['works'], 'right message after reconnect';
}
)->wait;
$mysql->pubsub->notify(test => 'skipped_message');
my $sa = $mysql->pubsub->listen(test => sub { push @payload, a => pop });
$mysql->pubsub->notify(test => 'm1');
wait_for(1 => 'one subscriber');
is_deeply \@payload, [a => 'm1'], 'right message m1';

$mysql->db->query('insert into mojo_pubsub_notify (channel, payload) values (?, ?)', 'test', 'm2');
wait_for(1 => 'one subscriber');
is_deeply \@payload, [a => 'm2'], 'right message m2';

$mysql->db->query('insert into mojo_pubsub_notify (channel, payload) values (?, ?), (?, ?), (?, ?), (?, ?)',
'test', 'm3', 'test', 'm4', 'skipped_channel', 'x1', 'test', 'm5');
wait_for(3 => 'skipped channel');
is_deeply \@payload, [map { (a => "m$_") } 3 .. 5], 'right messages 3..5';

my $sb = $mysql->pubsub->listen(test => sub { push @payload, b => pop });
$mysql->pubsub->notify(test => undef)->notify(test => 'd2');
wait_for(4, 'two subscribers');
is_deeply \@payload, [map { (a => $_, b => $_) } ('', 'd2')], 'right messages undef + d2';

$mysql->pubsub->unlisten(test => $sa)->notify(test => 'u1');
wait_for(1 => 'unlisten');
is_deeply \@payload, [b => 'u1'], 'right message after unlisten';

$mysql->pubsub->{db}{dbh}{Warn} = 0;
$mysql->db->query('kill ?', $pids[0]);
$mysql->pubsub->notify(test => 'k1');
wait_for(1 => 'reconnect');
isnt $pids[0], $pids[1], 'different database pids';
is_deeply \@payload, [b => 'k1'], 'right message after reconnect';

$mysql->migrations->name('pubsub')->from_data('Mojo::mysql::PubSub')->migrate(0);

done_testing();
done_testing;

sub wait_for {
my ($n, $descr) = @_;
note "[$n] $descr";
@payload = ();
my $tid = Mojo::IOLoop->recurring(0.05 => sub { @payload == $n * 2 and Mojo::IOLoop->stop });
Mojo::IOLoop->start;
Mojo::IOLoop->remove($tid);
}

0 comments on commit 2c8f5d8

Please sign in to comment.