diff --git a/xenvmd/xenvmd.ml b/xenvmd/xenvmd.ml index ab6bd90..f31f780 100644 --- a/xenvmd/xenvmd.ml +++ b/xenvmd/xenvmd.ml @@ -253,58 +253,63 @@ module VolumeManager = struct let toLVM = toLVM name in let fromLVM = fromLVM name in let freeLVM = freeLVM name in - ( try - Lwt.return (Lvm.Vg.LVs.find_by_name freeLVM (Vg_IO.metadata_of vg).Lvm.Vg.lvs).Lvm.Lv.id - with _ -> - fail Xenvm_interface.HostNotCreated ) >>= fun freeLVMid -> - ( match Vg_IO.find vg toLVM with - | Some lv -> return lv - | None -> assert false ) >>= fun v -> - Vg_IO.Volume.connect v - >>= function - | `Error _ -> fail (Failure (Printf.sprintf "Failed to open %s" toLVM)) - | `Ok disk -> - ToLVM.attach ~name ~disk () - >>= fun to_LVM -> - ToLVM.state to_LVM - >>= fun state -> - debug "ToLVM queue is currently %s" (match state with `Running -> "Running" | `Suspended -> "Suspended"); - ToLVM.resume to_LVM - >>= fun () -> - ( match Vg_IO.find vg fromLVM with - | Some lv -> return lv - | None -> assert false ) >>= fun v -> - Vg_IO.Volume.connect v - >>= function - | `Error _ -> fail (Failure (Printf.sprintf "Failed to open %s" fromLVM)) - | `Ok disk -> - FromLVM.attach ~name ~disk () - >>= fun (initial_state, from_LVM) -> - ( if initial_state = `Suspended then begin - debug "The FromLVM queue was already suspended: resending the free blocks"; - ( match Vg_IO.find vg freeLVM with - | Some lv -> return lv - | None -> assert false ) >>= fun lv -> - let allocation = Lvm.Lv.to_allocation (Vg_IO.Volume.metadata_of lv) in - FromLVM.push from_LVM allocation - >>= fun pos -> - FromLVM.advance from_LVM pos - >>= fun () -> - debug "Free blocks pushed"; + if List.mem_assoc name !to_LVMs then begin + info "Host-specific volumes (%s, %s, %s) already connected" toLVM fromLVM freeLVM; return () end else begin - debug "The FromLVM queue was running: no need to resend the free blocks"; + ( try + Lwt.return (Lvm.Vg.LVs.find_by_name freeLVM (Vg_IO.metadata_of vg).Lvm.Vg.lvs).Lvm.Lv.id + with _ -> + fail Xenvm_interface.HostNotCreated ) >>= fun freeLVMid -> + ( match Vg_IO.find vg toLVM with + | Some lv -> return lv + | None -> assert false ) >>= fun v -> + Vg_IO.Volume.connect v + >>= function + | `Error _ -> fail (Failure (Printf.sprintf "Failed to open %s" toLVM)) + | `Ok disk -> + ToLVM.attach ~name ~disk () + >>= fun to_LVM -> + ToLVM.state to_LVM + >>= fun state -> + debug "ToLVM queue is currently %s" (match state with `Running -> "Running" | `Suspended -> "Suspended"); + ToLVM.resume to_LVM + >>= fun () -> + ( match Vg_IO.find vg fromLVM with + | Some lv -> return lv + | None -> assert false ) >>= fun v -> + Vg_IO.Volume.connect v + >>= function + | `Error _ -> fail (Failure (Printf.sprintf "Failed to open %s" fromLVM)) + | `Ok disk -> + FromLVM.attach ~name ~disk () + >>= fun (initial_state, from_LVM) -> + ( if initial_state = `Suspended then begin + debug "The FromLVM queue was already suspended: resending the free blocks"; + ( match Vg_IO.find vg freeLVM with + | Some lv -> return lv + | None -> assert false ) >>= fun lv -> + let allocation = Lvm.Lv.to_allocation (Vg_IO.Volume.metadata_of lv) in + FromLVM.push from_LVM allocation + >>= fun pos -> + FromLVM.advance from_LVM pos + >>= fun () -> + debug "Free blocks pushed"; + return () + end else begin + debug "The FromLVM queue was running: no need to resend the free blocks"; + return () + end ) + >>= fun () -> + debug "querying state"; + FromLVM.state from_LVM + >>= fun state -> + debug "FromLVM queue is currently %s" (match state with `Running -> "Running" | `Suspended -> "Suspended"); + to_LVMs := (name, to_LVM) :: !to_LVMs; + from_LVMs := (name, from_LVM) :: !from_LVMs; + free_LVs := (name, (freeLVM,freeLVMid)) :: !free_LVs; return () - end ) - >>= fun () -> - debug "querying state"; - FromLVM.state from_LVM - >>= fun state -> - debug "FromLVM queue is currently %s" (match state with `Running -> "Running" | `Suspended -> "Suspended"); - to_LVMs := (name, to_LVM) :: !to_LVMs; - from_LVMs := (name, from_LVM) :: !from_LVMs; - free_LVs := (name, (freeLVM,freeLVMid)) :: !free_LVs; - return () + end (* Hold this mutex when actively flushing from the ToLVM queues *) let flush_m = Lwt_mutex.create ()