Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Added mpz: UNIX pipe with pub/sub semantincs over ZeroMQ+Message::Pas…

…sing

See --help for more info.

Signed-off-by: Pedro Melo <melo@simplicidade.org>
  • Loading branch information...
commit 9db6059256fd55a01ec8be497c51be186113c894 1 parent afb11c8
Pedro Melo authored

Showing 1 changed file with 104 additions and 0 deletions. Show diff stats Hide diff stats

  1. +104 0 bin/mpz
104 bin/mpz
... ... @@ -0,0 +1,104 @@
  1 +#!/usr/bin/env perl
  2 +#
  3 +# mpz: pipe over ZeroMQ using Message::Passing
  4 +#
  5 +# Use mpz --help for more information
  6 +#
  7 +# dependencies: cpanm -n Message::Passing::ZeroMQ
  8 +#
  9 +# Pedro Melo, 2012
  10 +# License: Artistic License v2
  11 +#
  12 +
  13 +use strict;
  14 +use warnings;
  15 +use Message::Passing::DSL;
  16 +use Message::Passing::Output::ZeroMQ;
  17 +use Getopt::Long;
  18 +
  19 +my $def_addr = 'tcp://127.0.0.1:12390';
  20 +my $input_mode = !-t \*STDIN;
  21 +my ($bind, $connect, $sub, $pub, $usage);
  22 +
  23 +if ($input_mode) { $connect = $pub = 1 }
  24 +else { $bind = $sub = 1 }
  25 +
  26 +GetOptions(
  27 + "bind" => \$bind,
  28 + "connect" => \$connect,
  29 + "publisher" => \$pub,
  30 + "subscriber" => \$sub,
  31 + "help" => \$usage,
  32 + "usage" => \$usage
  33 +) or usage();
  34 +usage() if $usage;
  35 +
  36 +my $addr = shift || $def_addr;
  37 +
  38 +my %opts = (socket_type => ($pub ? 'PUB' : 'SUB'));
  39 +$opts{socket_bind} = $addr if $bind;
  40 +$opts{connect} = $addr if $connect;
  41 +
  42 +my $chain;
  43 +if ($input_mode) {
  44 + my $zout = Message::Passing::Output::ZeroMQ->new(%opts, linger => 1); ## linger until everything is sent
  45 + while (<>) {
  46 + chomp;
  47 + $zout->consume($_);
  48 + }
  49 +}
  50 +else {
  51 + run_message_server message_chain {
  52 + error_log(class => 'STDERR');
  53 + output output => (class => 'STDOUT');
  54 + input input => (%opts, class => 'ZeroMQ', output_to => 'output');
  55 + };
  56 +}
  57 +exit(0);
  58 +
  59 +sub usage {
  60 + print <<"USAGE";
  61 +Usage: source | mpz <options> <addr> - or - mpz <options> <addr> [| optional_dest]
  62 +
  63 + A UNIX pipe over ZeroMQ+Message::Passing. The twisted bit is that you
  64 + can have multiple sources all sending to the same destination, and
  65 + each source can start/stop at any time.
  66 +
  67 + Options:
  68 + --help or --usage will print this message
  69 +
  70 + --bind Bind the ZeroMQ socket to an address
  71 + --connect Connect the ZeroMQ socket to an address
  72 +
  73 + Output mode defaults to --bind, while input defaults to --
  74 + connect.
  75 +
  76 + --publisher Use PUB socket type, default for output mode
  77 + --subscriber Use SUB socket type, default for input mode
  78 +
  79 + <addr> is the address to bind/connect, defaults to $def_addr
  80 +
  81 + Examples:
  82 +
  83 + Simple stream of a tail -F:
  84 +
  85 + receiver: mpz tcp://server:port
  86 + sender: tail -F logfile | mpz tcp://server:port
  87 +
  88 + Add an heartbeat:
  89 +
  90 + receiver: mpz tcp://server:port
  91 + sender 1: while true ; do date ; sleep 1 ; done | mpz tcp://server:port
  92 + sender 2: tail -F logfile | mpz tcp://server:port
  93 +
  94 + Add an heartbeat+multiple tail -F's:
  95 +
  96 + receiver: mpz tcp://server:port
  97 + sender 1: while true ; do date ; sleep 1 ; done | mpz tcp://server:port
  98 + sender 2: tail -F logfile1 | mpz tcp://server:port
  99 + sender 3: tail -F logfile2 | mpz tcp://server:port
  100 +
  101 +
  102 +USAGE
  103 + exit(2);
  104 +}

0 comments on commit 9db6059

Please sign in to comment.
Something went wrong with that request. Please try again.