Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions xenvm-local-allocator/local_allocator.ml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ module FromLVM = struct
module R = Shared_block.Ring.Make(Log)(Vg_IO.Volume)(FreeAllocation)
let rec attach ~disk () =
fatal_error "attaching to FromLVM queue" (R.Consumer.attach ~disk ())
let state t =
fatal_error "querying FromLVM state" (R.Consumer.state t)
let rec suspend t =
try_forever "FromLVM.suspend" (fun () -> R.Consumer.suspend t)
>>= fun x ->
Expand All @@ -81,6 +83,7 @@ module FromLVM = struct
>>= function
| `Suspended -> return ()
| `Running ->
debug "FromLVM.suspend got `Running; sleeping";
Lwt_unix.sleep 5.
>>= fun () ->
wait () in
Expand All @@ -94,6 +97,7 @@ module FromLVM = struct
fatal_error "reading state of FromLVM" (R.Consumer.state t)
>>= function
| `Suspended ->
debug "FromLVM.resume got `Suspended; sleeping";
Lwt_unix.sleep 5.
>>= fun () ->
wait ()
Expand All @@ -116,13 +120,15 @@ module ToLVM = struct
>>= function
| `Ok x -> return x
| _ ->
debug "ToLVM.attach got `Error; sleeping";
Lwt_unix.sleep 5.
>>= fun () ->
attach ~disk ()
let state t =
fatal_error "querying ToLVM state" (R.Producer.state t)
let rec push t item = R.Producer.push ~t ~item () >>= function
| `Error (`Retry | `Suspended) ->
debug "ToLVM.push got `Error; sleeping";
Lwt_unix.sleep 5.
>>= fun () ->
push t item
Expand Down Expand Up @@ -173,6 +179,9 @@ module FreePool = struct
| `Ok disk ->
FromLVM.attach ~disk ()
>>= fun from_lvm ->
FromLVM.state from_lvm
>>= fun state ->
debug "FromLVM queue is currently %s" (match state with `Running -> "Running" | `Suspended -> "Suspended");

(* Suspend and resume the queue: the producer will resend us all
the free blocks on resume. *)
Expand Down Expand Up @@ -209,7 +218,7 @@ module FreePool = struct
FromLVM.advance from_lvm pos
>>= fun () ->
loop_forever () in
loop_forever ()
return loop_forever
end

module Op = struct
Expand Down Expand Up @@ -300,6 +309,9 @@ let main config daemon socket journal fromLVM toLVM =
| `Ok disk ->
ToLVM.attach ~disk ()
>>= fun tolvm ->
ToLVM.state tolvm
>>= fun state ->
debug "ToLVM queue is currently %s" (match state with `Running -> "Running" | `Suspended -> "Suspended");

