Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions idl/log.ml
Original file line number Diff line number Diff line change
@@ -1,3 +1,27 @@
open Sexplib.Std

type traced_operation = [
| `Set of string * string * [ `Producer | `Consumer | `Suspend | `Suspend_ack ] * [ `Int64 of int64 | `Bool of bool ]
| `Get of string * string * [ `Producer | `Consumer | `Suspend | `Suspend_ack ] * [ `Int64 of int64 | `Bool of bool ]
] with sexp
type traced_operation_list = traced_operation list with sexp

let debug fmt = Printf.ksprintf (fun s -> print_endline s) fmt
let info fmt = Printf.ksprintf (fun s -> print_endline s) fmt
let error fmt = Printf.ksprintf (fun s -> print_endline s) fmt

let trace ts =
let string_of_key = function
| `Producer -> "producer"
| `Consumer -> "consumer"
| `Suspend -> "suspend"
| `Suspend_ack -> "suspend_ack" in
let string_of_value = function
| `Int64 x -> Int64.to_string x
| `Bool b -> string_of_bool b in
let one = function
| `Set (_, queue, key, value) ->
Printf.sprintf "%s.%s := %s" queue (string_of_key key) (string_of_value value)
| `Get (__, queue, key, value) ->
Printf.sprintf "%s.%s == %s" queue (string_of_key key) (string_of_value value) in
info "%s" (String.concat ", " (List.map one ts))
1 change: 1 addition & 0 deletions idl/xenvm_interface.ml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ external shutdown : unit -> int = ""
type queue = {
lv: string;
suspended: bool;
debug_info: (string * string) list;
}

type connection_state =
Expand Down
6 changes: 0 additions & 6 deletions test/common.ml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ open Lwt
(* Mock kernel devices so we can run as a regular user *)
let use_mock = ref true

module Log = struct
let debug fmt = Printf.ksprintf (fun s -> print_endline s) fmt
let info fmt = Printf.ksprintf (fun s -> print_endline s) fmt
let error fmt = Printf.ksprintf (fun s -> print_endline s) fmt
end

module Time = struct
type 'a io = 'a Lwt.t
let sleep = Lwt_unix.sleep
Expand Down
8 changes: 1 addition & 7 deletions xenvm-local-allocator/local_allocator.ml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ let rec try_forever msg f =
>>= function
| `Ok x -> return (`Ok x)
| `Error `Retry ->
debug "%s: retrying after 5s" msg;
Lwt_unix.sleep 5.
>>= fun () ->
try_forever msg f
Expand Down Expand Up @@ -85,7 +84,6 @@ module FromLVM = struct
>>= function
| `Suspended -> return ()
| `Running ->
debug "FromLVM.suspend got `Running; sleeping";
Lwt_unix.sleep 5.
>>= fun () ->
wait () in
Expand All @@ -99,7 +97,6 @@ module FromLVM = struct
fatal_error "reading state of FromLVM" (R.Consumer.state t)
>>= function
| `Suspended ->
debug "FromLVM.resume got `Suspended; sleeping";
Lwt_unix.sleep 5.
>>= fun () ->
wait ()
Expand All @@ -122,15 +119,13 @@ module ToLVM = struct
>>= function
| `Ok x -> return x
| _ ->
debug "ToLVM.attach got `Error; sleeping";
Lwt_unix.sleep 5.
>>= fun () ->
attach ~disk ()
let state t =
fatal_error "querying ToLVM state" (R.Producer.state t)
let rec push t item = R.Producer.push ~t ~item () >>= function
| `Error (`Retry | `Suspended) ->
debug "ToLVM.push got `Error; sleeping";
Lwt_unix.sleep 5.
>>= fun () ->
push t item
Expand Down Expand Up @@ -216,7 +211,6 @@ module FreePool = struct
>>= fun (pos, ts) ->
let open FreeAllocation in
( if ts = [] then begin
debug "No free blocks, sleeping for 5s";
Lwt_unix.sleep 5.
end else return ()
) >>= fun () ->
Expand Down Expand Up @@ -390,7 +384,7 @@ let main use_mock config daemon socket journal fromLVM toLVM =
) >>= fun device ->

(* We must replay the journal before resynchronising free blocks *)
J.start device perform
J.start ~client:"xenvm-local-allocator" ~name:"local allocator journal" device perform
>>|= fun j ->

