Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions idl/errors.ml
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,20 @@ let fatal_error msg m = m >>= function
| `Error `Retry -> fatal_error_t (msg ^ ": queue temporarily unavailable")
| `Ok x -> return x

let delayfn n =
if n>10 then 5.0 else (float_of_int n *. 0.5)

let rec retry_forever f =
f ()
>>= function
| `Ok x -> return (`Ok x)
| `Error `Retry ->
Lwt_unix.sleep 5.
>>= fun () ->
retry_forever f
| `Error x -> return (`Error x)
let rec inner n =
f ()
>>= function
| `Ok x -> return (`Ok x)
| `Error `Retry ->
Lwt_unix.sleep (delayfn n)
>>= fun () ->
inner (n+1)
| `Error x -> return (`Error x)
in inner 0

let wait_for f result =
let new_f () =
Expand Down
5 changes: 4 additions & 1 deletion idl/freeAllocation.ml
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
open Sexplib.Std

module T = struct
type t = Lvm.Pv.Allocator.t with sexp
type t = {
blocks : Lvm.Pv.Allocator.t;
generation : int
} with sexp
(** Physical blocks which should be included in the free pool *)
end

Expand Down
4 changes: 3 additions & 1 deletion idl/log.ml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ let info fmt = Lwt_log.info_f fmt
let warn fmt = Lwt_log.warning_f fmt
let error fmt = Lwt_log.error_f fmt

let trace_section = Lwt_log.Section.make "trace"

let trace ts =
let string_of_key = function
| `Producer -> "producer"
Expand All @@ -25,4 +27,4 @@ let trace ts =
Printf.sprintf "%s.%s := %s" queue (string_of_key key) (string_of_value value)
| `Get (__, queue, key, value) ->
Printf.sprintf "%s.%s == %s" queue (string_of_key key) (string_of_value value) in
info "%s" (String.concat ", " (List.map one ts))
Lwt_log.info_f ~section:trace_section "%s" (String.concat ", " (List.map one ts))
12 changes: 11 additions & 1 deletion idl/xenvm_interface.ml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
(* XenVM LVM type thing *)

exception HostNotCreated

exception HostStillConnecting of string
exception UnknownFistPoint of string

let _journal_name = "xenvm_journal"

Expand Down Expand Up @@ -59,6 +59,11 @@ type host = {
freeExtents: int64;
}

type fist =
| FreePool0
| FreePool1
| FreePool2

module Host = struct

external create: name:string -> unit = ""
Expand All @@ -79,3 +84,8 @@ module Host = struct

external all: unit -> host list = ""
end

module Fist = struct
external set : fist -> bool -> unit = ""
external list : unit -> (fist * bool) list = ""
end
7 changes: 0 additions & 7 deletions remoteConfig

This file was deleted.

75 changes: 72 additions & 3 deletions test/test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ let with_xenvmd ?existing_vg ?(cleanup_vg=true) (f : string -> string -> 'a) =
} in
Sexplib.Sexp.to_string_hum (Config.Xenvmd.sexp_of_t config)
|> file_of_string "test.xenvmd.conf";
let _ = Lwt_preemptive.detach (fun () -> xenvmd [ "--config"; "./test.xenvmd.conf" ]) () in
let _ = Lwt_preemptive.detach (fun () -> xenvmd [ "--config"; "./test.xenvmd.conf"; "--log"; "xenvmd.log"]) () in
wait_for_xenvmd_to_start vg;
Xenvm_client.Rpc.uri := "file://local/services/xenvmd/" ^ vg;
Xenvm_client.unix_domain_socket_path := "/tmp/xenvmd";
Expand Down Expand Up @@ -127,6 +127,8 @@ let la_has_started host =
Lwt_io.close oc >>= fun () ->
Lwt.return true)
(fun _ ->
Lwt_unix.close s
>>= fun () ->
Lwt.return false)

let start_local_allocator host devices =
Expand All @@ -144,7 +146,7 @@ let start_local_allocator host devices =
Sexplib.Sexp.to_string_hum (Config.Local_allocator.sexp_of_t config)
|> file_of_string config_file;
ignore(xenvm [ "host-connect"; vg; hostname]);
let la_thread = Lwt_preemptive.detach (fun () -> local_allocator ~host [ "--config"; config_file ]) () in
let la_thread = Lwt_preemptive.detach (fun () -> local_allocator ~host [ "--config"; config_file; "--log"; Printf.sprintf "la.%d.log" host ]) () in
la_thread

let wait_for_local_allocator_to_start host =
Expand All @@ -156,6 +158,12 @@ let wait_for_local_allocator_to_start host =
else (Lwt_unix.sleep 0.1 >>= fun () -> retry ())
in retry ()

let write_to_file thread filename =
thread
>>= fun log ->
Lwt_io.(with_file output filename
(fun chan -> write chan log))

let lvchange_offline =
"lvchange vg/lv --offline: check that we can activate volumes offline" >::
fun () ->
Expand Down Expand Up @@ -551,7 +559,7 @@ let inparallel fns =
Lwt_preemptive.detach (fun () -> ignore(fn ())) ()) fns)

let la_extend_multi device =
"Extend an LV with the local allocator" >::
"Extend 2 LVs on different hosts with the local allocator" >::
(fun () ->
ignore(Lwt_main.run (
let lvname = "test2" in
Expand Down Expand Up @@ -592,10 +600,71 @@ let la_extend_multi device =
Lwt.choose [la_dead; (Lwt_unix.sleep 30.0 >>= fun () -> Lwt.fail Timeout)]
)))


let la_extend_multi_fist device =
"Extend 2 LVs on different hosts with the local allocator" >::
(fun () ->
ignore(Lwt_main.run (
let lvname = "test4" in
let lvname2 = "test5" in
let la_thread_1 = start_local_allocator 1 [device] in
let la_thread_2 = start_local_allocator 2 [device] in
wait_for_local_allocator_to_start 1 >>= fun () ->
wait_for_local_allocator_to_start 2 >>= fun () ->
set_vg_info device vg 2;
inparallel [(fun () -> xenvm ["lvcreate"; "-n"; lvname; "-L"; "4"; vg]);
(fun () -> xenvm ["lvcreate"; "-an"; "-n"; lvname2; "-L"; "4"; vg])]
>>= fun () ->
inparallel [(fun () -> xenvm ~host:1 ["lvchange"; "-ay"; Printf.sprintf "%s/%s" vg lvname]);
(fun () -> xenvm ~host:2 ["lvchange"; "-ay"; Printf.sprintf "%s/%s" vg lvname2])]
>>= fun () ->
Client.Fist.set Xenvm_interface.FreePool2 true
>>= fun () ->
Client.Fist.list ()
>>= fun list ->
List.iter (fun (x,b) -> Printf.printf "%s: %b\n%!" (Rpc.to_string (Xenvm_interface.rpc_of_fist x)) b) list;
inparallel [(fun () -> xenvm ["lvextend"; "-L"; "132"; "--live"; Printf.sprintf "%s/%s" vg lvname]);
(fun () -> xenvm ~host:2 ["lvextend"; "-L"; "132"; "--live"; Printf.sprintf "%s/%s" vg lvname2])]
>>= fun () ->
Lwt_unix.sleep 10.0
>>= fun () ->
Client.Fist.set Xenvm_interface.FreePool2 false
>>= fun () ->
inparallel [(fun () -> xenvm ["lvextend"; "-L"; "1000"; "--live"; Printf.sprintf "%s/%s" vg lvname]);
(fun () -> xenvm ~host:2 ["lvextend"; "-L"; "1000"; "--live"; Printf.sprintf "%s/%s" vg lvname2])]
>>= fun () ->
inparallel [(fun () -> xenvm ["host-disconnect"; vg; "host1"] |> ignore);
(fun () -> xenvm ["host-disconnect"; vg; "host2"] |> ignore)]
>>= fun () ->
Client.get_lv lvname >>= fun (myvg, lv) ->
Client.get_lv lvname2 >>= fun (_, lv2) ->
ignore(myvg,lv,lv2);
let size = Lvm.Lv.size_in_extents lv in
let size2 = Lvm.Lv.size_in_extents lv2 in
ignore(xenvm ["lvchange"; "-an"; Printf.sprintf "%s/%s" vg lvname]);
ignore(xenvm ["lvchange"; "-an"; Printf.sprintf "%s/%s" vg lvname2]);
Printf.printf "Sanity checking VG\n%!";
Client.get () >>= fun myvg ->
Common.sanity_check myvg;
Printf.printf "final size=%Ld final_size2=%Ld\n%!" size size2;
assert_equal ~msg:"Unexpected final size"
~printer:Int64.to_string 250L size;
assert_equal ~msg:"Unexpected final size"
~printer:Int64.to_string 250L size2;

let la_dead = la_thread_1 >>= fun _ -> la_thread_2 >>= fun _ -> Lwt.return () in
Lwt.choose [la_dead; (Lwt_unix.sleep 30.0 >>= fun () -> Lwt.fail Timeout)]
>>= fun () ->
write_to_file la_thread_1 "local_allocator.1.log"
>>= fun () ->
write_to_file la_thread_2 "local_allocator.2.log"
)))

let local_allocator_suite device = "Commands which require the local allocator" >::: [
(* la_start device;
la_extend device;*)
la_extend_multi device;
la_extend_multi_fist device;
]

let _ =
Expand Down
55 changes: 44 additions & 11 deletions xenvm-local-allocator/local_allocator.ml
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,23 @@ module FreePool = struct
let m = Lwt_mutex.create ()
let c = Lwt_condition.create ()
let free = ref []
let generation = ref (-1)

let add extents =
let add gen extents =
Lwt_mutex.with_lock m
(fun () ->
free := Lvm.Pv.Allocator.merge !free extents;
Lwt_condition.broadcast c ();
return ()
begin
if gen <= !generation
then
(* Ignore updates we've already seen *)
()
else begin
free := Lvm.Pv.Allocator.merge !free extents;
generation := gen
end
end;
Lwt_condition.broadcast c ();
return ()
)

(* Allocate up to [nr_extents]. Blocks if there is no space free. Can return
Expand Down Expand Up @@ -113,24 +123,28 @@ module FreePool = struct
FromLVM.resume from_lvm
>>= fun () ->

let rec loop_forever () =
(* n here is the number of times we've been
around the loop without activity. we use
it to calculate the delay until next
poll. *)
let rec loop_forever n =
FromLVM.pop from_lvm
>>= fun (pos, ts) ->
let open FreeAllocation in
( if ts = [] then begin
Lwt_unix.sleep 5.
Lwt_unix.sleep (delayfn n)
end else return ()
) >>= fun () ->
Lwt_list.iter_s
(fun t ->
sexp_of_t t |> Sexplib.Sexp.to_string_hum |> debug "FreePool: received new allocation: %s"
>>= fun () ->
add t
add t.FreeAllocation.generation t.FreeAllocation.blocks
) ts
>>= fun () ->
FromLVM.c_advance from_lvm pos
>>= fun () ->
loop_forever () in
loop_forever (if ts = [] then n+1 else 0) in
return loop_forever
end

Expand Down Expand Up @@ -193,7 +207,7 @@ let targets_of x =
>>= fun () ->
return (`Error `Retry)

let main mock_dm config daemon socket journal fromLVM toLVM =
let main mock_dm config daemon socket journal fromLVM toLVM log =
let open Config.Local_allocator in
let config = t_of_sexp (Sexplib.Sexp.load_sexp config) in
let config = { config with
Expand All @@ -203,6 +217,12 @@ let main mock_dm config daemon socket journal fromLVM toLVM =
fromLVM = (match fromLVM with None -> config.fromLVM | Some x -> x);
} in

let log_filename =
match log with
| Some f -> if Filename.is_relative f then (Some (Filename.concat (Unix.getcwd ()) f)) else (Some f)
| None -> None
in

Lwt_log.add_rule "*" Lwt_log.Debug;
Lwt_log.default := Lwt_log.channel ~close_mode:`Keep ~channel:Lwt_io.stdout ();
begin match mock_dm with
Expand All @@ -226,6 +246,15 @@ let main mock_dm config daemon socket journal fromLVM toLVM =
in

let t =
(match log_filename with
| Some f ->
Lwt_log.file ~mode:`Append ~file_name:f () >>= fun logger ->
Lwt_log.default := logger;
Lwt.return ()
| None ->
Lwt.return ()
)
>>= fun () ->
info "Starting local allocator thread" >>= fun () ->
debug "Loaded configuration: %s" (Sexplib.Sexp.to_string_hum (sexp_of_t config))
>>= fun () ->
Expand Down Expand Up @@ -341,7 +370,7 @@ let main mock_dm config daemon socket journal fromLVM toLVM =

FreePool.start config vg
>>= fun forever_fun ->
let (_: unit Lwt.t) = forever_fun () in
let (_: unit Lwt.t) = forever_fun 0 in
let (_: unit Lwt.t) = wait_for_shutdown_forever () in

(* Called to extend a single device. This function decides what needs to be
Expand Down Expand Up @@ -500,11 +529,15 @@ let mock_dm_arg =
let doc = "Enable mock interfaces on device mapper." in
Arg.(value & opt (some string) None & info ["mock-devmapper"] ~doc)

let log =
let doc = "Log to a file rather than syslog/stdout" in
Arg.(value & opt (some string) None & info [ "log" ] ~docv:"LOGFILE" ~doc)

let () =
Sys.(set_signal sigpipe Signal_ignore);
Sys.(set_signal sigterm (Signal_handle (fun _ -> exit (128+sigterm))));

let t = Term.(pure main $ mock_dm_arg $ config $ daemon $ socket $ journal $ fromLVM $ toLVM) in
let t = Term.(pure main $ mock_dm_arg $ config $ daemon $ socket $ journal $ fromLVM $ toLVM $ log) in
match Term.eval (t, info) with
| `Error _ -> exit 1
| _ -> exit 0
21 changes: 21 additions & 0 deletions xenvmd/fist.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
(* Fist points for testing *)

type t = Xenvm_interface.fist

exception FistPointHit of string

let string_of_t t = Rpc.to_string (Xenvm_interface.rpc_of_fist t)

let all : (t * bool) list ref = ref []

let get k = try List.assoc k !all with _ -> false
let set k v = all := (k,v) :: (List.filter (fun (k',_) -> k' <> k) !all)
let list () = !all

let maybe_exn k = if get k then raise (FistPointHit (string_of_t k))
let maybe_lwt_fail k =
if get k then
let str = string_of_t k in
Lwt.(Log.error "Causing Lwt thread failure due to fist point: %s" str
>>= fun () -> Lwt.fail (FistPointHit str))
else Lwt.return ()
8 changes: 8 additions & 0 deletions xenvmd/fist.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
type t = Xenvm_interface.fist

val get : t -> bool
val set : t -> bool -> unit
val list : unit -> (t * bool) list

val maybe_exn : t -> unit
val maybe_lwt_fail : t -> unit Lwt.t
Loading