diff --git a/idl/errors.ml b/idl/errors.ml index f13ecfd..4503b50 100644 --- a/idl/errors.ml +++ b/idl/errors.ml @@ -33,15 +33,20 @@ let fatal_error msg m = m >>= function | `Error `Retry -> fatal_error_t (msg ^ ": queue temporarily unavailable") | `Ok x -> return x +let delayfn n = + if n>10 then 5.0 else (float_of_int n *. 0.5) + let rec retry_forever f = - f () - >>= function - | `Ok x -> return (`Ok x) - | `Error `Retry -> - Lwt_unix.sleep 5. - >>= fun () -> - retry_forever f - | `Error x -> return (`Error x) + let rec inner n = + f () + >>= function + | `Ok x -> return (`Ok x) + | `Error `Retry -> + Lwt_unix.sleep (delayfn n) + >>= fun () -> + inner (n+1) + | `Error x -> return (`Error x) + in inner 0 let wait_for f result = let new_f () = diff --git a/idl/freeAllocation.ml b/idl/freeAllocation.ml index aa22215..501b2c0 100644 --- a/idl/freeAllocation.ml +++ b/idl/freeAllocation.ml @@ -1,7 +1,10 @@ open Sexplib.Std module T = struct - type t = Lvm.Pv.Allocator.t with sexp + type t = { + blocks : Lvm.Pv.Allocator.t; + generation : int + } with sexp (** Physical blocks which should be included in the free pool *) end diff --git a/idl/log.ml b/idl/log.ml index 96453ec..f9fb54a 100644 --- a/idl/log.ml +++ b/idl/log.ml @@ -11,6 +11,8 @@ let info fmt = Lwt_log.info_f fmt let warn fmt = Lwt_log.warning_f fmt let error fmt = Lwt_log.error_f fmt +let trace_section = Lwt_log.Section.make "trace" + let trace ts = let string_of_key = function | `Producer -> "producer" @@ -25,4 +27,4 @@ let trace ts = Printf.sprintf "%s.%s := %s" queue (string_of_key key) (string_of_value value) | `Get (__, queue, key, value) -> Printf.sprintf "%s.%s == %s" queue (string_of_key key) (string_of_value value) in - info "%s" (String.concat ", " (List.map one ts)) + Lwt_log.info_f ~section:trace_section "%s" (String.concat ", " (List.map one ts)) diff --git a/idl/xenvm_interface.ml b/idl/xenvm_interface.ml index 756b88a..7e6d967 100644 --- a/idl/xenvm_interface.ml +++ b/idl/xenvm_interface.ml @@ -1,8 +1,8 @@ (* XenVM LVM type thing *) exception HostNotCreated - exception HostStillConnecting of string +exception UnknownFistPoint of string let _journal_name = "xenvm_journal" @@ -59,6 +59,11 @@ type host = { freeExtents: int64; } +type fist = + | FreePool0 + | FreePool1 + | FreePool2 + module Host = struct external create: name:string -> unit = "" @@ -79,3 +84,8 @@ module Host = struct external all: unit -> host list = "" end + +module Fist = struct + external set : fist -> bool -> unit = "" + external list : unit -> (fist * bool) list = "" +end diff --git a/remoteConfig b/remoteConfig deleted file mode 100644 index d18fe16..0000000 --- a/remoteConfig +++ /dev/null @@ -1,7 +0,0 @@ -( - (listenPort 4000) - (host_allocation_quantum 128) - (host_low_water_mark 8) - (vg djstest) - (devices (/dev/loop0)) -) diff --git a/test/test.ml b/test/test.ml index 7ea7b69..fda1fe8 100644 --- a/test/test.ml +++ b/test/test.ml @@ -91,7 +91,7 @@ let with_xenvmd ?existing_vg ?(cleanup_vg=true) (f : string -> string -> 'a) = } in Sexplib.Sexp.to_string_hum (Config.Xenvmd.sexp_of_t config) |> file_of_string "test.xenvmd.conf"; - let _ = Lwt_preemptive.detach (fun () -> xenvmd [ "--config"; "./test.xenvmd.conf" ]) () in + let _ = Lwt_preemptive.detach (fun () -> xenvmd [ "--config"; "./test.xenvmd.conf"; "--log"; "xenvmd.log"]) () in wait_for_xenvmd_to_start vg; Xenvm_client.Rpc.uri := "file://local/services/xenvmd/" ^ vg; Xenvm_client.unix_domain_socket_path := "/tmp/xenvmd"; @@ -127,6 +127,8 @@ let la_has_started host = Lwt_io.close oc >>= fun () -> Lwt.return true) (fun _ -> + Lwt_unix.close s + >>= fun () -> Lwt.return false) let start_local_allocator host devices = @@ -144,7 +146,7 @@ let start_local_allocator host devices = Sexplib.Sexp.to_string_hum (Config.Local_allocator.sexp_of_t config) |> file_of_string config_file; ignore(xenvm [ "host-connect"; vg; hostname]); - let la_thread = Lwt_preemptive.detach (fun () -> local_allocator ~host [ "--config"; config_file ]) () in + let la_thread = Lwt_preemptive.detach (fun () -> local_allocator ~host [ "--config"; config_file; "--log"; Printf.sprintf "la.%d.log" host ]) () in la_thread let wait_for_local_allocator_to_start host = @@ -156,6 +158,12 @@ let wait_for_local_allocator_to_start host = else (Lwt_unix.sleep 0.1 >>= fun () -> retry ()) in retry () +let write_to_file thread filename = + thread + >>= fun log -> + Lwt_io.(with_file output filename + (fun chan -> write chan log)) + let lvchange_offline = "lvchange vg/lv --offline: check that we can activate volumes offline" >:: fun () -> @@ -551,7 +559,7 @@ let inparallel fns = Lwt_preemptive.detach (fun () -> ignore(fn ())) ()) fns) let la_extend_multi device = - "Extend an LV with the local allocator" >:: + "Extend 2 LVs on different hosts with the local allocator" >:: (fun () -> ignore(Lwt_main.run ( let lvname = "test2" in @@ -592,10 +600,71 @@ let la_extend_multi device = Lwt.choose [la_dead; (Lwt_unix.sleep 30.0 >>= fun () -> Lwt.fail Timeout)] ))) + +let la_extend_multi_fist device = + "Extend 2 LVs on different hosts with the local allocator" >:: + (fun () -> + ignore(Lwt_main.run ( + let lvname = "test4" in + let lvname2 = "test5" in + let la_thread_1 = start_local_allocator 1 [device] in + let la_thread_2 = start_local_allocator 2 [device] in + wait_for_local_allocator_to_start 1 >>= fun () -> + wait_for_local_allocator_to_start 2 >>= fun () -> + set_vg_info device vg 2; + inparallel [(fun () -> xenvm ["lvcreate"; "-n"; lvname; "-L"; "4"; vg]); + (fun () -> xenvm ["lvcreate"; "-an"; "-n"; lvname2; "-L"; "4"; vg])] + >>= fun () -> + inparallel [(fun () -> xenvm ~host:1 ["lvchange"; "-ay"; Printf.sprintf "%s/%s" vg lvname]); + (fun () -> xenvm ~host:2 ["lvchange"; "-ay"; Printf.sprintf "%s/%s" vg lvname2])] + >>= fun () -> + Client.Fist.set Xenvm_interface.FreePool2 true + >>= fun () -> + Client.Fist.list () + >>= fun list -> + List.iter (fun (x,b) -> Printf.printf "%s: %b\n%!" (Rpc.to_string (Xenvm_interface.rpc_of_fist x)) b) list; + inparallel [(fun () -> xenvm ["lvextend"; "-L"; "132"; "--live"; Printf.sprintf "%s/%s" vg lvname]); + (fun () -> xenvm ~host:2 ["lvextend"; "-L"; "132"; "--live"; Printf.sprintf "%s/%s" vg lvname2])] + >>= fun () -> + Lwt_unix.sleep 10.0 + >>= fun () -> + Client.Fist.set Xenvm_interface.FreePool2 false + >>= fun () -> + inparallel [(fun () -> xenvm ["lvextend"; "-L"; "1000"; "--live"; Printf.sprintf "%s/%s" vg lvname]); + (fun () -> xenvm ~host:2 ["lvextend"; "-L"; "1000"; "--live"; Printf.sprintf "%s/%s" vg lvname2])] + >>= fun () -> + inparallel [(fun () -> xenvm ["host-disconnect"; vg; "host1"] |> ignore); + (fun () -> xenvm ["host-disconnect"; vg; "host2"] |> ignore)] + >>= fun () -> + Client.get_lv lvname >>= fun (myvg, lv) -> + Client.get_lv lvname2 >>= fun (_, lv2) -> + ignore(myvg,lv,lv2); + let size = Lvm.Lv.size_in_extents lv in + let size2 = Lvm.Lv.size_in_extents lv2 in + ignore(xenvm ["lvchange"; "-an"; Printf.sprintf "%s/%s" vg lvname]); + ignore(xenvm ["lvchange"; "-an"; Printf.sprintf "%s/%s" vg lvname2]); + Printf.printf "Sanity checking VG\n%!"; + Client.get () >>= fun myvg -> + Common.sanity_check myvg; + Printf.printf "final size=%Ld final_size2=%Ld\n%!" size size2; + assert_equal ~msg:"Unexpected final size" + ~printer:Int64.to_string 250L size; + assert_equal ~msg:"Unexpected final size" + ~printer:Int64.to_string 250L size2; + + let la_dead = la_thread_1 >>= fun _ -> la_thread_2 >>= fun _ -> Lwt.return () in + Lwt.choose [la_dead; (Lwt_unix.sleep 30.0 >>= fun () -> Lwt.fail Timeout)] + >>= fun () -> + write_to_file la_thread_1 "local_allocator.1.log" + >>= fun () -> + write_to_file la_thread_2 "local_allocator.2.log" + ))) + let local_allocator_suite device = "Commands which require the local allocator" >::: [ (* la_start device; la_extend device;*) la_extend_multi device; + la_extend_multi_fist device; ] let _ = diff --git a/xenvm-local-allocator/local_allocator.ml b/xenvm-local-allocator/local_allocator.ml index abf4eec..c1a16ba 100644 --- a/xenvm-local-allocator/local_allocator.ml +++ b/xenvm-local-allocator/local_allocator.ml @@ -40,13 +40,23 @@ module FreePool = struct let m = Lwt_mutex.create () let c = Lwt_condition.create () let free = ref [] + let generation = ref (-1) - let add extents = + let add gen extents = Lwt_mutex.with_lock m (fun () -> - free := Lvm.Pv.Allocator.merge !free extents; - Lwt_condition.broadcast c (); - return () + begin + if gen <= !generation + then + (* Ignore updates we've already seen *) + () + else begin + free := Lvm.Pv.Allocator.merge !free extents; + generation := gen + end + end; + Lwt_condition.broadcast c (); + return () ) (* Allocate up to [nr_extents]. Blocks if there is no space free. Can return @@ -113,24 +123,28 @@ module FreePool = struct FromLVM.resume from_lvm >>= fun () -> - let rec loop_forever () = + (* n here is the number of times we've been + around the loop without activity. we use + it to calculate the delay until next + poll. *) + let rec loop_forever n = FromLVM.pop from_lvm >>= fun (pos, ts) -> let open FreeAllocation in ( if ts = [] then begin - Lwt_unix.sleep 5. + Lwt_unix.sleep (delayfn n) end else return () ) >>= fun () -> Lwt_list.iter_s (fun t -> sexp_of_t t |> Sexplib.Sexp.to_string_hum |> debug "FreePool: received new allocation: %s" >>= fun () -> - add t + add t.FreeAllocation.generation t.FreeAllocation.blocks ) ts >>= fun () -> FromLVM.c_advance from_lvm pos >>= fun () -> - loop_forever () in + loop_forever (if ts = [] then n+1 else 0) in return loop_forever end @@ -193,7 +207,7 @@ let targets_of x = >>= fun () -> return (`Error `Retry) -let main mock_dm config daemon socket journal fromLVM toLVM = +let main mock_dm config daemon socket journal fromLVM toLVM log = let open Config.Local_allocator in let config = t_of_sexp (Sexplib.Sexp.load_sexp config) in let config = { config with @@ -203,6 +217,12 @@ let main mock_dm config daemon socket journal fromLVM toLVM = fromLVM = (match fromLVM with None -> config.fromLVM | Some x -> x); } in + let log_filename = + match log with + | Some f -> if Filename.is_relative f then (Some (Filename.concat (Unix.getcwd ()) f)) else (Some f) + | None -> None + in + Lwt_log.add_rule "*" Lwt_log.Debug; Lwt_log.default := Lwt_log.channel ~close_mode:`Keep ~channel:Lwt_io.stdout (); begin match mock_dm with @@ -226,6 +246,15 @@ let main mock_dm config daemon socket journal fromLVM toLVM = in let t = + (match log_filename with + | Some f -> + Lwt_log.file ~mode:`Append ~file_name:f () >>= fun logger -> + Lwt_log.default := logger; + Lwt.return () + | None -> + Lwt.return () + ) + >>= fun () -> info "Starting local allocator thread" >>= fun () -> debug "Loaded configuration: %s" (Sexplib.Sexp.to_string_hum (sexp_of_t config)) >>= fun () -> @@ -341,7 +370,7 @@ let main mock_dm config daemon socket journal fromLVM toLVM = FreePool.start config vg >>= fun forever_fun -> - let (_: unit Lwt.t) = forever_fun () in + let (_: unit Lwt.t) = forever_fun 0 in let (_: unit Lwt.t) = wait_for_shutdown_forever () in (* Called to extend a single device. This function decides what needs to be @@ -500,11 +529,15 @@ let mock_dm_arg = let doc = "Enable mock interfaces on device mapper." in Arg.(value & opt (some string) None & info ["mock-devmapper"] ~doc) +let log = + let doc = "Log to a file rather than syslog/stdout" in + Arg.(value & opt (some string) None & info [ "log" ] ~docv:"LOGFILE" ~doc) + let () = Sys.(set_signal sigpipe Signal_ignore); Sys.(set_signal sigterm (Signal_handle (fun _ -> exit (128+sigterm)))); - let t = Term.(pure main $ mock_dm_arg $ config $ daemon $ socket $ journal $ fromLVM $ toLVM) in + let t = Term.(pure main $ mock_dm_arg $ config $ daemon $ socket $ journal $ fromLVM $ toLVM $ log) in match Term.eval (t, info) with | `Error _ -> exit 1 | _ -> exit 0 diff --git a/xenvmd/fist.ml b/xenvmd/fist.ml new file mode 100644 index 0000000..f38df0c --- /dev/null +++ b/xenvmd/fist.ml @@ -0,0 +1,21 @@ +(* Fist points for testing *) + +type t = Xenvm_interface.fist + +exception FistPointHit of string + +let string_of_t t = Rpc.to_string (Xenvm_interface.rpc_of_fist t) + +let all : (t * bool) list ref = ref [] + +let get k = try List.assoc k !all with _ -> false +let set k v = all := (k,v) :: (List.filter (fun (k',_) -> k' <> k) !all) +let list () = !all + +let maybe_exn k = if get k then raise (FistPointHit (string_of_t k)) +let maybe_lwt_fail k = + if get k then + let str = string_of_t k in + Lwt.(Log.error "Causing Lwt thread failure due to fist point: %s" str + >>= fun () -> Lwt.fail (FistPointHit str)) + else Lwt.return () diff --git a/xenvmd/fist.mli b/xenvmd/fist.mli new file mode 100644 index 0000000..4a4c498 --- /dev/null +++ b/xenvmd/fist.mli @@ -0,0 +1,8 @@ +type t = Xenvm_interface.fist + +val get : t -> bool +val set : t -> bool -> unit +val list : unit -> (t * bool) list + +val maybe_exn : t -> unit +val maybe_lwt_fail : t -> unit Lwt.t diff --git a/xenvmd/freepool.ml b/xenvmd/freepool.ml index 4835796..bd71c71 100644 --- a/xenvmd/freepool.ml +++ b/xenvmd/freepool.ml @@ -8,7 +8,7 @@ open Errors let xenvmd_generation_tag = "xenvmd_gen" let lvm_op_of_free_allocation vg connected_host allocation = - let freeid = connected_host.Host.free_LV_uuid in + let freeid = connected_host.Hostdb.free_LV_uuid in let lv = Lvm.Vg.LVs.find freeid vg.Lvm.Vg.lvs in let size = Lvm.Lv.size_in_extents lv in let segments = Lvm.Lv.Segment.linear size allocation in @@ -28,6 +28,21 @@ let tag_of_generation n = | `Ok x -> x | `Error (`Msg y) -> failwith y +let generation_of_lv lv = + List.fold_left + (fun acc tag -> + match generation_of_tag tag with + | None -> acc + | x -> x) None lv.Lvm.Lv.tags + +let create name size = + let creation_host = Unix.gethostname () in + let creation_time = Unix.gettimeofday () |> Int64.of_float in + Vg_io.write (fun vg -> + Lvm.Vg.create vg name size ~creation_host ~creation_time + ~tags:[tag_of_generation 1 |> Lvm.Name.Tag.to_string ] + ) + let perform_expand_free ef connected_host = let open Journal.Op in sector_size >>= fun sector_size -> @@ -60,26 +75,21 @@ let perform_expand_free ef connected_host = maybe_write (fun vg -> - let current_allocation = allocation_of_lv vg connected_host.Host.free_LV_uuid in + let current_allocation = allocation_of_lv vg connected_host.Hostdb.free_LV_uuid in let new_space = Lvm.Pv.Allocator.sub current_allocation ef.old_allocation |> Lvm.Pv.Allocator.size in if new_space = 0L then begin try - let lv = Lvm.Vg.LVs.find connected_host.Host.free_LV_uuid vg.Lvm.Vg.lvs in (* Not_found here caught by the try-catch block *) + let lv = Lvm.Vg.LVs.find connected_host.Hostdb.free_LV_uuid vg.Lvm.Vg.lvs in (* Not_found here caught by the try-catch block *) let extent_size = vg.Lvm.Vg.extent_size in (* in sectors *) let extent_size_mib = Int64.(div (mul extent_size (of_int sector_size)) (mul 1024L 1024L)) in - let old_gen = List.fold_left - (fun acc tag -> - match generation_of_tag tag with - | None -> acc - | x -> x) None lv.Lvm.Lv.tags - in + let old_gen = generation_of_lv lv in let allocation = match Lvm.Pv.Allocator.find vg.Lvm.Vg.free_space Int64.(div ef.extra_size extent_size_mib) with | `Ok allocation -> allocation | `Error (`OnlyThisMuchFree (needed_extents, free_extents)) -> msgs := !msgs @ [ Printf.sprintf "LV %s expansion required, but number of free extents (%Ld) is less than needed extents (%Ld)" - connected_host.Host.free_LV free_extents needed_extents; + connected_host.Hostdb.free_LV free_extents needed_extents; "Expanding to use all the available space."]; vg.Lvm.Vg.free_space in @@ -89,29 +99,47 @@ let perform_expand_free ef connected_host = match old_gen with | Some g -> [ - Lvm.Redo.Op.LvRemoveTag (connected_host.Host.free_LV_uuid, tag_of_generation g); - Lvm.Redo.Op.LvAddTag (connected_host.Host.free_LV_uuid, tag_of_generation (g+1))] + Lvm.Redo.Op.LvRemoveTag (connected_host.Hostdb.free_LV_uuid, tag_of_generation g); + Lvm.Redo.Op.LvAddTag (connected_host.Hostdb.free_LV_uuid, tag_of_generation (g+1))] | None -> [ - Lvm.Redo.Op.LvAddTag (connected_host.Host.free_LV_uuid, tag_of_generation 1)] + Lvm.Redo.Op.LvAddTag (connected_host.Hostdb.free_LV_uuid, tag_of_generation 1)] in `Ok (op1::genops) | `Error x -> `Error x with | Not_found -> - `Error (`Msg (Printf.sprintf "Couldn't find the free LV for host: %s" connected_host.Host.free_LV)) + `Error (`Msg (Printf.sprintf "Couldn't find the free LV for host: %s" connected_host.Hostdb.free_LV)) end else `Ok []) >>= fun () -> Lwt_list.iter_s (fun msg -> debug "%s" msg) !msgs >>= fun () -> + Fist.maybe_lwt_fail Xenvm_interface.FreePool1 + >>= fun () -> read (fun vg -> - let current_allocation = allocation_of_lv vg connected_host.Host.free_LV_uuid in + let lv = Lvm.Vg.LVs.find connected_host.Hostdb.free_LV_uuid vg.Lvm.Vg.lvs in + let current_allocation = allocation_of_lv vg connected_host.Hostdb.free_LV_uuid in let old_allocation = ef.old_allocation in let new_extents = Lvm.Pv.Allocator.sub current_allocation old_allocation in - Lwt.return new_extents) - >>= fun allocation -> - Rings.FromLVM.push connected_host.Host.from_LVM allocation + let generation = generation_of_lv lv in + match generation with + | None -> (* This really should never happen *) + error "Expecting a generation count in the tags of LV: %s" connected_host.Hostdb.free_LV + >>= fun () -> + Lwt.fail (Failure "Generation count missing") + | Some g -> + Lwt.return (new_extents, g)) + >>= fun (allocation, generation) -> + let op = { + FreeAllocation.blocks = allocation; + FreeAllocation.generation = generation; + } in + Rings.FromLVM.push connected_host.Hostdb.from_LVM op >>= fun pos -> - Rings.FromLVM.p_advance connected_host.Host.from_LVM pos + Rings.FromLVM.p_advance connected_host.Hostdb.from_LVM pos + >>= fun result -> + Fist.maybe_lwt_fail Xenvm_interface.FreePool2 + >>= fun () -> + Lwt.return result let perform t = debug "%s" (Journal.Op.sexp_of_t t |> Sexplib.Sexp.to_string_hum) @@ -119,10 +147,10 @@ let perform t = match t with | Journal.Op.ExpandFree ef -> begin - match Host.get_connected_host ef.Journal.Op.host with - | Some connected_host -> + match Hostdb.get ef.Journal.Op.host with + | Hostdb.Connected connected_host -> perform_expand_free ef connected_host - | None -> + | _ -> error "Journalled entry exists, but the host does not!" end @@ -156,55 +184,81 @@ let shutdown () = | None -> return () -let resend_free_volumes () = +let send_allocation_to connected_host = fatal_error "resend_free_volumes unable to read LVM metadata" ( read (fun x -> return (`Ok x)) ) - >>= fun lvm -> - - let hosts = Host.get_connected_hosts () in - Lwt_list.iter_s - (fun (host, connected_host) -> - (* XXX: need to lock the host somehow. Ideally we would still service + >>= fun vg -> + (* XXX: need to lock the host somehow. Ideally we would still service other queues while one host is locked. *) - let from_lvm = connected_host.Host.from_LVM in - let freeid = connected_host.Host.free_LV_uuid in - let freename = connected_host.Host.free_LV in - Rings.FromLVM.p_state from_lvm - >>= function - | `Running -> return () - | `Suspended -> - let rec wait () = - Rings.FromLVM.p_state from_lvm - >>= function - | `Suspended -> - Lwt_unix.sleep 5. - >>= fun () -> - wait () - | `Running -> return () in - wait () + + let from_lvm = connected_host.Hostdb.from_LVM in + let freeid = connected_host.Hostdb.free_LV_uuid in + let freename = connected_host.Hostdb.free_LV in + + fatal_error "resend_free_volumes" + ( match try Some(Lvm.Vg.LVs.find freeid vg.Lvm.Vg.lvs) with _ -> None with + | Some lv -> return (`Ok (Lvm.Lv.to_allocation lv)) + | None -> return (`Error (`Msg (Printf.sprintf "Failed to find LV %s" freename))) ) + >>= fun allocation -> + let lv = Lvm.Vg.LVs.find freeid vg.Lvm.Vg.lvs in + (match generation_of_lv lv with + | Some g -> Lwt.return g + | None -> + error "Expecting a generation count in the tags of LV: %s" connected_host.Hostdb.free_LV >>= fun () -> - fatal_error "resend_free_volumes" - ( match try Some(Lvm.Vg.LVs.find freeid lvm.Lvm.Vg.lvs) with _ -> None with - | Some lv -> return (`Ok (Lvm.Lv.to_allocation lv)) - | None -> return (`Error (`Msg (Printf.sprintf "Failed to find LV %s" freename))) ) - >>= fun allocation -> - Rings.FromLVM.push from_lvm allocation - >>= fun pos -> - Rings.FromLVM.p_advance from_lvm pos - ) hosts + Lwt.fail (Failure "Generation count missing") + ) + >>= fun generation -> + let op = { + FreeAllocation.blocks = allocation; + FreeAllocation.generation = generation; + } in + Rings.FromLVM.push from_lvm op + >>= fun pos -> + Rings.FromLVM.p_advance from_lvm pos + + +let resend_free_volume_to connected_host = + let from_lvm = connected_host.Hostdb.from_LVM in + Rings.FromLVM.p_state from_lvm + >>= begin function + | `Running -> return () + | `Suspended -> + let rec wait () = + Rings.FromLVM.p_state from_lvm + >>= function + | `Suspended -> + Lwt_unix.sleep 0.5 + >>= fun () -> + wait () + | `Running -> return () in + wait () + >>= fun () -> + send_allocation_to connected_host + end + +let resend_free_volumes () = + let all = Hostdb.all () in + Lwt_list.iter_s + (function + | host, Hostdb.Connected connected_host -> + resend_free_volume_to connected_host + | host, _ -> + Lwt.return () + ) all let top_up_host config host connected_host = let open Config.Xenvmd in sector_size >>= fun sector_size -> read (Lwt.return) >>= fun vg -> - match try Some(Lvm.Vg.LVs.find connected_host.Host.free_LV_uuid vg.Lvm.Vg.lvs) with _ -> None with + match try Some(Lvm.Vg.LVs.find connected_host.Hostdb.free_LV_uuid vg.Lvm.Vg.lvs) with _ -> None with | Some lv -> let extent_size = vg.Lvm.Vg.extent_size in (* in sectors *) let extent_size_mib = Int64.(div (mul extent_size (of_int sector_size)) (mul 1024L 1024L)) in let size_mib = Int64.mul (Lvm.Lv.size_in_extents lv) extent_size_mib in if size_mib < config.host_low_water_mark then begin info "LV %s is %Ld MiB < low_water_mark %Ld MiB; allocating" - connected_host.Host.free_LV size_mib config.host_low_water_mark + connected_host.Hostdb.free_LV size_mib config.host_low_water_mark >>= fun () -> match !journal with | Some j -> @@ -216,15 +270,25 @@ let top_up_host config host connected_host = extra_size=config.host_allocation_quantum }) >>|= fun wait -> wait.Journal.J.sync () + >>= fun () -> + Lwt.return true | None -> error "No journal configured!" - end else return () + >>= fun () -> + Lwt.return false + end else return false | None -> error "Host has disappeared!" + >>= fun () -> + Lwt.return false let top_up_free_volumes config = - let hosts = Host.get_connected_hosts () in + let hosts = Hostdb.all () in Lwt_list.iter_s - (fun (host, connected_host) -> - top_up_host config host connected_host + (function + | host, Hostdb.Connected connected_host -> + top_up_host config host connected_host + >>= fun _ -> + Lwt.return () + | _ -> Lwt.return () ) hosts diff --git a/xenvmd/freepool.mli b/xenvmd/freepool.mli index b93ab53..52b43e9 100644 --- a/xenvmd/freepool.mli +++ b/xenvmd/freepool.mli @@ -1,4 +1,8 @@ val start : string -> unit Lwt.t val shutdown : unit -> unit Lwt.t +val top_up_host : Config.Xenvmd.t -> string -> Hostdb.connected_host -> bool Lwt.t +val send_allocation_to : Hostdb.connected_host -> unit Lwt.t +val resend_free_volume_to : Hostdb.connected_host -> unit Lwt.t val resend_free_volumes : unit -> unit Lwt.t val top_up_free_volumes : Config.Xenvmd.t -> unit Lwt.t +val create : string -> int64 -> Lvm.Vg.metadata Lwt.t diff --git a/xenvmd/host.ml b/xenvmd/host.ml index 648ba5f..407202f 100644 --- a/xenvmd/host.ml +++ b/xenvmd/host.ml @@ -1,28 +1,66 @@ open Lwt open Log -type connected_host = { - mutable state : Xenvm_interface.connection_state; - to_LVM : Rings.ToLVM.consumer; - from_LVM : Rings.FromLVM.producer; - free_LV : string; - free_LV_uuid : Lvm.Vg.LVs.key; -} +let connected_tag = "xenvm_connected" -module StringSet = Set.Make(String) -let connecting_hosts : StringSet.t ref = ref StringSet.empty -let connected_hosts : (string, connected_host) Hashtbl.t = Hashtbl.create 11 +(* Hold this mutex when actively flushing from the Rings.ToLVM queues *) +let flush_m = Lwt_mutex.create () -let get_connected_hosts () = - Hashtbl.fold (fun k v acc -> (k,v)::acc) connected_hosts [] +let flush_already_locked name = + match Hostdb.get name with + | Hostdb.Disconnected | Hostdb.Connecting -> return false + | Hostdb.Connected connected_host -> + let to_lvm = connected_host.Hostdb.to_LVM in + Rings.ToLVM.pop to_lvm + >>= fun (pos, items) -> + debug "Rings.FromLVM queue %s has %d items" name (List.length items) + >>= fun () -> + Lwt_list.iter_s (function { ExpandVolume.volume; segments } -> + debug "Expanding volume %s" volume + >>= fun () -> + Vg_io.write (fun vg -> + let id = (Lvm.Vg.LVs.find_by_name volume vg.Lvm.Vg.lvs).Lvm.Lv.id in + let free_id = connected_host.Hostdb.free_LV_uuid in + Lvm.Vg.do_op vg (Lvm.Redo.Op.(LvTransfer(free_id, id, segments))) + ) >>= fun _ -> Lwt.return () + ) items + >>= fun () -> + Rings.ToLVM.c_advance to_lvm pos + >>= fun () -> + Lwt.return (List.length items > 0) -let get_connected_host host = - try - Some (Hashtbl.find connected_hosts host) - with Not_found -> - None +let flush_one host = + Lwt_mutex.with_lock flush_m + (fun () -> flush_already_locked host) -let connected_tag = "xenvm_connected" +let flush_all () = + let all = Hostdb.all () in + Lwt_list.iter_s + (fun (h, state) -> + match state with + | Hostdb.Connected _ -> flush_one h >>= fun _ -> Lwt.return () + | _ -> return ()) all + +let service_host config host = + let rec inner n = + match Hostdb.get host with + | Hostdb.Connected connected_host -> + (* 0. Have any local allocators restarted? *) + Freepool.resend_free_volume_to connected_host + >>= fun () -> + (* 1. Do any of the host free LVs need topping up? *) + Freepool.top_up_host config host connected_host + >>= fun topped_up -> + (* 2. Are there any pending LVM updates from hosts? *) + flush_one host + >>= fun la_activity -> + let activity = topped_up || la_activity in + Lwt_unix.sleep (Errors.delayfn n) + >>= fun () -> + inner (if activity then 0 else n+1) + | _ -> + Lwt.return () in + inner 0 (* Conventional names of the metadata volumes *) let toLVM host = host ^ "-toLVM" @@ -89,15 +127,14 @@ let create name = Vg_io.Volume.disconnect disk >>= fun () -> (* Create the freeLVM LV at the end - we can use the existence of this as a flag to show that we've finished host creation *) - Vg_io.write (fun vg -> - Lvm.Vg.create vg freeLVM size ~creation_host ~creation_time - ) >>= fun _ -> + Freepool.create freeLVM size + >>= fun _ -> Vg_io.sync () end let sexp_of_exn e = Sexplib.Sexp.Atom (Printexc.to_string e) -let connect name = +let connect config name = Vg_io.myvg >>= fun vg -> info "Registering host %s" name >>= fun () -> @@ -105,26 +142,27 @@ let connect name = let fromLVM = fromLVM name in let freeLVM = freeLVM name in - let is_connecting = StringSet.mem name (!connecting_hosts) in - - begin if Hashtbl.mem connected_hosts name then begin - let connected_host = Hashtbl.find connected_hosts name in - match connected_host.state with - | Xenvm_interface.Failed msg -> - info "Connection to host %s has failed with %s: retrying" name msg - >>= fun () -> - Lwt.return true - | x -> - info "Connction to host %s in state %s" name (Jsonrpc.to_string (Xenvm_interface.rpc_of_connection_state x)) - >>= fun () -> - Lwt.return false - end else Lwt.return true end + begin match Hostdb.get name with + | Hostdb.Connected connected_host -> begin + match connected_host.Hostdb.state with + | Xenvm_interface.Failed msg -> + info "Connection to host %s has failed with %s: retrying" name msg + >>= fun () -> + Lwt.return true + | x -> + info "Connction to host %s in state %s" name (Jsonrpc.to_string (Xenvm_interface.rpc_of_connection_state x)) + >>= fun () -> + Lwt.return false + end + | Hostdb.Disconnected -> return true (* We should try again *) + | Hostdb.Connecting -> return false (* If we're already connecting, no need to do anything *) + end >>= fun try_again -> - if is_connecting || (not try_again) then begin + if (not try_again) then begin return () end else begin - connecting_hosts := StringSet.add name !connecting_hosts; + Hostdb.set name Hostdb.Connecting; match Vg_io.find vg toLVM, Vg_io.find vg fromLVM, Vg_io.find vg freeLVM with | Some toLVM_id, Some fromLVM_id, Some freeLVM_id -> (* Persist at this point that we're going to connect this host *) @@ -152,26 +190,20 @@ let connect name = | `Ok disk -> Rings.FromLVM.attach_as_producer ~name ~disk () >>= fun (initial_state, fromLVM_q) -> - let connected_host = { + let connected_host = Hostdb.({ state = Xenvm_interface.Resuming_to_LVM; from_LVM = fromLVM_q; to_LVM = toLVM_q; free_LV = freeLVM; free_LV_uuid = (Vg_io.Volume.metadata_of freeLVM_id).Lvm.Lv.id - } in - Hashtbl.replace connected_hosts name connected_host; - connecting_hosts := StringSet.remove name !connecting_hosts; + }) in + Hostdb.set name (Hostdb.Connected connected_host); ( if initial_state = `Suspended then begin - connected_host.state <- Xenvm_interface.Resending_free_blocks; + connected_host.Hostdb.state <- Xenvm_interface.Resending_free_blocks; debug "The Rings.FromLVM queue was already suspended: resending the free blocks" >>= fun () -> - let allocation = Lvm.Lv.to_allocation (Vg_io.Volume.metadata_of freeLVM_id) in - Rings.FromLVM.push fromLVM_q allocation - >>= fun pos -> - Rings.FromLVM.p_advance fromLVM_q pos - >>= fun () -> - debug "Free blocks pushed" + Freepool.send_allocation_to connected_host end else begin debug "The Rings.FromLVM queue was running: no need to resend the free blocks" end ) @@ -191,17 +223,17 @@ let connect name = (fun () -> background_t () >>= fun connected_host -> - connected_host.state <- Xenvm_interface.Connected; + connected_host.Hostdb.state <- Xenvm_interface.Connected; + let _ = service_host config name in return () ) (fun e -> let msg = Printexc.to_string e in error "Connecting to %s failed with: %s" name msg >>= fun () -> begin - try - let connected_host = Hashtbl.find connected_hosts name in - connected_host.state <- Xenvm_interface.Failed msg; - with Not_found -> () + match Hostdb.get name with + | Hostdb.Connected ch -> ch.Hostdb.state <- Xenvm_interface.Failed msg + | _ -> () end; return ()) ); @@ -209,65 +241,40 @@ let connect name = | _, _, _ -> info "At least one of host %s's volumes does not exist" name >>= fun () -> - connecting_hosts := StringSet.remove name !connecting_hosts; + Hostdb.set name Hostdb.Disconnected; fail Xenvm_interface.HostNotCreated end -(* Hold this mutex when actively flushing from the Rings.ToLVM queues *) -let flush_m = Lwt_mutex.create () - -let flush_already_locked name = - if not (Hashtbl.mem connected_hosts name) - then return () - else begin - let connected_host = Hashtbl.find connected_hosts name in - let to_lvm = connected_host.to_LVM in - Rings.ToLVM.pop to_lvm - >>= fun (pos, items) -> - debug "Rings.FromLVM queue %s has %d items" name (List.length items) - >>= fun () -> - Lwt_list.iter_s (function { ExpandVolume.volume; segments } -> - debug "Expanding volume %s" volume - >>= fun () -> - Vg_io.write (fun vg -> - let id = (Lvm.Vg.LVs.find_by_name volume vg.Lvm.Vg.lvs).Lvm.Lv.id in - let free_id = connected_host.free_LV_uuid in - Lvm.Vg.do_op vg (Lvm.Redo.Op.(LvTransfer(free_id, id, segments))) - ) >>= fun _ -> Lwt.return () - ) items - >>= fun () -> - Rings.ToLVM.c_advance to_lvm pos - end - let disconnect ~cooperative name = - if StringSet.mem name !connecting_hosts then + match Hostdb.get name with + | Hostdb.Disconnected -> return () + | Hostdb.Connecting -> fail (Xenvm_interface.(HostStillConnecting (Jsonrpc.to_string (rpc_of_connection_state Xenvm_interface.Resuming_to_LVM)))) - else - if Hashtbl.mem connected_hosts name then begin - let connected_host = Hashtbl.find connected_hosts name in - match connected_host.state with - | Xenvm_interface.Connected -> - let to_lvm = connected_host.to_LVM in - ( if cooperative then begin - debug "Cooperative disconnect: suspending Rings.ToLVM queue for %s" name - >>= fun () -> - Rings.ToLVM.suspend to_lvm - end else return () - ) >>= fun () -> - (* There may still be updates in the Rings.ToLVM queue *) - debug "Flushing Rings.ToLVM queue for %s" name - >>= fun () -> - Lwt_mutex.with_lock flush_m (fun () -> flush_already_locked name) - >>= fun () -> - let toLVM = toLVM name in - Vg_io.write (fun vg -> - Lvm.Vg.remove_tag vg toLVM connected_tag - ) >>= fun _ -> - Hashtbl.remove connected_hosts name; - return () - | x -> - fail (Xenvm_interface.(HostStillConnecting (Jsonrpc.to_string (rpc_of_connection_state x)))) - end else return () + | Hostdb.Connected connected_host -> + begin + match connected_host.Hostdb.state with + | Xenvm_interface.Connected -> + let to_lvm = connected_host.Hostdb.to_LVM in + ( if cooperative then begin + debug "Cooperative disconnect: suspending Rings.ToLVM queue for %s" name + >>= fun () -> + Rings.ToLVM.suspend to_lvm + end else return () + ) >>= fun () -> + (* There may still be updates in the Rings.ToLVM queue *) + debug "Flushing Rings.ToLVM queue for %s" name + >>= fun () -> + Lwt_mutex.with_lock flush_m (fun () -> flush_already_locked name) + >>= fun _ -> + let toLVM = toLVM name in + Vg_io.write (fun vg -> + Lvm.Vg.remove_tag vg toLVM connected_tag + ) >>= fun _ -> + Hostdb.set name Hostdb.Disconnected; + return () + | x -> + fail (Xenvm_interface.(HostStillConnecting (Jsonrpc.to_string (rpc_of_connection_state x)))) + end let destroy name = disconnect ~cooperative:false name @@ -287,36 +294,42 @@ let destroy name = Lwt.return () let all () = - let list = Hashtbl.fold (fun n c acc -> (n,c)::acc) connected_hosts [] in + let all = Hostdb.all () in Lwt_list.map_s - (fun (name, connected_host) -> - let lv = toLVM name in - let t = connected_host.to_LVM in - ( Rings.ToLVM.c_state t >>= function - | `Suspended -> return true - | `Running -> return false ) >>= fun suspended -> - Rings.ToLVM.c_debug_info t - >>= fun debug_info -> - let toLVM = { Xenvm_interface.lv; suspended; debug_info } in - let lv = fromLVM name in - let t = connected_host.from_LVM in - ( Rings.FromLVM.p_state t >>= function - | `Suspended -> return true - | `Running -> return false ) >>= fun suspended -> - Rings.FromLVM.p_debug_info t - >>= fun debug_info -> - let fromLVM = { Xenvm_interface.lv; suspended; debug_info } in - Vg_io.read (fun vg -> - try - let lv = Lvm.Vg.LVs.find_by_name (freeLVM name) vg.Lvm.Vg.lvs in - return (Lvm.Lv.size_in_extents lv) - with Not_found -> return 0L - ) >>= fun freeExtents -> - let connection_state = Some connected_host.state in - return { Xenvm_interface.name; connection_state; fromLVM; toLVM; freeExtents } - ) list + (fun (name,state) -> + match state with + | Hostdb.Connected connected_host -> + let lv = toLVM name in + let t = connected_host.Hostdb.to_LVM in + ( Rings.ToLVM.c_state t >>= function + | `Suspended -> return true + | `Running -> return false ) >>= fun suspended -> + Rings.ToLVM.c_debug_info t + >>= fun debug_info -> + let toLVM = { Xenvm_interface.lv; suspended; debug_info } in + let lv = fromLVM name in + let t = connected_host.Hostdb.from_LVM in + ( Rings.FromLVM.p_state t >>= function + | `Suspended -> return true + | `Running -> return false ) >>= fun suspended -> + Rings.FromLVM.p_debug_info t + >>= fun debug_info -> + let fromLVM = { Xenvm_interface.lv; suspended; debug_info } in + Vg_io.read (fun vg -> + try + let lv = Lvm.Vg.LVs.find_by_name (freeLVM name) vg.Lvm.Vg.lvs in + return (Lvm.Lv.size_in_extents lv) + with Not_found -> return 0L + ) >>= fun freeExtents -> + let connection_state = Some connected_host.Hostdb.state in + return (Some ({ Xenvm_interface.name; connection_state; fromLVM; toLVM; freeExtents })) + | _ -> return None + ) all + >>= fun l -> + Lwt.return (List.rev (List.fold_left (fun acc entry -> match entry with Some e -> e::acc | None -> acc) [] l)) -let reconnect_all () = + +let reconnect_all config = Vg_io.read (fun vg -> debug "Reconnecting" >>= fun () -> @@ -334,18 +347,16 @@ let reconnect_all () = vg.Lvm.Vg.lvs [] |> Lwt.return ) >>= fun host_states -> Lwt_list.iter_s (fun (host, was_connected) -> - if was_connected then connect host else disconnect ~cooperative:false host) host_states - -let flush_one host = - Lwt_mutex.with_lock flush_m - (fun () -> flush_already_locked host) - -let flush_all () = - let hosts = Hashtbl.fold (fun h _ acc -> h::acc) connected_hosts [] in - Lwt_list.iter_s flush_one hosts + if was_connected then connect config host else disconnect ~cooperative:false host) host_states let shutdown () = - let hosts = Hashtbl.fold (fun h _ acc -> h::acc) connected_hosts [] in - Lwt_list.iter_s (disconnect ~cooperative:true) hosts + let all = Hostdb.all () in + Lwt_list.iter_s + (fun (h, state) -> + match state with + | Hostdb.Connected _ -> disconnect ~cooperative:true h + | _ -> return ()) all >>= fun () -> Vg_io.sync () + + diff --git a/xenvmd/host.mli b/xenvmd/host.mli index 0bd5be7..9117d93 100644 --- a/xenvmd/host.mli +++ b/xenvmd/host.mli @@ -1,17 +1,8 @@ -type connected_host = { - mutable state : Xenvm_interface.connection_state; - to_LVM : Rings.ToLVM.consumer; - from_LVM : Rings.FromLVM.producer; - free_LV : string; - free_LV_uuid : Lvm.Vg.LVs.key; -} -val get_connected_hosts : unit -> (string * connected_host) list -val get_connected_host : string -> connected_host option val create : string -> unit Lwt.t -val connect : string -> unit Lwt.t +val connect : Config.Xenvmd.t -> string -> unit Lwt.t val disconnect : cooperative:bool -> string -> unit Lwt.t val destroy : string -> unit Lwt.t val all : unit -> Xenvm_interface.host list Lwt.t -val reconnect_all : unit -> unit Lwt.t +val reconnect_all : Config.Xenvmd.t -> unit Lwt.t val flush_all : unit -> unit Lwt.t val shutdown : unit -> unit Lwt.t diff --git a/xenvmd/hostdb.ml b/xenvmd/hostdb.ml new file mode 100644 index 0000000..a085fd0 --- /dev/null +++ b/xenvmd/hostdb.ml @@ -0,0 +1,21 @@ +type connected_host = { + mutable state : Xenvm_interface.connection_state; + to_LVM : Rings.ToLVM.consumer; + from_LVM : Rings.FromLVM.producer; + free_LV : string; + free_LV_uuid : Lvm.Vg.LVs.key; +} + +type state = + | Disconnected + | Connecting + | Connected of connected_host + +let state : (string, state) Hashtbl.t = Hashtbl.create 11 + +let get host = try Hashtbl.find state host with _ -> Disconnected +let set host st = + if st=Disconnected + then Hashtbl.remove state host + else Hashtbl.replace state host st +let all () = Hashtbl.fold (fun k v acc -> (k,v)::acc) state [] diff --git a/xenvmd/hostdb.mli b/xenvmd/hostdb.mli new file mode 100644 index 0000000..9b364a5 --- /dev/null +++ b/xenvmd/hostdb.mli @@ -0,0 +1,13 @@ +type connected_host = { + mutable state : Xenvm_interface.connection_state; + to_LVM : Rings.ToLVM.consumer; + from_LVM : Rings.FromLVM.producer; + free_LV : string; + free_LV_uuid : Lvm.Vg.LVs.key; +} +type state = Disconnected | Connecting | Connected of connected_host + +val get : string -> state +val set : string -> state -> unit +val all : unit -> (string * state) list + diff --git a/xenvmd/xenvmd.ml b/xenvmd/xenvmd.ml index d3d003a..f79e311 100644 --- a/xenvmd/xenvmd.ml +++ b/xenvmd/xenvmd.ml @@ -12,7 +12,8 @@ module Impl = struct let ignore_result _ = Lwt.return () type context = { - stoppers : (unit Lwt.u) list + stoppers : (unit Lwt.u) list; + config : Config.Xenvmd.t } let get context () = @@ -82,20 +83,25 @@ module Impl = struct module Host = struct let create context ~name = Host.create name - let connect context ~name = Host.connect name + let connect context ~name = Host.connect context.config name let disconnect context ~cooperative ~name = Host.disconnect ~cooperative name let destroy context ~name = Host.destroy name let all context () = Host.all () end + + module Fist = struct + let set context point value = return (Fist.set point value) + let list context () = return (Fist.list ()) + end end module XenvmServer = Xenvm_interface.ServerM(Impl) open Cohttp_lwt_unix -let handler ~info stoppers (ch,conn) req body = +let handler ~info stoppers config (ch,conn) req body = Cohttp_lwt_body.to_string body >>= fun bodystr -> - XenvmServer.process {Impl.stoppers} (Jsonrpc.call_of_string bodystr) >>= fun result -> + XenvmServer.process {Impl.stoppers; config} (Jsonrpc.call_of_string bodystr) >>= fun result -> Server.respond_string ~status:`OK ~body:(Jsonrpc.string_of_response result) () let maybe_write_pid config = @@ -118,8 +124,15 @@ let maybe_write_pid config = exit 1 end -let run port sock_path config = +let run port sock_path config log_filename = let t = + (match log_filename with + | Some f -> + Lwt_log.file ~mode:`Append ~file_name:f () >>= fun logger -> + Lwt_log.default := logger; + Lwt.return () + | None -> Lwt.return ()) + >>= fun () -> maybe_write_pid config >>= fun () -> info "Started with configuration: %s" (Sexplib.Sexp.to_string_hum (Config.Xenvmd.sexp_of_t config)) @@ -128,28 +141,17 @@ let run port sock_path config = >>= fun () -> Freepool.start Xenvm_interface._journal_name >>= fun () -> - Host.reconnect_all () + Host.reconnect_all config >>= fun () -> (* Create a snapshot cache of the metadata for the stats thread *) Vg_io.read return >>= fun vg -> let stats_vg_cache = ref vg in - let rec service_queues () = - (* 0. Have any local allocators restarted? *) - Freepool.resend_free_volumes () - >>= fun () -> - (* 1. Do any of the host free LVs need topping up? *) - Freepool.top_up_free_volumes config - >>= fun () -> - (* 2. Are there any pending LVM updates from hosts? *) - Host.flush_all () - >>= fun () -> - (* 3. Update the metadata snapshot for the stats collection *) + let rec service_stats () = Vg_io.read return >>= fun vg -> stats_vg_cache := vg; - Lwt_unix.sleep 5. >>= fun () -> - service_queues () in + service_stats () in (* See below for a description of 'stoppers' and 'stop' *) let service_http stoppers mode stop = @@ -161,7 +163,7 @@ let run port sock_path config = Lwt.ignore_result (debug "Listening for HTTP request on: %s\n" ty); let info = Printf.sprintf "Served by Cohttp/Lwt listening on %s" ty in let conn_closed (ch,conn) = () in - let callback = handler ~info stoppers in + let callback = handler ~info stoppers config in let c = Server.make ~callback ~conn_closed () in (* Listen for regular API calls *) Server.create ~mode ~stop c in @@ -201,7 +203,7 @@ let run port sock_path config = | None -> () end; - Lwt.join ((service_queues ())::threads) in + Lwt.join ((service_stats ())::threads) in Lwt_main.run t @@ -248,17 +250,24 @@ let daemonize config = end; Lwt_daemon.daemonize () -let main port sock_path config daemon = +let main port sock_path config daemon log = let open Config.Xenvmd in Sys.(set_signal sigterm (Signal_handle (fun _ -> exit (128+sigterm)))); let config = t_of_sexp (Sexplib.Sexp.load_sexp config) in let config = { config with listenPort = match port with None -> config.listenPort | Some x -> Some x } in let config = { config with listenPath = match sock_path with None -> config.listenPath | Some x -> Some x } in + Lwt_log.add_rule "*" Lwt_log.Debug; + let log_filename = + match log with + | Some f -> if Filename.is_relative f then (Some (Filename.concat (Unix.getcwd ()) f)) else (Some f) + | None -> None + in + if daemon then daemonize config; - run port sock_path config + run port sock_path config log_filename open Cmdliner @@ -287,13 +296,17 @@ let daemon = let doc = "Detach from the terminal and run as a daemon" in Arg.(value & flag & info ["daemon"] ~docv:"DAEMON" ~doc) +let log = + let doc = "Log to a file rather than syslog/stdout" in + Arg.(value & opt (some string) None & info [ "log" ] ~docv:"LOGFILE" ~doc) + let cmd = let doc = "Start a XenVM daemon" in let man = [ `S "EXAMPLES"; `P "TODO"; ] in - Term.(pure main $ port $ sock_path $ config $ daemon), + Term.(pure main $ port $ sock_path $ config $ daemon $ log), Term.info "xenvmd" ~version:"0.1" ~doc ~man let _ =