This repository has been archived by the owner on May 22, 2018. It is now read-only.
/
xmlrpc_client.ml
321 lines (290 loc) · 11.7 KB
/
xmlrpc_client.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
(*
* Copyright (C) Citrix Systems Inc.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)
open Stringext
open Pervasiveext
open Threadext
module D = Debug.Debugger(struct let name = "xmlrpc_client" end)
open D
module E = Debug.Debugger(struct let name = "mscgen" end)
module Internal = struct
let set_stunnelpid_callback : (string option -> int -> unit) option ref = ref None
let unset_stunnelpid_callback : (string option -> int -> unit) option ref = ref None
end
let user_agent = "xen-api-libs/1.0"
let connect ?session_id ?task_id ?subtask_of path =
let arg str x = Opt.default [] (Opt.map (fun x -> [ str, x ]) x) in
let cookie = arg "session_id" session_id @ (arg "task_id" task_id) @ (arg "subtask_of" subtask_of) in
Http.Request.make ~user_agent ~version:"1.0" ~keep_alive:true ~cookie ?subtask_of
Http.Connect path
let xmlrpc ?frame ?version ?keep_alive ?task_id ?cookie ?length ?auth ?subtask_of ?query ?body path =
let headers = Opt.map (fun x -> [ Http.Hdr.task_id, x ]) task_id in
Http.Request.make ~user_agent ?frame ?version ?keep_alive ?cookie ?headers ?length ?auth ?subtask_of ?query ?body
Http.Post path
(** Thrown when ECONNRESET is caught which suggests the remote crashed or restarted *)
exception Connection_reset
module StunnelDebug=Debug.Debugger(struct let name="stunnel" end)
let write_to_log x = StunnelDebug.debug "%s" (String.strip String.isspace x)
(** Return true if this fd is connected to an HTTP server by sending an XMLRPC request
for an unknown method and checking we get a matching MESSAGE_METHOD_UNKNOWN.
This is used to prevent us accidentally trying to reuse a connection which has been
closed or left in some other inconsistent state. *)
let check_reusable (x: Unix.file_descr) =
let msg_name = "system.isAlive" in
let msg_uuid = Uuid.string_of_uuid (Uuid.make_uuid ()) in
(* This is for backward compatability *)
let msg_func = Printf.sprintf "%s:%s" msg_name msg_uuid in
let msg_param = [ XMLRPC.To.string msg_uuid ] in
let xml = Xml.to_string (XMLRPC.To.methodCall msg_func msg_param) in
let http = xmlrpc ~version:"1.1" ~keep_alive:true ~body:xml "/" in
try
Http_client.rpc x http
(fun response _ ->
match response.Http.Response.content_length with
| Some len ->
let len = Int64.to_int len in
let tmp = String.make len 'X' in
let buf = Buf_io.of_fd x in
Buf_io.really_input buf tmp 0 len;
begin match XMLRPC.From.methodResponse (Xml.parse_string tmp) with
| XMLRPC.Failure("MESSAGE_METHOD_UNKNOWN", [ param ])
when param = msg_func ->
(* This must be the server pre-dates system.isAlive *)
true
| XMLRPC.Success param when param = msg_param ->
(* This must be the new server withs system.isAlive *)
true
| _ ->
StunnelDebug.debug "check_reusable: unexpected response: connection not reusable: %s" tmp;
false
end
| None ->
StunnelDebug.debug "check_reusable: no content-length from known-invalid URI: connection not reusable";
false
)
with exn ->
StunnelDebug.debug "check_reusable: caught exception %s; assuming not reusable" (Printexc.to_string exn);
false
(** Thrown when repeated attempts to connect an stunnel to a remote host and check
the connection works fail. *)
exception Stunnel_connection_failed
let get_new_stunnel_id =
let counter = ref 0 in
let m = Mutex.create () in
fun () -> Mutex.execute m (fun () -> incr counter; !counter)
(** Returns an stunnel, either from the persistent cache or a fresh one which
has been checked out and guaranteed to work. *)
let get_reusable_stunnel ?use_fork_exec_helper ?write_to_log host port verify_cert =
let start_time = Unix.gettimeofday () in
let found = ref None in
(* 1. First check if there is a suitable stunnel in the cache. *)
begin
try
while !found = None do
let (x: Stunnel.t) = Stunnel_cache.remove host port verify_cert in
if check_reusable x.Stunnel.fd
then found := Some x
else begin
StunnelDebug.debug "get_reusable_stunnel: Found non-reusable stunnel in the cache. disconnecting from %s:%d" host port;
Stunnel.disconnect x
end
done
with Not_found -> ()
end;
match !found with
| Some x ->
StunnelDebug.debug "get_reusable_stunnel: got from cache in %.2f ms for %s:%d." ((Unix.gettimeofday () -. start_time) *. 1000.) host port;
x
| None ->
StunnelDebug.debug "get_reusable_stunnel: stunnel cache is empty; creating a fresh connection to %s:%d" host port;
(* 2. Create a fresh connection and make sure it works *)
let start_time_2 = Unix.gettimeofday () in
begin
let max_attempts = 10 in
let attempt_number = ref 0 in
let delay = 10. in (* seconds *)
while !found = None && (!attempt_number < max_attempts) do
incr attempt_number;
try
let unique_id = get_new_stunnel_id () in
let (x: Stunnel.t) = Stunnel.connect ~unique_id ?use_fork_exec_helper ?write_to_log ~verify_cert host port in
if check_reusable x.Stunnel.fd
then found := Some x
else begin
StunnelDebug.error "get_reusable_stunnel: fresh stunnel failed reusable check; delaying %.2f seconds before reconnecting to %s:%d (attempt %d / %d)" delay host port !attempt_number max_attempts;
Thread.delay delay;
Stunnel.disconnect x
end
with e ->
StunnelDebug.error "get_reusable_stunnel: fresh stunnel connection failed with exception: %s: delaying %.2f seconds before reconnecting to %s:%d (attempt %d / %d)" (Printexc.to_string e) delay host port !attempt_number max_attempts;
Thread.delay delay;
done
end;
begin match !found with
| Some x ->
let now = Unix.gettimeofday () in
StunnelDebug.debug "get_reusable_stunnel: done in %.2f ms of which %.2f ms for new stunnel to %s:%d"
((now -. start_time) *. 1000.) ((now -. start_time_2) *. 1000.) host port;
x
| None ->
StunnelDebug.error "get_reusable_stunnel: failed to acquire a working stunnel to connect to %s:%d" host port;
raise Stunnel_connection_failed
end
module SSL = struct
type t = {
use_fork_exec_helper: bool;
use_stunnel_cache: bool;
verify_cert: bool;
task_id: string option
}
let make ?(use_fork_exec_helper=true) ?(use_stunnel_cache=false) ?(verify_cert=false) ?task_id () = {
use_fork_exec_helper = use_fork_exec_helper;
use_stunnel_cache = use_stunnel_cache;
verify_cert = verify_cert;
task_id = task_id
}
let to_string (x: t) =
Printf.sprintf "{ use_fork_exec_helper = %b; use_stunnel_cache = %b; verify_cert = %b; task_id = %s }"
x.use_fork_exec_helper x.use_stunnel_cache x.verify_cert
(Opt.default "None" (Opt.map (fun x -> "Some " ^ x) x.task_id))
end
type transport =
| Unix of string
| TCP of string * int
| SSL of SSL.t * string * int
let string_of_transport = function
| Unix x -> Printf.sprintf "Unix %s" x
| TCP (host, port) -> Printf.sprintf "TCP %s:%d" host port
| SSL (ssl, host, port) -> Printf.sprintf "SSL %s:%d %s" host port (SSL.to_string ssl)
let transport_of_url (scheme, _) =
let open Http.Url in
match scheme with
| File { path = path } -> Unix path
| Http ({ ssl = false } as h) ->
let port = Opt.default 80 h.port in
TCP(h.host, port)
| Http ({ ssl = true } as h) ->
let port = Opt.default 443 h.port in
SSL(SSL.make (), h.host, port)
let with_transport transport f = match transport with
| Unix path ->
let fd = Unixext.open_connection_unix_fd path in
finally
(fun () -> f fd)
(fun () -> Unix.close fd)
| TCP (host, port) ->
let fd = Unixext.open_connection_fd host port in
finally
(fun () ->
Unixext.set_tcp_nodelay fd true;
f fd)
(fun () -> Unix.close fd)
| SSL ({
SSL.use_fork_exec_helper = use_fork_exec_helper;
use_stunnel_cache = use_stunnel_cache;
verify_cert = verify_cert;
task_id = task_id}, host, port) ->
let st_proc =
if use_stunnel_cache
then get_reusable_stunnel ~use_fork_exec_helper ~write_to_log host port verify_cert
else
let unique_id = get_new_stunnel_id () in
Stunnel.connect ~use_fork_exec_helper ~write_to_log ~unique_id ~verify_cert ~extended_diagnosis:true host port in
let s = st_proc.Stunnel.fd in
let s_pid = Stunnel.getpid st_proc.Stunnel.pid in
debug "stunnel pid: %d (cached = %b) connected to %s:%d" s_pid use_stunnel_cache host port;
(* Call the {,un}set_stunnelpid_callback hooks around the remote call *)
let with_recorded_stunnelpid task_opt s_pid f =
debug "with_recorded_stunnelpid task_opt=%s s_pid=%d" (Opt.default "None" task_opt) s_pid;
begin
match !Internal.set_stunnelpid_callback with
| Some f -> f task_id s_pid
| _ -> ()
end;
finally f
(fun () ->
match !Internal.unset_stunnelpid_callback with
| Some f -> f task_id s_pid
| _ -> ()
) in
with_recorded_stunnelpid task_id s_pid
(fun () ->
finally
(fun () ->
try
f s
with e ->
warn "stunnel pid: %d caught %s" s_pid (Printexc.to_string e);
if e = Connection_reset && not use_stunnel_cache
then Stunnel.diagnose_failure st_proc;
raise e)
(fun () ->
if use_stunnel_cache
then begin
Stunnel_cache.add st_proc;
debug "stunnel pid: %d (cached = %b) returned stunnel to cache" s_pid use_stunnel_cache;
end else
begin
Unix.unlink st_proc.Stunnel.logfile;
Stunnel.disconnect st_proc
end
)
)
let with_http request f s =
try
Http_client.rpc s request (fun response s -> f (response, s))
with Unix.Unix_error(Unix.ECONNRESET, _, _) -> raise Connection_reset
let curry2 f (a, b) = f a b
module type FORMAT = sig
type response
val response_of_string: string -> response
val response_of_file_descr: Unix.file_descr -> response
type request
val request_to_string: request -> string
val request_to_short_string: request -> string
end
module XML = struct
type response = Xml.xml
let response_of_string = Xml.parse_string
let response_of_file_descr fd = Xml.parse_in (Unix.in_channel_of_descr fd)
type request = Xml.xml
let request_to_string = Xml.to_string
let request_to_short_string = Xml.to_string
end
module XMLRPC = struct
type response = Rpc.response
let response_of_string x = Xmlrpc.response_of_string x
let response_of_file_descr fd = Xmlrpc.response_of_in_channel (Unix.in_channel_of_descr fd)
type request = Rpc.call
let request_to_string x = Xmlrpc.string_of_call x
let request_to_short_string x = x.Rpc.name
end
module Protocol = functor(F: FORMAT) -> struct
(** Take an optional content_length and task_id together with a socket
and return the XMLRPC response as an XML document *)
let read_response r s =
try
match r.Http.Response.content_length with
| Some l when (Int64.to_int l) <= Sys.max_string_length ->
F.response_of_string (Unixext.really_read_string s (Int64.to_int l))
| Some _ | None -> F.response_of_file_descr s
with
| Unix.Unix_error(Unix.ECONNRESET, _, _) -> raise Connection_reset
let rpc ?(srcstr="unset") ?(dststr="unset") ~transport ~http req =
E.debug "%s=>%s [label=\"%s\"];" srcstr dststr (F.request_to_short_string req) ;
let body = F.request_to_string req in
let http = { http with Http.Request.body = Some body } in
with_transport transport (with_http http (curry2 read_response))
end
module XML_protocol = Protocol(XML)
module XMLRPC_protocol = Protocol(XMLRPC)