Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Newer
Older
100644 165 lines (128 sloc) 3.055 kb
8b5146a @rcaputo Prototype a Reflexive ZeroMQ socket, with basic publish and subscribe
authored
1 package ZmqSocket;
2
3 use Moose;
4 extends 'Reflex::Base';
5
6 use Errno qw(EAGAIN EINTR);
7 use ZeroMQ::Raw;
8 use ZeroMQ::Raw::Constants qw(
9 ZMQ_FD ZMQ_NOBLOCK ZMQ_PUB ZMQ_SUB ZMQ_SUBSCRIBE ZMQ_POLLIN
10 ZMQ_EVENTS
11 );
12
65eb070 @rcaputo Modernize ZeroMQ examples to use event objects.
authored
13 # ZeroMQ message event. See ZmqMessage.pm in the eg directory.
14 use ZmqMessage;
15
8b5146a @rcaputo Prototype a Reflexive ZeroMQ socket, with basic publish and subscribe
authored
16 # ZeroMQ::Raw::Context
17
18 has thread_count => (
19 is => 'ro',
20 isa => 'Int',
21 default => 1,
22 );
23
24 # ZeroMQ::Raw::Socket
25
26 has socket_type => (
27 is => 'ro',
28 isa => 'Int',
29 required => 1,
30 );
31
32 # ZeroMQ::Raw::Bind
33
34 has endpoints => (
35 is => 'ro',
36 isa => 'ArrayRef[Str]',
37 required => 1,
38 );
39
40 ### Misc.
41
42 has _zmq_active => ( is => 'rw', isa => 'Bool', default => 1 );
43
44 has _zmq_context => (
45 is => 'ro',
46 isa => 'ZeroMQ::Raw::Context',
47 lazy => 1,
48 default => sub {
49 my $self = shift();
50 return ZeroMQ::Raw::Context->new( threads => $self->thread_count() );
51 },
52 );
53
54 has _zmq_socket => (
55 is => 'ro',
56 isa => 'ZeroMQ::Raw::Socket',
57 lazy => 1,
58 default => sub {
59 my $self = shift;
60
61 my $socket = ZeroMQ::Raw::Socket->new(
62 $self->_zmq_context(),
63 $self->socket_type(),
64 );
65
66 # TODO - Some better way to dispatch setup than if() statements.
67 #
68 # $self->publish( \@endpoints );
69 # Create the socket as ZMQ_PUB.
70 # Bind to the @endpoints.
71 #
72 # $self->subscribe( \@endpoints );
73 # Create the socket as ZMQ_SUB.
74 # Connect to the @endpoints.
75
76 if ($self->socket_type() == ZMQ_PUB) {
77 foreach (@{$self->endpoints()}) {
78 $! = 0;
79 $socket->bind($_) or warn "can't bind to $_ - $!";
80 }
81 return $socket;
82 }
83
84 if ($self->socket_type() == ZMQ_SUB) {
85 foreach (@{$self->endpoints()}) {
86 $socket->connect($_);
87 }
88
89 $socket->setsockopt(ZMQ_SUBSCRIBE, 'debug:');
90
91 return $socket;
92 }
93
94 die "unknown zmq socket type: " . $self->socket_type();
95 },
96 );
97
98 has _zmq_filehandle => (
99 is => 'ro',
100 isa => 'FileHandle',
101 lazy => 1,
102 default => sub {
103 my $self = shift();
104
105 # TODO - Is it necessary to open this socket for append?
106
107 open(
108 my $zmq_fh, "+<&=" . $self->_zmq_socket()->getsockopt(ZMQ_FD)
109 ) or die "filehandle creation failed: $!";
110
111 return $zmq_fh;
112 },
113 );
114
115 with 'Reflex::Role::Readable' => {
116 att_active => '_zmq_active',
117 att_handle => '_zmq_filehandle',
118 cb_ready => '_on_zmq_readable',
119 method_pause => 'pause_reading',
120 method_resume => 'resume_reading',
121 method_stop => 'stop_reading',
122 };
123
124 sub _on_zmq_readable {
125 my ($self, $args) = @_;
126
127 MESSAGE: while (1) {
128 return unless $self->_zmq_socket()->getsockopt(ZMQ_EVENTS) & ZMQ_POLLIN;
129
130 my $msg = ZeroMQ::Raw::Message->new();
131
132 unless ($self->_zmq_socket()->recv($msg, ZMQ_NOBLOCK)) {
133 $self->emit(
65eb070 @rcaputo Modernize ZeroMQ examples to use event objects.
authored
134 -name => "message",
135 -type => 'ZmqMessage',
136 message => $msg,
8b5146a @rcaputo Prototype a Reflexive ZeroMQ socket, with basic publish and subscribe
authored
137 );
138 next MESSAGE;
139 }
140
141 return if $! == EAGAIN or $! == EINTR;
142
143 $self->pause_reading();
144
145 $self->on_error(
146 {
147 errnum => ($! + 0),
148 errstr => "$!",
149 errfun => 'zmq_recv',
150 }
151 );
152
153 return;
154 }
155 }
156
157 sub send_scalar {
158 my ($self, $scalar) = @_;
159
160 my $message = ZeroMQ::Raw::Message->new_from_scalar($scalar);
161 $self->_zmq_socket()->send($message, 0) || 0;
162 }
163
164 1;
Something went wrong with that request. Please try again.