diff --git a/idl/xenvm_interface.ml b/idl/xenvm_interface.ml index ee32b29..084dd2d 100644 --- a/idl/xenvm_interface.ml +++ b/idl/xenvm_interface.ml @@ -32,7 +32,10 @@ external flush : name:string -> unit = "" (** [flush lv] processes all pending allocations for this LV, such that future calls to [get_lv] will return accurate metadata. *) -external shutdown : unit -> unit = "" +(** [shutdown ()] will cause xenvmd to exit shortly after returning. + The returned value is the pid of the process to enable the caller + to wait until the process has actually exitted. *) +external shutdown : unit -> int = "" type queue = { lv: string; diff --git a/setup.sh b/setup.sh index 065cd31..6a52988 100755 --- a/setup.sh +++ b/setup.sh @@ -31,7 +31,9 @@ fi cat test.xenvmd.conf.in | sed -r "s|@BIGDISK@|$LOOP|g" > test.xenvmd.conf mkdir -p /tmp/xenvm.d ./xenvm.native format $LOOP --vg djstest --configdir /tmp/xenvm.d $MOCK_ARG -./xenvmd.native --config ./test.xenvmd.conf --daemon +./xenvmd.native --config ./test.xenvmd.conf > xenvmd.log & + +sleep 2 ./xenvm.native set-vg-info --pvpath $LOOP -S /tmp/xenvmd djstest --local-allocator-path /tmp/host1-socket --uri file://local/services/xenvmd/djstest --configdir /tmp/xenvm.d $MOCK_ARG @@ -82,3 +84,6 @@ sleep 30 #echo Run 'sudo ./xenvm.native host-connect /dev/djstest host1' to connect to the local allocator' #echo Run 'sudo ./local_allocator.native' and type 'djstest-live' to request an allocation echo Run './cleanup.sh' to remove all volumes and devices + +sleep 10 + diff --git a/test.sh b/test.sh index 2da696d..340e3f0 100755 --- a/test.sh +++ b/test.sh @@ -35,9 +35,11 @@ fi cat test.xenvmd.conf.in | sed -r "s|@BIGDISK@|$LOOP|g" > test.xenvmd.conf mkdir -p /etc/xenvm.d BISECT_FILE=_build/xenvm.coverage ./xenvm.native format $LOOP --vg djstest $MOCK_ARG -BISECT_FILE=_build/xenvmd.coverage ./xenvmd.native --config ./test.xenvmd.conf --daemon +BISECT_FILE=_build/xenvmd.coverage ./xenvmd.native --config ./test.xenvmd.conf > xenvmd.log & export BISECT_FILE=_build/xenvm.coverage +sleep 2 + ./xenvm.native set-vg-info --pvpath $LOOP -S /tmp/xenvmd djstest --local-allocator-path /tmp/xenvm-local-allocator --uri file://local/services/xenvmd/djstest $MOCK_ARG #./xenvm.native benchmark /dev/djstest $MOCK_ARG diff --git a/xenvm/xenvm.ml b/xenvm/xenvm.ml index 2c38a93..a749a40 100644 --- a/xenvm/xenvm.ml +++ b/xenvm/xenvm.ml @@ -124,28 +124,20 @@ let host_list copts (vg_name,_) = Lwt_main.run t let shutdown copts (vg_name,_) = + let is_alive pid = + try Unix.kill pid 0; true with _ -> false + in + let lwt_while guard body = + let rec inner () = + if guard () then body () >>= inner else Lwt.return () + in inner () + in let t = get_vg_info_t copts vg_name >>= fun info -> set_uri copts info; - Client.shutdown () - >>= fun () -> - (* wait for the daemon to disappear *) - let finished = ref false in - let rec wait () = - Lwt.catch - (fun () -> - Client.Host.all () - >>= fun _ -> - stderr "Xenvmd is still alive: will sleep 5s and try again" - >>= fun () -> - Lwt_unix.sleep 5. - ) (fun _ -> - finished := true; - return ()) - >>= fun () -> - if !finished then return () else wait () in - wait () in - Lwt_main.run t + Client.shutdown () >>= fun pid -> + lwt_while (fun () -> is_alive pid) (fun () -> Lwt_unix.sleep 1.0) + in Lwt_main.run t let benchmark copts (vg_name,_) = let t = diff --git a/xenvmd/xenvmd.ml b/xenvmd/xenvmd.ml index 31c887c..4719ffc 100644 --- a/xenvmd/xenvmd.ml +++ b/xenvmd/xenvmd.ml @@ -597,7 +597,9 @@ module Impl = struct let fail = Lwt.fail let handle_failure = Lwt.catch - type context = unit + type context = { + stoppers : (unit Lwt.u) list + } let get context () = fatal_error "get" (VolumeManager.read (fun x -> return (`Ok x))) @@ -650,6 +652,7 @@ module Impl = struct VolumeManager.flush_all () let shutdown context () = + List.iter (fun u -> Lwt.wakeup u ()) context.stoppers; VolumeManager.shutdown () >>= fun () -> FreePool.shutdown () @@ -658,7 +661,7 @@ module Impl = struct Lwt_unix.sleep 1. >>= fun () -> exit 0 in - return () + return (Unix.getpid ()) module Host = struct let create context ~name = VolumeManager.Host.create name @@ -674,64 +677,23 @@ module XenvmServer = Xenvm_interface.ServerM(Impl) open Cohttp_lwt_unix -let handler ~info (ch,conn) req body = +let handler ~info stoppers (ch,conn) req body = Cohttp_lwt_body.to_string body >>= fun bodystr -> - XenvmServer.process () (Jsonrpc.call_of_string bodystr) >>= fun result -> + XenvmServer.process {Impl.stoppers} (Jsonrpc.call_of_string bodystr) >>= fun result -> Server.respond_string ~status:`OK ~body:(Jsonrpc.string_of_response result) () -let run port sock_path config daemon = - let config = Config.t_of_sexp (Sexplib.Sexp.load_sexp config) in - let config = { config with Config.listenPort = match port with None -> config.Config.listenPort | Some x -> Some x } in - let config = { config with Config.listenPath = match sock_path with None -> config.Config.listenPath | Some x -> Some x } in - - (* Ideally we would bind our sockets before daemonizing to avoid racing - with the next command, but the conduit API doesn't let us pass a socket - in. Instead we daemonize in a fork()ed child, and in the parent we wait - for a connect() to succeed. *) - if Unix.fork () <> 0 then begin - let started = ref false in - let rec wait remaining = - if remaining = 0 then begin - Printf.fprintf stderr "Failed to communicate with xenvmd: check the configuration and try again.\n%!"; - exit 1; - end; - begin match config.Config.listenPort with - | Some port -> - let s = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in - (try - Unix.connect s (Unix.ADDR_INET(Unix.inet_addr_of_string "127.0.0.1", port)); - Unix.close s; - started := true - with _ -> - Unix.close s) - | None -> () - end; - begin match config.Config.listenPath with - | Some path -> - let s = Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in - (try - Unix.connect s (Unix.ADDR_UNIX path); - Unix.close s; - started := true - with e -> - Unix.close s) - | None -> () - end; - if not !started then begin - Unix.sleep 1; - wait (remaining - 1) - end in - wait 30; - exit 0 - end; - if daemon then Lwt_daemon.daemonize (); - ( match config.Config.listenPath with - | None -> +let maybe_write_pid config = + match config.Config.listenPath with + | None -> (* don't need a lock file because we'll fail to bind to the port *) - () - | Some path -> - info "Writing pidfile to %s" path; - Pidfile.write_pid (path ^ ".lock") ); + () + | Some path -> + info "Writing pidfile to %s" path; + Pidfile.write_pid (path ^ ".lock") + +let run port sock_path config = + maybe_write_pid config; + let t = info "Started with configuration: %s" (Sexplib.Sexp.to_string_hum (Config.sexp_of_t config)); VolumeManager.vgopen ~devices:config.Config.devices @@ -756,7 +718,8 @@ let run port sock_path config daemon = >>= fun () -> service_queues () in - let service_http mode = + (* See below for a description of 'stoppers' and 'stop' *) + let service_http stoppers mode stop = let ty = match mode with | `TCP (`Port x) -> Printf.sprintf "TCP port %d" x | `Unix_domain_socket (`File p) -> Printf.sprintf "Unix domain socket '%s'" p @@ -765,10 +728,10 @@ let run port sock_path config daemon = Printf.printf "Listening for HTTP request on: %s\n" ty; let info = Printf.sprintf "Served by Cohttp/Lwt listening on %s" ty in let conn_closed (ch,conn) = () in - let callback = handler ~info in + let callback = handler ~info stoppers in let c = Server.make ~callback ~conn_closed () in (* Listen for regular API calls *) - Server.create ~mode c in + Server.create ~mode ~stop c in let tcp_mode = @@ -787,12 +750,74 @@ let run port sock_path config daemon = Lwt.return [] end >>= fun unix_mode -> - let threads = List.map service_http (tcp_mode @ unix_mode) in + let services = tcp_mode @ unix_mode in + + (* stoppers here is a list of type (unit Lwt.u) list, and 'stops' + is a list of type (unit Lwt.t). Each of the listening Cohttp + servers is given one of the 'stop' threads, and the whole + 'stoppers' list is passed to every handler. When a 'shutdown' + is issued, whichever server received the call to shutdown can + use the 'stoppers' list to shutdown each of the listeners so + they no longer react to API calls. *) + let stops,stoppers = List.map (fun _ -> Lwt.wait ()) services |> List.split in + let threads = List.map2 (service_http stoppers) (tcp_mode @ unix_mode) stops in Lwt.join ((service_queues ())::threads) in Lwt_main.run t +let daemonize config = + (* Ideally we would bind our sockets before daemonizing to avoid racing + with the next command, but the conduit API doesn't let us pass a socket + in. Instead we daemonize in a fork()ed child, and in the parent we wait + for a connect() to succeed. *) + if Unix.fork () <> 0 then begin + let started = ref false in + let rec wait remaining = + if remaining = 0 then begin + Printf.fprintf stderr "Failed to communicate with xenvmd: check the configuration and try again.\n%!"; + exit 1; + end; + begin match config.Config.listenPort with + | Some port -> + let s = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in + (try + Unix.connect s (Unix.ADDR_INET(Unix.inet_addr_of_string "127.0.0.1", port)); + Unix.close s; + started := true + with _ -> + Unix.close s) + | None -> () + end; + begin match config.Config.listenPath with + | Some path -> + let s = Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in + (try + Unix.connect s (Unix.ADDR_UNIX path); + Unix.close s; + started := true + with e -> + Unix.close s) + | None -> () + end; + if not !started then begin + Unix.sleep 1; + wait (remaining - 1) + end in + wait 30; + exit 0 + end; + Lwt_daemon.daemonize () + +let main port sock_path config daemon = + let config = Config.t_of_sexp (Sexplib.Sexp.load_sexp config) in + let config = { config with Config.listenPort = match port with None -> config.Config.listenPort | Some x -> Some x } in + let config = { config with Config.listenPath = match sock_path with None -> config.Config.listenPath | Some x -> Some x } in + + if daemon then daemonize config; + + run port sock_path config + open Cmdliner let info = @@ -826,7 +851,7 @@ let cmd = `S "EXAMPLES"; `P "TODO"; ] in - Term.(pure run $ port $ sock_path $ config $ daemon), + Term.(pure main $ port $ sock_path $ config $ daemon), Term.info "xenvmd" ~version:"0.1" ~doc ~man let _ =