-
Notifications
You must be signed in to change notification settings - Fork 3
/
Socket-Connection-Linux.pm6
139 lines (107 loc) · 3.29 KB
/
Socket-Connection-Linux.pm6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
use v6;
#
# Copyright © 2018-2019 Joelle Maslak
# All Rights Reserved - See License
#
use StrictClass;
unit class Net::BGP::Socket-Connection-Linux:ver<0.1.1>:auth<cpan:JMASLAK>
does StrictClass;
use NativeCall;
enum States (
SOCKET_CLOSED => 0;
SOCKET_OPEN => 1;
);
has UInt:D $.my-port is required;
has Str:D $.my-host is required;
has Int:D $.peer-family is required;
has UInt:D $.peer-port is required where ^(2¹⁶);
has Str:D $.peer-host is required;
has Int:D $.socket-fd is required;
has Lock:D $!lock = Lock.new;
has Channel $!out-channel;
has States:D $.state is rw = SOCKET_OPEN;
# Aliases for socket-(port|host)
method socket-host { return $.my-host }
method socket-port { return $.my-port }
# write(int fd, const void *buf, size_t count)
sub native-write(int32, Pointer, int32 -->int32) is native is symbol('write') {*}
method write(buf8:D $buffer -->Nil) {
if $!state ≠ SOCKET_OPEN { die "Socket in wrong state" }
my $rv = native-write($!socket-fd, nativecast(Pointer, $buffer), $buffer.bytes);
if $rv ≠ $buffer {
self.close if $!state ≠ SOCKET_CLOSED;
}
}
method say(Str:D $str) {
if $!state ≠ SOCKET_OPEN { die "Socket in wrong state" }
self.print("$str\n");
}
method print(Str:D $str) {
if $!state ≠ SOCKET_OPEN { die "Socket in wrong state" }
self.write( buf8.new( $str.encode(:encoding('ascii')) ) );
}
# recv(int fd, void *buf, size_t len, int flags)
sub native-recv(int32, buf8 is rw, int32, int32 -->int32) is native is symbol('recv') {*}
method recv(-->buf8) {
if $!state ≠ SOCKET_OPEN { die "Socket in wrong state" }
my $buf = buf8.new( 0 xx (2¹⁶) );
my $rv = native-recv($!socket-fd, $buf, $buf.bytes, 0);
if $rv < 0 {
self.close if $!state ≠ SOCKET_CLOSED;
die("recv returned an error")
}
if $rv == 0 { return buf8.new; }
return $buf.subbuf(0..^($rv));
}
# Supply
method Supply(-->Supply:D) {
if $!state ≠ SOCKET_OPEN { die "Socket in wrong state" }
my $supplier = Supplier::Preserving.new;
my $supply = $supplier.Supply;
start {
while $!state == SOCKET_OPEN {
my $buf = self.recv;
if $buf.bytes == 0 {
self.close;
} else {
$supplier.emit($buf);
}
}
$supplier.done;
CATCH {
default {
$supplier.done;
self.close if $!state ≠ SOCKET_CLOSED;
}
}
}
return $supply;
}
# Output (Buffered)
method buffered-send(buf8:D $buffer -->Nil) {
if $!state ≠ SOCKET_OPEN { die "Socket in wrong state" }
$!lock.protect: {
if ! $!out-channel.defined {
$!out-channel = Channel.new;
start loop {
my $to-send = $!out-channel.receive;
self.write($to-send);
}
}
}
$!out-channel.send($buffer);
}
# close(int)
sub native-close(int32 -->int32) is native is symbol('close') {*}
method close(-->Nil) {
if $!state ≠ SOCKET_OPEN { return; }
my $rv = native-close($!socket-fd);
if $rv { die("close failed") }
$!state = SOCKET_CLOSED;
}
submethod DESTROY {
if self.state ≠ SOCKET_CLOSED { self.close }
CATCH {
default { }
}
}