-
Notifications
You must be signed in to change notification settings - Fork 9
/
mpz
executable file
·104 lines (81 loc) · 2.6 KB
/
mpz
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#!/usr/bin/env perl
#
# mpz: pipe over ZeroMQ using Message::Passing
#
# Use mpz --help for more information
#
# dependencies: cpanm -n Message::Passing::ZeroMQ
#
# Pedro Melo, 2012
# License: Artistic License v2
#
use strict;
use warnings;
use Message::Passing::DSL;
use Message::Passing::Output::ZeroMQ;
use Getopt::Long;
my $def_addr = 'tcp://127.0.0.1:12390';
my $input_mode = !-t \*STDIN;
my ($bind, $connect, $sub, $pub, $usage);
if ($input_mode) { $connect = $pub = 1 }
else { $bind = $sub = 1 }
GetOptions(
"bind" => \$bind,
"connect" => \$connect,
"publisher" => \$pub,
"subscriber" => \$sub,
"help" => \$usage,
"usage" => \$usage
) or usage();
usage() if $usage;
my $addr = shift || $def_addr;
my %opts = (socket_type => ($pub ? 'PUB' : 'SUB'));
$opts{socket_bind} = $addr if $bind;
$opts{connect} = $addr if $connect;
my $chain;
if ($input_mode) {
my $zout = Message::Passing::Output::ZeroMQ->new(%opts, linger => 1); ## linger until everything is sent
while (<>) {
chomp;
$zout->consume($_);
}
}
else {
run_message_server message_chain {
error_log(class => 'STDERR');
output output => (class => 'STDOUT');
input input => (%opts, class => 'ZeroMQ', output_to => 'output');
};
}
exit(0);
sub usage {
print <<"USAGE";
Usage: source | mpz <options> <addr> - or - mpz <options> <addr> [| optional_dest]
A UNIX pipe over ZeroMQ+Message::Passing. The twisted bit is that you
can have multiple sources all sending to the same destination, and
each source can start/stop at any time.
Options:
--help or --usage will print this message
--bind Bind the ZeroMQ socket to an address
--connect Connect the ZeroMQ socket to an address
Output mode defaults to --bind, while input defaults to --
connect.
--publisher Use PUB socket type, default for output mode
--subscriber Use SUB socket type, default for input mode
<addr> is the address to bind/connect, defaults to $def_addr
Examples:
Simple stream of a tail -F:
receiver: mpz tcp://server:port
sender: tail -F logfile | mpz tcp://server:port
Add an heartbeat:
receiver: mpz tcp://server:port
sender 1: while true ; do date ; sleep 1 ; done | mpz tcp://server:port
sender 2: tail -F logfile | mpz tcp://server:port
Add an heartbeat+multiple tail -F's:
receiver: mpz tcp://server:port
sender 1: while true ; do date ; sleep 1 ; done | mpz tcp://server:port
sender 2: tail -F logfile1 | mpz tcp://server:port
sender 3: tail -F logfile2 | mpz tcp://server:port
USAGE
exit(2);
}