diff --git a/main.ml b/main.ml index e5b9ca1..49d02cb 100644 --- a/main.ml +++ b/main.ml @@ -36,6 +36,9 @@ let info fmt = end ) fmt +let _nonpersistent = "NONPERSISTENT" +let _clone_on_boot_key = "clone-on-boot" + let backend_error name args = let open Storage_interface in let exnty = Exception.Backend_error (name, args) in @@ -106,6 +109,10 @@ let fork_exec_rpc root_dir script_name args response_of_rpc = end end +let script root_dir name kind script = match kind with +| `Volume -> Filename.(concat (concat root_dir name) script) +| `Datapath datapath -> Filename.(concat (concat (concat (dirname root_dir) "datapath") datapath) script) + module Attached_SRs = struct let sr_table : string String.Table.t ref = ref (String.Table.create ()) let state_path = ref None @@ -146,6 +153,34 @@ module Attached_SRs = struct return () end +module Datapath_plugins = struct + let table: Storage.Plugin.Types.query_result String.Table.t ref = ref (String.Table.create ()) + + let register root_dir name = + let args = Storage.Plugin.Types.Plugin.Query.In.make "register" in + let args = Storage.Plugin.Types.Plugin.Query.In.rpc_of_t args in + fork_exec_rpc root_dir (script root_dir name (`Datapath name) "Plugin.Query") args Storage.Plugin.Types.Plugin.Query.Out.t_of_rpc + >>= function + | Ok response -> + info "Registered datapath plugin %s" name + >>= fun () -> + Hashtbl.replace !table name response; + return () + | _ -> + info "Failed to register datapath plugin %s" name + >>= fun () -> + return () + + let unregister root_dir name = + Hashtbl.remove !table name; + return () + + let supports_feature scheme feature = + match Hashtbl.find !table scheme with + | None -> false + | Some query_result -> List.mem query_result.Storage.Plugin.Types.features feature +end + let vdi_of_volume x = let open Storage_interface in { vdi = x.Storage.Volume.Types.key; @@ -164,27 +199,56 @@ let vdi_of_volume x = persistent = true; } -let choose_datapath = function - | [] -> return (Error (missing_uri ())) - | uri :: _ -> - let uri' = Uri.of_string uri in - let domain = "0" in - begin match Uri.scheme uri' with - | None -> return (Error (missing_uri ())) - | Some scheme -> return (Ok (scheme, uri, domain)) - end - -let script root_dir name kind script = match kind with -| `Volume -> Filename.(concat (concat root_dir name) script) -| `Datapath datapath -> Filename.(concat (concat (concat (dirname root_dir) "datapath") datapath) script) - let stat root_dir name dbg sr vdi = let args = Storage.Volume.Types.Volume.Stat.In.make dbg sr vdi in let args = Storage.Volume.Types.Volume.Stat.In.rpc_of_t args in - let open Deferred.Result.Monad_infix in fork_exec_rpc root_dir (script root_dir name `Volume "Volume.stat") args Storage.Volume.Types.Volume.Stat.Out.t_of_rpc - >>= fun response -> - choose_datapath response.Storage.Volume.Types.uri + +let clone root_dir name dbg sr vdi = + let args = Storage.Volume.Types.Volume.Clone.In.make dbg sr vdi in + let args = Storage.Volume.Types.Volume.Clone.In.rpc_of_t args in + fork_exec_rpc root_dir (script root_dir name `Volume "Volume.clone") args Storage.Volume.Types.Volume.Clone.Out.t_of_rpc + +let destroy root_dir name dbg sr vdi = + let args = Storage.Volume.Types.Volume.Destroy.In.make dbg sr vdi in + let args = Storage.Volume.Types.Volume.Destroy.In.rpc_of_t args in + fork_exec_rpc root_dir (script root_dir name `Volume "Volume.destroy") args Storage.Volume.Types.Volume.Destroy.Out.t_of_rpc + +let set root_dir name dbg sr vdi k v = + let args = Storage.Volume.Types.Volume.Set.In.make dbg sr vdi k v in + let args = Storage.Volume.Types.Volume.Set.In.rpc_of_t args in + fork_exec_rpc root_dir (script root_dir name `Volume "Volume.set") args Storage.Volume.Types.Volume.Set.Out.t_of_rpc + +let unset root_dir name dbg sr vdi k = + let args = Storage.Volume.Types.Volume.Unset.In.make dbg sr vdi k in + let args = Storage.Volume.Types.Volume.Unset.In.rpc_of_t args in + fork_exec_rpc root_dir (script root_dir name `Volume "Volume.unset") args Storage.Volume.Types.Volume.Unset.Out.t_of_rpc + +let choose_datapath ?(persistent = true) response = + (* We can only use a URI with a valid scheme, since we use the scheme + to name the datapath plugin. *) + let possible = + List.filter_map ~f:(fun x -> + let uri = Uri.of_string x in + match Uri.scheme uri with + | None -> None + | Some scheme -> Some (scheme, x) + ) response.Storage.Volume.Types.uri in + (* We can only use URIs whose schemes correspond to registered plugins *) + let possible = List.filter ~f:(fun (scheme, _) -> Hashtbl.mem !Datapath_plugins.table scheme) possible in + (* If we want to be non-persistent, we prefer if the datapath plugin supports it natively *) + let preference_order = + if persistent + then possible + else + let supports_nonpersistent, others = List.partition_map ~f:(fun (scheme, uri) -> + if Datapath_plugins.supports_feature scheme _nonpersistent + then `Fst (scheme, uri) else `Snd (scheme, uri) + ) possible in + supports_nonpersistent @ others in + match preference_order with + | [] -> return (Error (missing_uri ())) + | (scheme, u) :: us -> return (Ok (scheme, u, "0")) (* Process a message *) let process root_dir name x = @@ -245,6 +309,12 @@ let process root_dir name x = "VDI_ATTACH"; "VDI_DETACH"; "VDI_ACTIVATE"; "VDI_DEACTIVATE"; "VDI_INTRODUCE" ] in + (* If we have the ability to clone a disk then we can provide + clone on boot. *) + let features = + if List.mem features "VDI_CLONE" + then "VDI_RESET_ON_BOOT/2" :: features + else features in let response = { driver = response.Storage.Plugin.Types.plugin; name = response.Storage.Plugin.Types.name; @@ -330,6 +400,13 @@ let process root_dir name x = let args = Storage.Volume.Types.SR.Ls.In.rpc_of_t args in fork_exec_rpc root_dir (script root_dir name `Volume "SR.ls") args Storage.Volume.Types.SR.Ls.Out.t_of_rpc >>= fun response -> + (* Filter out volumes which are clone-on-boot transients *) + let transients = List.fold ~f:(fun set x -> + match List.Assoc.find x.Storage.Volume.Types.keys _clone_on_boot_key with + | None -> set + | Some transient -> Set.add set transient + ) ~init:(Set.empty ~comparator:String.comparator) response in + let response = List.filter ~f:(fun x -> not(Set.mem transients x.Storage.Volume.Types.key)) response in let response = List.map ~f:vdi_of_volume response in Deferred.Result.return (R.success (Args.SR.Scan.rpc_of_response response)) | { R.name = "VDI.create"; R.params = [ args ] } -> @@ -354,14 +431,19 @@ let process root_dir name x = let args = Args.VDI.Destroy.request_of_rpc args in Attached_SRs.find args.Args.VDI.Destroy.sr >>= fun sr -> - let args = Storage.Volume.Types.Volume.Destroy.In.make - args.Args.VDI.Destroy.dbg - sr - args.Args.VDI.Destroy.vdi in - let args = Storage.Volume.Types.Volume.Destroy.In.rpc_of_t args in - fork_exec_rpc root_dir (script root_dir name `Volume "Volume.destroy") args Storage.Volume.Types.Volume.Destroy.Out.t_of_rpc + stat root_dir name args.Args.VDI.Destroy.dbg sr args.Args.VDI.Destroy.vdi >>= fun response -> - Deferred.Result.return (R.success (Args.VDI.Destroy.rpc_of_response response)) + (* Destroy any clone-on-boot volume that might exist *) + ( match List.Assoc.find response.Storage.Volume.Types.keys _clone_on_boot_key with + | None -> + return (Ok ()) + | Some temporary -> + (* Destroy the temporary disk we made earlier *) + destroy root_dir name args.Args.VDI.Destroy.dbg sr temporary + ) >>= fun () -> + destroy root_dir name args.Args.VDI.Destroy.dbg sr args.Args.VDI.Destroy.vdi + >>= fun () -> + Deferred.Result.return (R.success (Args.VDI.Destroy.rpc_of_response ())) | { R.name = "VDI.snapshot"; R.params = [ args ] } -> let open Deferred.Result.Monad_infix in let args = Args.VDI.Snapshot.request_of_rpc args in @@ -383,12 +465,7 @@ let process root_dir name x = Attached_SRs.find args.Args.VDI.Clone.sr >>= fun sr -> let vdi_info = args.Args.VDI.Clone.vdi_info in - let args = Storage.Volume.Types.Volume.Clone.In.make - args.Args.VDI.Clone.dbg - sr - vdi_info.vdi in - let args = Storage.Volume.Types.Volume.Clone.In.rpc_of_t args in - fork_exec_rpc root_dir (script root_dir name `Volume "Volume.clone") args Storage.Volume.Types.Volume.Clone.Out.t_of_rpc + clone root_dir name args.Args.VDI.Clone.dbg sr vdi_info.vdi >>= fun response -> let response = vdi_of_volume response in Deferred.Result.return (R.success (Args.VDI.Clone.rpc_of_response response)) @@ -431,9 +508,7 @@ let process root_dir name x = fork_exec_rpc root_dir (script root_dir name `Volume "Volume.resize") args Storage.Volume.Types.Volume.Resize.Out.t_of_rpc >>= fun () -> (* Now call Volume.stat to discover the size *) - let args = Storage.Volume.Types.Volume.Stat.In.make dbg sr vdi in - let args = Storage.Volume.Types.Volume.Stat.In.rpc_of_t args in - fork_exec_rpc root_dir (script root_dir name `Volume "Volume.stat") args Storage.Volume.Types.Volume.Stat.Out.t_of_rpc + stat root_dir name dbg sr vdi >>= fun response -> Deferred.Result.return (R.success (Args.VDI.Resize.rpc_of_response response.Storage.Volume.Types.virtual_size)) | { R.name = "VDI.stat"; R.params = [ args ] } -> @@ -442,12 +517,7 @@ let process root_dir name x = Attached_SRs.find args.Args.VDI.Stat.sr >>= fun sr -> let vdi = args.Args.VDI.Stat.vdi in - let args = Storage.Volume.Types.Volume.Stat.In.make - args.Args.VDI.Stat.dbg - sr - vdi in - let args = Storage.Volume.Types.Volume.Stat.In.rpc_of_t args in - fork_exec_rpc root_dir (script root_dir name `Volume "Volume.stat") args Storage.Volume.Types.Volume.Stat.Out.t_of_rpc + stat root_dir name args.Args.VDI.Stat.dbg sr vdi >>= fun response -> let response = vdi_of_volume response in Deferred.Result.return (R.success (Args.VDI.Stat.rpc_of_response response)) @@ -457,12 +527,7 @@ let process root_dir name x = Attached_SRs.find args.Args.VDI.Introduce.sr >>= fun sr -> let vdi = args.Args.VDI.Introduce.location in - let args = Storage.Volume.Types.Volume.Stat.In.make - args.Args.VDI.Introduce.dbg - sr - vdi in - let args = Storage.Volume.Types.Volume.Stat.In.rpc_of_t args in - fork_exec_rpc root_dir (script root_dir name `Volume "Volume.stat") args Storage.Volume.Types.Volume.Stat.Out.t_of_rpc + stat root_dir name args.Args.VDI.Introduce.dbg sr vdi >>= fun response -> let response = vdi_of_volume response in Deferred.Result.return (R.success (Args.VDI.Introduce.rpc_of_response response)) @@ -472,10 +537,16 @@ let process root_dir name x = Attached_SRs.find args.Args.VDI.Attach.sr >>= fun sr -> (* Discover the URIs using Volume.stat *) - stat root_dir name - args.Args.VDI.Attach.dbg - sr - args.Args.VDI.Attach.vdi + stat root_dir name args.Args.VDI.Attach.dbg sr args.Args.VDI.Attach.vdi + >>= fun response -> + (* If we have a clone-on-boot volume then use that instead *) + ( match List.Assoc.find response.Storage.Volume.Types.keys _clone_on_boot_key with + | None -> + return (Ok response) + | Some temporary -> + stat root_dir name args.Args.VDI.Attach.dbg sr temporary + ) >>= fun response -> + choose_datapath response >>= fun (datapath, uri, domain) -> let args' = Storage.Datapath.Types.Datapath.Attach.In.make args.Args.VDI.Attach.dbg @@ -500,10 +571,16 @@ let process root_dir name x = Attached_SRs.find args.Args.VDI.Activate.sr >>= fun sr -> (* Discover the URIs using Volume.stat *) - stat root_dir name - args.Args.VDI.Activate.dbg - sr - args.Args.VDI.Activate.vdi + stat root_dir name args.Args.VDI.Activate.dbg sr args.Args.VDI.Activate.vdi + >>= fun response -> + (* If we have a clone-on-boot volume then use that instead *) + ( match List.Assoc.find response.Storage.Volume.Types.keys _clone_on_boot_key with + | None -> + return (Ok response) + | Some temporary -> + stat root_dir name args.Args.VDI.Activate.dbg sr temporary + ) >>= fun response -> + choose_datapath response >>= fun (datapath, uri, domain) -> let args' = Storage.Datapath.Types.Datapath.Activate.In.make args.Args.VDI.Activate.dbg @@ -518,10 +595,15 @@ let process root_dir name x = Attached_SRs.find args.Args.VDI.Deactivate.sr >>= fun sr -> (* Discover the URIs using Volume.stat *) - stat root_dir name - args.Args.VDI.Deactivate.dbg - sr - args.Args.VDI.Deactivate.vdi + stat root_dir name args.Args.VDI.Deactivate.dbg sr args.Args.VDI.Deactivate.vdi + >>= fun response -> + ( match List.Assoc.find response.Storage.Volume.Types.keys _clone_on_boot_key with + | None -> + return (Ok response) + | Some temporary -> + stat root_dir name args.Args.VDI.Deactivate.dbg sr temporary + ) >>= fun response -> + choose_datapath response >>= fun (datapath, uri, domain) -> let args' = Storage.Datapath.Types.Datapath.Deactivate.In.make args.Args.VDI.Deactivate.dbg @@ -536,10 +618,15 @@ let process root_dir name x = Attached_SRs.find args.Args.VDI.Detach.sr >>= fun sr -> (* Discover the URIs using Volume.stat *) - stat root_dir name - args.Args.VDI.Detach.dbg - sr - args.Args.VDI.Detach.vdi + stat root_dir name args.Args.VDI.Detach.dbg sr args.Args.VDI.Detach.vdi + >>= fun response -> + ( match List.Assoc.find response.Storage.Volume.Types.keys _clone_on_boot_key with + | None -> + return (Ok response) + | Some temporary -> + stat root_dir name args.Args.VDI.Detach.dbg sr temporary + ) >>= fun response -> + choose_datapath response >>= fun (datapath, uri, domain) -> let args' = Storage.Datapath.Types.Datapath.Detach.In.make args.Args.VDI.Detach.dbg @@ -569,18 +656,74 @@ let process root_dir name x = let args = Args.VDI.Epoch_begin.request_of_rpc args in Attached_SRs.find args.Args.VDI.Epoch_begin.sr >>= fun sr -> - (* FIXME: will some backends do this in special ways? - See [djs55/xapi-storage#19] *) - Deferred.Result.return (R.success (Args.VDI.Epoch_begin.rpc_of_response ())) + (* Discover the URIs using Volume.stat *) + let persistent = args.Args.VDI.Epoch_begin.persistent in + stat root_dir name args.Args.VDI.Epoch_begin.dbg sr args.Args.VDI.Epoch_begin.vdi + >>= fun response -> + choose_datapath ~persistent response + >>= fun (datapath, uri, domain) -> + (* If non-persistent and the datapath plugin supports NONPERSISTENT + then we delegate this to the datapath plugin. Otherwise we will + make a temporary clone now and attach/detach etc this file. *) + if Datapath_plugins.supports_feature datapath _nonpersistent then begin + (* We delegate handling non-persistent disks to the datapath plugin. *) + let args = Storage.Datapath.Types.Datapath.Open.In.make + args.Args.VDI.Epoch_begin.dbg + uri persistent in + let args = Storage.Datapath.Types.Datapath.Open.In.rpc_of_t args in + fork_exec_rpc root_dir (script root_dir name (`Datapath datapath) "Datapath.open") args Storage.Datapath.Types.Datapath.Open.Out.t_of_rpc + >>= fun () -> + Deferred.Result.return (R.success (Args.VDI.Epoch_begin.rpc_of_response ())) + end else begin + (* We create a non-persistent disk here with Volume.clone, and store + the name of the cloned disk in the metadata of the original. *) + ( match List.Assoc.find response.Storage.Volume.Types.keys _clone_on_boot_key with + | None -> + return (Ok ()) + | Some temporary -> + (* Destroy the temporary disk we made earlier *) + destroy root_dir name args.Args.VDI.Epoch_begin.dbg sr temporary + ) >>= fun () -> + clone root_dir name args.Args.VDI.Epoch_begin.dbg sr args.Args.VDI.Epoch_begin.vdi + >>= fun vdi -> + set root_dir name args.Args.VDI.Epoch_begin.dbg sr args.Args.VDI.Epoch_begin.vdi _clone_on_boot_key vdi.Storage.Volume.Types.key + >>= fun () -> + Deferred.Result.return (R.success (Args.VDI.Epoch_begin.rpc_of_response ())) + end | { R.name = "VDI.epoch_end"; R.params = [ args ] } -> let open Deferred.Result.Monad_infix in let args = Args.VDI.Epoch_end.request_of_rpc args in Attached_SRs.find args.Args.VDI.Epoch_end.sr >>= fun sr -> - (* FIXME: will some backends do this in special ways? - See [djs55/xapi-storage#19] *) - Deferred.Result.return (R.success (Args.VDI.Epoch_end.rpc_of_response ())) - + (* Discover the URIs using Volume.stat *) + stat root_dir name args.Args.VDI.Epoch_end.dbg sr args.Args.VDI.Epoch_end.vdi + >>= fun response -> + choose_datapath response + >>= fun (datapath, uri, domain) -> + if Datapath_plugins.supports_feature datapath _nonpersistent then begin + let args = Storage.Datapath.Types.Datapath.Close.In.make + args.Args.VDI.Epoch_end.dbg + uri in + let args = Storage.Datapath.Types.Datapath.Close.In.rpc_of_t args in + fork_exec_rpc root_dir (script root_dir name (`Datapath datapath) "Datapath.close") args Storage.Datapath.Types.Datapath.Close.Out.t_of_rpc + >>= fun () -> + Deferred.Result.return (R.success (Args.VDI.Epoch_end.rpc_of_response ())) + end else begin + match List.Assoc.find response.Storage.Volume.Types.keys _clone_on_boot_key with + | None -> + Deferred.Result.return (R.success (Args.VDI.Epoch_end.rpc_of_response ())) + | Some temporary -> + (* Destroy the temporary disk we made earlier *) + destroy root_dir name args.Args.VDI.Epoch_end.dbg sr temporary + >>= fun () -> + unset root_dir name args.Args.VDI.Epoch_end.dbg sr args.Args.VDI.Epoch_end.vdi _clone_on_boot_key + >>= fun () -> + Deferred.Result.return (R.success (Args.VDI.Epoch_end.rpc_of_response ())) + end + | { R.name = "VDI.set_persistent"; R.params = [ args ] } -> + let open Deferred.Result.Monad_infix in + (* We don't do anything until the VDI.epoch_begin *) + Deferred.Result.return (R.success (Args.VDI.Set_persistent.rpc_of_response ())) | { R.name = name } -> Deferred.return (Error (backend_error "UNIMPLEMENTED" [ name ]))) >>= function @@ -604,49 +747,43 @@ let get_ok = function Format.pp_print_flush fmt (); failwith (Buffer.contents b) -let create switch_path root_dir name = - if Hashtbl.mem servers name - then return () - else begin - info "Adding %s" name - >>= fun () -> - Protocol_async.Server.listen ~process:(process root_dir name) ~switch:switch_path ~queue:(Filename.basename name) () - >>= fun result -> - let server = get_ok result in - Hashtbl.add_exn servers name server; - return () - end - -let destroy switch_path name = - info "Removing %s" name - >>= fun () -> - if Hashtbl.mem servers name then begin - let t = Hashtbl.find_exn servers name in - Protocol_async.Server.shutdown ~t () >>= fun () -> - Hashtbl.remove servers name; - return () - end else return () let rec diff a b = match a with | [] -> [] | a :: aa -> if List.mem b a then diff aa b else a :: (diff aa b) -(* Ensure the right servers are started *) -let sync ~root_dir ~switch_path = - Sys.readdir root_dir - >>= fun names -> - let needed : string list = Array.to_list names in - let got_already : string list = Hashtbl.keys servers in - Deferred.all_ignore (List.map ~f:(create switch_path root_dir) (diff needed got_already)) - >>= fun () -> - Deferred.all_ignore (List.map ~f:(destroy switch_path) (diff got_already needed)) - -let main ~root_dir ~state_path ~switch_path = - Attached_SRs.reload state_path - >>= fun () -> - (* We watch and create queues for the Volume plugins only *) +let watch_volume_plugins ~root_dir ~switch_path = let root_dir = Filename.concat root_dir "volume" in + let create switch_path root_dir name = + if Hashtbl.mem servers name + then return () + else begin + info "Adding %s" name + >>= fun () -> + Protocol_async.Server.listen ~process:(process root_dir name) ~switch:switch_path ~queue:(Filename.basename name) () + >>= fun result -> + let server = get_ok result in + Hashtbl.add_exn servers name server; + return () + end in + let destroy switch_path name = + info "Removing %s" name + >>= fun () -> + if Hashtbl.mem servers name then begin + let t = Hashtbl.find_exn servers name in + Protocol_async.Server.shutdown ~t () >>= fun () -> + Hashtbl.remove servers name; + return () + end else return () in + let sync ~root_dir ~switch_path = + Sys.readdir root_dir + >>= fun names -> + let needed : string list = Array.to_list names in + let got_already : string list = Hashtbl.keys servers in + Deferred.all_ignore (List.map ~f:(create switch_path root_dir) (diff needed got_already)) + >>= fun () -> + Deferred.all_ignore (List.map ~f:(destroy switch_path) (diff got_already needed)) in Async_inotify.create ~recursive:false ~watch_new_dirs:false root_dir >>= fun (watch, _) -> sync ~root_dir ~switch_path @@ -677,6 +814,54 @@ let main ~root_dir ~state_path ~switch_path = loop () in loop () +let watch_datapath_plugins ~root_dir = + let root_dir = Filename.concat root_dir "datapath" in + let sync ~root_dir = + Sys.readdir root_dir + >>= fun names -> + let needed : string list = Array.to_list names in + let got_already : string list = Hashtbl.keys servers in + Deferred.all_ignore (List.map ~f:(Datapath_plugins.register root_dir) (diff needed got_already)) + >>= fun () -> + Deferred.all_ignore (List.map ~f:(Datapath_plugins.unregister root_dir) (diff got_already needed)) in + Async_inotify.create ~recursive:false ~watch_new_dirs:false root_dir + >>= fun (watch, _) -> + sync ~root_dir + >>= fun () -> + let pipe = Async_inotify.pipe watch in + let open Async_inotify.Event in + let rec loop () = + ( Pipe.read pipe >>= function + | `Eof -> + info "Received EOF from inotify event pipe" + >>= fun () -> + Shutdown.exit 1 + | `Ok (Created path) + | `Ok (Moved (Into path)) -> + Datapath_plugins.register root_dir (Filename.basename path) + | `Ok (Unlinked path) + | `Ok (Moved (Away path)) -> + Datapath_plugins.unregister root_dir (Filename.basename path) + | `Ok (Modified _) -> + return () + | `Ok (Moved (Move (path_a, path_b))) -> + Datapath_plugins.unregister root_dir (Filename.basename path_a) + >>= fun () -> + Datapath_plugins.register root_dir (Filename.basename path_b) + | `Ok Queue_overflow -> + sync ~root_dir + ) >>= fun () -> + loop () in + loop () + +let main ~root_dir ~state_path ~switch_path = + Attached_SRs.reload state_path + >>= fun () -> + Deferred.all_unit [ + watch_volume_plugins ~root_dir ~switch_path; + watch_datapath_plugins ~root_dir + ] + let main ~root_dir ~state_path ~switch_path = let (_: unit Deferred.t) = main ~root_dir ~state_path ~switch_path in never_returns (Scheduler.go ())