Skip to content
Browse files

Update Storage_interface clients to use new functor

Signed-off-by: Rob Hoes <rob.hoes@citrix.com>
  • Loading branch information...
1 parent 419d9d5 commit a0fbe9dd4efde9d3358d9fa75ada73f56aded172 @robhoes robhoes committed
View
11 ocaml/sm-cli/main.ml
@@ -22,11 +22,14 @@ open Xmlrpc_client
let url = ref (Http.Url.File ({ Http.Url.path = "/var/xapi/storage" }, "/"))
+module RPC = struct
let rpc call =
XMLRPC_protocol.rpc ~transport:(transport_of_url !url)
~http:(xmlrpc ~version:"1.0" ?auth:(Http.Url.auth_of !url) (Http.Url.uri_of !url)) call
+end
open Storage_interface
+module Client = Client(RPC)
let task = "sm-cli"
@@ -52,14 +55,14 @@ let _ =
let args = List.filter (not ++ (String.startswith "url=")) args |> List.tl in
match args with
| [ "sr-list" ] ->
- let srs = Client.SR.list rpc ~task in
+ let srs = Client.SR.list ~task in
List.iter
(fun sr ->
Printf.printf "%s\n" sr
) srs
| [ "sr-scan"; sr ] ->
if Array.length Sys.argv < 3 then usage_and_exit ();
- begin match Client.SR.scan rpc ~task ~sr with
+ begin match Client.SR.scan ~task ~sr with
| Success (Vdis vs) ->
List.iter
(fun v ->
@@ -96,14 +99,14 @@ let _ =
then Some (String.sub k l (String.length k - l), v)
else None) kvpairs in
- begin match Client.VDI.create rpc ~task ~sr ~vdi_info ~params with
+ begin match Client.VDI.create ~task ~sr ~vdi_info ~params with
| Success (Vdi v) ->
Printf.printf "%s\n" (string_of_vdi_info v)
| x ->
Printf.fprintf stderr "Unexpected result: %s\n" (string_of_result x)
end
| [ "vdi-destroy"; sr; vdi ] ->
- begin match Client.VDI.destroy rpc ~task ~sr ~vdi with
+ begin match Client.VDI.destroy ~task ~sr ~vdi with
| Success Unit -> ()
| x ->
Printf.fprintf stderr "Unexpected result: %s\n" (string_of_result x)
View
34 ocaml/xapi/storage_access.ml
@@ -426,6 +426,8 @@ let unbind ~__context ~pbd =
let rpc call = Storage_mux.Server.process None call
+module Client = Client(struct let rpc = rpc end)
+
let start () =
let open Storage_impl.Local_domain_socket in
start Xapi_globs.storage_unix_domain_socket Storage_mux.Server.process
@@ -480,7 +482,8 @@ let of_vbd ~__context ~vbd ~domid =
let is_attached ~__context ~vbd ~domid =
let rpc, task, dp, sr, vdi = of_vbd ~__context ~vbd ~domid in
let open Vdi_automaton in
- match Client.VDI.stat rpc ~task ~sr ~vdi () with
+ let module C = Storage_interface.Client(struct let rpc = rpc end) in
+ match C.VDI.stat ~task ~sr ~vdi () with
| Success (Stat { superstate = Detached }) -> false
| Success _ -> true
| Failure _ as r -> error "Unable to query state of VDI: %s, %s" vdi (string_of_result r); false
@@ -489,7 +492,8 @@ let is_attached ~__context ~vbd ~domid =
useful for executing Storage_interface.Client.VDI functions *)
let on_vdi ~__context ~vbd ~domid f =
let rpc, task, dp, sr, vdi = of_vbd ~__context ~vbd ~domid in
- let dp = Client.DP.create rpc task dp in
+ let module C = Storage_interface.Client(struct let rpc = rpc end) in
+ let dp = C.DP.create task dp in
f rpc task dp sr vdi
let reset ~__context ~vm =
@@ -499,7 +503,7 @@ let reset ~__context ~vm =
let sr = Db.SR.get_uuid ~__context ~self:(Db.PBD.get_SR ~__context ~self:pbd) in
info "Resetting all state associated with SR: %s" sr;
expect_unit (fun () -> ())
- (Client.SR.reset rpc (Ref.string_of task) sr);
+ (Client.SR.reset (Ref.string_of task) sr);
Db.PBD.set_currently_attached ~__context ~self:pbd ~value:false;
) (System_domains.pbd_of_vm ~__context ~vm)
@@ -511,13 +515,14 @@ let attach_and_activate ~__context ~vbd ~domid ~hvm f =
let read_write = Db.VBD.get_mode ~__context ~self:vbd = `RW in
let result = on_vdi ~__context ~vbd ~domid
(fun rpc task dp sr vdi ->
+ let module C = Storage_interface.Client(struct let rpc = rpc end) in
expect_params
(fun path ->
expect_unit
(fun () ->
f path
- ) (Client.VDI.activate rpc task dp sr vdi)
- ) (Client.VDI.attach rpc task dp sr vdi read_write)
+ ) (C.VDI.activate task dp sr vdi)
+ ) (C.VDI.attach task dp sr vdi read_write)
) in
Qemu_blkfront.create ~__context ~self:vbd ~read_write hvm;
result
@@ -534,24 +539,25 @@ let deactivate_and_detach ~__context ~vbd ~domid ~unplug_frontends =
automatically detached and deactivated. *)
on_vdi ~__context ~vbd ~domid
(fun rpc task dp sr vdi ->
+ let module C = Storage_interface.Client(struct let rpc = rpc end) in
expect_unit (fun () -> ())
- (Client.DP.destroy rpc task dp false)
+ (C.DP.destroy task dp false)
)
let diagnostics ~__context =
expect_string (fun x -> x)
- (Storage_interface.Client.DP.diagnostics rpc ())
+ (Client.DP.diagnostics ())
let dp_destroy ~__context dp allow_leak =
let task = Context.get_task_id __context in
expect_unit (fun () -> ())
- (Client.DP.destroy rpc (Ref.string_of task) dp allow_leak)
+ (Client.DP.destroy (Ref.string_of task) dp allow_leak)
(* Set my PBD.currently_attached fields in the Pool database to match the local one *)
let resynchronise_pbds ~__context ~pbds =
let task = Context.get_task_id __context in
- let srs = Client.SR.list rpc (Ref.string_of task) in
+ let srs = Client.SR.list (Ref.string_of task) in
debug "Currently-attached SRs: [ %s ]" (String.concat "; " srs);
List.iter
(fun self ->
@@ -569,7 +575,7 @@ let resynchronise_pbds ~__context ~pbds =
(* This is a layering violation. The layers are:
xapi: has a pool-wide view
storage_impl: has a host-wide view of SRs and VDIs
- SM: has a SR-wide view
+ SM: has a SR-wide viep
Unfortunately the SM is storing some of its critical state (VDI-host locks) in the xapi
metadata rather than on the backend storage. The xapi metadata is generally not authoritative
and must be synchronised against the state of the world. Therefore we must synchronise the
@@ -620,14 +626,14 @@ let refresh_local_vdi_activations ~__context =
(fun () -> Hashtbl.replace Builtin_impl.VDI.vdi_read_write key (ro_rw = RW)) in
let task = Ref.string_of (Context.get_task_id __context) in
- let srs = Client.SR.list rpc task in
+ let srs = Client.SR.list task in
List.iter
(fun (vdi_ref, vdi_rec) ->
let sr = Db.SR.get_uuid ~__context ~self:vdi_rec.API.vDI_SR in
let vdi = vdi_rec.API.vDI_location in
if List.mem sr srs
then
- match Client.VDI.stat rpc ~task ~sr ~vdi () with
+ match Client.VDI.stat ~task ~sr ~vdi () with
| Success (Stat { superstate = Activated RO }) ->
lock_vdi (vdi_ref, vdi_rec) RO;
remember (sr, vdi) RO
@@ -666,11 +672,11 @@ let destroy_sr ~__context ~sr =
bind ~__context ~pbd;
let task = Ref.string_of (Context.get_task_id __context) in
expect_unit (fun () -> ())
- (Client.SR.attach rpc task (Db.SR.get_uuid ~__context ~self:sr) pbd_t.API.pBD_device_config);
+ (Client.SR.attach task (Db.SR.get_uuid ~__context ~self:sr) pbd_t.API.pBD_device_config);
(* The current backends expect the PBD to be temporarily set to currently_attached = true *)
Db.PBD.set_currently_attached ~__context ~self:pbd ~value:true;
expect_unit (fun () -> ())
- (Client.SR.destroy rpc task (Db.SR.get_uuid ~__context ~self:sr));
+ (Client.SR.destroy task (Db.SR.get_uuid ~__context ~self:sr));
(* All PBDs are clearly currently_attached = false now *)
Db.PBD.set_currently_attached ~__context ~self:pbd ~value:false;
unbind ~__context ~pbd
View
58 ocaml/xapi/storage_mux.ml
@@ -76,40 +76,64 @@ module Mux = struct
version = "0.1";
features = [];
}
-
module DP = struct
let create context ~task ~id = id (* XXX: is this pointless? *)
let destroy context ~task ~dp ~allow_leak =
(* Tell each plugin about this *)
- fail_or choose (multicast (Client.DP.destroy ~task ~dp ~allow_leak))
+ fail_or choose (multicast (fun rpc ->
+ let module C = Client(struct let rpc = rpc end) in
+ C.DP.destroy ~task ~dp ~allow_leak))
let diagnostics context () =
let combine results =
let all = List.fold_left (fun acc (sr, result) ->
Printf.sprintf "For SR: %s" sr :: (string_of_result result) :: acc) [] results in
Success (String (String.concat "\n" all)) in
- fail_or combine (multicast (fun rpc -> Client.DP.diagnostics rpc ()))
+ fail_or combine (multicast (fun rpc ->
+ let module C = Client(struct let rpc = rpc end) in
+ C.DP.diagnostics ()))
end
-
module SR = struct
- let attach context ~task ~sr = Client.SR.attach (of_sr sr) ~task ~sr
- let detach context ~task ~sr = Client.SR.detach (of_sr sr) ~task ~sr
- let destroy context ~task ~sr = Client.SR.destroy (of_sr sr) ~task ~sr
- let scan context ~task ~sr = Client.SR.scan (of_sr sr) ~task ~sr
+ let attach context ~task ~sr =
+ let module C = Client(struct let rpc = of_sr sr end) in
+ C.SR.attach ~task ~sr
+ let detach context ~task ~sr =
+ let module C = Client(struct let rpc = of_sr sr end) in
+ C.SR.detach ~task ~sr
+ let destroy context ~task ~sr =
+ let module C = Client(struct let rpc = of_sr sr end) in
+ C.SR.destroy ~task ~sr
+ let scan context ~task ~sr =
+ let module C = Client(struct let rpc = of_sr sr end) in
+ C.SR.scan ~task ~sr
let list context ~task =
- List.fold_left (fun acc (sr, list) -> list @ acc) [] (multicast (Client.SR.list ~task))
+ List.fold_left (fun acc (sr, list) -> list @ acc) [] (multicast (fun rpc ->
+ let module C = Client(struct let rpc = rpc end) in
+ C.SR.list ~task))
let reset context ~task ~sr = assert false
end
module VDI = struct
let create context ~task ~sr ~vdi_info ~params =
- Client.VDI.create (of_sr sr) ~task ~sr ~vdi_info ~params
-
- let stat context ~task ~sr ~vdi = Client.VDI.stat (of_sr sr) ~task ~sr ~vdi
- let destroy context ~task ~sr ~vdi = Client.VDI.destroy (of_sr sr) ~task ~sr ~vdi
+ let module C = Client(struct let rpc = of_sr sr end) in
+ C.VDI.create ~task ~sr ~vdi_info ~params
+
+ let stat context ~task ~sr ~vdi =
+ let module C = Client(struct let rpc = of_sr sr end) in
+ C.VDI.stat ~task ~sr ~vdi
+ let destroy context ~task ~sr ~vdi =
+ let module C = Client(struct let rpc = of_sr sr end) in
+ C.VDI.destroy ~task ~sr ~vdi
let attach context ~task ~dp ~sr ~vdi ~read_write =
- Client.VDI.attach (of_sr sr) ~task ~dp ~sr ~vdi ~read_write
- let activate context ~task ~dp ~sr ~vdi = Client.VDI.activate (of_sr sr) ~task ~dp ~sr ~vdi
- let deactivate context ~task ~dp ~sr ~vdi = Client.VDI.deactivate (of_sr sr) ~task ~dp ~sr ~vdi
- let detach context ~task ~dp ~sr ~vdi = Client.VDI.detach (of_sr sr) ~task ~dp ~sr ~vdi
+ let module C = Client(struct let rpc = of_sr sr end) in
+ C.VDI.attach ~task ~dp ~sr ~vdi ~read_write
+ let activate context ~task ~dp ~sr ~vdi =
+ let module C = Client(struct let rpc = of_sr sr end) in
+ C.VDI.activate ~task ~dp ~sr ~vdi
+ let deactivate context ~task ~dp ~sr ~vdi =
+ let module C = Client(struct let rpc = of_sr sr end) in
+ C.VDI.deactivate ~task ~dp ~sr ~vdi
+ let detach context ~task ~dp ~sr ~vdi =
+ let module C = Client(struct let rpc = of_sr sr end) in
+ C.VDI.detach ~task ~dp ~sr ~vdi
end
end
View
36 ocaml/xapi/storage_proxy.ml
@@ -24,29 +24,31 @@ end
module Proxy = functor(RPC: RPC) -> struct
type context = Smint.request
- let query _ = Client.query RPC.rpc
+ module Client = Client(RPC)
+
+ let query _ = Client.query
module DP = struct
- let create _ = Client.DP.create RPC.rpc
- let destroy _ = Client.DP.destroy RPC.rpc
- let diagnostics _ = Client.DP.diagnostics RPC.rpc
+ let create _ = Client.DP.create
+ let destroy _ = Client.DP.destroy
+ let diagnostics _ = Client.DP.diagnostics
end
module SR = struct
- let attach _ = Client.SR.attach RPC.rpc
- let detach _ = Client.SR.detach RPC.rpc
- let reset _ = Client.SR.reset RPC.rpc
- let destroy _ = Client.SR.destroy RPC.rpc
- let scan _ = Client.SR.scan RPC.rpc
- let list _ = Client.SR.list RPC.rpc
+ let attach _ = Client.SR.attach
+ let detach _ = Client.SR.detach
+ let reset _ = Client.SR.reset
+ let destroy _ = Client.SR.destroy
+ let scan _ = Client.SR.scan
+ let list _ = Client.SR.list
end
module VDI = struct
- let attach _ = Client.VDI.attach RPC.rpc
- let activate _ = Client.VDI.activate RPC.rpc
- let deactivate _ = Client.VDI.deactivate RPC.rpc
- let detach _ = Client.VDI.detach RPC.rpc
+ let attach _ = Client.VDI.attach
+ let activate _ = Client.VDI.activate
+ let deactivate _ = Client.VDI.deactivate
+ let detach _ = Client.VDI.detach
- let stat _ = Client.VDI.stat RPC.rpc
+ let stat _ = Client.VDI.stat
- let create _ = Client.VDI.create RPC.rpc
- let destroy _ = Client.VDI.destroy RPC.rpc
+ let create _ = Client.VDI.create
+ let destroy _ = Client.VDI.destroy
end
end
View
3 ocaml/xapi/system_domains.ml
@@ -146,7 +146,8 @@ let queryable ip port () =
let open Xmlrpc_client in
let rpc = XMLRPC_protocol.rpc ~transport:(TCP(ip, port)) ~http:(xmlrpc ~version:"1.0" "/") in
try
- let q = Storage_interface.Client.query rpc () in
+ let module C = Storage_interface.Client(struct let rpc = rpc end) in
+ let q = C.query () in
info "%s:%s:%s at %s:%d" q.Storage_interface.name q.Storage_interface.vendor q.Storage_interface.version ip port;
true
with _ -> false
View
6 ocaml/xapi/xapi_pbd.ml
@@ -103,6 +103,8 @@ let check_sharing_constraint ~__context ~self =
[ Ref.string_of self; Ref.string_of (Db.PBD.get_host ~__context ~self:(List.hd others)) ]))
end
+module C = Storage_interface.Client(struct let rpc = Storage_access.rpc end)
+
let plug ~__context ~self =
let currently_attached = Db.PBD.get_currently_attached ~__context ~self in
if not currently_attached then
@@ -113,7 +115,7 @@ let plug ~__context ~self =
let task = Ref.string_of (Context.get_task_id __context) in
let device_config = Db.PBD.get_device_config ~__context ~self in
Storage_access.expect_unit (fun () -> ())
- (Storage_interface.Client.SR.attach Storage_access.rpc task (Db.SR.get_uuid ~__context ~self:sr) device_config);
+ (C.SR.attach task (Db.SR.get_uuid ~__context ~self:sr) device_config);
Db.PBD.set_currently_attached ~__context ~self ~value:true;
end
@@ -164,7 +166,7 @@ let unplug ~__context ~self =
end;
let task = Ref.string_of (Context.get_task_id __context) in
Storage_access.expect_unit (fun () -> ())
- (Storage_interface.Client.SR.detach Storage_access.rpc task (Db.SR.get_uuid ~__context ~self:sr));
+ (C.SR.detach task (Db.SR.get_uuid ~__context ~self:sr));
Storage_access.unbind ~__context ~pbd:self;
Db.PBD.set_currently_attached ~__context ~self ~value:false
end
View
4 ocaml/xapi/xapi_sr.ml
@@ -519,9 +519,9 @@ let scan ~__context ~sr =
let open Storage_access in
let task = Context.get_task_id __context in
let open Storage_interface in
-
+ let module C = Client(struct let rpc = rpc end) in
let sr' = Ref.string_of sr in
- match Client.SR.scan rpc ~task:(Ref.string_of task) ~sr:(Db.SR.get_uuid ~__context ~self:sr) with
+ match C.SR.scan ~task:(Ref.string_of task) ~sr:(Db.SR.get_uuid ~__context ~self:sr) with
| Success (Vdis vs) ->
let db_vdis = Db.VDI.get_records_where ~__context ~expr:(Eq(Field "SR", Literal sr')) in
update_vdis ~__context ~sr:sr db_vdis vs;
View
9 ocaml/xapi/xapi_vdi.ml
@@ -235,7 +235,9 @@ let create ~__context ~name_label ~name_description
if virtual_size < vi.virtual_size
then info "sr:%s vdi:%s requested virtual size %Ld < actual virtual size %Ld" (Ref.string_of sR) vi.vdi virtual_size vi.virtual_size;
newvdi ~__context ~sr:sR vi
- ) (Client.VDI.create rpc ~task:(Ref.string_of task) ~sr:(Db.SR.get_uuid ~__context ~self:sR)
+ ) (
+ let module C = Client(struct let rpc = rpc end) in
+ C.VDI.create ~task:(Ref.string_of task) ~sr:(Db.SR.get_uuid ~__context ~self:sR)
~vdi_info ~params:sm_config)
(* Make the database record only *)
@@ -387,8 +389,9 @@ let destroy ~__context ~self =
let open Storage_access in
let open Storage_interface in
let task = Context.get_task_id __context in
- expect_unit (fun () -> ())
- (Client.VDI.destroy rpc ~task:(Ref.string_of task) ~sr:(Db.SR.get_uuid ~__context ~self:sr) ~vdi:location);
+ expect_unit (fun () -> ()) (
+ let module C = Client(struct let rpc = rpc end) in
+ C.VDI.destroy ~task:(Ref.string_of task) ~sr:(Db.SR.get_uuid ~__context ~self:sr) ~vdi:location);
(* destroy all the VBDs now rather than wait for the GC thread. This helps
prevent transient glitches but doesn't totally prevent races. *)
View
9 ocaml/xapi/xapi_vm_migrate.ml
@@ -281,8 +281,9 @@ let transmitter ~xal ~__context is_localhost_migration fd vm_migrate_failed host
then Storage_access.Qemu_blkfront.destroy ~__context ~self:vbd;
Storage_access.on_vdi ~__context ~vbd ~domid
(fun rpc task datapath_id sr vdi ->
+ let module C = Storage_interface.Client(struct let rpc = rpc end) in
Storage_access.expect_unit (fun () -> ())
- (Storage_interface.Client.VDI.deactivate rpc task datapath_id sr vdi)
+ (C.VDI.deactivate task datapath_id sr vdi)
)
) vbds;
@@ -404,8 +405,9 @@ let receiver ~__context ~localhost is_localhost_migration fd vm xc xs memory_req
let read_write = Db.VBD.get_mode ~__context ~self:vbd = `RW in
Storage_access.on_vdi ~__context ~vbd ~domid
(fun rpc task datapath_id sr vdi ->
+ let module C = Storage_interface.Client(struct let rpc = rpc end) in
Storage_access.expect_params (fun _ -> ())
- (Storage_interface.Client.VDI.attach rpc task datapath_id sr vdi read_write)
+ (C.VDI.attach task datapath_id sr vdi read_write)
)
) (Storage_access.vbd_attach_order ~__context vbds);
with exn ->
@@ -475,8 +477,9 @@ let receiver ~__context ~localhost is_localhost_migration fd vm xc xs memory_req
(fun vbd ->
Storage_access.on_vdi ~__context ~vbd ~domid
(fun rpc task datapath_id sr vdi ->
+ let module C = Storage_interface.Client(struct let rpc = rpc end) in
Storage_access.expect_unit (fun () -> ())
- (Storage_interface.Client.VDI.activate rpc task datapath_id sr vdi)
+ (C.VDI.activate task datapath_id sr vdi)
)
) vbds;

0 comments on commit a0fbe9d

Please sign in to comment.
Something went wrong with that request. Please try again.