-
Notifications
You must be signed in to change notification settings - Fork 18
/
bin_prot_test.ml
103 lines (96 loc) · 3.27 KB
/
bin_prot_test.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
open Core.Std
open Async.Std
open Print
module Fd = Unix.Fd
(* The type of values we want to transmit for testing *)
type test =
{
a : int;
b : float;
c : bool array;
d : (string * int64) list;
e : nativeint array;
f : [`Foo | `Bar] array;
g : float array;
}
[@@deriving bin_io]
(* This value is very likely larger than e.g. an order, fill, etc. *)
let test =
{
a = 42;
b = 3.1;
c = Array.init 10 ~f:(fun i -> i mod 2 = 0);
d = [ ("asdf", 421311L); ("foo.bar", -123412L); ("asdfkasdfkjasdf", Int64.zero) ];
e = Array.init 64 ~f:(fun i -> Nativeint.shift_right Nativeint.one i);
f = Array.init 20 ~f:(fun i -> if i mod 2 = 0 then `Foo else `Bar);
g = Array.init 20 ~f:float;
}
(* Number of messages to send for testing *)
let n_msgs = 100_000
(* Reader reads binary protocol messages from a pipe *)
let start_reader fd =
let reader = Reader.create fd in
let rec loop n =
(* All messages received *)
if n = 0 then
(* Make sure we now get an EOF *)
upon (Reader.read_bin_prot reader bin_reader_test) (function
| `Eof ->
print_endline "Reader success";
shutdown 0
| _ -> assert false)
else begin
(* Read another binary protocol message *)
upon (Reader.read_bin_prot reader bin_reader_test) (function
| `Ok v when v = test -> loop (n - 1)
| _ -> assert false)
end
in
loop n_msgs;
never_returns (Scheduler.go ())
(* Writer writes binary protocol messages to a pipe and waits for its
reader child process to successfully finish reading *)
let start_writer pid fd =
let writer = Writer.create fd in
let rec loop n =
if n = 0 then begin
(* All messages sent; make sure to flush writer buffers (easy to
forget!) and close the underlying file descriptor *)
Writer.close writer
>>> fun () ->
let bytes_written = Writer.bytes_written writer in
printf "Writer success: %s bytes written\n%!"
(Int63.to_string bytes_written);
(* Wait for reader to terminate successfully *)
Unix.waitpid_exn pid
>>> fun () ->
printf "All successful: transmitted %d messages\n" n_msgs;
shutdown 0
end else begin
(* Write a value to the writer using the binary protocol *)
Writer.write_bin_prot writer bin_writer_test test;
(* It's usually a good idea performance-wise to flush if the buffer
is close to full before putting in more messages, because
otherwise the buffer has to be grown (assuming default size of
131_072) - possibly without bounds. *)
if Writer.bytes_to_write writer > 100_000 then
upon (Writer.flushed writer) (fun _ -> loop (n - 1))
else loop (n - 1)
end
in
loop n_msgs;
never_returns (Scheduler.go ())
let () =
let module Unix = Core.Std.Unix in
(* Create pipe for communicating between reader and writer process *)
let ifd, ofd = Unix.pipe () in
(* Fork off reader process *)
match Unix.fork () with
| `In_the_child ->
(* We are the reader *)
Unix.close ofd;
start_reader (Fd.create Fd.Kind.Fifo ifd (Info.of_string "<parent reader>"))
| `In_the_parent pid ->
(* We are the writer *)
Unix.close ifd;
start_writer pid (Fd.create Fd.Kind.Fifo ofd (Info.of_string "<child writer>"))