FreePool.start config vg
Expand Down
2 changes: 1 addition & 1 deletion xenvm/xenvm.ml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ let host_list copts (vg_name,_) =
let table_of_queue q = [
[ "lv"; q.lv ];
[ "suspended"; string_of_bool q.suspended ]
] in
] @ (List.map (fun (k, v) -> [ k; v ]) q.debug_info) in
let table_of_host h =
let connection_state = [ "state"; match h.connection_state with Some x -> Jsonrpc.to_string (rpc_of_connection_state x) | None -> "None" ] in
let fromLVM = add_prefix "fromLVM" (table_of_queue h.fromLVM) in
Expand Down
23 changes: 11 additions & 12 deletions xenvmd/xenvmd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ module ToLVM = struct
fatal_error "attaching to ToLVM queue" (R.Consumer.attach ~queue:(name ^ " ToLVM Consumer") ~client:"xenvmd" ~disk ())
let state t =
fatal_error "querying ToLVM state" (R.Consumer.state t)
let debug_info t =
fatal_error "querying ToLVM debug_info" (R.Consumer.debug_info t)
let rec suspend t =
R.Consumer.suspend t
>>= function
| `Error (`Msg msg) -> fatal_error_t msg
| `Error `Suspended -> return ()
| `Error `Retry ->
debug "ToLVM.suspend got `Retry; sleeping";
Lwt_unix.sleep 5.
>>= fun () ->
suspend t
Expand All @@ -63,7 +64,6 @@ module ToLVM = struct
>>= function
| `Error _ -> fatal_error_t "reading state of ToLVM"
| `Ok `Running ->
debug "ToLVM.suspend got `Running; sleeping";
Lwt_unix.sleep 5.
>>= fun () ->
wait ()
Expand All @@ -74,7 +74,6 @@ module ToLVM = struct
>>= function
| `Error (`Msg msg) -> fatal_error_t msg
| `Error `Retry ->
debug "ToLVM.resume got `Retry; sleeping";
Lwt_unix.sleep 5.
>>= fun () ->
resume t
Expand All @@ -85,7 +84,6 @@ module ToLVM = struct
>>= function
| `Error _ -> fatal_error_t "reading state of ToLVM"
| `Ok `Suspended ->
debug "ToLVM.resume got `Suspended; sleeping";
Lwt_unix.sleep 5.
>>= fun () ->
wait ()
Expand All @@ -108,7 +106,6 @@ module FromLVM = struct
let initial_state = ref `Running in
let rec loop () = R.Producer.attach ~queue:(name ^ " FromLVM Producer") ~client:"xenvmd" ~disk () >>= function
| `Error `Suspended ->
debug "FromLVM.attach got `Suspended; sleeping";
Lwt_unix.sleep 5.
>>= fun () ->
initial_state := `Suspended;
Expand All @@ -118,15 +115,15 @@ module FromLVM = struct
>>= fun x ->
return (!initial_state, x)
let state t = fatal_error "FromLVM.state" (R.Producer.state t)
let debug_info t =
fatal_error "querying FromLVM debug_info" (R.Producer.debug_info t)
let rec push t item = R.Producer.push ~t ~item () >>= function
| `Error (`Msg x) -> fatal_error_t (Printf.sprintf "Error pushing to the FromLVM queue: %s" x)
| `Error `Retry ->
debug "FromLVM.push got `Retry; sleeping";
Lwt_unix.sleep 5.
>>= fun () ->
push t item
| `Error `Suspended ->
debug "FromLVM.push got `Suspended; sleeping";
Lwt_unix.sleep 5.
>>= fun () ->
push t item
Expand Down Expand Up @@ -435,13 +432,17 @@ module VolumeManager = struct
( ToLVM.state t >>= function
| `Suspended -> return true
| `Running -> return false ) >>= fun suspended ->
let toLVM = { Xenvm_interface.lv; suspended } in
ToLVM.debug_info t
>>= fun debug_info ->
let toLVM = { Xenvm_interface.lv; suspended; debug_info } in
let lv = fromLVM name in
let t = List.assoc name !from_LVMs in
( FromLVM.state t >>= function
| `Suspended -> return true
| `Running -> return false ) >>= fun suspended ->
let fromLVM = { Xenvm_interface.lv; suspended } in
FromLVM.debug_info t
>>= fun debug_info ->
let fromLVM = { Xenvm_interface.lv; suspended; debug_info } in
read (fun vg ->
try
let lv = Lvm.Vg.LVs.find_by_name (freeLVM name) vg.Lvm.Vg.lvs in
Expand Down Expand Up @@ -537,7 +538,7 @@ module FreePool = struct
| `Error _ -> fatal_error_t ("open " ^ name)
| `Ok x -> return x )
>>= fun device ->
J.start device perform
J.start ~client:"xenvmd" ~name:"allocation journal" device perform
>>|= fun j' ->
journal := Some j';
return ()
Expand Down Expand Up @@ -570,7 +571,6 @@ module FreePool = struct
FromLVM.state from_lvm
>>= function
| `Suspended ->
debug "FromLVM.state got `Suspended; sleeping";
Lwt_unix.sleep 5.
>>= fun () ->
wait ()
Expand Down Expand Up @@ -753,7 +753,6 @@ let run port sock_path config =
VolumeManager.flush_all ()
>>= fun () ->

debug "sleeping for 5s";
Lwt_unix.sleep 5.
>>= fun () ->
service_queues () in
Expand Down