/
Posix.pm
391 lines (287 loc) · 11.2 KB
/
Posix.pm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
use v6;
=begin pod
=head1 NAME
MQ::Posix - Perl 6 interface for POSIX message queues
=head1 SYNOPSIS
=begin code
use MQ::Posix;
my $queue = MQ::Posix.new(name => 'test-queue', :create, :r );
react {
whenever $queue.Supply -> $buf {
say $buf.decode;
}
whenever signal(SIGINT) {
$queue.close;
$queue.unlink;
done;
}
}
=end code
And in some separate process:
=begin code
use MQ::Posix;
my $queue = MQ::Posix.new(name => 'test-queue', :create, :w );
await $queue.send("some test message", priority => 10);
$queue.close;
=end code
=head1 DESCRIPTION
POSIX message queues offer a mechanism for processes to reliably exchange
data in the form of messages
The messages are presented as a priority ordered queue with higher priority
messages being delivered first and messages of equal priority being delivered
in age order.
The mechanism is simple, having no provision for message metadata and so forth
and whilst reliable, unread messages do not persist beyond the lifetime of the
running kernel.
=head1 METHODS
=head2 method new
method new(Str :$name!, Bool :$r, Bool :$w, Bool :$create, Bool :$exclusive, Int :$max-messages, Int :$message-size, Int :$mode = 0o660)
The constructor of the class, C<$name> is the name of the queue and is required,
there may be different constraints on the name in different implementations but
in both B<Linux> and B<FreeBSD> it must conform to the requirements of a
filename. On or both of C<r> or C<w> must be provided to indicate whether
the queue should be readable, writable or both. If C<create> is supplied
the queue will be created if necessary, otherwise if the queue doesn't
exist an exception will be thrown. If C<exclusive> is supplied along with
C<create> an exception will be thrown if the queue already exists. C<$mode>
will be used as the mode of the queue if the queue is to be created, after
the application of the user file creation mask in effect.
C<$max-messages> and C<$message-size> will be used to set the queues attributes
if it is created if provided, otherwise the system defaults will be used.
The system defaults may differ from system to system. If the user is not
privileged and the values are higher than the configured limits then an
exception may be thrown when the queue is created - how to determine the
limits may differ from system to system, on Linux they can be obtained
and set through a C<sysctl> interface (or via C</proc/sys/fs/mqueue/> )
The queue itself may not be created immediately but rather when it first
needs to be used, so any exception may not be thrown at the time the
constructor is called.
=head2 method attributes
method attributes(--> MQ::Posix::Attr)
This returns an object describing the queue's attributes, they can't
be changed after the queue is created. The object has the attributes
C<message-size> which is the allowed maximum size of a message,
C<max-messages> is the maximum number of messages allowed in the queue
simulataneously and C<current-messages> the number of messages in
the queue.
=head2 method send
multi method send(Str $msg, Int $priority = 0 --> Promise)
multi method send(Buf $msg, Int $priority = 0 --> Promise)
multi method send(CArray $msg, Int $length, Int $priority = 0 --> Promise)
If the queue is opened for writing this will send the supplied message
with the specified priority, returning a Promise that will be kept
when the message is placed on the queue (as it may block if there are
C<max-messages> alreadt on the queue.) The Promise will be broken with
an exception if the queue is not opened for writing or if the message is
longer than C<message-size>.
=head2 method receive
method receive(--> Promise )
This returns a Promise that will be kept with the highest priority
message from the queue as a L<Buf> (you are free to decode or
marshal this as you wish as there is no mechanism to convey the
encoding.) it will be broken with an exception if the queue wasn't
opened for reading. The message will never exceed C<message-size> bytes.
=head2 method Supply
method Supply(--> Supply)
This provides a Supply onto which are emitted the messages as a L<Buf>
as they arrive on the queue. An exception will be thrown if the queue
isn't opened for reading. The first time this is called a new thread
will be started to feed the supply which will run until the queue is
closed.
In places which expect a Supply such as a C<whenever> this need not
be explicitly called and the object can be coerced instead,
=head2 method close
method close( --> Bool)
This closes the queue handle that will have been opened if the queue
was written to or read, after this has been called an exception
will be thrown if attempting to read or write. If C<Supply> was
called the thread it started will finish.
=head2 method unlink
method unlink( --> Bool)
This will remove the queue and it will no longer be able to be opened
by another process, any process that currently has it opened will still
be able to use it, and the queue will be removed when the last opener
closes it. An exception will be thrown if the queue was already removed
or if the effective user doesn't have permission.
=end pod
use NativeCall;
use NativeHelpers::Array;
class MQ::Posix {
my constant __syscall_slong_t = int64;
my constant mqd_t = int32;
my constant LIB = [ 'rt', v1 ];
constant ReadOnly = 0;
constant WriteOnly = 1;
constant ReadWrite = 2;
constant Create = 64;
constant Exclusive = 128;
class X::MQ is Exception {
has Str $.message;
}
class X::MQ::System is X::MQ {
has Int $.errno is required;
has Str $!message;
method message( --> Str) {
$!message //= self!strerror ~ " ({ $!errno })";
}
sub strerror_r(int32, CArray $buf is rw, size_t $buflen --> CArray) is native { * }
method !strerror(--> Str) {
my $array = CArray[uint8].new((0) xx 256);
my $out = strerror_r($!errno, $array, 256);
my $buff = copy-carray-to-buf($out, 256);
$buff.decode;
}
}
class X::MQ::Open is X::MQ::System {
}
class Attr is repr('CStruct') {
has __syscall_slong_t $.flags;
has __syscall_slong_t $.max-messages;
has __syscall_slong_t $.message-size;
has __syscall_slong_t $.current-messages;
has __syscall_slong_t $!__pad_1;
has __syscall_slong_t $!__pad_2;
has __syscall_slong_t $!__pad_3;
has __syscall_slong_t $!__pad_4;
}
has Str $.name is required;
has Int $!open-flags;
has Int $.max-messages;
has Int $.message-size;
has Int $.mode;
has Attr $.attributes;
has Int $!queue-descriptor;
has Promise $!open-promise;
my $errno := cglobal(Str, 'errno', int32);
sub mq_open(Str $name, int32 $oflag, int32 $mode, Attr $attr --> mqd_t ) is native(LIB) { * }
method queue-descriptor(--> mqd_t) {
my Attr $attr;
if ( $!open-flags & Create ) && ( $!message-size || $!max-messages ) {
$attr = Attr.new(message-size => $!message-size || 8192, max-messages => $!max-messages || 10);
}
$!queue-descriptor //= do {
my $fd = mq_open($!name, $!open-flags, $!mode, $attr);
if $fd < 0 {
X::MQ::Open.new(:$errno).throw;
}
$!open-promise = Promise.new;
$fd;
}
}
method r(--> Bool) {
?($!open-flags +& ( ReadOnly | ReadWrite));
}
method w(--> Bool) {
?($!open-flags +& ( WriteOnly | ReadWrite));
}
submethod BUILD(Str :$!name!, Bool :$r, Bool :$w, Bool :$create, Bool :$exclusive, Int :$!max-messages, Int :$!message-size, Int :$!mode = 0o660) {
if !$!name.starts-with('/') {
$!name = '/' ~ $!name;
}
$!open-flags = do if $r && $w {
ReadWrite;
}
elsif $w {
WriteOnly;
}
else {
ReadOnly;
}
if $create {
$!open-flags +|= Create;
if $exclusive {
$!open-flags +|= Exclusive;
}
}
}
class X::MQ::Close is X::MQ::System {
}
sub mq_close(mqd_t $mqdes --> int32 ) is native(LIB) { * }
method close( --> Bool) {
my Bool $rc = True;
if $!queue-descriptor.defined {
if mq_close($!queue-descriptor) < 0 {
X::MQ::Close.new(:$errno).throw;
}
$!open-promise.keep: True;
}
$rc;
}
class X::MQ::Attributes is X::MQ::System {
}
sub mq_getattr(mqd_t $mqdes, Attr $mqstat is rw --> int32 ) is native(LIB) { * }
method attributes(--> Attr) {
$!attributes //= do {
my $attrs = Attr.new;
if mq_getattr(self.queue-descriptor, $attrs) < 0 {
X::MQ::Attributes.new(:$errno).throw;
}
$attrs;
}
}
class X::MQ::Unlink is X::MQ::System {
}
sub mq_unlink(Str $name --> int32 ) is native(LIB) { * }
method unlink(--> Bool) {
if mq_unlink($!name) < 0 {
X::MQ::Unlink.new(:$errno).throw;
}
True;
}
class X::MQ::Receive is X::MQ::System {
}
sub mq_receive(mqd_t $mqdes, CArray[uint8] $msg_ptr is rw, size_t $msg_len, Pointer[uint32] $msg_prio --> ssize_t ) is native(LIB) { * }
method receive(--> Promise ) {
start {
my Int $msg-size = $.attributes.message-size;
my CArray $buf = CArray[uint8].new((8) xx $msg-size);
my $rc = mq_receive($.queue-descriptor, $buf, $msg-size, Pointer[uint32]);
if $rc < 0 {
X::MQ::Receive.new(:$errno).throw;
}
else {
copy-carray-to-buf($buf, $rc);
}
}
}
class X::MQ::Send is X::MQ::System {
}
sub mq_send(mqd_t $mqdes, CArray[uint8] $msg_ptr, size_t $msg_len, uint32 $msg_prio --> int32 ) is native(LIB) { * }
proto method send(|c) { * }
multi method send(Str $msg, Int $priority = 0 --> Promise) {
self.send(Buf.new($msg.encode.list), $priority);
}
multi method send(Buf $msg, Int $priority = 0 --> Promise) {
my CArray $carray = copy-buf-to-carray($msg);
self.send($carray, $msg.elems, $priority);
}
multi method send(CArray $msg, Int $length, Int $priority = 0 --> Promise) {
start {
if mq_send(self.queue-descriptor, $msg, $length, $priority ) < 0 {
X::MQ::Send.new(:$errno).throw;
}
else {
True;
}
}
}
has Supplier $!supplier;
has Supply $.Supply;
has Promise $!supply-promise;
method Supply(--> Supply) {
$!Supply //= do {
if !$!open-promise.defined {
sink self.queue-descriptor;
}
$!supplier = Supplier.new;
$!supply-promise = start {
while $!open-promise.status ~~ Planned {
$!supplier.emit: await self.receive;
}
$!supplier.done;
}
$!supplier.Supply;
}
}
}
# vim: expandtab shiftwidth=4 ft=perl6