-
Notifications
You must be signed in to change notification settings - Fork 6
/
glisten.gleam
215 lines (199 loc) · 5.99 KB
/
glisten.gleam
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
import gleam/bytes_builder.{type BytesBuilder}
import gleam/dynamic.{type Dynamic}
import gleam/erlang/process.{type Selector, type Subject}
import gleam/option.{type Option, None, Some}
import gleam/result
import glisten/internal/acceptor.{Pool}
import glisten/internal/handler.{type ClientIp as InternalClientIp}
import glisten/socket.{
type Socket as InternalSocket, type SocketReason as InternalSocketReason,
Closed, Timeout,
}
import glisten/socket/options.{Certfile, Keyfile}
import glisten/transport.{type Transport}
import glisten/tcp
import glisten/ssl
import gleam/otp/actor
import gleam/otp/supervisor
/// Reasons that `serve` might fail
pub type StartError {
ListenerClosed
ListenerTimeout
AcceptorTimeout
AcceptorFailed(process.ExitReason)
AcceptorCrashed(Dynamic)
SystemError(SocketReason)
}
/// Your provided loop function with receive these message types as the
/// first argument.
pub type Message(user_message) {
/// These are messages received from the socket
Packet(BitArray)
/// These are any messages received from the selector returned from `on_init`
User(user_message)
}
pub type ClientIp =
InternalClientIp
pub type Socket =
InternalSocket
pub type SocketReason =
InternalSocketReason
/// This type holds useful bits of data for the active connection.
pub type Connection(user_message) {
Connection(
/// This will be optionally a tuple for the IPv4 of the other end of the
/// socket
client_ip: ClientIp,
socket: Socket,
/// This provides a uniform interface for both TCP and SSL methods.
transport: Transport,
subject: Subject(handler.Message(user_message)),
)
}
/// Sends a BytesBuilder message over the socket using the active transport
pub fn send(
conn: Connection(user_message),
msg: BytesBuilder,
) -> Result(Nil, SocketReason) {
transport.send(conn.transport, conn.socket, msg)
}
/// This is the shape of the function you need to provide for the `handler`
/// argument to `serve(_ssl)`.
pub type Loop(user_message, data) =
fn(Message(user_message), data, Connection(user_message)) ->
actor.Next(Message(user_message), data)
pub opaque type Handler(user_message, data) {
Handler(
on_init: fn() -> #(data, Option(Selector(user_message))),
loop: Loop(user_message, data),
on_close: Option(fn(data) -> Nil),
pool_size: Int,
)
}
fn map_user_selector(
selector: Selector(Message(user_message)),
) -> Selector(handler.LoopMessage(user_message)) {
process.map_selector(selector, fn(value) {
case value {
Packet(msg) -> handler.Packet(msg)
User(msg) -> handler.Custom(msg)
}
})
}
fn convert_loop(
loop: Loop(user_message, data),
) -> handler.Loop(user_message, data) {
fn(msg, data, conn: handler.Connection(user_message)) {
let conn =
Connection(conn.client_ip, conn.socket, conn.transport, conn.sender)
case msg {
handler.Packet(msg) -> {
case loop(Packet(msg), data, conn) {
actor.Continue(data, selector) ->
actor.Continue(data, option.map(selector, map_user_selector))
actor.Stop(reason) -> actor.Stop(reason)
}
}
handler.Custom(msg) -> {
case loop(User(msg), data, conn) {
actor.Continue(data, selector) ->
actor.Continue(data, option.map(selector, map_user_selector))
actor.Stop(reason) -> actor.Stop(reason)
}
}
}
}
}
/// Create a new handler for each connection. The required arguments mirror the
/// `actor.start` API from `gleam_otp`. The default pool is 10 accceptor
/// processes.
pub fn handler(
on_init: fn() -> #(data, Option(Selector(user_message))),
loop: Loop(user_message, data),
) -> Handler(user_message, data) {
Handler(on_init: on_init, loop: loop, on_close: None, pool_size: 10)
}
/// Adds a function to the handler to be called when the connection is closed.
pub fn with_close(
handler: Handler(user_message, data),
on_close: fn(data) -> Nil,
) -> Handler(user_message, data) {
Handler(..handler, on_close: Some(on_close))
}
/// Modify the size of the acceptor pool
pub fn with_pool_size(
handler: Handler(user_message, data),
size: Int,
) -> Handler(user_message, data) {
Handler(..handler, pool_size: size)
}
/// Start the TCP server with the given handler on the provided port
pub fn serve(
handler: Handler(user_message, data),
port: Int,
) -> Result(Subject(supervisor.Message), StartError) {
port
|> tcp.listen([])
|> result.map_error(fn(err) {
case err {
Closed -> ListenerClosed
Timeout -> ListenerTimeout
err -> SystemError(err)
}
})
|> result.then(fn(socket) {
Pool(
socket,
convert_loop(handler.loop),
handler.pool_size,
handler.on_init,
handler.on_close,
transport.Tcp,
)
|> acceptor.start_pool
|> result.map_error(fn(err) {
case err {
actor.InitTimeout -> AcceptorTimeout
actor.InitFailed(reason) -> AcceptorFailed(reason)
actor.InitCrashed(reason) -> AcceptorCrashed(reason)
}
})
})
}
/// Start the SSL server with the given handler on the provided port. The key
/// and cert files must be provided, valid, and readable by the current user.
pub fn serve_ssl(
handler: Handler(user_message, data),
port port: Int,
certfile certfile: String,
keyfile keyfile: String,
) -> Result(Subject(supervisor.Message), StartError) {
let assert Ok(_nil) = ssl.start()
port
|> ssl.listen([Certfile(certfile), Keyfile(keyfile)])
|> result.map_error(fn(err) {
case err {
Closed -> ListenerClosed
Timeout -> ListenerTimeout
err -> SystemError(err)
}
})
|> result.then(fn(socket) {
Pool(
socket,
convert_loop(handler.loop),
handler.pool_size,
handler.on_init,
handler.on_close,
transport.Ssl,
)
|> acceptor.start_pool
|> result.map_error(fn(err) {
case err {
actor.InitTimeout -> AcceptorTimeout
actor.InitFailed(reason) -> AcceptorFailed(reason)
actor.InitCrashed(reason) -> AcceptorCrashed(reason)
}
})
})
}