From 9f0fbc26d700fee7e69620146ed392f4ade37302 Mon Sep 17 00:00:00 2001 From: Jon Ludlam Date: Thu, 15 Oct 2015 11:38:05 +0100 Subject: [PATCH 01/18] Add initial fist-point module to Xenvmd Signed-off-by: Jon Ludlam --- idl/xenvm_interface.ml | 7 ++++++- xenvmd/fist.ml | 17 +++++++++++++++++ xenvmd/fist.mli | 8 ++++++++ xenvmd/xenvmd.ml | 8 ++++++++ 4 files changed, 39 insertions(+), 1 deletion(-) create mode 100644 xenvmd/fist.ml create mode 100644 xenvmd/fist.mli diff --git a/idl/xenvm_interface.ml b/idl/xenvm_interface.ml index 756b88a..839001e 100644 --- a/idl/xenvm_interface.ml +++ b/idl/xenvm_interface.ml @@ -1,8 +1,8 @@ (* XenVM LVM type thing *) exception HostNotCreated - exception HostStillConnecting of string +exception UnknownFistPoint of string let _journal_name = "xenvm_journal" @@ -79,3 +79,8 @@ module Host = struct external all: unit -> host list = "" end + +module Fist = struct + external set : string -> bool -> unit = "" + external list : unit -> (string * bool) list = "" +end diff --git a/xenvmd/fist.ml b/xenvmd/fist.ml new file mode 100644 index 0000000..337fa18 --- /dev/null +++ b/xenvmd/fist.ml @@ -0,0 +1,17 @@ +(* Fist points for testing *) + +type t = string + +let dummy = "dummy" + +let all = Hashtbl.create 10 + +let _ = + Hashtbl.replace all dummy false + +let get k = Hashtbl.find all k +let set k v = Hashtbl.replace all k v +let list () = Hashtbl.fold (fun k v acc -> (k,v)::acc) all [] + +let t_of_string str : t option = + if Hashtbl.mem all str then Some str else None diff --git a/xenvmd/fist.mli b/xenvmd/fist.mli new file mode 100644 index 0000000..16093da --- /dev/null +++ b/xenvmd/fist.mli @@ -0,0 +1,8 @@ +type t + +val dummy : t + +val get : t -> bool +val set : t -> bool -> unit +val list : unit -> (string * bool) list +val t_of_string : string -> t option diff --git a/xenvmd/xenvmd.ml b/xenvmd/xenvmd.ml index d3d003a..c5e773b 100644 --- a/xenvmd/xenvmd.ml +++ b/xenvmd/xenvmd.ml @@ -87,6 +87,14 @@ module Impl = struct let destroy context ~name = Host.destroy name let all context () = Host.all () end + + module Fist = struct + let set context point value = + match Fist.t_of_string point with + | Some t -> return (Fist.set t value) + | None -> raise (Xenvm_interface.UnknownFistPoint point) + let list context () = return (Fist.list ()) + end end module XenvmServer = Xenvm_interface.ServerM(Impl) From 86ef164d9b2e51b96e1b3de51ad61779bbf0e1e4 Mon Sep 17 00:00:00 2001 From: Jon Ludlam Date: Thu, 15 Oct 2015 11:49:56 +0100 Subject: [PATCH 02/18] Add some fistpoint failures to freepool.ml Signed-off-by: Jon Ludlam --- xenvmd/fist.ml | 11 ++++++++++- xenvmd/fist.mli | 6 ++++++ xenvmd/freepool.ml | 6 ++++++ 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/xenvmd/fist.ml b/xenvmd/fist.ml index 337fa18..619394b 100644 --- a/xenvmd/fist.ml +++ b/xenvmd/fist.ml @@ -1,13 +1,19 @@ (* Fist points for testing *) type t = string +exception FistPointHit of string let dummy = "dummy" +let freepool_fail_point0 = "freepool_fail_point0" +let freepool_fail_point1 = "freepool_fail_point1" +let freepool_fail_point2 = "freepool_fail_point2" let all = Hashtbl.create 10 let _ = - Hashtbl.replace all dummy false + Hashtbl.replace all dummy false; + Hashtbl.replace all freepool_fail_point1 false + let get k = Hashtbl.find all k let set k v = Hashtbl.replace all k v @@ -15,3 +21,6 @@ let list () = Hashtbl.fold (fun k v acc -> (k,v)::acc) all [] let t_of_string str : t option = if Hashtbl.mem all str then Some str else None + +let maybe_exn k = if get k then raise (FistPointHit k) +let maybe_lwt_fail k = if get k then Lwt.fail (FistPointHit k) else Lwt.return () diff --git a/xenvmd/fist.mli b/xenvmd/fist.mli index 16093da..90d7476 100644 --- a/xenvmd/fist.mli +++ b/xenvmd/fist.mli @@ -1,8 +1,14 @@ type t val dummy : t +val freepool_fail_point0 : t +val freepool_fail_point1 : t +val freepool_fail_point2 : t val get : t -> bool val set : t -> bool -> unit val list : unit -> (string * bool) list val t_of_string : string -> t option + +val maybe_exn : t -> unit +val maybe_lwt_fail : t -> unit Lwt.t diff --git a/xenvmd/freepool.ml b/xenvmd/freepool.ml index 4835796..31021ea 100644 --- a/xenvmd/freepool.ml +++ b/xenvmd/freepool.ml @@ -103,6 +103,8 @@ let perform_expand_free ef connected_host = >>= fun () -> Lwt_list.iter_s (fun msg -> debug "%s" msg) !msgs >>= fun () -> + Fist.maybe_lwt_fail Fist.freepool_fail_point1 + >>= fun () -> read (fun vg -> let current_allocation = allocation_of_lv vg connected_host.Host.free_LV_uuid in let old_allocation = ef.old_allocation in @@ -112,6 +114,10 @@ let perform_expand_free ef connected_host = Rings.FromLVM.push connected_host.Host.from_LVM allocation >>= fun pos -> Rings.FromLVM.p_advance connected_host.Host.from_LVM pos + >>= fun result -> + Fist.maybe_lwt_fail Fist.freepool_fail_point2 + >>= fun () -> + Lwt.return result let perform t = debug "%s" (Journal.Op.sexp_of_t t |> Sexplib.Sexp.to_string_hum) From 265904588b9f6e437f956b7847ece4b21470cba0 Mon Sep 17 00:00:00 2001 From: Jon Ludlam Date: Thu, 15 Oct 2015 14:41:02 +0100 Subject: [PATCH 03/18] Extract some more logic into a separate module: Hostdb Signed-off-by: Jon Ludlam --- xenvmd/freepool.ml | 100 +++++++++++---------- xenvmd/host.ml | 217 ++++++++++++++++++++++----------------------- xenvmd/host.mli | 9 -- xenvmd/hostdb.ml | 21 +++++ xenvmd/hostdb.mli | 13 +++ 5 files changed, 191 insertions(+), 169 deletions(-) create mode 100644 xenvmd/hostdb.ml create mode 100644 xenvmd/hostdb.mli diff --git a/xenvmd/freepool.ml b/xenvmd/freepool.ml index 31021ea..b072018 100644 --- a/xenvmd/freepool.ml +++ b/xenvmd/freepool.ml @@ -8,7 +8,7 @@ open Errors let xenvmd_generation_tag = "xenvmd_gen" let lvm_op_of_free_allocation vg connected_host allocation = - let freeid = connected_host.Host.free_LV_uuid in + let freeid = connected_host.Hostdb.free_LV_uuid in let lv = Lvm.Vg.LVs.find freeid vg.Lvm.Vg.lvs in let size = Lvm.Lv.size_in_extents lv in let segments = Lvm.Lv.Segment.linear size allocation in @@ -60,11 +60,11 @@ let perform_expand_free ef connected_host = maybe_write (fun vg -> - let current_allocation = allocation_of_lv vg connected_host.Host.free_LV_uuid in + let current_allocation = allocation_of_lv vg connected_host.Hostdb.free_LV_uuid in let new_space = Lvm.Pv.Allocator.sub current_allocation ef.old_allocation |> Lvm.Pv.Allocator.size in if new_space = 0L then begin try - let lv = Lvm.Vg.LVs.find connected_host.Host.free_LV_uuid vg.Lvm.Vg.lvs in (* Not_found here caught by the try-catch block *) + let lv = Lvm.Vg.LVs.find connected_host.Hostdb.free_LV_uuid vg.Lvm.Vg.lvs in (* Not_found here caught by the try-catch block *) let extent_size = vg.Lvm.Vg.extent_size in (* in sectors *) let extent_size_mib = Int64.(div (mul extent_size (of_int sector_size)) (mul 1024L 1024L)) in let old_gen = List.fold_left @@ -79,7 +79,7 @@ let perform_expand_free ef connected_host = | `Error (`OnlyThisMuchFree (needed_extents, free_extents)) -> msgs := !msgs @ [ Printf.sprintf "LV %s expansion required, but number of free extents (%Ld) is less than needed extents (%Ld)" - connected_host.Host.free_LV free_extents needed_extents; + connected_host.Hostdb.free_LV free_extents needed_extents; "Expanding to use all the available space."]; vg.Lvm.Vg.free_space in @@ -89,16 +89,16 @@ let perform_expand_free ef connected_host = match old_gen with | Some g -> [ - Lvm.Redo.Op.LvRemoveTag (connected_host.Host.free_LV_uuid, tag_of_generation g); - Lvm.Redo.Op.LvAddTag (connected_host.Host.free_LV_uuid, tag_of_generation (g+1))] + Lvm.Redo.Op.LvRemoveTag (connected_host.Hostdb.free_LV_uuid, tag_of_generation g); + Lvm.Redo.Op.LvAddTag (connected_host.Hostdb.free_LV_uuid, tag_of_generation (g+1))] | None -> [ - Lvm.Redo.Op.LvAddTag (connected_host.Host.free_LV_uuid, tag_of_generation 1)] + Lvm.Redo.Op.LvAddTag (connected_host.Hostdb.free_LV_uuid, tag_of_generation 1)] in `Ok (op1::genops) | `Error x -> `Error x with | Not_found -> - `Error (`Msg (Printf.sprintf "Couldn't find the free LV for host: %s" connected_host.Host.free_LV)) + `Error (`Msg (Printf.sprintf "Couldn't find the free LV for host: %s" connected_host.Hostdb.free_LV)) end else `Ok []) >>= fun () -> Lwt_list.iter_s (fun msg -> debug "%s" msg) !msgs @@ -106,14 +106,14 @@ let perform_expand_free ef connected_host = Fist.maybe_lwt_fail Fist.freepool_fail_point1 >>= fun () -> read (fun vg -> - let current_allocation = allocation_of_lv vg connected_host.Host.free_LV_uuid in + let current_allocation = allocation_of_lv vg connected_host.Hostdb.free_LV_uuid in let old_allocation = ef.old_allocation in let new_extents = Lvm.Pv.Allocator.sub current_allocation old_allocation in Lwt.return new_extents) >>= fun allocation -> - Rings.FromLVM.push connected_host.Host.from_LVM allocation + Rings.FromLVM.push connected_host.Hostdb.from_LVM allocation >>= fun pos -> - Rings.FromLVM.p_advance connected_host.Host.from_LVM pos + Rings.FromLVM.p_advance connected_host.Hostdb.from_LVM pos >>= fun result -> Fist.maybe_lwt_fail Fist.freepool_fail_point2 >>= fun () -> @@ -125,10 +125,10 @@ let perform t = match t with | Journal.Op.ExpandFree ef -> begin - match Host.get_connected_host ef.Journal.Op.host with - | Some connected_host -> + match Hostdb.get ef.Journal.Op.host with + | Hostdb.Connected connected_host -> perform_expand_free ef connected_host - | None -> + | _ -> error "Journalled entry exists, but the host does not!" end @@ -167,50 +167,54 @@ let resend_free_volumes () = ( read (fun x -> return (`Ok x)) ) >>= fun lvm -> - let hosts = Host.get_connected_hosts () in + let all = Hostdb.all () in Lwt_list.iter_s - (fun (host, connected_host) -> + (function + | host, Hostdb.Connected connected_host -> (* XXX: need to lock the host somehow. Ideally we would still service other queues while one host is locked. *) - let from_lvm = connected_host.Host.from_LVM in - let freeid = connected_host.Host.free_LV_uuid in - let freename = connected_host.Host.free_LV in - Rings.FromLVM.p_state from_lvm - >>= function - | `Running -> return () - | `Suspended -> - let rec wait () = - Rings.FromLVM.p_state from_lvm - >>= function - | `Suspended -> - Lwt_unix.sleep 5. - >>= fun () -> - wait () - | `Running -> return () in - wait () - >>= fun () -> - fatal_error "resend_free_volumes" - ( match try Some(Lvm.Vg.LVs.find freeid lvm.Lvm.Vg.lvs) with _ -> None with - | Some lv -> return (`Ok (Lvm.Lv.to_allocation lv)) - | None -> return (`Error (`Msg (Printf.sprintf "Failed to find LV %s" freename))) ) - >>= fun allocation -> - Rings.FromLVM.push from_lvm allocation - >>= fun pos -> - Rings.FromLVM.p_advance from_lvm pos - ) hosts + let from_lvm = connected_host.Hostdb.from_LVM in + let freeid = connected_host.Hostdb.free_LV_uuid in + let freename = connected_host.Hostdb.free_LV in + Rings.FromLVM.p_state from_lvm + >>= begin function + | `Running -> return () + | `Suspended -> + let rec wait () = + Rings.FromLVM.p_state from_lvm + >>= function + | `Suspended -> + Lwt_unix.sleep 5. + >>= fun () -> + wait () + | `Running -> return () in + wait () + >>= fun () -> + fatal_error "resend_free_volumes" + ( match try Some(Lvm.Vg.LVs.find freeid lvm.Lvm.Vg.lvs) with _ -> None with + | Some lv -> return (`Ok (Lvm.Lv.to_allocation lv)) + | None -> return (`Error (`Msg (Printf.sprintf "Failed to find LV %s" freename))) ) + >>= fun allocation -> + Rings.FromLVM.push from_lvm allocation + >>= fun pos -> + Rings.FromLVM.p_advance from_lvm pos + end + | host, _ -> + Lwt.return () + ) all let top_up_host config host connected_host = let open Config.Xenvmd in sector_size >>= fun sector_size -> read (Lwt.return) >>= fun vg -> - match try Some(Lvm.Vg.LVs.find connected_host.Host.free_LV_uuid vg.Lvm.Vg.lvs) with _ -> None with + match try Some(Lvm.Vg.LVs.find connected_host.Hostdb.free_LV_uuid vg.Lvm.Vg.lvs) with _ -> None with | Some lv -> let extent_size = vg.Lvm.Vg.extent_size in (* in sectors *) let extent_size_mib = Int64.(div (mul extent_size (of_int sector_size)) (mul 1024L 1024L)) in let size_mib = Int64.mul (Lvm.Lv.size_in_extents lv) extent_size_mib in if size_mib < config.host_low_water_mark then begin info "LV %s is %Ld MiB < low_water_mark %Ld MiB; allocating" - connected_host.Host.free_LV size_mib config.host_low_water_mark + connected_host.Hostdb.free_LV size_mib config.host_low_water_mark >>= fun () -> match !journal with | Some j -> @@ -229,8 +233,10 @@ let top_up_host config host connected_host = error "Host has disappeared!" let top_up_free_volumes config = - let hosts = Host.get_connected_hosts () in + let hosts = Hostdb.all () in Lwt_list.iter_s - (fun (host, connected_host) -> - top_up_host config host connected_host + (function + | host, Hostdb.Connected connected_host -> + top_up_host config host connected_host + | _ -> Lwt.return () ) hosts diff --git a/xenvmd/host.ml b/xenvmd/host.ml index 648ba5f..b796ecb 100644 --- a/xenvmd/host.ml +++ b/xenvmd/host.ml @@ -1,27 +1,6 @@ open Lwt open Log -type connected_host = { - mutable state : Xenvm_interface.connection_state; - to_LVM : Rings.ToLVM.consumer; - from_LVM : Rings.FromLVM.producer; - free_LV : string; - free_LV_uuid : Lvm.Vg.LVs.key; -} - -module StringSet = Set.Make(String) -let connecting_hosts : StringSet.t ref = ref StringSet.empty -let connected_hosts : (string, connected_host) Hashtbl.t = Hashtbl.create 11 - -let get_connected_hosts () = - Hashtbl.fold (fun k v acc -> (k,v)::acc) connected_hosts [] - -let get_connected_host host = - try - Some (Hashtbl.find connected_hosts host) - with Not_found -> - None - let connected_tag = "xenvm_connected" (* Conventional names of the metadata volumes *) @@ -105,26 +84,27 @@ let connect name = let fromLVM = fromLVM name in let freeLVM = freeLVM name in - let is_connecting = StringSet.mem name (!connecting_hosts) in - - begin if Hashtbl.mem connected_hosts name then begin - let connected_host = Hashtbl.find connected_hosts name in - match connected_host.state with - | Xenvm_interface.Failed msg -> - info "Connection to host %s has failed with %s: retrying" name msg - >>= fun () -> - Lwt.return true - | x -> - info "Connction to host %s in state %s" name (Jsonrpc.to_string (Xenvm_interface.rpc_of_connection_state x)) - >>= fun () -> - Lwt.return false - end else Lwt.return true end + begin match Hostdb.get name with + | Hostdb.Connected connected_host -> begin + match connected_host.Hostdb.state with + | Xenvm_interface.Failed msg -> + info "Connection to host %s has failed with %s: retrying" name msg + >>= fun () -> + Lwt.return true + | x -> + info "Connction to host %s in state %s" name (Jsonrpc.to_string (Xenvm_interface.rpc_of_connection_state x)) + >>= fun () -> + Lwt.return false + end + | Hostdb.Disconnected -> return true (* We should try again *) + | Hostdb.Connecting -> return false (* If we're already connecting, no need to do anything *) + end >>= fun try_again -> - if is_connecting || (not try_again) then begin + if (not try_again) then begin return () end else begin - connecting_hosts := StringSet.add name !connecting_hosts; + Hostdb.set name Hostdb.Connecting; match Vg_io.find vg toLVM, Vg_io.find vg fromLVM, Vg_io.find vg freeLVM with | Some toLVM_id, Some fromLVM_id, Some freeLVM_id -> (* Persist at this point that we're going to connect this host *) @@ -152,18 +132,17 @@ let connect name = | `Ok disk -> Rings.FromLVM.attach_as_producer ~name ~disk () >>= fun (initial_state, fromLVM_q) -> - let connected_host = { + let connected_host = Hostdb.({ state = Xenvm_interface.Resuming_to_LVM; from_LVM = fromLVM_q; to_LVM = toLVM_q; free_LV = freeLVM; free_LV_uuid = (Vg_io.Volume.metadata_of freeLVM_id).Lvm.Lv.id - } in - Hashtbl.replace connected_hosts name connected_host; - connecting_hosts := StringSet.remove name !connecting_hosts; + }) in + Hostdb.set name (Hostdb.Connected connected_host); ( if initial_state = `Suspended then begin - connected_host.state <- Xenvm_interface.Resending_free_blocks; + connected_host.Hostdb.state <- Xenvm_interface.Resending_free_blocks; debug "The Rings.FromLVM queue was already suspended: resending the free blocks" >>= fun () -> let allocation = Lvm.Lv.to_allocation (Vg_io.Volume.metadata_of freeLVM_id) in @@ -191,17 +170,16 @@ let connect name = (fun () -> background_t () >>= fun connected_host -> - connected_host.state <- Xenvm_interface.Connected; + connected_host.Hostdb.state <- Xenvm_interface.Connected; return () ) (fun e -> let msg = Printexc.to_string e in error "Connecting to %s failed with: %s" name msg >>= fun () -> begin - try - let connected_host = Hashtbl.find connected_hosts name in - connected_host.state <- Xenvm_interface.Failed msg; - with Not_found -> () + match Hostdb.get name with + | Hostdb.Connected ch -> ch.Hostdb.state <- Xenvm_interface.Failed msg + | _ -> () end; return ()) ); @@ -209,7 +187,7 @@ let connect name = | _, _, _ -> info "At least one of host %s's volumes does not exist" name >>= fun () -> - connecting_hosts := StringSet.remove name !connecting_hosts; + Hostdb.set name Hostdb.Disconnected; fail Xenvm_interface.HostNotCreated end @@ -217,11 +195,10 @@ let connect name = let flush_m = Lwt_mutex.create () let flush_already_locked name = - if not (Hashtbl.mem connected_hosts name) - then return () - else begin - let connected_host = Hashtbl.find connected_hosts name in - let to_lvm = connected_host.to_LVM in + match Hostdb.get name with + | Hostdb.Disconnected | Hostdb.Connecting -> return () + | Hostdb.Connected connected_host -> + let to_lvm = connected_host.Hostdb.to_LVM in Rings.ToLVM.pop to_lvm >>= fun (pos, items) -> debug "Rings.FromLVM queue %s has %d items" name (List.length items) @@ -231,43 +208,43 @@ let flush_already_locked name = >>= fun () -> Vg_io.write (fun vg -> let id = (Lvm.Vg.LVs.find_by_name volume vg.Lvm.Vg.lvs).Lvm.Lv.id in - let free_id = connected_host.free_LV_uuid in + let free_id = connected_host.Hostdb.free_LV_uuid in Lvm.Vg.do_op vg (Lvm.Redo.Op.(LvTransfer(free_id, id, segments))) ) >>= fun _ -> Lwt.return () ) items >>= fun () -> Rings.ToLVM.c_advance to_lvm pos - end let disconnect ~cooperative name = - if StringSet.mem name !connecting_hosts then + match Hostdb.get name with + | Hostdb.Disconnected -> return () + | Hostdb.Connecting -> fail (Xenvm_interface.(HostStillConnecting (Jsonrpc.to_string (rpc_of_connection_state Xenvm_interface.Resuming_to_LVM)))) - else - if Hashtbl.mem connected_hosts name then begin - let connected_host = Hashtbl.find connected_hosts name in - match connected_host.state with - | Xenvm_interface.Connected -> - let to_lvm = connected_host.to_LVM in - ( if cooperative then begin - debug "Cooperative disconnect: suspending Rings.ToLVM queue for %s" name - >>= fun () -> - Rings.ToLVM.suspend to_lvm - end else return () - ) >>= fun () -> - (* There may still be updates in the Rings.ToLVM queue *) - debug "Flushing Rings.ToLVM queue for %s" name - >>= fun () -> - Lwt_mutex.with_lock flush_m (fun () -> flush_already_locked name) - >>= fun () -> - let toLVM = toLVM name in - Vg_io.write (fun vg -> - Lvm.Vg.remove_tag vg toLVM connected_tag - ) >>= fun _ -> - Hashtbl.remove connected_hosts name; - return () - | x -> - fail (Xenvm_interface.(HostStillConnecting (Jsonrpc.to_string (rpc_of_connection_state x)))) - end else return () + | Hostdb.Connected connected_host -> + begin + match connected_host.Hostdb.state with + | Xenvm_interface.Connected -> + let to_lvm = connected_host.Hostdb.to_LVM in + ( if cooperative then begin + debug "Cooperative disconnect: suspending Rings.ToLVM queue for %s" name + >>= fun () -> + Rings.ToLVM.suspend to_lvm + end else return () + ) >>= fun () -> + (* There may still be updates in the Rings.ToLVM queue *) + debug "Flushing Rings.ToLVM queue for %s" name + >>= fun () -> + Lwt_mutex.with_lock flush_m (fun () -> flush_already_locked name) + >>= fun () -> + let toLVM = toLVM name in + Vg_io.write (fun vg -> + Lvm.Vg.remove_tag vg toLVM connected_tag + ) >>= fun _ -> + Hostdb.set name Hostdb.Disconnected; + return () + | x -> + fail (Xenvm_interface.(HostStillConnecting (Jsonrpc.to_string (rpc_of_connection_state x)))) + end let destroy name = disconnect ~cooperative:false name @@ -287,34 +264,40 @@ let destroy name = Lwt.return () let all () = - let list = Hashtbl.fold (fun n c acc -> (n,c)::acc) connected_hosts [] in + let all = Hostdb.all () in Lwt_list.map_s - (fun (name, connected_host) -> - let lv = toLVM name in - let t = connected_host.to_LVM in - ( Rings.ToLVM.c_state t >>= function - | `Suspended -> return true - | `Running -> return false ) >>= fun suspended -> - Rings.ToLVM.c_debug_info t - >>= fun debug_info -> - let toLVM = { Xenvm_interface.lv; suspended; debug_info } in - let lv = fromLVM name in - let t = connected_host.from_LVM in - ( Rings.FromLVM.p_state t >>= function - | `Suspended -> return true - | `Running -> return false ) >>= fun suspended -> - Rings.FromLVM.p_debug_info t - >>= fun debug_info -> - let fromLVM = { Xenvm_interface.lv; suspended; debug_info } in - Vg_io.read (fun vg -> - try - let lv = Lvm.Vg.LVs.find_by_name (freeLVM name) vg.Lvm.Vg.lvs in - return (Lvm.Lv.size_in_extents lv) - with Not_found -> return 0L - ) >>= fun freeExtents -> - let connection_state = Some connected_host.state in - return { Xenvm_interface.name; connection_state; fromLVM; toLVM; freeExtents } - ) list + (fun (name,state) -> + match state with + | Hostdb.Connected connected_host -> + let lv = toLVM name in + let t = connected_host.Hostdb.to_LVM in + ( Rings.ToLVM.c_state t >>= function + | `Suspended -> return true + | `Running -> return false ) >>= fun suspended -> + Rings.ToLVM.c_debug_info t + >>= fun debug_info -> + let toLVM = { Xenvm_interface.lv; suspended; debug_info } in + let lv = fromLVM name in + let t = connected_host.Hostdb.from_LVM in + ( Rings.FromLVM.p_state t >>= function + | `Suspended -> return true + | `Running -> return false ) >>= fun suspended -> + Rings.FromLVM.p_debug_info t + >>= fun debug_info -> + let fromLVM = { Xenvm_interface.lv; suspended; debug_info } in + Vg_io.read (fun vg -> + try + let lv = Lvm.Vg.LVs.find_by_name (freeLVM name) vg.Lvm.Vg.lvs in + return (Lvm.Lv.size_in_extents lv) + with Not_found -> return 0L + ) >>= fun freeExtents -> + let connection_state = Some connected_host.Hostdb.state in + return (Some ({ Xenvm_interface.name; connection_state; fromLVM; toLVM; freeExtents })) + | _ -> return None + ) all + >>= fun l -> + Lwt.return (List.rev (List.fold_left (fun acc entry -> match entry with Some e -> e::acc | None -> acc) [] l)) + let reconnect_all () = Vg_io.read (fun vg -> @@ -341,11 +324,19 @@ let flush_one host = (fun () -> flush_already_locked host) let flush_all () = - let hosts = Hashtbl.fold (fun h _ acc -> h::acc) connected_hosts [] in - Lwt_list.iter_s flush_one hosts + let all = Hostdb.all () in + Lwt_list.iter_s + (fun (h, state) -> + match state with + | Hostdb.Connected _ -> flush_one h + | _ -> return ()) all let shutdown () = - let hosts = Hashtbl.fold (fun h _ acc -> h::acc) connected_hosts [] in - Lwt_list.iter_s (disconnect ~cooperative:true) hosts + let all = Hostdb.all () in + Lwt_list.iter_s + (fun (h, state) -> + match state with + | Hostdb.Connected _ -> disconnect ~cooperative:true h + | _ -> return ()) all >>= fun () -> Vg_io.sync () diff --git a/xenvmd/host.mli b/xenvmd/host.mli index 0bd5be7..d4d0924 100644 --- a/xenvmd/host.mli +++ b/xenvmd/host.mli @@ -1,12 +1,3 @@ -type connected_host = { - mutable state : Xenvm_interface.connection_state; - to_LVM : Rings.ToLVM.consumer; - from_LVM : Rings.FromLVM.producer; - free_LV : string; - free_LV_uuid : Lvm.Vg.LVs.key; -} -val get_connected_hosts : unit -> (string * connected_host) list -val get_connected_host : string -> connected_host option val create : string -> unit Lwt.t val connect : string -> unit Lwt.t val disconnect : cooperative:bool -> string -> unit Lwt.t diff --git a/xenvmd/hostdb.ml b/xenvmd/hostdb.ml new file mode 100644 index 0000000..a085fd0 --- /dev/null +++ b/xenvmd/hostdb.ml @@ -0,0 +1,21 @@ +type connected_host = { + mutable state : Xenvm_interface.connection_state; + to_LVM : Rings.ToLVM.consumer; + from_LVM : Rings.FromLVM.producer; + free_LV : string; + free_LV_uuid : Lvm.Vg.LVs.key; +} + +type state = + | Disconnected + | Connecting + | Connected of connected_host + +let state : (string, state) Hashtbl.t = Hashtbl.create 11 + +let get host = try Hashtbl.find state host with _ -> Disconnected +let set host st = + if st=Disconnected + then Hashtbl.remove state host + else Hashtbl.replace state host st +let all () = Hashtbl.fold (fun k v acc -> (k,v)::acc) state [] diff --git a/xenvmd/hostdb.mli b/xenvmd/hostdb.mli new file mode 100644 index 0000000..9b364a5 --- /dev/null +++ b/xenvmd/hostdb.mli @@ -0,0 +1,13 @@ +type connected_host = { + mutable state : Xenvm_interface.connection_state; + to_LVM : Rings.ToLVM.consumer; + from_LVM : Rings.FromLVM.producer; + free_LV : string; + free_LV_uuid : Lvm.Vg.LVs.key; +} +type state = Disconnected | Connecting | Connected of connected_host + +val get : string -> state +val set : string -> state -> unit +val all : unit -> (string * state) list + From 9305b15576623f0f2c2fbf4de5dda4334fdd63dd Mon Sep 17 00:00:00 2001 From: Jon Ludlam Date: Thu, 15 Oct 2015 15:01:29 +0100 Subject: [PATCH 04/18] Move some code around a little. No semantic change Signed-off-by: Jon Ludlam --- xenvmd/host.ml | 75 ++++++++++++++++++++++++++------------------------ 1 file changed, 39 insertions(+), 36 deletions(-) diff --git a/xenvmd/host.ml b/xenvmd/host.ml index b796ecb..9940cc6 100644 --- a/xenvmd/host.ml +++ b/xenvmd/host.ml @@ -3,6 +3,43 @@ open Log let connected_tag = "xenvm_connected" +(* Hold this mutex when actively flushing from the Rings.ToLVM queues *) +let flush_m = Lwt_mutex.create () + +let flush_already_locked name = + match Hostdb.get name with + | Hostdb.Disconnected | Hostdb.Connecting -> return () + | Hostdb.Connected connected_host -> + let to_lvm = connected_host.Hostdb.to_LVM in + Rings.ToLVM.pop to_lvm + >>= fun (pos, items) -> + debug "Rings.FromLVM queue %s has %d items" name (List.length items) + >>= fun () -> + Lwt_list.iter_s (function { ExpandVolume.volume; segments } -> + debug "Expanding volume %s" volume + >>= fun () -> + Vg_io.write (fun vg -> + let id = (Lvm.Vg.LVs.find_by_name volume vg.Lvm.Vg.lvs).Lvm.Lv.id in + let free_id = connected_host.Hostdb.free_LV_uuid in + Lvm.Vg.do_op vg (Lvm.Redo.Op.(LvTransfer(free_id, id, segments))) + ) >>= fun _ -> Lwt.return () + ) items + >>= fun () -> + Rings.ToLVM.c_advance to_lvm pos + +let flush_one host = + Lwt_mutex.with_lock flush_m + (fun () -> flush_already_locked host) + +let flush_all () = + let all = Hostdb.all () in + Lwt_list.iter_s + (fun (h, state) -> + match state with + | Hostdb.Connected _ -> flush_one h + | _ -> return ()) all + + (* Conventional names of the metadata volumes *) let toLVM host = host ^ "-toLVM" let fromLVM host = host ^ "-fromLVM" @@ -191,30 +228,6 @@ let connect name = fail Xenvm_interface.HostNotCreated end -(* Hold this mutex when actively flushing from the Rings.ToLVM queues *) -let flush_m = Lwt_mutex.create () - -let flush_already_locked name = - match Hostdb.get name with - | Hostdb.Disconnected | Hostdb.Connecting -> return () - | Hostdb.Connected connected_host -> - let to_lvm = connected_host.Hostdb.to_LVM in - Rings.ToLVM.pop to_lvm - >>= fun (pos, items) -> - debug "Rings.FromLVM queue %s has %d items" name (List.length items) - >>= fun () -> - Lwt_list.iter_s (function { ExpandVolume.volume; segments } -> - debug "Expanding volume %s" volume - >>= fun () -> - Vg_io.write (fun vg -> - let id = (Lvm.Vg.LVs.find_by_name volume vg.Lvm.Vg.lvs).Lvm.Lv.id in - let free_id = connected_host.Hostdb.free_LV_uuid in - Lvm.Vg.do_op vg (Lvm.Redo.Op.(LvTransfer(free_id, id, segments))) - ) >>= fun _ -> Lwt.return () - ) items - >>= fun () -> - Rings.ToLVM.c_advance to_lvm pos - let disconnect ~cooperative name = match Hostdb.get name with | Hostdb.Disconnected -> return () @@ -319,18 +332,6 @@ let reconnect_all () = Lwt_list.iter_s (fun (host, was_connected) -> if was_connected then connect host else disconnect ~cooperative:false host) host_states -let flush_one host = - Lwt_mutex.with_lock flush_m - (fun () -> flush_already_locked host) - -let flush_all () = - let all = Hostdb.all () in - Lwt_list.iter_s - (fun (h, state) -> - match state with - | Hostdb.Connected _ -> flush_one h - | _ -> return ()) all - let shutdown () = let all = Hostdb.all () in Lwt_list.iter_s @@ -340,3 +341,5 @@ let shutdown () = | _ -> return ()) all >>= fun () -> Vg_io.sync () + + From e4e41ac18bdec9463d26b2fb06ddd976a41cb29b Mon Sep 17 00:00:00 2001 From: Jon Ludlam Date: Fri, 16 Oct 2015 13:17:06 +0100 Subject: [PATCH 05/18] Add an adaptive delay to the local allocator Signed-off-by: Jon Ludlam --- idl/errors.ml | 3 +++ xenvm-local-allocator/local_allocator.ml | 12 ++++++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/idl/errors.ml b/idl/errors.ml index f13ecfd..9195c40 100644 --- a/idl/errors.ml +++ b/idl/errors.ml @@ -33,6 +33,9 @@ let fatal_error msg m = m >>= function | `Error `Retry -> fatal_error_t (msg ^ ": queue temporarily unavailable") | `Ok x -> return x +let delayfn n = + if n>10 then 5.0 else (float_of_int n *. 0.5) + let rec retry_forever f = f () >>= function diff --git a/xenvm-local-allocator/local_allocator.ml b/xenvm-local-allocator/local_allocator.ml index abf4eec..6db232f 100644 --- a/xenvm-local-allocator/local_allocator.ml +++ b/xenvm-local-allocator/local_allocator.ml @@ -113,12 +113,16 @@ module FreePool = struct FromLVM.resume from_lvm >>= fun () -> - let rec loop_forever () = + (* n here is the number of times we've been + around the loop without activity. we use + it to calculate the delay until next + poll. *) + let rec loop_forever n = FromLVM.pop from_lvm >>= fun (pos, ts) -> let open FreeAllocation in ( if ts = [] then begin - Lwt_unix.sleep 5. + Lwt_unix.sleep (delayfn n) end else return () ) >>= fun () -> Lwt_list.iter_s @@ -130,7 +134,7 @@ module FreePool = struct >>= fun () -> FromLVM.c_advance from_lvm pos >>= fun () -> - loop_forever () in + loop_forever (if ts = [] then n+1 else 0) in return loop_forever end @@ -341,7 +345,7 @@ let main mock_dm config daemon socket journal fromLVM toLVM = FreePool.start config vg >>= fun forever_fun -> - let (_: unit Lwt.t) = forever_fun () in + let (_: unit Lwt.t) = forever_fun 0 in let (_: unit Lwt.t) = wait_for_shutdown_forever () in (* Called to extend a single device. This function decides what needs to be From e56b27f9879266f0c01ddc5779a4106383212430 Mon Sep 17 00:00:00 2001 From: Jon Ludlam Date: Fri, 16 Oct 2015 13:17:55 +0100 Subject: [PATCH 06/18] Put trace logging into its own 'section' Signed-off-by: Jon Ludlam --- idl/log.ml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/idl/log.ml b/idl/log.ml index 96453ec..f9fb54a 100644 --- a/idl/log.ml +++ b/idl/log.ml @@ -11,6 +11,8 @@ let info fmt = Lwt_log.info_f fmt let warn fmt = Lwt_log.warning_f fmt let error fmt = Lwt_log.error_f fmt +let trace_section = Lwt_log.Section.make "trace" + let trace ts = let string_of_key = function | `Producer -> "producer" @@ -25,4 +27,4 @@ let trace ts = Printf.sprintf "%s.%s := %s" queue (string_of_key key) (string_of_value value) | `Get (__, queue, key, value) -> Printf.sprintf "%s.%s == %s" queue (string_of_key key) (string_of_value value) in - info "%s" (String.concat ", " (List.map one ts)) + Lwt_log.info_f ~section:trace_section "%s" (String.concat ", " (List.map one ts)) From 36dca4dbed88771e8004de29babc5738bc7407e1 Mon Sep 17 00:00:00 2001 From: Jon Ludlam Date: Fri, 16 Oct 2015 13:20:40 +0100 Subject: [PATCH 07/18] Add adaptive error retry backoff Signed-off-by: Jon Ludlam --- idl/errors.ml | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/idl/errors.ml b/idl/errors.ml index 9195c40..4503b50 100644 --- a/idl/errors.ml +++ b/idl/errors.ml @@ -37,14 +37,16 @@ let delayfn n = if n>10 then 5.0 else (float_of_int n *. 0.5) let rec retry_forever f = - f () - >>= function - | `Ok x -> return (`Ok x) - | `Error `Retry -> - Lwt_unix.sleep 5. - >>= fun () -> - retry_forever f - | `Error x -> return (`Error x) + let rec inner n = + f () + >>= function + | `Ok x -> return (`Ok x) + | `Error `Retry -> + Lwt_unix.sleep (delayfn n) + >>= fun () -> + inner (n+1) + | `Error x -> return (`Error x) + in inner 0 let wait_for f result = let new_f () = From b795cbbda183c00561004e7ac1feaf2ea5244f9d Mon Sep 17 00:00:00 2001 From: Jon Ludlam Date: Fri, 16 Oct 2015 13:30:23 +0100 Subject: [PATCH 08/18] CA-185884: Move to thread-per-host in xenvmd Also uses adaptive delay logic Signed-off-by: Jon Ludlam --- xenvmd/freepool.ml | 70 ++++++++++++++++++++++++++------------------- xenvmd/freepool.mli | 2 ++ xenvmd/host.ml | 35 +++++++++++++++++++---- xenvmd/host.mli | 4 +-- xenvmd/xenvmd.ml | 31 +++++++------------- 5 files changed, 84 insertions(+), 58 deletions(-) diff --git a/xenvmd/freepool.ml b/xenvmd/freepool.ml index b072018..eb2a0cb 100644 --- a/xenvmd/freepool.ml +++ b/xenvmd/freepool.ml @@ -162,43 +162,45 @@ let shutdown () = | None -> return () -let resend_free_volumes () = +let resend_free_volume_to connected_host = fatal_error "resend_free_volumes unable to read LVM metadata" ( read (fun x -> return (`Ok x)) ) >>= fun lvm -> + (* XXX: need to lock the host somehow. Ideally we would still service + other queues while one host is locked. *) + let from_lvm = connected_host.Hostdb.from_LVM in + let freeid = connected_host.Hostdb.free_LV_uuid in + let freename = connected_host.Hostdb.free_LV in + Rings.FromLVM.p_state from_lvm + >>= begin function + | `Running -> return () + | `Suspended -> + let rec wait () = + Rings.FromLVM.p_state from_lvm + >>= function + | `Suspended -> + Lwt_unix.sleep 0.5 + >>= fun () -> + wait () + | `Running -> return () in + wait () + >>= fun () -> + fatal_error "resend_free_volumes" + ( match try Some(Lvm.Vg.LVs.find freeid lvm.Lvm.Vg.lvs) with _ -> None with + | Some lv -> return (`Ok (Lvm.Lv.to_allocation lv)) + | None -> return (`Error (`Msg (Printf.sprintf "Failed to find LV %s" freename))) ) + >>= fun allocation -> + Rings.FromLVM.push from_lvm allocation + >>= fun pos -> + Rings.FromLVM.p_advance from_lvm pos + end +let resend_free_volumes () = let all = Hostdb.all () in Lwt_list.iter_s (function | host, Hostdb.Connected connected_host -> - (* XXX: need to lock the host somehow. Ideally we would still service - other queues while one host is locked. *) - let from_lvm = connected_host.Hostdb.from_LVM in - let freeid = connected_host.Hostdb.free_LV_uuid in - let freename = connected_host.Hostdb.free_LV in - Rings.FromLVM.p_state from_lvm - >>= begin function - | `Running -> return () - | `Suspended -> - let rec wait () = - Rings.FromLVM.p_state from_lvm - >>= function - | `Suspended -> - Lwt_unix.sleep 5. - >>= fun () -> - wait () - | `Running -> return () in - wait () - >>= fun () -> - fatal_error "resend_free_volumes" - ( match try Some(Lvm.Vg.LVs.find freeid lvm.Lvm.Vg.lvs) with _ -> None with - | Some lv -> return (`Ok (Lvm.Lv.to_allocation lv)) - | None -> return (`Error (`Msg (Printf.sprintf "Failed to find LV %s" freename))) ) - >>= fun allocation -> - Rings.FromLVM.push from_lvm allocation - >>= fun pos -> - Rings.FromLVM.p_advance from_lvm pos - end + resend_free_volume_to connected_host | host, _ -> Lwt.return () ) all @@ -226,11 +228,17 @@ let top_up_host config host connected_host = extra_size=config.host_allocation_quantum }) >>|= fun wait -> wait.Journal.J.sync () + >>= fun () -> + Lwt.return true | None -> error "No journal configured!" - end else return () + >>= fun () -> + Lwt.return false + end else return false | None -> error "Host has disappeared!" + >>= fun () -> + Lwt.return false let top_up_free_volumes config = let hosts = Hostdb.all () in @@ -238,5 +246,7 @@ let top_up_free_volumes config = (function | host, Hostdb.Connected connected_host -> top_up_host config host connected_host + >>= fun _ -> + Lwt.return () | _ -> Lwt.return () ) hosts diff --git a/xenvmd/freepool.mli b/xenvmd/freepool.mli index b93ab53..dd8252d 100644 --- a/xenvmd/freepool.mli +++ b/xenvmd/freepool.mli @@ -1,4 +1,6 @@ val start : string -> unit Lwt.t val shutdown : unit -> unit Lwt.t +val top_up_host : Config.Xenvmd.t -> string -> Hostdb.connected_host -> bool Lwt.t +val resend_free_volume_to : Hostdb.connected_host -> unit Lwt.t val resend_free_volumes : unit -> unit Lwt.t val top_up_free_volumes : Config.Xenvmd.t -> unit Lwt.t diff --git a/xenvmd/host.ml b/xenvmd/host.ml index 9940cc6..e9407ac 100644 --- a/xenvmd/host.ml +++ b/xenvmd/host.ml @@ -8,7 +8,7 @@ let flush_m = Lwt_mutex.create () let flush_already_locked name = match Hostdb.get name with - | Hostdb.Disconnected | Hostdb.Connecting -> return () + | Hostdb.Disconnected | Hostdb.Connecting -> return false | Hostdb.Connected connected_host -> let to_lvm = connected_host.Hostdb.to_LVM in Rings.ToLVM.pop to_lvm @@ -26,6 +26,8 @@ let flush_already_locked name = ) items >>= fun () -> Rings.ToLVM.c_advance to_lvm pos + >>= fun () -> + Lwt.return (List.length items > 0) let flush_one host = Lwt_mutex.with_lock flush_m @@ -36,9 +38,29 @@ let flush_all () = Lwt_list.iter_s (fun (h, state) -> match state with - | Hostdb.Connected _ -> flush_one h + | Hostdb.Connected _ -> flush_one h >>= fun _ -> Lwt.return () | _ -> return ()) all +let service_host config host = + let rec inner n = + match Hostdb.get host with + | Hostdb.Connected connected_host -> + (* 0. Have any local allocators restarted? *) + Freepool.resend_free_volume_to connected_host + >>= fun () -> + (* 1. Do any of the host free LVs need topping up? *) + Freepool.top_up_host config host connected_host + >>= fun topped_up -> + (* 2. Are there any pending LVM updates from hosts? *) + flush_one host + >>= fun la_activity -> + let activity = topped_up || la_activity in + Lwt_unix.sleep (Errors.delayfn n) + >>= fun () -> + inner (if activity then 0 else n+1) + | _ -> + Lwt.return () in + inner 0 (* Conventional names of the metadata volumes *) let toLVM host = host ^ "-toLVM" @@ -113,7 +135,7 @@ let create name = let sexp_of_exn e = Sexplib.Sexp.Atom (Printexc.to_string e) -let connect name = +let connect config name = Vg_io.myvg >>= fun vg -> info "Registering host %s" name >>= fun () -> @@ -208,6 +230,7 @@ let connect name = background_t () >>= fun connected_host -> connected_host.Hostdb.state <- Xenvm_interface.Connected; + let _ = service_host config name in return () ) (fun e -> let msg = Printexc.to_string e in @@ -248,7 +271,7 @@ let disconnect ~cooperative name = debug "Flushing Rings.ToLVM queue for %s" name >>= fun () -> Lwt_mutex.with_lock flush_m (fun () -> flush_already_locked name) - >>= fun () -> + >>= fun _ -> let toLVM = toLVM name in Vg_io.write (fun vg -> Lvm.Vg.remove_tag vg toLVM connected_tag @@ -312,7 +335,7 @@ let all () = Lwt.return (List.rev (List.fold_left (fun acc entry -> match entry with Some e -> e::acc | None -> acc) [] l)) -let reconnect_all () = +let reconnect_all config = Vg_io.read (fun vg -> debug "Reconnecting" >>= fun () -> @@ -330,7 +353,7 @@ let reconnect_all () = vg.Lvm.Vg.lvs [] |> Lwt.return ) >>= fun host_states -> Lwt_list.iter_s (fun (host, was_connected) -> - if was_connected then connect host else disconnect ~cooperative:false host) host_states + if was_connected then connect config host else disconnect ~cooperative:false host) host_states let shutdown () = let all = Hostdb.all () in diff --git a/xenvmd/host.mli b/xenvmd/host.mli index d4d0924..9117d93 100644 --- a/xenvmd/host.mli +++ b/xenvmd/host.mli @@ -1,8 +1,8 @@ val create : string -> unit Lwt.t -val connect : string -> unit Lwt.t +val connect : Config.Xenvmd.t -> string -> unit Lwt.t val disconnect : cooperative:bool -> string -> unit Lwt.t val destroy : string -> unit Lwt.t val all : unit -> Xenvm_interface.host list Lwt.t -val reconnect_all : unit -> unit Lwt.t +val reconnect_all : Config.Xenvmd.t -> unit Lwt.t val flush_all : unit -> unit Lwt.t val shutdown : unit -> unit Lwt.t diff --git a/xenvmd/xenvmd.ml b/xenvmd/xenvmd.ml index c5e773b..8a3dcb3 100644 --- a/xenvmd/xenvmd.ml +++ b/xenvmd/xenvmd.ml @@ -12,7 +12,8 @@ module Impl = struct let ignore_result _ = Lwt.return () type context = { - stoppers : (unit Lwt.u) list + stoppers : (unit Lwt.u) list; + config : Config.Xenvmd.t } let get context () = @@ -82,7 +83,7 @@ module Impl = struct module Host = struct let create context ~name = Host.create name - let connect context ~name = Host.connect name + let connect context ~name = Host.connect context.config name let disconnect context ~cooperative ~name = Host.disconnect ~cooperative name let destroy context ~name = Host.destroy name let all context () = Host.all () @@ -101,9 +102,9 @@ module XenvmServer = Xenvm_interface.ServerM(Impl) open Cohttp_lwt_unix -let handler ~info stoppers (ch,conn) req body = +let handler ~info stoppers config (ch,conn) req body = Cohttp_lwt_body.to_string body >>= fun bodystr -> - XenvmServer.process {Impl.stoppers} (Jsonrpc.call_of_string bodystr) >>= fun result -> + XenvmServer.process {Impl.stoppers; config} (Jsonrpc.call_of_string bodystr) >>= fun result -> Server.respond_string ~status:`OK ~body:(Jsonrpc.string_of_response result) () let maybe_write_pid config = @@ -136,28 +137,17 @@ let run port sock_path config = >>= fun () -> Freepool.start Xenvm_interface._journal_name >>= fun () -> - Host.reconnect_all () + Host.reconnect_all config >>= fun () -> (* Create a snapshot cache of the metadata for the stats thread *) Vg_io.read return >>= fun vg -> let stats_vg_cache = ref vg in - let rec service_queues () = - (* 0. Have any local allocators restarted? *) - Freepool.resend_free_volumes () - >>= fun () -> - (* 1. Do any of the host free LVs need topping up? *) - Freepool.top_up_free_volumes config - >>= fun () -> - (* 2. Are there any pending LVM updates from hosts? *) - Host.flush_all () - >>= fun () -> - (* 3. Update the metadata snapshot for the stats collection *) + let rec service_stats () = Vg_io.read return >>= fun vg -> stats_vg_cache := vg; - Lwt_unix.sleep 5. >>= fun () -> - service_queues () in + service_stats () in (* See below for a description of 'stoppers' and 'stop' *) let service_http stoppers mode stop = @@ -169,7 +159,7 @@ let run port sock_path config = Lwt.ignore_result (debug "Listening for HTTP request on: %s\n" ty); let info = Printf.sprintf "Served by Cohttp/Lwt listening on %s" ty in let conn_closed (ch,conn) = () in - let callback = handler ~info stoppers in + let callback = handler ~info stoppers config in let c = Server.make ~callback ~conn_closed () in (* Listen for regular API calls *) Server.create ~mode ~stop c in @@ -209,7 +199,7 @@ let run port sock_path config = | None -> () end; - Lwt.join ((service_queues ())::threads) in + Lwt.join ((service_stats ())::threads) in Lwt_main.run t @@ -262,6 +252,7 @@ let main port sock_path config daemon = let config = t_of_sexp (Sexplib.Sexp.load_sexp config) in let config = { config with listenPort = match port with None -> config.listenPort | Some x -> Some x } in let config = { config with listenPath = match sock_path with None -> config.listenPath | Some x -> Some x } in + Lwt_log.add_rule "*" Lwt_log.Debug; if daemon then daemonize config; From a181ca122a8c761461b0ebf6abc17bc27b4d2bb4 Mon Sep 17 00:00:00 2001 From: Jon Ludlam Date: Fri, 16 Oct 2015 15:05:48 +0100 Subject: [PATCH 09/18] Log when we hit a fist point Signed-off-by: Jon Ludlam --- xenvmd/fist.ml | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/xenvmd/fist.ml b/xenvmd/fist.ml index 619394b..3d324df 100644 --- a/xenvmd/fist.ml +++ b/xenvmd/fist.ml @@ -12,7 +12,9 @@ let all = Hashtbl.create 10 let _ = Hashtbl.replace all dummy false; - Hashtbl.replace all freepool_fail_point1 false + Hashtbl.replace all freepool_fail_point0 false; + Hashtbl.replace all freepool_fail_point1 false; + Hashtbl.replace all freepool_fail_point2 false let get k = Hashtbl.find all k @@ -23,4 +25,7 @@ let t_of_string str : t option = if Hashtbl.mem all str then Some str else None let maybe_exn k = if get k then raise (FistPointHit k) -let maybe_lwt_fail k = if get k then Lwt.fail (FistPointHit k) else Lwt.return () +let maybe_lwt_fail k = + if get k then + Lwt.(Log.error "Causing Lwt thread failure due to fist point: %s" k >>= fun () -> Lwt.fail (FistPointHit k)) + else Lwt.return () From 5e65ebc86031af85773b1013e51b2d79d20dbae0 Mon Sep 17 00:00:00 2001 From: Jon Ludlam Date: Fri, 16 Oct 2015 15:06:49 +0100 Subject: [PATCH 10/18] xenvmd: Add a command line option to log to a file Signed-off-by: Jon Ludlam --- xenvmd/xenvmd.ml | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/xenvmd/xenvmd.ml b/xenvmd/xenvmd.ml index 8a3dcb3..ced2d3d 100644 --- a/xenvmd/xenvmd.ml +++ b/xenvmd/xenvmd.ml @@ -92,7 +92,9 @@ module Impl = struct module Fist = struct let set context point value = match Fist.t_of_string point with - | Some t -> return (Fist.set t value) + | Some t -> + debug "Setting fist point: %s=%b" point value >>= fun () -> + return (Fist.set t value) | None -> raise (Xenvm_interface.UnknownFistPoint point) let list context () = return (Fist.list ()) end @@ -127,8 +129,15 @@ let maybe_write_pid config = exit 1 end -let run port sock_path config = +let run port sock_path config log_filename = let t = + (match log_filename with + | Some f -> + Lwt_log.file ~mode:`Truncate ~file_name:f () >>= fun logger -> + Lwt_log.default := logger; + Lwt.return () + | None -> Lwt.return ()) + >>= fun () -> maybe_write_pid config >>= fun () -> info "Started with configuration: %s" (Sexplib.Sexp.to_string_hum (Config.Xenvmd.sexp_of_t config)) @@ -246,7 +255,7 @@ let daemonize config = end; Lwt_daemon.daemonize () -let main port sock_path config daemon = +let main port sock_path config daemon log = let open Config.Xenvmd in Sys.(set_signal sigterm (Signal_handle (fun _ -> exit (128+sigterm)))); let config = t_of_sexp (Sexplib.Sexp.load_sexp config) in @@ -255,9 +264,15 @@ let main port sock_path config daemon = Lwt_log.add_rule "*" Lwt_log.Debug; + let log_filename = + match log with + | Some f -> if Filename.is_relative f then (Some (Filename.concat (Unix.getcwd ()) f)) else (Some f) + | None -> None + in + if daemon then daemonize config; - run port sock_path config + run port sock_path config log_filename open Cmdliner @@ -286,13 +301,17 @@ let daemon = let doc = "Detach from the terminal and run as a daemon" in Arg.(value & flag & info ["daemon"] ~docv:"DAEMON" ~doc) +let log = + let doc = "Log to a file rather than syslog/stdout" in + Arg.(value & opt (some string) None & info [ "log" ] ~docv:"LOGFILE" ~doc) + let cmd = let doc = "Start a XenVM daemon" in let man = [ `S "EXAMPLES"; `P "TODO"; ] in - Term.(pure main $ port $ sock_path $ config $ daemon), + Term.(pure main $ port $ sock_path $ config $ daemon $ log), Term.info "xenvmd" ~version:"0.1" ~doc ~man let _ = From 99985ad8720938bf8f4ff4f2dfe64f8d49cb7a8c Mon Sep 17 00:00:00 2001 From: Jon Ludlam Date: Fri, 16 Oct 2015 15:07:35 +0100 Subject: [PATCH 11/18] Add a test to check for problems with FromLVM In particular, if an error occurs after the ExtendPool message has been written to the ring but before it has been removed from the journal, the same blocks will be sent next time the journal is replayed, leading to double allocation of the same blocks. This test case detects that problem. Signed-off-by: Jon Ludlam --- test/test.ml | 68 ++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 66 insertions(+), 2 deletions(-) diff --git a/test/test.ml b/test/test.ml index 7ea7b69..2325fb8 100644 --- a/test/test.ml +++ b/test/test.ml @@ -91,7 +91,7 @@ let with_xenvmd ?existing_vg ?(cleanup_vg=true) (f : string -> string -> 'a) = } in Sexplib.Sexp.to_string_hum (Config.Xenvmd.sexp_of_t config) |> file_of_string "test.xenvmd.conf"; - let _ = Lwt_preemptive.detach (fun () -> xenvmd [ "--config"; "./test.xenvmd.conf" ]) () in + let _ = Lwt_preemptive.detach (fun () -> xenvmd [ "--config"; "./test.xenvmd.conf"; "--log"; "xenvmd.log"]) () in wait_for_xenvmd_to_start vg; Xenvm_client.Rpc.uri := "file://local/services/xenvmd/" ^ vg; Xenvm_client.unix_domain_socket_path := "/tmp/xenvmd"; @@ -156,6 +156,12 @@ let wait_for_local_allocator_to_start host = else (Lwt_unix.sleep 0.1 >>= fun () -> retry ()) in retry () +let write_to_file thread filename = + thread + >>= fun log -> + Lwt_io.(with_file output filename + (fun chan -> write chan log)) + let lvchange_offline = "lvchange vg/lv --offline: check that we can activate volumes offline" >:: fun () -> @@ -551,7 +557,7 @@ let inparallel fns = Lwt_preemptive.detach (fun () -> ignore(fn ())) ()) fns) let la_extend_multi device = - "Extend an LV with the local allocator" >:: + "Extend 2 LVs on different hosts with the local allocator" >:: (fun () -> ignore(Lwt_main.run ( let lvname = "test2" in @@ -592,10 +598,68 @@ let la_extend_multi device = Lwt.choose [la_dead; (Lwt_unix.sleep 30.0 >>= fun () -> Lwt.fail Timeout)] ))) + +let la_extend_multi_fist device = + "Extend 2 LVs on different hosts with the local allocator" >:: + (fun () -> + ignore(Lwt_main.run ( + let lvname = "test4" in + let lvname2 = "test5" in + let la_thread_1 = start_local_allocator 1 [device] in + let la_thread_2 = start_local_allocator 2 [device] in + wait_for_local_allocator_to_start 1 >>= fun () -> + wait_for_local_allocator_to_start 2 >>= fun () -> + set_vg_info device vg 2; + inparallel [(fun () -> xenvm ["lvcreate"; "-n"; lvname; "-L"; "4"; vg]); + (fun () -> xenvm ["lvcreate"; "-n"; lvname2; "-L"; "4"; vg])] + >>= fun () -> + Client.Fist.set "freepool_fail_point2" true + >>= fun () -> + Client.Fist.list () + >>= fun list -> + List.iter (fun (x,b) -> Printf.printf "%s: %b\n%!" x b) list; + inparallel [(fun () -> xenvm ["lvextend"; "-L"; "132"; "--live"; Printf.sprintf "%s/%s" vg lvname]); + (fun () -> xenvm ~host:2 ["lvextend"; "-L"; "132"; "--live"; Printf.sprintf "%s/%s" vg lvname2])] + >>= fun () -> + Lwt_unix.sleep 10.0 + >>= fun () -> + Client.Fist.set "freepool_fail_point2" false + >>= fun () -> + inparallel [(fun () -> xenvm ["lvextend"; "-L"; "1000"; "--live"; Printf.sprintf "%s/%s" vg lvname]); + (fun () -> xenvm ~host:2 ["lvextend"; "-L"; "1000"; "--live"; Printf.sprintf "%s/%s" vg lvname2])] + >>= fun () -> + inparallel [(fun () -> xenvm ["host-disconnect"; vg; "host1"] |> ignore); + (fun () -> xenvm ["host-disconnect"; vg; "host2"] |> ignore)] + >>= fun () -> + Client.get_lv lvname >>= fun (myvg, lv) -> + Client.get_lv lvname2 >>= fun (_, lv2) -> + ignore(myvg,lv,lv2); + let size = Lvm.Lv.size_in_extents lv in + let size2 = Lvm.Lv.size_in_extents lv2 in + ignore(xenvm ["lvchange"; "-an"; Printf.sprintf "%s/%s" vg lvname]); + ignore(xenvm ["lvchange"; "-an"; Printf.sprintf "%s/%s" vg lvname2]); + Printf.printf "Sanity checking VG\n%!"; + Client.get () >>= fun myvg -> + Common.sanity_check myvg; + Printf.printf "final size=%Ld final_size2=%Ld\n%!" size size2; + assert_equal ~msg:"Unexpected final size" + ~printer:Int64.to_string 66L size; + assert_equal ~msg:"Unexpected final size" + ~printer:Int64.to_string 66L size2; + + let la_dead = la_thread_1 >>= fun _ -> la_thread_2 >>= fun _ -> Lwt.return () in + Lwt.choose [la_dead; (Lwt_unix.sleep 30.0 >>= fun () -> Lwt.fail Timeout)] + >>= fun () -> + write_to_file la_thread_1 "local_allocator.1.log" + >>= fun () -> + write_to_file la_thread_2 "local_allocator.2.log" + ))) + let local_allocator_suite device = "Commands which require the local allocator" >::: [ (* la_start device; la_extend device;*) la_extend_multi device; + la_extend_multi_fist device; ] let _ = From 572046068177e67d6a7b2dce8b7a32c15a9531a5 Mon Sep 17 00:00:00 2001 From: Jon Ludlam Date: Mon, 19 Oct 2015 16:12:52 +0100 Subject: [PATCH 12/18] Define fist points in the IDL Signed-off-by: Jon Ludlam --- idl/xenvm_interface.ml | 9 +++++++-- test/test.ml | 6 +++--- xenvmd/fist.ml | 32 +++++++++++--------------------- xenvmd/fist.mli | 10 ++-------- xenvmd/freepool.ml | 4 ++-- xenvmd/xenvmd.ml | 7 +------ 6 files changed, 26 insertions(+), 42 deletions(-) diff --git a/idl/xenvm_interface.ml b/idl/xenvm_interface.ml index 839001e..7e6d967 100644 --- a/idl/xenvm_interface.ml +++ b/idl/xenvm_interface.ml @@ -59,6 +59,11 @@ type host = { freeExtents: int64; } +type fist = + | FreePool0 + | FreePool1 + | FreePool2 + module Host = struct external create: name:string -> unit = "" @@ -81,6 +86,6 @@ module Host = struct end module Fist = struct - external set : string -> bool -> unit = "" - external list : unit -> (string * bool) list = "" + external set : fist -> bool -> unit = "" + external list : unit -> (fist * bool) list = "" end diff --git a/test/test.ml b/test/test.ml index 2325fb8..bb7923d 100644 --- a/test/test.ml +++ b/test/test.ml @@ -613,17 +613,17 @@ let la_extend_multi_fist device = inparallel [(fun () -> xenvm ["lvcreate"; "-n"; lvname; "-L"; "4"; vg]); (fun () -> xenvm ["lvcreate"; "-n"; lvname2; "-L"; "4"; vg])] >>= fun () -> - Client.Fist.set "freepool_fail_point2" true + Client.Fist.set Xenvm_interface.FreePool2 true >>= fun () -> Client.Fist.list () >>= fun list -> - List.iter (fun (x,b) -> Printf.printf "%s: %b\n%!" x b) list; + List.iter (fun (x,b) -> Printf.printf "%s: %b\n%!" (Rpc.to_string (Xenvm_interface.rpc_of_fist x)) b) list; inparallel [(fun () -> xenvm ["lvextend"; "-L"; "132"; "--live"; Printf.sprintf "%s/%s" vg lvname]); (fun () -> xenvm ~host:2 ["lvextend"; "-L"; "132"; "--live"; Printf.sprintf "%s/%s" vg lvname2])] >>= fun () -> Lwt_unix.sleep 10.0 >>= fun () -> - Client.Fist.set "freepool_fail_point2" false + Client.Fist.set Xenvm_interface.FreePool2 false >>= fun () -> inparallel [(fun () -> xenvm ["lvextend"; "-L"; "1000"; "--live"; Printf.sprintf "%s/%s" vg lvname]); (fun () -> xenvm ~host:2 ["lvextend"; "-L"; "1000"; "--live"; Printf.sprintf "%s/%s" vg lvname2])] diff --git a/xenvmd/fist.ml b/xenvmd/fist.ml index 3d324df..f38df0c 100644 --- a/xenvmd/fist.ml +++ b/xenvmd/fist.ml @@ -1,31 +1,21 @@ (* Fist points for testing *) -type t = string -exception FistPointHit of string - -let dummy = "dummy" -let freepool_fail_point0 = "freepool_fail_point0" -let freepool_fail_point1 = "freepool_fail_point1" -let freepool_fail_point2 = "freepool_fail_point2" +type t = Xenvm_interface.fist -let all = Hashtbl.create 10 +exception FistPointHit of string -let _ = - Hashtbl.replace all dummy false; - Hashtbl.replace all freepool_fail_point0 false; - Hashtbl.replace all freepool_fail_point1 false; - Hashtbl.replace all freepool_fail_point2 false - +let string_of_t t = Rpc.to_string (Xenvm_interface.rpc_of_fist t) -let get k = Hashtbl.find all k -let set k v = Hashtbl.replace all k v -let list () = Hashtbl.fold (fun k v acc -> (k,v)::acc) all [] +let all : (t * bool) list ref = ref [] -let t_of_string str : t option = - if Hashtbl.mem all str then Some str else None +let get k = try List.assoc k !all with _ -> false +let set k v = all := (k,v) :: (List.filter (fun (k',_) -> k' <> k) !all) +let list () = !all -let maybe_exn k = if get k then raise (FistPointHit k) +let maybe_exn k = if get k then raise (FistPointHit (string_of_t k)) let maybe_lwt_fail k = if get k then - Lwt.(Log.error "Causing Lwt thread failure due to fist point: %s" k >>= fun () -> Lwt.fail (FistPointHit k)) + let str = string_of_t k in + Lwt.(Log.error "Causing Lwt thread failure due to fist point: %s" str + >>= fun () -> Lwt.fail (FistPointHit str)) else Lwt.return () diff --git a/xenvmd/fist.mli b/xenvmd/fist.mli index 90d7476..4a4c498 100644 --- a/xenvmd/fist.mli +++ b/xenvmd/fist.mli @@ -1,14 +1,8 @@ -type t - -val dummy : t -val freepool_fail_point0 : t -val freepool_fail_point1 : t -val freepool_fail_point2 : t +type t = Xenvm_interface.fist val get : t -> bool val set : t -> bool -> unit -val list : unit -> (string * bool) list -val t_of_string : string -> t option +val list : unit -> (t * bool) list val maybe_exn : t -> unit val maybe_lwt_fail : t -> unit Lwt.t diff --git a/xenvmd/freepool.ml b/xenvmd/freepool.ml index eb2a0cb..1d05db8 100644 --- a/xenvmd/freepool.ml +++ b/xenvmd/freepool.ml @@ -103,7 +103,7 @@ let perform_expand_free ef connected_host = >>= fun () -> Lwt_list.iter_s (fun msg -> debug "%s" msg) !msgs >>= fun () -> - Fist.maybe_lwt_fail Fist.freepool_fail_point1 + Fist.maybe_lwt_fail Xenvm_interface.FreePool1 >>= fun () -> read (fun vg -> let current_allocation = allocation_of_lv vg connected_host.Hostdb.free_LV_uuid in @@ -115,7 +115,7 @@ let perform_expand_free ef connected_host = >>= fun pos -> Rings.FromLVM.p_advance connected_host.Hostdb.from_LVM pos >>= fun result -> - Fist.maybe_lwt_fail Fist.freepool_fail_point2 + Fist.maybe_lwt_fail Xenvm_interface.FreePool2 >>= fun () -> Lwt.return result diff --git a/xenvmd/xenvmd.ml b/xenvmd/xenvmd.ml index ced2d3d..7d0c753 100644 --- a/xenvmd/xenvmd.ml +++ b/xenvmd/xenvmd.ml @@ -90,12 +90,7 @@ module Impl = struct end module Fist = struct - let set context point value = - match Fist.t_of_string point with - | Some t -> - debug "Setting fist point: %s=%b" point value >>= fun () -> - return (Fist.set t value) - | None -> raise (Xenvm_interface.UnknownFistPoint point) + let set context point value = return (Fist.set point value) let list context () = return (Fist.list ()) end end From 7025ac8230e2ce35f8b960fc67f9becd20d6e911 Mon Sep 17 00:00:00 2001 From: Jon Ludlam Date: Wed, 21 Oct 2015 15:18:36 +0100 Subject: [PATCH 13/18] Add option to local allocator to log to a file Signed-off-by: Jon Ludlam --- xenvm-local-allocator/local_allocator.ml | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/xenvm-local-allocator/local_allocator.ml b/xenvm-local-allocator/local_allocator.ml index 6db232f..c9f6b23 100644 --- a/xenvm-local-allocator/local_allocator.ml +++ b/xenvm-local-allocator/local_allocator.ml @@ -197,7 +197,7 @@ let targets_of x = >>= fun () -> return (`Error `Retry) -let main mock_dm config daemon socket journal fromLVM toLVM = +let main mock_dm config daemon socket journal fromLVM toLVM log = let open Config.Local_allocator in let config = t_of_sexp (Sexplib.Sexp.load_sexp config) in let config = { config with @@ -207,6 +207,12 @@ let main mock_dm config daemon socket journal fromLVM toLVM = fromLVM = (match fromLVM with None -> config.fromLVM | Some x -> x); } in + let log_filename = + match log with + | Some f -> if Filename.is_relative f then (Some (Filename.concat (Unix.getcwd ()) f)) else (Some f) + | None -> None + in + Lwt_log.add_rule "*" Lwt_log.Debug; Lwt_log.default := Lwt_log.channel ~close_mode:`Keep ~channel:Lwt_io.stdout (); begin match mock_dm with @@ -230,6 +236,15 @@ let main mock_dm config daemon socket journal fromLVM toLVM = in let t = + (match log_filename with + | Some f -> + Lwt_log.file ~mode:`Truncate ~file_name:f () >>= fun logger -> + Lwt_log.default := logger; + Lwt.return () + | None -> + Lwt.return () + ) + >>= fun () -> info "Starting local allocator thread" >>= fun () -> debug "Loaded configuration: %s" (Sexplib.Sexp.to_string_hum (sexp_of_t config)) >>= fun () -> @@ -504,11 +519,15 @@ let mock_dm_arg = let doc = "Enable mock interfaces on device mapper." in Arg.(value & opt (some string) None & info ["mock-devmapper"] ~doc) +let log = + let doc = "Log to a file rather than syslog/stdout" in + Arg.(value & opt (some string) None & info [ "log" ] ~docv:"LOGFILE" ~doc) + let () = Sys.(set_signal sigpipe Signal_ignore); Sys.(set_signal sigterm (Signal_handle (fun _ -> exit (128+sigterm)))); - let t = Term.(pure main $ mock_dm_arg $ config $ daemon $ socket $ journal $ fromLVM $ toLVM) in + let t = Term.(pure main $ mock_dm_arg $ config $ daemon $ socket $ journal $ fromLVM $ toLVM $ log) in match Term.eval (t, info) with | `Error _ -> exit 1 | _ -> exit 0 From f0d3c83187f725d8d494c1305ef62c12a93c6b94 Mon Sep 17 00:00:00 2001 From: Jon Ludlam Date: Wed, 21 Oct 2015 15:19:38 +0100 Subject: [PATCH 14/18] Log to a file in the test code, and fix an incorrect test condition. Signed-off-by: Jon Ludlam --- test/test.ml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/test.ml b/test/test.ml index bb7923d..1731a97 100644 --- a/test/test.ml +++ b/test/test.ml @@ -144,7 +144,7 @@ let start_local_allocator host devices = Sexplib.Sexp.to_string_hum (Config.Local_allocator.sexp_of_t config) |> file_of_string config_file; ignore(xenvm [ "host-connect"; vg; hostname]); - let la_thread = Lwt_preemptive.detach (fun () -> local_allocator ~host [ "--config"; config_file ]) () in + let la_thread = Lwt_preemptive.detach (fun () -> local_allocator ~host [ "--config"; config_file; "--log"; Printf.sprintf "la.%d.log" host ]) () in la_thread let wait_for_local_allocator_to_start host = @@ -643,9 +643,9 @@ let la_extend_multi_fist device = Common.sanity_check myvg; Printf.printf "final size=%Ld final_size2=%Ld\n%!" size size2; assert_equal ~msg:"Unexpected final size" - ~printer:Int64.to_string 66L size; + ~printer:Int64.to_string 250L size; assert_equal ~msg:"Unexpected final size" - ~printer:Int64.to_string 66L size2; + ~printer:Int64.to_string 250L size2; let la_dead = la_thread_1 >>= fun _ -> la_thread_2 >>= fun _ -> Lwt.return () in Lwt.choose [la_dead; (Lwt_unix.sleep 30.0 >>= fun () -> Lwt.fail Timeout)] From bac96d22a863c2654d170bce57457c02ee9fde88 Mon Sep 17 00:00:00 2001 From: Jon Ludlam Date: Wed, 21 Oct 2015 15:24:03 +0100 Subject: [PATCH 15/18] CA-186251: Add a generation to free pool allocation messages This allows the local allocator to ignore any resends of the same set of blocks. Signed-off-by: Jon Ludlam --- idl/freeAllocation.ml | 5 +- xenvm-local-allocator/local_allocator.ml | 20 ++++-- xenvmd/freepool.ml | 80 ++++++++++++++++++------ xenvmd/freepool.mli | 2 + xenvmd/host.ml | 12 +--- 5 files changed, 85 insertions(+), 34 deletions(-) diff --git a/idl/freeAllocation.ml b/idl/freeAllocation.ml index aa22215..501b2c0 100644 --- a/idl/freeAllocation.ml +++ b/idl/freeAllocation.ml @@ -1,7 +1,10 @@ open Sexplib.Std module T = struct - type t = Lvm.Pv.Allocator.t with sexp + type t = { + blocks : Lvm.Pv.Allocator.t; + generation : int + } with sexp (** Physical blocks which should be included in the free pool *) end diff --git a/xenvm-local-allocator/local_allocator.ml b/xenvm-local-allocator/local_allocator.ml index c9f6b23..c287cf5 100644 --- a/xenvm-local-allocator/local_allocator.ml +++ b/xenvm-local-allocator/local_allocator.ml @@ -40,13 +40,23 @@ module FreePool = struct let m = Lwt_mutex.create () let c = Lwt_condition.create () let free = ref [] + let generation = ref (-1) - let add extents = + let add gen extents = Lwt_mutex.with_lock m (fun () -> - free := Lvm.Pv.Allocator.merge !free extents; - Lwt_condition.broadcast c (); - return () + begin + if gen <= !generation + then + (* Ignore updates we've already seen *) + () + else begin + free := Lvm.Pv.Allocator.merge !free extents; + generation := gen + end + end; + Lwt_condition.broadcast c (); + return () ) (* Allocate up to [nr_extents]. Blocks if there is no space free. Can return @@ -129,7 +139,7 @@ module FreePool = struct (fun t -> sexp_of_t t |> Sexplib.Sexp.to_string_hum |> debug "FreePool: received new allocation: %s" >>= fun () -> - add t + add t.FreeAllocation.generation t.FreeAllocation.blocks ) ts >>= fun () -> FromLVM.c_advance from_lvm pos diff --git a/xenvmd/freepool.ml b/xenvmd/freepool.ml index 1d05db8..bd71c71 100644 --- a/xenvmd/freepool.ml +++ b/xenvmd/freepool.ml @@ -28,6 +28,21 @@ let tag_of_generation n = | `Ok x -> x | `Error (`Msg y) -> failwith y +let generation_of_lv lv = + List.fold_left + (fun acc tag -> + match generation_of_tag tag with + | None -> acc + | x -> x) None lv.Lvm.Lv.tags + +let create name size = + let creation_host = Unix.gethostname () in + let creation_time = Unix.gettimeofday () |> Int64.of_float in + Vg_io.write (fun vg -> + Lvm.Vg.create vg name size ~creation_host ~creation_time + ~tags:[tag_of_generation 1 |> Lvm.Name.Tag.to_string ] + ) + let perform_expand_free ef connected_host = let open Journal.Op in sector_size >>= fun sector_size -> @@ -67,12 +82,7 @@ let perform_expand_free ef connected_host = let lv = Lvm.Vg.LVs.find connected_host.Hostdb.free_LV_uuid vg.Lvm.Vg.lvs in (* Not_found here caught by the try-catch block *) let extent_size = vg.Lvm.Vg.extent_size in (* in sectors *) let extent_size_mib = Int64.(div (mul extent_size (of_int sector_size)) (mul 1024L 1024L)) in - let old_gen = List.fold_left - (fun acc tag -> - match generation_of_tag tag with - | None -> acc - | x -> x) None lv.Lvm.Lv.tags - in + let old_gen = generation_of_lv lv in let allocation = match Lvm.Pv.Allocator.find vg.Lvm.Vg.free_space Int64.(div ef.extra_size extent_size_mib) with | `Ok allocation -> allocation @@ -106,12 +116,24 @@ let perform_expand_free ef connected_host = Fist.maybe_lwt_fail Xenvm_interface.FreePool1 >>= fun () -> read (fun vg -> + let lv = Lvm.Vg.LVs.find connected_host.Hostdb.free_LV_uuid vg.Lvm.Vg.lvs in let current_allocation = allocation_of_lv vg connected_host.Hostdb.free_LV_uuid in let old_allocation = ef.old_allocation in let new_extents = Lvm.Pv.Allocator.sub current_allocation old_allocation in - Lwt.return new_extents) - >>= fun allocation -> - Rings.FromLVM.push connected_host.Hostdb.from_LVM allocation + let generation = generation_of_lv lv in + match generation with + | None -> (* This really should never happen *) + error "Expecting a generation count in the tags of LV: %s" connected_host.Hostdb.free_LV + >>= fun () -> + Lwt.fail (Failure "Generation count missing") + | Some g -> + Lwt.return (new_extents, g)) + >>= fun (allocation, generation) -> + let op = { + FreeAllocation.blocks = allocation; + FreeAllocation.generation = generation; + } in + Rings.FromLVM.push connected_host.Hostdb.from_LVM op >>= fun pos -> Rings.FromLVM.p_advance connected_host.Hostdb.from_LVM pos >>= fun result -> @@ -162,15 +184,42 @@ let shutdown () = | None -> return () -let resend_free_volume_to connected_host = +let send_allocation_to connected_host = fatal_error "resend_free_volumes unable to read LVM metadata" ( read (fun x -> return (`Ok x)) ) - >>= fun lvm -> + >>= fun vg -> (* XXX: need to lock the host somehow. Ideally we would still service other queues while one host is locked. *) + let from_lvm = connected_host.Hostdb.from_LVM in let freeid = connected_host.Hostdb.free_LV_uuid in let freename = connected_host.Hostdb.free_LV in + + fatal_error "resend_free_volumes" + ( match try Some(Lvm.Vg.LVs.find freeid vg.Lvm.Vg.lvs) with _ -> None with + | Some lv -> return (`Ok (Lvm.Lv.to_allocation lv)) + | None -> return (`Error (`Msg (Printf.sprintf "Failed to find LV %s" freename))) ) + >>= fun allocation -> + let lv = Lvm.Vg.LVs.find freeid vg.Lvm.Vg.lvs in + (match generation_of_lv lv with + | Some g -> Lwt.return g + | None -> + error "Expecting a generation count in the tags of LV: %s" connected_host.Hostdb.free_LV + >>= fun () -> + Lwt.fail (Failure "Generation count missing") + ) + >>= fun generation -> + let op = { + FreeAllocation.blocks = allocation; + FreeAllocation.generation = generation; + } in + Rings.FromLVM.push from_lvm op + >>= fun pos -> + Rings.FromLVM.p_advance from_lvm pos + + +let resend_free_volume_to connected_host = + let from_lvm = connected_host.Hostdb.from_LVM in Rings.FromLVM.p_state from_lvm >>= begin function | `Running -> return () @@ -185,14 +234,7 @@ let resend_free_volume_to connected_host = | `Running -> return () in wait () >>= fun () -> - fatal_error "resend_free_volumes" - ( match try Some(Lvm.Vg.LVs.find freeid lvm.Lvm.Vg.lvs) with _ -> None with - | Some lv -> return (`Ok (Lvm.Lv.to_allocation lv)) - | None -> return (`Error (`Msg (Printf.sprintf "Failed to find LV %s" freename))) ) - >>= fun allocation -> - Rings.FromLVM.push from_lvm allocation - >>= fun pos -> - Rings.FromLVM.p_advance from_lvm pos + send_allocation_to connected_host end let resend_free_volumes () = diff --git a/xenvmd/freepool.mli b/xenvmd/freepool.mli index dd8252d..52b43e9 100644 --- a/xenvmd/freepool.mli +++ b/xenvmd/freepool.mli @@ -1,6 +1,8 @@ val start : string -> unit Lwt.t val shutdown : unit -> unit Lwt.t val top_up_host : Config.Xenvmd.t -> string -> Hostdb.connected_host -> bool Lwt.t +val send_allocation_to : Hostdb.connected_host -> unit Lwt.t val resend_free_volume_to : Hostdb.connected_host -> unit Lwt.t val resend_free_volumes : unit -> unit Lwt.t val top_up_free_volumes : Config.Xenvmd.t -> unit Lwt.t +val create : string -> int64 -> Lvm.Vg.metadata Lwt.t diff --git a/xenvmd/host.ml b/xenvmd/host.ml index e9407ac..407202f 100644 --- a/xenvmd/host.ml +++ b/xenvmd/host.ml @@ -127,9 +127,8 @@ let create name = Vg_io.Volume.disconnect disk >>= fun () -> (* Create the freeLVM LV at the end - we can use the existence of this as a flag to show that we've finished host creation *) - Vg_io.write (fun vg -> - Lvm.Vg.create vg freeLVM size ~creation_host ~creation_time - ) >>= fun _ -> + Freepool.create freeLVM size + >>= fun _ -> Vg_io.sync () end @@ -204,12 +203,7 @@ let connect config name = connected_host.Hostdb.state <- Xenvm_interface.Resending_free_blocks; debug "The Rings.FromLVM queue was already suspended: resending the free blocks" >>= fun () -> - let allocation = Lvm.Lv.to_allocation (Vg_io.Volume.metadata_of freeLVM_id) in - Rings.FromLVM.push fromLVM_q allocation - >>= fun pos -> - Rings.FromLVM.p_advance fromLVM_q pos - >>= fun () -> - debug "Free blocks pushed" + Freepool.send_allocation_to connected_host end else begin debug "The Rings.FromLVM queue was running: no need to resend the free blocks" end ) From a2ac634e93f10a42c59168da3978feb6cb1b2901 Mon Sep 17 00:00:00 2001 From: Jon Ludlam Date: Fri, 23 Oct 2015 14:58:08 +0100 Subject: [PATCH 16/18] Append, don't truncate existing logs Signed-off-by: Jon Ludlam --- xenvm-local-allocator/local_allocator.ml | 2 +- xenvmd/xenvmd.ml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/xenvm-local-allocator/local_allocator.ml b/xenvm-local-allocator/local_allocator.ml index c287cf5..c1a16ba 100644 --- a/xenvm-local-allocator/local_allocator.ml +++ b/xenvm-local-allocator/local_allocator.ml @@ -248,7 +248,7 @@ let main mock_dm config daemon socket journal fromLVM toLVM log = let t = (match log_filename with | Some f -> - Lwt_log.file ~mode:`Truncate ~file_name:f () >>= fun logger -> + Lwt_log.file ~mode:`Append ~file_name:f () >>= fun logger -> Lwt_log.default := logger; Lwt.return () | None -> diff --git a/xenvmd/xenvmd.ml b/xenvmd/xenvmd.ml index 7d0c753..f79e311 100644 --- a/xenvmd/xenvmd.ml +++ b/xenvmd/xenvmd.ml @@ -128,7 +128,7 @@ let run port sock_path config log_filename = let t = (match log_filename with | Some f -> - Lwt_log.file ~mode:`Truncate ~file_name:f () >>= fun logger -> + Lwt_log.file ~mode:`Append ~file_name:f () >>= fun logger -> Lwt_log.default := logger; Lwt.return () | None -> Lwt.return ()) From 816a3b70032148cb6d7fd867991aa5985ec5b7cd Mon Sep 17 00:00:00 2001 From: Jon Ludlam Date: Fri, 23 Oct 2015 17:23:25 +0100 Subject: [PATCH 17/18] test: make sure the LV is activated on the correct host Signed-off-by: Jon Ludlam --- test/test.ml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/test/test.ml b/test/test.ml index 1731a97..fda1fe8 100644 --- a/test/test.ml +++ b/test/test.ml @@ -127,6 +127,8 @@ let la_has_started host = Lwt_io.close oc >>= fun () -> Lwt.return true) (fun _ -> + Lwt_unix.close s + >>= fun () -> Lwt.return false) let start_local_allocator host devices = @@ -611,7 +613,10 @@ let la_extend_multi_fist device = wait_for_local_allocator_to_start 2 >>= fun () -> set_vg_info device vg 2; inparallel [(fun () -> xenvm ["lvcreate"; "-n"; lvname; "-L"; "4"; vg]); - (fun () -> xenvm ["lvcreate"; "-n"; lvname2; "-L"; "4"; vg])] + (fun () -> xenvm ["lvcreate"; "-an"; "-n"; lvname2; "-L"; "4"; vg])] + >>= fun () -> + inparallel [(fun () -> xenvm ~host:1 ["lvchange"; "-ay"; Printf.sprintf "%s/%s" vg lvname]); + (fun () -> xenvm ~host:2 ["lvchange"; "-ay"; Printf.sprintf "%s/%s" vg lvname2])] >>= fun () -> Client.Fist.set Xenvm_interface.FreePool2 true >>= fun () -> From cbee74146a320e68dc7cd4e61e00c5c53e361294 Mon Sep 17 00:00:00 2001 From: Jon Ludlam Date: Fri, 23 Oct 2015 17:23:52 +0100 Subject: [PATCH 18/18] Remove redundant file Signed-off-by: Jon Ludlam --- remoteConfig | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 remoteConfig diff --git a/remoteConfig b/remoteConfig deleted file mode 100644 index d18fe16..0000000 --- a/remoteConfig +++ /dev/null @@ -1,7 +0,0 @@ -( - (listenPort 4000) - (host_allocation_quantum 128) - (host_low_water_mark 8) - (vg djstest) - (devices (/dev/loop0)) -)