diff --git a/xenvm-local-allocator/local_allocator.ml b/xenvm-local-allocator/local_allocator.ml index 6d7c753..1d35975 100644 --- a/xenvm-local-allocator/local_allocator.ml +++ b/xenvm-local-allocator/local_allocator.ml @@ -71,6 +71,8 @@ module FromLVM = struct module R = Shared_block.Ring.Make(Log)(Vg_IO.Volume)(FreeAllocation) let rec attach ~disk () = fatal_error "attaching to FromLVM queue" (R.Consumer.attach ~disk ()) + let state t = + fatal_error "querying FromLVM state" (R.Consumer.state t) let rec suspend t = try_forever "FromLVM.suspend" (fun () -> R.Consumer.suspend t) >>= fun x -> @@ -81,6 +83,7 @@ module FromLVM = struct >>= function | `Suspended -> return () | `Running -> + debug "FromLVM.suspend got `Running; sleeping"; Lwt_unix.sleep 5. >>= fun () -> wait () in @@ -94,6 +97,7 @@ module FromLVM = struct fatal_error "reading state of FromLVM" (R.Consumer.state t) >>= function | `Suspended -> + debug "FromLVM.resume got `Suspended; sleeping"; Lwt_unix.sleep 5. >>= fun () -> wait () @@ -116,6 +120,7 @@ module ToLVM = struct >>= function | `Ok x -> return x | _ -> + debug "ToLVM.attach got `Error; sleeping"; Lwt_unix.sleep 5. >>= fun () -> attach ~disk () @@ -123,6 +128,7 @@ module ToLVM = struct fatal_error "querying ToLVM state" (R.Producer.state t) let rec push t item = R.Producer.push ~t ~item () >>= function | `Error (`Retry | `Suspended) -> + debug "ToLVM.push got `Error; sleeping"; Lwt_unix.sleep 5. >>= fun () -> push t item @@ -173,6 +179,9 @@ module FreePool = struct | `Ok disk -> FromLVM.attach ~disk () >>= fun from_lvm -> + FromLVM.state from_lvm + >>= fun state -> + debug "FromLVM queue is currently %s" (match state with `Running -> "Running" | `Suspended -> "Suspended"); (* Suspend and resume the queue: the producer will resend us all the free blocks on resume. *) @@ -209,7 +218,7 @@ module FreePool = struct FromLVM.advance from_lvm pos >>= fun () -> loop_forever () in - loop_forever () + return loop_forever end module Op = struct @@ -300,6 +309,9 @@ let main config daemon socket journal fromLVM toLVM = | `Ok disk -> ToLVM.attach ~disk () >>= fun tolvm -> + ToLVM.state tolvm + >>= fun state -> + debug "ToLVM queue is currently %s" (match state with `Running -> "Running" | `Suspended -> "Suspended"); let extent_size = metadata.Lvm.Vg.extent_size in (* in sectors *) let extent_size_mib = Int64.(div (mul extent_size (of_int sector_size)) (mul 1024L 1024L)) in @@ -314,6 +326,7 @@ let main config daemon socket journal fromLVM toLVM = info "The ToLVM queue has been suspended. We will acknowledge and exit"; exit 0 | `Running -> + debug "The ToLVM queue is still running"; Lwt_unix.sleep 5. >>= fun () -> wait_for_shutdown_forever () in @@ -362,7 +375,9 @@ let main config daemon socket journal fromLVM toLVM = J.start device perform >>|= fun j -> - let (_: unit Lwt.t) = FreePool.start config vg in + FreePool.start config vg + >>= fun forever_fun -> + let (_: unit Lwt.t) = forever_fun () in let (_: unit Lwt.t) = wait_for_shutdown_forever () in (* Called to extend a single device. This function decides what needs to be diff --git a/xenvmd/xenvmd.ml b/xenvmd/xenvmd.ml index 50622da..edea72f 100644 --- a/xenvmd/xenvmd.ml +++ b/xenvmd/xenvmd.ml @@ -42,7 +42,7 @@ module ToLVM = struct module R = Shared_block.Ring.Make(Log)(Vg_IO.Volume)(ExpandVolume) let create ~disk () = fatal_error "creating ToLVM queue" (R.Producer.create ~disk ()) - let rec attach ~disk () = + let attach ~disk () = fatal_error "attaching to ToLVM queue" (R.Consumer.attach ~disk ()) let state t = fatal_error "querying ToLVM state" (R.Consumer.state t) @@ -52,6 +52,7 @@ module ToLVM = struct | `Error (`Msg msg) -> fatal_error_t msg | `Error `Suspended -> return () | `Error `Retry -> + debug "ToLVM.suspend got `Retry; sleeping"; Lwt_unix.sleep 5. >>= fun () -> suspend t @@ -61,6 +62,7 @@ module ToLVM = struct >>= function | `Error _ -> fatal_error_t "reading state of ToLVM" | `Ok `Running -> + debug "ToLVM.suspend got `Running; sleeping"; Lwt_unix.sleep 5. >>= fun () -> wait () @@ -71,6 +73,7 @@ module ToLVM = struct >>= function | `Error (`Msg msg) -> fatal_error_t msg | `Error `Retry -> + debug "ToLVM.resume got `Retry; sleeping"; Lwt_unix.sleep 5. >>= fun () -> resume t @@ -81,6 +84,7 @@ module ToLVM = struct >>= function | `Error _ -> fatal_error_t "reading state of ToLVM" | `Ok `Suspended -> + debug "ToLVM.resume got `Suspended; sleeping"; Lwt_unix.sleep 5. >>= fun () -> wait () @@ -99,16 +103,23 @@ module FromLVM = struct module R = Shared_block.Ring.Make(Log)(Vg_IO.Volume)(FreeAllocation) let create ~disk () = fatal_error "FromLVM.create" (R.Producer.create ~disk ()) - let attach ~disk () = - fatal_error "FromLVM.attach" (R.Producer.attach ~disk ()) + let rec attach ~disk () = R.Producer.attach ~disk () >>= function + | `Error `Suspended -> + debug "FromLVM.attach got `Suspended; sleeping"; + Lwt_unix.sleep 5. + >>= fun () -> + attach ~disk () + | x -> fatal_error "FromLVM.attach" (return x) let state t = fatal_error "FromLVM.state" (R.Producer.state t) let rec push t item = R.Producer.push ~t ~item () >>= function | `Error (`Msg x) -> fatal_error_t (Printf.sprintf "Error pushing to the FromLVM queue: %s" x) | `Error `Retry -> + debug "FromLVM.push got `Retry; sleeping"; Lwt_unix.sleep 5. >>= fun () -> push t item | `Error `Suspended -> + debug "FromLVM.push got `Suspended; sleeping"; Lwt_unix.sleep 5. >>= fun () -> push t item @@ -235,6 +246,9 @@ module VolumeManager = struct | `Ok disk -> ToLVM.attach ~disk () >>= fun to_LVM -> + ToLVM.state to_LVM + >>= fun state -> + debug "ToLVM queue is currently %s" (match state with `Running -> "Running" | `Suspended -> "Suspended"); ToLVM.resume to_LVM >>= fun () -> ( match Vg_IO.find vg fromLVM with @@ -246,6 +260,9 @@ module VolumeManager = struct | `Ok disk -> FromLVM.attach ~disk () >>= fun from_LVM -> + FromLVM.state from_LVM + >>= fun state -> + debug "FromLVM queue is currently %s" (match state with `Running -> "Running" | `Suspended -> "Suspended"); to_LVMs := (name, to_LVM) :: !to_LVMs; from_LVMs := (name, from_LVM) :: !from_LVMs; free_LVs := (name, (freeLVM,freeLVMid)) :: !free_LVs; @@ -433,6 +450,7 @@ module FreePool = struct FromLVM.state from_lvm >>= function | `Suspended -> + debug "FromLVM.state got `Suspended; sleeping"; Lwt_unix.sleep 5. >>= fun () -> wait ()