/
zeromq-client.t
75 lines (55 loc) · 1.55 KB
/
zeromq-client.t
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
use Cro::ZeroMQ::Message;
use Cro::ZeroMQ::Client;
use Cro::ZeroMQ::Service;
use Test;
# Echo socket
my Cro::Service $service = Cro::ZeroMQ::Service.rep(
bind => 'tcp://127.0.0.1:5432'
);
$service.start;
my $client = Cro::ZeroMQ::Client.req(
connect => 'tcp://127.0.0.1:5432'
);
my $reply = await $client.send(Cro::ZeroMQ::Message.new('test'));
ok $reply.body-text eq 'test', 'Client works';
$client = Cro::ZeroMQ::Client.req(
connect => 'tcp://127.0.0.1:5431'
);
dies-ok {
$client.send(Cro::ZeroMQ::Message.new('test'));
$client.send(Cro::ZeroMQ::Message.new('test'));
}, 'Req client roundtrip cannot be interracted';
$service.stop;
# Dealer part
class Replier does Cro::Transform {
method consumes() { Cro::ZeroMQ::Message }
method produces() { Cro::ZeroMQ::Message }
method transformer(Supply $messages --> Supply) {
supply {
whenever $messages {
emit Cro::ZeroMQ::Message.new(.body-text);
}
}
}
}
$service = Cro::ZeroMQ::Service.rep(
bind => 'tcp://127.0.0.1:5431',
Replier
);
$service.start;
$client = Cro::ZeroMQ::Client.dealer(
connect => 'tcp://127.0.0.1:5431'
);
my %h = :!a, :!b, :!c;
my $completion = Promise.new;
for <a b c>.pick(*).list {
start {
my $reply = await $client.send(Cro::ZeroMQ::Message.new($_));
%h{$reply.body-text} = True;
$completion.keep if %h<a> && %h<b> && %h<c>;
}
}
await Promise.anyof($completion, Promise.in(2));
is $completion.status, Kept, "Dealer client is working";;
$service.stop;
done-testing;