Skip to content

Commit

Permalink
move cascading pub/sub into controller
Browse files Browse the repository at this point in the history
This handles the heirarchal tree of topics and forwards published
messages on leaf nodes to subscribers of the branch nodes.

Refs #34
  • Loading branch information
preaction committed Sep 1, 2016
1 parent 108c2ee commit 1b4b5c8
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 94 deletions.
101 changes: 7 additions & 94 deletions lib/Mercury.pm
Expand Up @@ -19,7 +19,6 @@ use Scalar::Util qw( refaddr );
use File::Basename qw( dirname );
use File::Spec::Functions qw( catdir );

my %pubsub_topics;
my %bus_topics;
my %pushpull;

Expand Down Expand Up @@ -72,56 +71,6 @@ sub send_bus_message {
return;
}

=method add_topic_subscriber
$c->add_topic_subscriber( $topic );
Add the current connection as a subscriber to the given topic. Connections can
be subscribed to only one topic, but they will receive all messages to
child topics as well.
=cut

sub add_topic_subscriber {
my ( $self, $topic ) = @_;
$pubsub_topics{ $topic }{ refaddr $self } = $self;
return;
}

=method remove_topic_subscriber
$c->remove_topic_subscriber( $topic );
Remove the current connection from the given topic. Must be called to clean up
the state.
=cut

sub remove_topic_subscriber {
my ( $self, $topic ) = @_;
delete $pubsub_topics{ $topic }{ refaddr $self };
return;
}

=method publish_topic_message
$c->publish_topic_message( $topic, $message );
Publish a message on the given topic. The message will be sent once to any subscriber
of this topic or any child topics.
=cut

sub publish_topic_message {
my ( $self, $topic, $message ) = @_;
my @parts = split m{/}, $topic;
my @topics = map { join '/', @parts[0..$_] } 0..$#parts;
for my $topic ( @topics ) {
$_->send( $message ) for values %{ $pubsub_topics{ $topic } };
}
return;
}

