Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

net: add run_server to run eio servers #408

Merged
merged 2 commits into from
Jan 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 30 additions & 0 deletions lib_eio/net.ml
Expand Up @@ -170,6 +170,8 @@ class virtual listening_socket = object
method virtual close : unit
end

type connection_handler = stream_socket -> Sockaddr.stream -> unit

let accept ~sw (t : #listening_socket) = t#accept ~sw

let accept_fork ~sw (t : #listening_socket) ~on_error handle =
Expand Down Expand Up @@ -267,3 +269,31 @@ let with_tcp_connect ?(timeout=Time.Timeout.none) ~host ~service t f =
| exception (Exn.Io _ as ex) ->
let bt = Printexc.get_raw_backtrace () in
Exn.reraise_with_context ex bt "connecting to %S:%s" host service

(* Run a server loop in a single domain. *)
let run_server_loop ~connections ~on_error ~stop listening_socket connection_handler =
Switch.run @@ fun sw ->
let rec accept () =
Semaphore.acquire connections;
accept_fork ~sw ~on_error listening_socket (fun conn addr ->
Fun.protect (fun () -> connection_handler conn addr)
~finally:(fun () -> Semaphore.release connections)
);
accept ()
in
match stop with
| None -> accept ()
| Some stop -> Fiber.first accept (fun () -> Promise.await stop)

let run_server ?(max_connections=Int.max_int) ?(additional_domains) ?stop ~on_error listening_socket connection_handler : 'a =
if max_connections <= 0 then invalid_arg "max_connections";
Switch.run @@ fun sw ->
let connections = Semaphore.make max_connections in
let run_server_loop () = run_server_loop ~connections ~on_error ~stop listening_socket connection_handler in
additional_domains |> Option.iter (fun (domain_mgr, domains) ->
if domains < 0 then invalid_arg "additional_domains";
for _ = 1 to domains do
Fiber.fork ~sw (fun () -> Domain_manager.run domain_mgr (fun () -> ignore (run_server_loop () : 'a)))
done;
);
run_server_loop ()
47 changes: 44 additions & 3 deletions lib_eio/net.mli
Expand Up @@ -182,17 +182,26 @@ val accept :
The new socket will be closed automatically when [sw] finishes, if not closed earlier.
If you want to handle multiple connections, consider using {!accept_fork} instead. *)

type connection_handler = stream_socket -> Sockaddr.stream -> unit
(** [connection_handler] handles incoming connections from a listening socket. *)

val accept_fork :
sw:Switch.t ->
#listening_socket ->
on_error:(exn -> unit) ->
(stream_socket -> Sockaddr.stream -> unit) ->
connection_handler ->
unit
(** [accept_fork socket fn] accepts a connection and handles it in a new fiber.
(** [accept_fork ~sw ~on_error socket fn] accepts a connection and handles it in a new fiber.

After accepting a connection to [socket], it runs [fn flow client_addr] in a new fiber.

[flow] will be closed when [fn] returns. *)
[flow] will be closed when [fn] returns. The new fiber is attached to [sw].

@param on_error Called if [connection_handler] raises an exception.
This is typically a good place to log the error and continue.
If the exception is an {!Eio.Io} error then the caller's address is added to it.
If you don't want to handle connection errors,
use [~on_error:raise] to cancel the caller's context. *)

val accept_sub :
sw:Switch.t ->
Expand All @@ -202,6 +211,38 @@ val accept_sub :
unit
[@@deprecated "Use accept_fork instead"]

(** {2 Running Servers} *)

val run_server :
?max_connections:int ->
?additional_domains:(#Domain_manager.t * int) ->
?stop:'a Promise.t ->
on_error:(exn -> unit) ->
#listening_socket ->
connection_handler ->
'a
(** [run_server ~on_error sock connection_handler] establishes a concurrent socket server [s].

It accepts incoming client connections on socket [sock] and handles them with {!accept_fork}
(see that for the description of [on_error] and [connection_handler]).

{b Running a Parallel Server}

By default [s] runs on a {e single} OCaml {!module:Domain}. However, if [additional_domains:(domain_mgr, domains)]
parameter is given, then [s] will spawn [domains] additional domains and run accept loops in those too.
In such cases you must ensure that [connection_handler] only accesses thread-safe values.
Note that having more than {!Domain.recommended_domain_count} domains in total is likely to result in bad performance.

@param max_connections The maximum number of concurrent connections accepted by [s] at any time.
The default is [Int.max_int].
@param stop Resolving this promise causes [s] to stop accepting new connections.
[run_server] will wait for all existing connections to finish and then return.
This is useful to upgrade a server without clients noticing.
To stop immediately, cancelling all connections, just cancel [s]'s fiber instead.
@param on_error Connection error handler (see {!accept_fork}).
@raise Invalid_argument if [max_connections <= 0].
if [additional_domains = (domain_mgr, domains)] is used and [domains < 0]. *)

(** {2 Datagram Sockets} *)

val datagram_socket :
Expand Down
230 changes: 229 additions & 1 deletion tests/network.md
Expand Up @@ -259,7 +259,7 @@ Extracting file descriptors from Eio objects:
traceln "Listening socket has Unix FD: %b" (Eio_unix.FD.peek_opt server <> None);
let have_client, have_server =
Fiber.pair
(fun () ->
(fun () ->
let flow = Eio.Net.connect ~sw net addr in
(Eio_unix.FD.peek_opt flow <> None)
)
Expand Down Expand Up @@ -710,3 +710,231 @@ TODO: This is wrong; see https://github.com/ocaml-multicore/eio/issues/342
- : unit = ()
```


## run_server

A simple connection handler for testing:
```ocaml
let handle_connection flow _addr =
let msg = read_all flow in
assert (msg = "Hi");
Fiber.yield ();
Eio.Flow.copy_string "Bye" flow
```

A mock listening socket that allows acceping `n_clients` clients, each of which writes "Hi",
and then allows `n_domains` further attempts, none of which ever completes:

```ocaml
let mock_listener ~n_clients ~n_domains =
let make_flow i () =
if n_domains > 1 then Fiber.yield () (* Load balance *)
else Fiber.check ();
let flow = Eio_mock.Flow.make ("flow" ^ string_of_int i) in
Eio_mock.Flow.on_read flow [`Return "Hi"; `Raise End_of_file];
flow, `Tcp (Eio.Net.Ipaddr.V4.loopback, 30000 + i)
in
let listening_socket = Eio_mock.Net.listening_socket "tcp/80" in
Eio_mock.Net.on_accept listening_socket (
List.init n_clients (fun i -> `Run (make_flow i)) @
List.init n_domains (fun _ -> `Run Fiber.await_cancel)
);
listening_socket
```

Start handling the connections, then begin a graceful shutdown,
allowing the connections to finish and then exiting:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
let listening_socket = mock_listener ~n_clients:3 ~n_domains:1 in
let stop, set_stop = Promise.create () in
Fiber.both
(fun () ->
Eio.Net.run_server listening_socket handle_connection
~max_connections:10
~on_error:raise
~stop
)
(fun () ->
traceln "Begin graceful shutdown";
Promise.resolve set_stop ()
);;
+tcp/80: accepted connection from tcp:127.0.0.1:30000
+flow0: read "Hi"
+tcp/80: accepted connection from tcp:127.0.0.1:30001
+flow1: read "Hi"
+tcp/80: accepted connection from tcp:127.0.0.1:30002
+flow2: read "Hi"
+Begin graceful shutdown
+flow0: wrote "Bye"
+flow0: closed
+flow1: wrote "Bye"
+flow1: closed
+flow2: wrote "Bye"
+flow2: closed
- : unit = ()
```

Non-graceful shutdown, closing all connections still in progress:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
let listening_socket = mock_listener ~n_clients:3 ~n_domains:1 in
Fiber.both
(fun () ->
Eio.Net.run_server listening_socket handle_connection
~max_connections:10
~on_error:raise
)
(fun () -> failwith "Simulated error");;
+tcp/80: accepted connection from tcp:127.0.0.1:30000
+flow0: read "Hi"
+tcp/80: accepted connection from tcp:127.0.0.1:30001
+flow1: read "Hi"
+tcp/80: accepted connection from tcp:127.0.0.1:30002
+flow2: read "Hi"
+flow0: closed
+flow1: closed
+flow2: closed
Exception: Failure "Simulated error".
```

To test support for multiple domains, we just run everything in one domain
to keep the output deterministic. We override `traceln` to log the (fake)
domain ID too:

```ocaml
let with_domain_tracing id fn =
let traceln ?__POS__ fmt =
Eio.Private.Debug.default_traceln ?__POS__ ("[%d] " ^^ fmt) id
in
Fiber.with_binding Eio.Private.Debug.v#traceln { traceln } fn

let fake_domain_mgr () = object (_ : #Eio.Domain_manager.t)
val mutable next_domain_id = 1

method run fn =
let self = next_domain_id in
next_domain_id <- next_domain_id + 1;
let cancelled, _ = Promise.create () in
with_domain_tracing self (fun () -> fn ~cancelled)

method run_raw _ = assert false
end
```

Handling the connections with 3 domains, with a graceful shutdown:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
with_domain_tracing 0 @@ fun () ->
let n_domains = 3 in
let listening_socket = mock_listener ~n_clients:10 ~n_domains in
let stop, set_stop = Promise.create () in
Fiber.both
(fun () ->
Eio.Net.run_server listening_socket handle_connection
~additional_domains:(fake_domain_mgr (), n_domains - 1)
~max_connections:10
~on_error:raise
~stop
)
(fun () ->
Fiber.yield ();
Promise.resolve set_stop ();
Fiber.yield (); (* Allow fibers to receive shutdown request *)
traceln "Requested graceful shutdown"
);;
+[1] tcp/80: accepted connection from tcp:127.0.0.1:30000
+[1] flow0: read "Hi"
+[2] tcp/80: accepted connection from tcp:127.0.0.1:30001
+[2] flow1: read "Hi"
+[0] tcp/80: accepted connection from tcp:127.0.0.1:30002
+[0] flow2: read "Hi"
+[1] flow0: wrote "Bye"
+[1] flow0: closed
+[1] tcp/80: accepted connection from tcp:127.0.0.1:30003
+[1] flow3: read "Hi"
+[2] flow1: wrote "Bye"
+[2] flow1: closed
+[2] tcp/80: accepted connection from tcp:127.0.0.1:30004
+[2] flow4: read "Hi"
+[0] flow2: wrote "Bye"
+[0] flow2: closed
+[0] tcp/80: accepted connection from tcp:127.0.0.1:30005
+[0] flow5: read "Hi"
+[0] Requested graceful shutdown
+[1] flow3: wrote "Bye"
+[1] flow3: closed
+[2] flow4: wrote "Bye"
+[2] flow4: closed
+[0] flow5: wrote "Bye"
+[0] flow5: closed
- : unit = ()
```

Handling the connections with 3 domains, aborting immediately:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
with_domain_tracing 0 @@ fun () ->
let n_domains = 3 in
let listening_socket = mock_listener ~n_clients:10 ~n_domains in
Fiber.both
(fun () ->
Eio.Net.run_server listening_socket handle_connection
~additional_domains:(fake_domain_mgr (), n_domains - 1)
~max_connections:10
~on_error:raise
)
(fun () -> Fiber.yield (); failwith "Simulated error");;
+[1] tcp/80: accepted connection from tcp:127.0.0.1:30000
+[1] flow0: read "Hi"
+[2] tcp/80: accepted connection from tcp:127.0.0.1:30001
+[2] flow1: read "Hi"
+[0] tcp/80: accepted connection from tcp:127.0.0.1:30002
+[0] flow2: read "Hi"
+[1] flow0: closed
+[2] flow1: closed
+[0] flow2: closed
Exception: Failure "Simulated error".
```

Limiting to 2 concurrent connections:

```ocaml
# Eio_mock.Backend.run @@ fun () ->
let listening_socket = mock_listener ~n_clients:10 ~n_domains:1 in
let stop, set_stop = Promise.create () in
Fiber.both
(fun () ->
Eio.Net.run_server listening_socket handle_connection
~max_connections:2
~on_error:raise
~stop
)
(fun () ->
for _ = 1 to 2 do Fiber.yield () done;
traceln "Begin graceful shutdown";
Promise.resolve set_stop ()
);;
+tcp/80: accepted connection from tcp:127.0.0.1:30000
+flow0: read "Hi"
+tcp/80: accepted connection from tcp:127.0.0.1:30001
+flow1: read "Hi"
+flow0: wrote "Bye"
+flow0: closed
+flow1: wrote "Bye"
+flow1: closed
+tcp/80: accepted connection from tcp:127.0.0.1:30002
+flow2: read "Hi"
+tcp/80: accepted connection from tcp:127.0.0.1:30003
+flow3: read "Hi"
+Begin graceful shutdown
+flow2: wrote "Bye"
+flow2: closed
+flow3: wrote "Bye"
+flow3: closed
- : unit = ()
```