-
Notifications
You must be signed in to change notification settings - Fork 11
/
msg_chan.ml
90 lines (78 loc) · 2.41 KB
/
msg_chan.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
(* Copyright (C) 2015, Thomas Leonard
See the README file for details. *)
open Lwt
let unwrap_read = function
| Error e -> `Error (Format.asprintf "%a" Vchan_xen.pp_error e)
| Ok `Eof -> `Eof
| Ok (`Data x) -> `Ok x
let (>>!=) x f =
x >>= function
| `Ok y -> f y
| `Error msg -> fail (Failure msg)
| `Eof -> return `Eof
module Make (F : Formats.FRAMING) = struct
type t = {
domid : int;
vchan : Vchan_xen.flow;
mutable buffer : Cstruct.t;
read_lock : Lwt_mutex.t;
write_lock : Lwt_mutex.t;
}
let rec read_exactly t size =
let avail = Cstruct.len t.buffer in
if avail >= size then (
let retval = Cstruct.sub t.buffer 0 size in
t.buffer <- Cstruct.shift t.buffer size;
return (`Ok retval)
) else (
Vchan_xen.read t.vchan >|= unwrap_read >>!= fun buf ->
t.buffer <- Cstruct.append t.buffer buf;
read_exactly t size
)
let recv t =
Lwt_mutex.with_lock t.read_lock (fun () ->
read_exactly t F.header_size >>!= fun hdr ->
read_exactly t (F.body_size_from_header hdr) >>!= fun body ->
return (`Ok (hdr, body))
)
let recv_fixed t size =
Lwt_mutex.with_lock t.read_lock (fun () ->
read_exactly t size >>!= fun body ->
return (`Ok body)
)
let recv_raw t : Cstruct.t S.or_eof Lwt.t =
Lwt_mutex.with_lock t.read_lock @@ fun () ->
if Cstruct.len t.buffer > 0 then (
let data = t.buffer in
t.buffer <- Cstruct.create 0;
return (`Ok data)
) else (
Vchan_xen.read t.vchan >|= unwrap_read >>!= fun result ->
return (`Ok result)
)
let send t (buffers : Cstruct.t list) : unit S.or_eof Lwt.t =
Lwt_mutex.with_lock t.write_lock (fun () ->
Vchan_xen.writev t.vchan buffers >>= function
| Error `Closed -> return `Eof
| Error e -> fail (Failure (Format.asprintf "%a" Vchan_xen.pp_write_error e))
| Ok _ -> return @@ `Ok ()
)
let server ~domid ~port () =
Vchan_xen.server ~domid ~port () >|= fun vchan -> {
vchan;
domid;
buffer = Cstruct.create 0;
read_lock = Lwt_mutex.create ();
write_lock = Lwt_mutex.create ();
}
let client ~domid ~port () =
Vchan_xen.client ~domid ~port () >|= fun vchan -> {
vchan;
domid;
buffer = Cstruct.create 0;
read_lock = Lwt_mutex.create ();
write_lock = Lwt_mutex.create ();
}
let disconnect t : unit Lwt.t =
Vchan_xen.close t.vchan
end