=route /bus/*topic
Establish a WebSocket message bus to send/receive messages on the given
Expand Down Expand Up @@ -158,52 +107,11 @@ sub route_websocket_bus {
} );
};

=route /sub/*topic
Establish a WebSocket to subscribe to the given C<topic>. Messages published
to the topic or any child topics will be sent to this subscriber.
=cut

sub route_websocket_sub {
my ( $c ) = @_;
Mojo::IOLoop->stream($c->tx->connection)->timeout(1200);

my $topic = $c->stash( 'topic' );
$c->add_topic_subscriber( $topic );

$c->on( finish => sub {
my ( $c ) = @_;
$c->remove_topic_subscriber( $topic );
} );
};

=route /pub/*topic
Establish a WebSocket to publish to the given C<topic>. Messages published to
the topic will be sent to all subscribers to the topic or any parent topics.
=cut

sub route_websocket_pub {
my ( $c ) = @_;
Mojo::IOLoop->stream($c->tx->connection)->timeout(1200);

my $topic = $c->stash( 'topic' );
$c->on( message => sub {
my ( $c, $message ) = @_;
$c->publish_topic_message( $topic, $message );
} );
}

sub startup {
my ( $app ) = @_;
$app->plugin( 'Config', { default => { broker => { } } } );
$app->commands->namespaces( [ 'Mercury::Command::mercury' ] );

$app->helper( add_topic_subscriber => \&add_topic_subscriber );
$app->helper( remove_topic_subscriber => \&remove_topic_subscriber );
$app->helper( publish_topic_message => \&publish_topic_message );
$app->helper( add_bus_peer => \&add_bus_peer );
$app->helper( remove_bus_peer => \&remove_bus_peer );
$app->helper( send_bus_message => \&send_bus_message );
Expand All @@ -229,8 +137,6 @@ sub startup {
} );
}

$r->websocket( '/sub/*topic' )->to( cb => \&route_websocket_sub )->name( 'sub' );
$r->websocket( '/pub/*topic' )->to( cb => \&route_websocket_pub )->name( 'pub' );
$r->websocket( '/bus/*topic' )->to( cb => \&route_websocket_bus )->name( 'bus' );

$app->plugin( 'Mercury' );
Expand All @@ -241,6 +147,13 @@ sub startup {
->to( controller => 'PushPull', action => 'pull' )
->name( 'pull' );

$r->websocket( '/pub/*topic' )
->to( controller => 'PubSub::Cascade', action => 'publish' )
->name( 'pub' );
$r->websocket( '/sub/*topic' )
->to( controller => 'PubSub::Cascade', action => 'subscribe' )
->name( 'sub' );

if ( $app->mode eq 'development' ) {
# Enable the example app
my $root = catdir( dirname( __FILE__ ), 'Mercury' );
Expand Down
2 changes: 2 additions & 0 deletions lib/Mercury/Controller/PubSub.pm
Expand Up @@ -25,6 +25,8 @@ For more information on the pub/sub pattern, see L<Mercury::Pattern::PubSub>.
=item L<Mercury::Pattern::PubSub>
=item L<Mercury::Controller::PubSub::Cascade>
=item L<Mercury>
=back
Expand Down
121 changes: 121 additions & 0 deletions lib/Mercury/Controller/PubSub/Cascade.pm
@@ -0,0 +1,121 @@
package Mercury::Controller::PubSub::Cascade;

# ABSTRACT: Pub/sub controller with a topic heirarchy and cascading

=head1 SYNOPSIS
# myapp.pl
use Mojolicious::Lite;
plugin 'Mercury';
websocket( '/pub/*topic' )
->to( controller => 'PubSub::Cascade', action => 'pub' );
websocket( '/sub/*topic' )
->to( controller => 'PubSub::Cascade', action => 'sub' );
=head1 DESCRIPTION
This controller enables a L<pub/sub pattern|Mercury::Pattern::PubSub> on
a pair of endpoints (L<publish|/publish> and L<subscribe|/subscribe>.
In this variant, topics are organized into a heirarchy. Subscribers can
subscribe to higher branch of the tree to recieve messages from all the
publishers on lower branches of the tree. So, a subscriber to C</foo>
will receive messages sent to C</foo>, C</foo/bar>, and C</foo/bar/baz>.
For more information on the pub/sub pattern, see L<Mercury::Pattern::PubSub>.
=head1 SEE ALSO
=over
=item L<Mercury::Pattern::PubSub>
=item L<Mercury::Controller::PubSub>
=item L<Mercury>
=back
=cut

use Mojo::Base 'Mojolicious::Controller';
use Mercury::Pattern::PubSub;

=method publish
$app->routes->websocket( '/pub/*topic' )
->to( controller => 'PubSub::Cascade', action => 'publish' );
Controller action to connect a websocket as a publisher. A publish
client sends messages through the socket. The message will be sent to
all of the connected subscribers for the topic and all parent topics.
This endpoint requires a C<topic> in the stash.
=cut

sub publish {
my ( $c ) = @_;
my $topic = $c->stash( 'topic' );
my $pattern = $c->_pattern( $topic );
$pattern->add_publisher( $c->tx );

# Send messages to parent topics
$c->tx->on( message => sub {
my ( $tx, $msg ) = @_;
my @parts = split m{/}, $topic;
my @patterns =
# Only pattern objects that have been created
grep { defined }
# Change topics into pattern objects
map { $c->mercury->pattern( 'PubSub::Cascade' => $_ ) }
# Build parent topics
map { join '/', @parts[0..$_] }
0..$#parts-1;

$_->send_message( $msg ) for @patterns;
} );

$c->rendered( 101 );
}

=method subscribe
$app->routes->websocket( '/sub/*topic' )
->to( controller => 'PubSub::Cascade', action => 'subscribe' );
Controller action to connect a websocket as a subscriber. A subscriber
will recieve every message sent by publishers to the current topic and
any child topics.
This endpoint requires a C<topic> in the stash.
=cut

sub subscribe {
my ( $c ) = @_;
my $pattern = $c->_pattern( $c->stash( 'topic' ) );
$pattern->add_subscriber( $c->tx );
$c->rendered( 101 );
}

#=method _pattern
#
# my $pattern = $c->_pattern( $topic );
#
# Get or create the L<Mercury::Pattern::PubSub> object for the given
# topic.
#
#=cut

sub _pattern {
my ( $c, $topic ) = @_;
my $pattern = $c->mercury->pattern( 'PubSub::Cascade' => $topic );
if ( !$pattern ) {
$pattern = Mercury::Pattern::PubSub->new;
$c->mercury->pattern( 'PubSub::Cascade' => $topic => $pattern );
}
return $pattern;
}

1;

0 comments on commit 1b4b5c8

Please sign in to comment.