Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
0.3.0 (24-Apr-2015)
- Update to message-switch.0.11.0

0.2.0 (4-Apr-2015)
- Update to SMAPIv2 with o_direct, o_direct_reason
- Update to using json marshalling for exception backtraces
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.2.0
0.3.0
4 changes: 2 additions & 2 deletions _oasis
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
OASISFormat: 0.2
Name: xapi-script-storage
Version: 0.1
Version: 0.3
Synopsis: Adapter which allows xapi to call storage scripts
Authors: David Scott
License: LGPL-2.1 with OCaml linking exception
Expand All @@ -13,4 +13,4 @@ Executable xapi_script_storage
MainIs: main.ml
Custom: true
Install: false
BuildDepends: xcp, xcp.storage, async_inotify, threads, message_switch.async, rpclib, xapi-storage, sexplib, sexplib.syntax, rpclib, rpclib.syntax
BuildDepends: xcp, xcp.storage, async_inotify, threads, message_switch.async (>= 0.11.0), rpclib, xapi-storage, sexplib, sexplib.syntax, rpclib, rpclib.syntax
54 changes: 34 additions & 20 deletions main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -450,46 +450,60 @@ let process root_dir name x =
(* Active servers, one per sub-directory of the root_dir *)
let servers = String.Table.create () ~size:4

let create switch_port root_dir name =
(* XXX: need a better error-handling strategy *)
let get_ok = function
| `Ok x -> x
| `Error e ->
let b = Buffer.create 16 in
let fmt = Format.formatter_of_buffer b in
Protocol_unix.Server.pp_error fmt e;
Format.pp_print_flush fmt ();
failwith (Buffer.contents b)

let create switch_path root_dir name =
if Hashtbl.mem servers name
then return ()
else begin
info "Adding %s" name
>>= fun () ->
Protocol_async.M.connect switch_port >>= fun c ->
let server = Protocol_async.Server.listen (process root_dir name) c (Filename.basename name) in
Protocol_async.Server.listen ~process:(process root_dir name) ~switch:switch_path ~queue:(Filename.basename name) ()
>>= fun result ->
let server = get_ok result in
Hashtbl.add_exn servers name server;
return ()
end

let destroy switch_port name =
let destroy switch_path name =
info "Removing %s" name
>>= fun () ->
Protocol_async.M.connect switch_port >>= fun c ->
Hashtbl.remove servers name;
return ()
if Hashtbl.mem servers name then begin
let t = Hashtbl.find_exn servers name in
Protocol_async.Server.shutdown ~t () >>= fun () ->
Hashtbl.remove servers name;
return ()
end else return ()

let rec diff a b = match a with
| [] -> []
| a :: aa ->
if List.mem b a then diff aa b else a :: (diff aa b)

(* Ensure the right servers are started *)
let sync ~root_dir ~switch_port =
let sync ~root_dir ~switch_path =
Sys.readdir root_dir
>>= fun names ->
let needed : string list = Array.to_list names in
let got_already : string list = Hashtbl.keys servers in
Deferred.all_ignore (List.map ~f:(create switch_port root_dir) (diff needed got_already))
Deferred.all_ignore (List.map ~f:(create switch_path root_dir) (diff needed got_already))
>>= fun () ->
Deferred.all_ignore (List.map ~f:(destroy switch_port) (diff got_already needed))
Deferred.all_ignore (List.map ~f:(destroy switch_path) (diff got_already needed))

let main ~root_dir ~switch_port =
let main ~root_dir ~switch_path =
(* We watch and create queues for the Volume plugins only *)
let root_dir = Filename.concat root_dir "volume" in
Async_inotify.create ~recursive:false ~watch_new_dirs:false root_dir
>>= fun (watch, _) ->
sync ~root_dir ~switch_port
sync ~root_dir ~switch_path
>>= fun () ->
let pipe = Async_inotify.pipe watch in
let open Async_inotify.Event in
Expand All @@ -501,24 +515,24 @@ let main ~root_dir ~switch_port =
Shutdown.exit 1
| `Ok (Created path)
| `Ok (Moved (Into path)) ->
create switch_port root_dir (Filename.basename path)
create switch_path root_dir (Filename.basename path)
| `Ok (Unlinked path)
| `Ok (Moved (Away path)) ->
destroy switch_port (Filename.basename path)
destroy switch_path (Filename.basename path)
| `Ok (Modified _) ->
return ()
| `Ok (Moved (Move (path_a, path_b))) ->
destroy switch_port (Filename.basename path_a)
destroy switch_path (Filename.basename path_a)
>>= fun () ->
create switch_port root_dir (Filename.basename path_b)
create switch_path root_dir (Filename.basename path_b)
| `Ok Queue_overflow ->
sync ~root_dir ~switch_port
sync ~root_dir ~switch_path
) >>= fun () ->
loop () in
loop ()

let main ~root_dir ~switch_port =
let (_: unit Deferred.t) = main ~root_dir ~switch_port in
let main ~root_dir ~switch_path =
let (_: unit Deferred.t) = main ~root_dir ~switch_path in
never_returns (Scheduler.go ())

open Xcp_service
Expand Down Expand Up @@ -558,5 +572,5 @@ let _ =
use_syslog := true;
Core.Syslog.openlog ~id:"xapi-storage-script" ~facility:Core.Syslog.Facility.DAEMON ();
end;
main ~root_dir:!root_dir ~switch_port:!Xcp_client.switch_port
main ~root_dir:!root_dir ~switch_path:!Xcp_client.switch_path

12 changes: 7 additions & 5 deletions setup.ml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(* setup.ml generated for the first time by OASIS v0.4.4 *)

(* OASIS_START *)
(* DO NOT EDIT (digest: 357e519346d191c046f831856e592460) *)
(* DO NOT EDIT (digest: 2cebb0f2ebccebc1740714c72972c587) *)
(*
Regenerated by OASIS v0.4.5
Visit http://oasis.forge.ocamlcore.org for more information and
Expand Down Expand Up @@ -6673,7 +6673,7 @@ let setup_t =
alpha_features = [];
beta_features = [];
name = "xapi-script-storage";
version = "0.1";
version = "0.3";
license =
OASISLicense.DEP5License
(OASISLicense.DEP5Unit
Expand Down Expand Up @@ -6743,7 +6743,9 @@ let setup_t =
FindlibPackage ("xcp.storage", None);
FindlibPackage ("async_inotify", None);
FindlibPackage ("threads", None);
FindlibPackage ("message_switch.async", None);
FindlibPackage
("message_switch.async",
Some (OASISVersion.VGreaterEqual "0.11.0"));
FindlibPackage ("rpclib", None);
FindlibPackage ("xapi-storage", None);
FindlibPackage ("sexplib", None);
Expand All @@ -6770,14 +6772,14 @@ let setup_t =
};
oasis_fn = Some "_oasis";
oasis_version = "0.4.5";
oasis_digest = Some "#8c\151\238J\181\240\231Z\r\141PQ\248h";
oasis_digest = Some "\223w\129*\2508n\224(\131\015\249\232\225\232\031";
oasis_exec = None;
oasis_setup_args = [];
setup_update = false
};;

let setup () = BaseSetup.setup setup_t;;

# 6782 "setup.ml"
# 6784 "setup.ml"
(* OASIS_STOP *)
let () = setup ();;