-
Notifications
You must be signed in to change notification settings - Fork 0
/
mug.gleam
292 lines (266 loc) · 7.17 KB
/
mug.gleam
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
import gleam/bytes_builder.{type BytesBuilder}
import gleam/dynamic.{type Dynamic}
import gleam/erlang/atom
import gleam/erlang/charlist.{type Charlist}
import gleam/erlang/process
pub type Socket
type DoNotLeak
/// Errors that can occur when working with TCP sockets.
///
/// For more information on these errors see the Erlang documentation:
/// - https://www.erlang.org/doc/man/file#type-posix
/// - https://www.erlang.org/doc/man/inet#type-posix
///
pub type Error {
// https://www.erlang.org/doc/man/inet#type-posix
Closed
Timeout
Eaddrinuse
Eaddrnotavail
Eafnosupport
Ealready
Econnaborted
Econnrefused
Econnreset
Edestaddrreq
Ehostdown
Ehostunreach
Einprogress
Eisconn
Emsgsize
Enetdown
Enetunreach
Enopkg
Enoprotoopt
Enotconn
Enotty
Enotsock
Eproto
Eprotonosupport
Eprototype
Esocktnosupport
Etimedout
Ewouldblock
Exbadport
Exbadseq
Nxdomain
// https://www.erlang.org/doc/man/file#type-posix
Eacces
Eagain
Ebadf
Ebadmsg
Ebusy
Edeadlk
Edeadlock
Edquot
Eexist
Efault
Efbig
Eftype
Eintr
Einval
Eio
Eisdir
Eloop
Emfile
Emlink
Emultihop
Enametoolong
Enfile
Enobufs
Enodev
Enolck
Enolink
Enoent
Enomem
Enospc
Enosr
Enostr
Enosys
Enotblk
Enotdir
Enotsup
Enxio
Eopnotsupp
Eoverflow
Eperm
Epipe
Erange
Erofs
Espipe
Esrch
Estale
Etxtbsy
Exdev
}
pub type ConnectionOptions {
ConnectionOptions(
/// The hostname of the server to connect to.
host: String,
/// The port of the server to connect to.
port: Int,
/// A timeout in milliseconds for the connection to be established.
///
/// Note that if the operating system returns a timeout then this package
/// will also return a timeout, even if this timeout value has not been
/// reached yet.
timeout: Int,
)
}
/// Create a new set of connection options.
///
pub fn new(host: String, port port: Int) -> ConnectionOptions {
ConnectionOptions(host: host, port: port, timeout: 1000)
}
/// Specify a timeout for the connection to be established.
///
pub fn timeout(
options: ConnectionOptions,
milliseconds timeout: Int,
) -> ConnectionOptions {
ConnectionOptions(..options, timeout: timeout)
}
/// Establish a TCP connection to the server specified in the connection
/// options.
///
/// Returns an error if the connection could not be established.
///
/// The socket is created in passive mode, meaning the the `receive` function is
/// to be called to receive packets from the client. The
/// `receive_next_packet_as_message` function can be used to switch the socket
/// to active mode and receive the next packet as an Erlang message.
///
pub fn connect(options: ConnectionOptions) -> Result(Socket, Error) {
let gen_options = [
// When data is received on the socket queue it in the TCP stack rather than
// sending it as an Erlang message to the socket owner's inbox.
#(Active, dynamic.from(False)),
// We want the data from the socket as bit arrays please, not lists.
#(Mode, dynamic.from(Binary)),
]
let host = charlist.from_string(options.host)
gen_tcp_connect(host, options.port, gen_options, options.timeout)
}
type GenTcpOptionName {
Active
Mode
}
type ModeValue {
Binary
}
type ActiveValue {
Once
}
type GenTcpOption =
#(GenTcpOptionName, Dynamic)
@external(erlang, "gen_tcp", "connect")
fn gen_tcp_connect(
host: Charlist,
port: Int,
options: List(GenTcpOption),
timeout: Int,
) -> Result(Socket, Error)
/// Send a packet to the client.
///
pub fn send(socket: Socket, packet: BitArray) -> Result(Nil, Error) {
send_builder(socket, bytes_builder.from_bit_array(packet))
}
/// Send a packet to the client, the data in `BytesBuilder`. Using this function
/// is more efficient turning an `BytesBuilder` or a `StringBuilder` into a
/// `BitArray` to use with the `send` function.
///
@external(erlang, "mug_ffi", "send")
pub fn send_builder(socket: Socket, packet: BytesBuilder) -> Result(Nil, Error)
/// Receive a packet from the client.
///
/// Errors if the socket is closed, if the timeout is reached, or if there is
/// some other problem receiving the packet.
///
pub fn receive(
socket: Socket,
timeout_milliseconds timeout: Int,
) -> Result(BitArray, Error) {
gen_tcp_receive(socket, 0, timeout_milliseconds: timeout)
}
@external(erlang, "gen_tcp", "recv")
fn gen_tcp_receive(
socket: Socket,
read_bytes_num: Int,
timeout_milliseconds timeout: Int,
) -> Result(BitArray, Error)
/// Close the socket, ensuring that any data buffered in the socket is flushed to the operating system kernel socket first.
///
@external(erlang, "mug_ffi", "shutdown")
pub fn shutdown(socket: Socket) -> Result(Nil, Error)
/// Switch the socket to active mode, meaning that the next packet received on
/// the socket will be sent as an Erlang message to the socket owner's inbox.
///
/// This is useful for when you wish to have an OTP actor handle incoming
/// messages as using the `receive` function would result in the actor being
/// blocked and unable to handle other messages while waiting for the next
/// packet.
///
/// Messages will be send to the process that controls the socket, which is the
/// process that established the socket with the `connect` function.
///
pub fn receive_next_packet_as_message(socket: Socket) -> Nil {
set_socket_options(socket, [#(Active, dynamic.from(Once))])
Nil
}
@external(erlang, "inet", "setopts")
fn set_socket_options(socket: Socket, options: List(GenTcpOption)) -> DoNotLeak
/// Messages that can be sent by the socket to the process that controls it.
///
pub type TcpMessage {
/// A packet has been received from the client.
Packet(Socket, BitArray)
/// The socket has been closed by the client.
SocketClosed(Socket)
/// An error has occurred on the socket.
TcpError(Socket, Error)
}
/// Configure a selector to receive messages from TCP sockets.
///
/// Note this will receive messages from all TCP sockets that the process
/// controls, rather than any specific one. If you wish to only handle messages
/// from one socket then use one process per socket.
///
pub fn selecting_tcp_messages(
selector: process.Selector(t),
mapper: fn(TcpMessage) -> t,
) -> process.Selector(t) {
let tcp = atom.create_from_string("tcp")
let closed = atom.create_from_string("tcp_closed")
let error = atom.create_from_string("tcp_error")
selector
|> process.selecting_record3(tcp, unsafe_coerce_packet(mapper))
|> process.selecting_record2(closed, unsafe_coerce_closed(mapper))
|> process.selecting_record3(error, unsafe_coerce_to_tcp_error(mapper))
}
fn unsafe_coerce_packet(
mapper: fn(TcpMessage) -> t,
) -> fn(Dynamic, Dynamic) -> t {
fn(socket, data) {
Packet(unsafe_coerce_to_socket(socket), dynamic.unsafe_coerce(data))
|> mapper
}
}
fn unsafe_coerce_closed(mapper: fn(TcpMessage) -> t) -> fn(Dynamic) -> t {
fn(socket) {
SocketClosed(unsafe_coerce_to_socket(socket))
|> mapper
}
}
fn unsafe_coerce_to_tcp_error(
mapper: fn(TcpMessage) -> t,
) -> fn(Dynamic, Dynamic) -> t {
fn(socket, reason) {
mapper(TcpError(
unsafe_coerce_to_socket(socket),
dynamic.unsafe_coerce(reason),
))
}
}
fn unsafe_coerce_to_socket(socket: Dynamic) -> Socket {
dynamic.unsafe_coerce(socket)
}