let extent_size = metadata.Lvm.Vg.extent_size in (* in sectors *)
let extent_size_mib = Int64.(div (mul extent_size (of_int sector_size)) (mul 1024L 1024L)) in
Expand All @@ -314,6 +326,7 @@ let main config daemon socket journal fromLVM toLVM =
info "The ToLVM queue has been suspended. We will acknowledge and exit";
exit 0
| `Running ->
debug "The ToLVM queue is still running";
Lwt_unix.sleep 5.
>>= fun () ->
wait_for_shutdown_forever () in
Expand Down Expand Up @@ -362,7 +375,9 @@ let main config daemon socket journal fromLVM toLVM =
J.start device perform
>>|= fun j ->

let (_: unit Lwt.t) = FreePool.start config vg in
FreePool.start config vg
>>= fun forever_fun ->
let (_: unit Lwt.t) = forever_fun () in
let (_: unit Lwt.t) = wait_for_shutdown_forever () in

(* Called to extend a single device. This function decides what needs to be
Expand Down
24 changes: 21 additions & 3 deletions xenvmd/xenvmd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ module ToLVM = struct
module R = Shared_block.Ring.Make(Log)(Vg_IO.Volume)(ExpandVolume)
let create ~disk () =
fatal_error "creating ToLVM queue" (R.Producer.create ~disk ())
let rec attach ~disk () =
let attach ~disk () =
fatal_error "attaching to ToLVM queue" (R.Consumer.attach ~disk ())
let state t =
fatal_error "querying ToLVM state" (R.Consumer.state t)
Expand All @@ -52,6 +52,7 @@ module ToLVM = struct
| `Error (`Msg msg) -> fatal_error_t msg
| `Error `Suspended -> return ()
| `Error `Retry ->
debug "ToLVM.suspend got `Retry; sleeping";
Lwt_unix.sleep 5.
>>= fun () ->
suspend t
Expand All @@ -61,6 +62,7 @@ module ToLVM = struct
>>= function
| `Error _ -> fatal_error_t "reading state of ToLVM"
| `Ok `Running ->
debug "ToLVM.suspend got `Running; sleeping";
Lwt_unix.sleep 5.
>>= fun () ->
wait ()
Expand All @@ -71,6 +73,7 @@ module ToLVM = struct
>>= function
| `Error (`Msg msg) -> fatal_error_t msg
| `Error `Retry ->
debug "ToLVM.resume got `Retry; sleeping";
Lwt_unix.sleep 5.
>>= fun () ->
resume t
Expand All @@ -81,6 +84,7 @@ module ToLVM = struct
>>= function
| `Error _ -> fatal_error_t "reading state of ToLVM"
| `Ok `Suspended ->
debug "ToLVM.resume got `Suspended; sleeping";
Lwt_unix.sleep 5.
>>= fun () ->
wait ()
Expand All @@ -99,16 +103,23 @@ module FromLVM = struct
module R = Shared_block.Ring.Make(Log)(Vg_IO.Volume)(FreeAllocation)
let create ~disk () =
fatal_error "FromLVM.create" (R.Producer.create ~disk ())
let attach ~disk () =
fatal_error "FromLVM.attach" (R.Producer.attach ~disk ())
let rec attach ~disk () = R.Producer.attach ~disk () >>= function
| `Error `Suspended ->
debug "FromLVM.attach got `Suspended; sleeping";
Lwt_unix.sleep 5.
>>= fun () ->
attach ~disk ()
| x -> fatal_error "FromLVM.attach" (return x)
let state t = fatal_error "FromLVM.state" (R.Producer.state t)
let rec push t item = R.Producer.push ~t ~item () >>= function
| `Error (`Msg x) -> fatal_error_t (Printf.sprintf "Error pushing to the FromLVM queue: %s" x)
| `Error `Retry ->
debug "FromLVM.push got `Retry; sleeping";
Lwt_unix.sleep 5.
>>= fun () ->
push t item
| `Error `Suspended ->
debug "FromLVM.push got `Suspended; sleeping";
Lwt_unix.sleep 5.
>>= fun () ->
push t item
Expand Down Expand Up @@ -235,6 +246,9 @@ module VolumeManager = struct
| `Ok disk ->
ToLVM.attach ~disk ()
>>= fun to_LVM ->
ToLVM.state to_LVM
>>= fun state ->
debug "ToLVM queue is currently %s" (match state with `Running -> "Running" | `Suspended -> "Suspended");
ToLVM.resume to_LVM
>>= fun () ->
( match Vg_IO.find vg fromLVM with
Expand All @@ -246,6 +260,9 @@ module VolumeManager = struct
| `Ok disk ->
FromLVM.attach ~disk ()
>>= fun from_LVM ->
FromLVM.state from_LVM
>>= fun state ->
debug "FromLVM queue is currently %s" (match state with `Running -> "Running" | `Suspended -> "Suspended");
to_LVMs := (name, to_LVM) :: !to_LVMs;
from_LVMs := (name, from_LVM) :: !from_LVMs;
free_LVs := (name, (freeLVM,freeLVMid)) :: !free_LVs;
Expand Down Expand Up @@ -433,6 +450,7 @@ module FreePool = struct
FromLVM.state from_lvm
>>= function
| `Suspended ->
debug "FromLVM.state got `Suspended; sleeping";
Lwt_unix.sleep 5.
>>= fun () ->
wait ()
Expand Down