Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 43 additions & 18 deletions main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -765,8 +765,7 @@ let rec diff a b = match a with
| a :: aa ->
if List.mem b a then diff aa b else a :: (diff aa b)

let watch_volume_plugins ~root_dir ~switch_path =
let root_dir = Filename.concat root_dir "volume" in
let watch_volume_plugins ~root_dir ~switch_path ~pipe =
let create switch_path root_dir name =
if Hashtbl.mem servers name
then return ()
Expand Down Expand Up @@ -794,11 +793,8 @@ let watch_volume_plugins ~root_dir ~switch_path =
Deferred.all_ignore (List.map ~f:(create switch_path root_dir) (diff needed got_already))
>>= fun () ->
Deferred.all_ignore (List.map ~f:(destroy switch_path) (diff got_already needed)) in
Async_inotify.create ~recursive:false ~watch_new_dirs:false root_dir
>>= fun (watch, _) ->
sync ~root_dir ~switch_path
>>= fun () ->
let pipe = Async_inotify.pipe watch in
let open Async_inotify.Event in
let rec loop () =
( Pipe.read pipe >>= function
Expand All @@ -823,8 +819,7 @@ let watch_volume_plugins ~root_dir ~switch_path =
loop () in
loop ()

let watch_datapath_plugins ~root_dir =
let root_dir = Filename.concat root_dir "datapath" in
let watch_datapath_plugins ~root_dir ~pipe =
let sync ~root_dir =
Sys.readdir root_dir
>>= fun names ->
Expand All @@ -833,11 +828,8 @@ let watch_datapath_plugins ~root_dir =
Deferred.all_ignore (List.map ~f:(Datapath_plugins.register root_dir) (diff needed got_already))
>>= fun () ->
Deferred.all_ignore (List.map ~f:(Datapath_plugins.unregister root_dir) (diff got_already needed)) in
Async_inotify.create ~recursive:false ~watch_new_dirs:false root_dir
>>= fun (watch, _) ->
sync ~root_dir
>>= fun () ->
let pipe = Async_inotify.pipe watch in
let open Async_inotify.Event in
let rec loop () =
( Pipe.read pipe >>= function
Expand Down Expand Up @@ -865,14 +857,32 @@ let watch_datapath_plugins ~root_dir =
let main ~root_dir ~state_path ~switch_path =
Attached_SRs.reload state_path
>>= fun () ->
Deferred.all_unit [
watch_volume_plugins ~root_dir ~switch_path;
watch_datapath_plugins ~root_dir
]
let datapath_root = Filename.concat root_dir "datapath" in
Async_inotify.create ~recursive:false ~watch_new_dirs:false datapath_root
>>= fun (watch, _) ->
let datapath = Async_inotify.pipe watch in
let volume_root = Filename.concat root_dir "volume" in
Async_inotify.create ~recursive:false ~watch_new_dirs:false volume_root
>>= fun (watch, _) ->
let volume = Async_inotify.pipe watch in

let main ~root_dir ~state_path ~switch_path =
let (_: unit Deferred.t) = main ~root_dir ~state_path ~switch_path in
never_returns (Scheduler.go ())
let rec loop () =
Monitor.try_with
(fun () ->
Deferred.all_unit [
watch_volume_plugins ~root_dir:volume_root ~switch_path ~pipe:volume;
watch_datapath_plugins ~root_dir:datapath_root ~pipe:datapath
]
)
>>= function
| Ok () ->
info "main thread shutdown cleanly";
return ()
| Error x ->
error "main thread failed with %s" (Exn.to_string x);
Clock.after (Time.Span.of_sec 5.) >>= fun () ->
loop () in
loop ()

open Xcp_service

Expand Down Expand Up @@ -917,5 +927,20 @@ let _ =
use_syslog := true;
info "Daemonisation successful.";
end;
main ~root_dir:!root_dir ~state_path:!state_path ~switch_path:!Xcp_client.switch_path
let (_: unit Deferred.t) =
let rec loop () =
Monitor.try_with
(fun () ->
main ~root_dir:!root_dir ~state_path:!state_path ~switch_path:!Xcp_client.switch_path
)
>>= function
| Ok () ->
info "main thread shutdown cleanly";
return ()
| Error x ->
error "main thread failed with %s" (Exn.to_string x);
Clock.after (Time.Span.of_sec 5.) >>= fun () ->
loop () in
loop () in
never_returns (Scheduler.go ())