Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion idl/xenvm_interface.ml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ external flush : name:string -> unit = ""
(** [flush lv] processes all pending allocations for this LV, such that
future calls to [get_lv] will return accurate metadata. *)

external shutdown : unit -> unit = ""
(** [shutdown ()] will cause xenvmd to exit shortly after returning.
The returned value is the pid of the process to enable the caller
to wait until the process has actually exitted. *)
external shutdown : unit -> int = ""

type queue = {
lv: string;
Expand Down
7 changes: 6 additions & 1 deletion setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ fi
cat test.xenvmd.conf.in | sed -r "s|@BIGDISK@|$LOOP|g" > test.xenvmd.conf
mkdir -p /tmp/xenvm.d
./xenvm.native format $LOOP --vg djstest --configdir /tmp/xenvm.d $MOCK_ARG
./xenvmd.native --config ./test.xenvmd.conf --daemon
./xenvmd.native --config ./test.xenvmd.conf > xenvmd.log &

sleep 2

./xenvm.native set-vg-info --pvpath $LOOP -S /tmp/xenvmd djstest --local-allocator-path /tmp/host1-socket --uri file://local/services/xenvmd/djstest --configdir /tmp/xenvm.d $MOCK_ARG

Expand Down Expand Up @@ -82,3 +84,6 @@ sleep 30
#echo Run 'sudo ./xenvm.native host-connect /dev/djstest host1' to connect to the local allocator'
#echo Run 'sudo ./local_allocator.native' and type 'djstest-live' to request an allocation
echo Run './cleanup.sh' to remove all volumes and devices

sleep 10

4 changes: 3 additions & 1 deletion test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ fi
cat test.xenvmd.conf.in | sed -r "s|@BIGDISK@|$LOOP|g" > test.xenvmd.conf
mkdir -p /etc/xenvm.d
BISECT_FILE=_build/xenvm.coverage ./xenvm.native format $LOOP --vg djstest $MOCK_ARG
BISECT_FILE=_build/xenvmd.coverage ./xenvmd.native --config ./test.xenvmd.conf --daemon
BISECT_FILE=_build/xenvmd.coverage ./xenvmd.native --config ./test.xenvmd.conf > xenvmd.log &
export BISECT_FILE=_build/xenvm.coverage

sleep 2

./xenvm.native set-vg-info --pvpath $LOOP -S /tmp/xenvmd djstest --local-allocator-path /tmp/xenvm-local-allocator --uri file://local/services/xenvmd/djstest $MOCK_ARG

#./xenvm.native benchmark /dev/djstest $MOCK_ARG
Expand Down
30 changes: 11 additions & 19 deletions xenvm/xenvm.ml
Original file line number Diff line number Diff line change
Expand Up @@ -124,28 +124,20 @@ let host_list copts (vg_name,_) =
Lwt_main.run t

let shutdown copts (vg_name,_) =
let is_alive pid =
try Unix.kill pid 0; true with _ -> false
in
let lwt_while guard body =
let rec inner () =
if guard () then body () >>= inner else Lwt.return ()
in inner ()
in
let t =
get_vg_info_t copts vg_name >>= fun info ->
set_uri copts info;
Client.shutdown ()
>>= fun () ->
(* wait for the daemon to disappear *)
let finished = ref false in
let rec wait () =
Lwt.catch
(fun () ->
Client.Host.all ()
>>= fun _ ->
stderr "Xenvmd is still alive: will sleep 5s and try again"
>>= fun () ->
Lwt_unix.sleep 5.
) (fun _ ->
finished := true;
return ())
>>= fun () ->
if !finished then return () else wait () in
wait () in
Lwt_main.run t
Client.shutdown () >>= fun pid ->
lwt_while (fun () -> is_alive pid) (fun () -> Lwt_unix.sleep 1.0)
in Lwt_main.run t

let benchmark copts (vg_name,_) =
let t =
Expand Down
147 changes: 86 additions & 61 deletions xenvmd/xenvmd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,9 @@ module Impl = struct
let fail = Lwt.fail
let handle_failure = Lwt.catch

type context = unit
type context = {
stoppers : (unit Lwt.u) list
}

let get context () =
fatal_error "get" (VolumeManager.read (fun x -> return (`Ok x)))
Expand Down Expand Up @@ -650,6 +652,7 @@ module Impl = struct
VolumeManager.flush_all ()

let shutdown context () =
List.iter (fun u -> Lwt.wakeup u ()) context.stoppers;
VolumeManager.shutdown ()
>>= fun () ->
FreePool.shutdown ()
Expand All @@ -658,7 +661,7 @@ module Impl = struct
Lwt_unix.sleep 1.
>>= fun () ->
exit 0 in
return ()
return (Unix.getpid ())

module Host = struct
let create context ~name = VolumeManager.Host.create name
Expand All @@ -674,64 +677,23 @@ module XenvmServer = Xenvm_interface.ServerM(Impl)

open Cohttp_lwt_unix

let handler ~info (ch,conn) req body =
let handler ~info stoppers (ch,conn) req body =
Cohttp_lwt_body.to_string body >>= fun bodystr ->
XenvmServer.process () (Jsonrpc.call_of_string bodystr) >>= fun result ->
XenvmServer.process {Impl.stoppers} (Jsonrpc.call_of_string bodystr) >>= fun result ->
Server.respond_string ~status:`OK ~body:(Jsonrpc.string_of_response result) ()

let run port sock_path config daemon =
let config = Config.t_of_sexp (Sexplib.Sexp.load_sexp config) in
let config = { config with Config.listenPort = match port with None -> config.Config.listenPort | Some x -> Some x } in
let config = { config with Config.listenPath = match sock_path with None -> config.Config.listenPath | Some x -> Some x } in

(* Ideally we would bind our sockets before daemonizing to avoid racing
with the next command, but the conduit API doesn't let us pass a socket
in. Instead we daemonize in a fork()ed child, and in the parent we wait
for a connect() to succeed. *)
if Unix.fork () <> 0 then begin
let started = ref false in
let rec wait remaining =
if remaining = 0 then begin
Printf.fprintf stderr "Failed to communicate with xenvmd: check the configuration and try again.\n%!";
exit 1;
end;
begin match config.Config.listenPort with
| Some port ->
let s = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
(try
Unix.connect s (Unix.ADDR_INET(Unix.inet_addr_of_string "127.0.0.1", port));
Unix.close s;
started := true
with _ ->
Unix.close s)
| None -> ()
end;
begin match config.Config.listenPath with
| Some path ->
let s = Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in
(try
Unix.connect s (Unix.ADDR_UNIX path);
Unix.close s;
started := true
with e ->
Unix.close s)
| None -> ()
end;
if not !started then begin
Unix.sleep 1;
wait (remaining - 1)
end in
wait 30;
exit 0
end;
if daemon then Lwt_daemon.daemonize ();
( match config.Config.listenPath with
| None ->
let maybe_write_pid config =
match config.Config.listenPath with
| None ->
(* don't need a lock file because we'll fail to bind to the port *)
()
| Some path ->
info "Writing pidfile to %s" path;
Pidfile.write_pid (path ^ ".lock") );
()
| Some path ->
info "Writing pidfile to %s" path;
Pidfile.write_pid (path ^ ".lock")

let run port sock_path config =
maybe_write_pid config;

let t =
info "Started with configuration: %s" (Sexplib.Sexp.to_string_hum (Config.sexp_of_t config));
VolumeManager.vgopen ~devices:config.Config.devices
Expand All @@ -756,7 +718,8 @@ let run port sock_path config daemon =
>>= fun () ->
service_queues () in

let service_http mode =
(* See below for a description of 'stoppers' and 'stop' *)
let service_http stoppers mode stop =
let ty = match mode with
| `TCP (`Port x) -> Printf.sprintf "TCP port %d" x
| `Unix_domain_socket (`File p) -> Printf.sprintf "Unix domain socket '%s'" p
Expand All @@ -765,10 +728,10 @@ let run port sock_path config daemon =
Printf.printf "Listening for HTTP request on: %s\n" ty;
let info = Printf.sprintf "Served by Cohttp/Lwt listening on %s" ty in
let conn_closed (ch,conn) = () in
let callback = handler ~info in
let callback = handler ~info stoppers in
let c = Server.make ~callback ~conn_closed () in
(* Listen for regular API calls *)
Server.create ~mode c in
Server.create ~mode ~stop c in


let tcp_mode =
Expand All @@ -787,12 +750,74 @@ let run port sock_path config daemon =
Lwt.return []
end >>= fun unix_mode ->

let threads = List.map service_http (tcp_mode @ unix_mode) in
let services = tcp_mode @ unix_mode in

(* stoppers here is a list of type (unit Lwt.u) list, and 'stops'
is a list of type (unit Lwt.t). Each of the listening Cohttp
servers is given one of the 'stop' threads, and the whole
'stoppers' list is passed to every handler. When a 'shutdown'
is issued, whichever server received the call to shutdown can
use the 'stoppers' list to shutdown each of the listeners so
they no longer react to API calls. *)
let stops,stoppers = List.map (fun _ -> Lwt.wait ()) services |> List.split in
let threads = List.map2 (service_http stoppers) (tcp_mode @ unix_mode) stops in

Lwt.join ((service_queues ())::threads) in

Lwt_main.run t

let daemonize config =
(* Ideally we would bind our sockets before daemonizing to avoid racing
with the next command, but the conduit API doesn't let us pass a socket
in. Instead we daemonize in a fork()ed child, and in the parent we wait
for a connect() to succeed. *)
if Unix.fork () <> 0 then begin
let started = ref false in
let rec wait remaining =
if remaining = 0 then begin
Printf.fprintf stderr "Failed to communicate with xenvmd: check the configuration and try again.\n%!";
exit 1;
end;
begin match config.Config.listenPort with
| Some port ->
let s = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
(try
Unix.connect s (Unix.ADDR_INET(Unix.inet_addr_of_string "127.0.0.1", port));
Unix.close s;
started := true
with _ ->
Unix.close s)
| None -> ()
end;
begin match config.Config.listenPath with
| Some path ->
let s = Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in
(try
Unix.connect s (Unix.ADDR_UNIX path);
Unix.close s;
started := true
with e ->
Unix.close s)
| None -> ()
end;
if not !started then begin
Unix.sleep 1;
wait (remaining - 1)
end in
wait 30;
exit 0
end;
Lwt_daemon.daemonize ()

let main port sock_path config daemon =
let config = Config.t_of_sexp (Sexplib.Sexp.load_sexp config) in
let config = { config with Config.listenPort = match port with None -> config.Config.listenPort | Some x -> Some x } in
let config = { config with Config.listenPath = match sock_path with None -> config.Config.listenPath | Some x -> Some x } in

if daemon then daemonize config;

run port sock_path config

open Cmdliner

let info =
Expand Down Expand Up @@ -826,7 +851,7 @@ let cmd =
`S "EXAMPLES";
`P "TODO";
] in
Term.(pure run $ port $ sock_path $ config $ daemon),
Term.(pure main $ port $ sock_path $ config $ daemon),
Term.info "xenvmd" ~version:"0.1" ~doc ~man

let _ =
Expand Down