Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TAMPA] rrdd and squeezed: use new xenstore-watching, not polling. #1621

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ocaml/rrdd/OMakefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ OCAMLINCLUDES = \
interface
# ocaml/xapi only needed for xapi_fist : should move xapi_first to libs
OCAMLPACKS = oclock xml-light2 stunnel http-svr xenctrl xenctrlext xenstore
OCAML_LIBS = $(ROOT)/ocaml/fhs ../idl/ocaml_backend/xapi_client
OCAML_LIBS = $(ROOT)/ocaml/fhs ../idl/ocaml_backend/xapi_client ../xenops/xenstore_watch
# ../xenops/xenops_client

UseCamlp4(rpc-light.syntax, rrdd_server)
Expand Down
48 changes: 46 additions & 2 deletions ocaml/rrdd/rrdd_main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,48 @@ let uuid_of_domid domains domid =
with Not_found ->
failwith (Printf.sprintf "Failed to find uuid corresponding to domid: %d" domid)

(*****************************************************)
(* xenstore related code *)
(*****************************************************)

open Xenstore_watch

(* Map from domid to the latest seen meminfo_free value *)
let current_meminfofree_values = ref IntMap.empty

let meminfo_path domid = Printf.sprintf "/local/domain/%d/data/meminfo_free" domid

module Meminfo = struct
let interesting_paths_for_domain domid uuid = [ meminfo_path domid ]

let domain_appeared _ _ _ = ()
let domain_disappeared _ _ _ = ()
let unmanaged_domain domid id = false
let found_running_domain domid id = ()

let fire_event_on_vm xs domid domains =
let d = int_of_string domid in
if not(IntMap.mem d domains)
then debug "Ignoring watch on shutdown domain %d" d
else
let path = meminfo_path d in
try
let meminfo_free = Int64.of_string (xs.Xs.read path) in
debug "memfree has changed to %Ld in domain %d" meminfo_free d;
current_meminfofree_values := IntMap.add d meminfo_free !current_meminfofree_values
with Xenbus.Xb.Noent ->
debug "Couldn't read path %s; forgetting last known memfree value for domain %d" path d;
current_meminfofree_values := IntMap.remove d !current_meminfofree_values

let watch_fired xc xs path domains _ =
match List.filter (fun x -> x <> "") (Stringext.String.split '/' path) with
| "local" :: "domain" :: domid :: "data" :: "meminfo_free" :: [] ->
fire_event_on_vm xs domid domains
| _ -> debug "Ignoring unexpected watch: %s" path
end

module Watcher = WatchXenstore(Meminfo)

(*****************************************************)
(* cpu related code *)
(*****************************************************)
Expand Down Expand Up @@ -241,8 +283,7 @@ let update_memory xc doms =
in
let other_ds =
try
let memfree_xs_key = Printf.sprintf "/local/domain/%d/data/meminfo_free" domid in
let mem_free = with_xs (fun xs -> Int64.of_string (xs.Xs.read memfree_xs_key)) in
let mem_free = IntMap.find domid !current_meminfofree_values in
Some (
VM uuid,
ds_make ~name:"memory_internal_free" ~units:"B"
Expand Down Expand Up @@ -653,6 +694,9 @@ let _ =
debug "Starting the HTTP server ..";
start (Rrdd_interface.xmlrpc_path, Rrdd_interface.http_fwd_path) Server.process;

debug "Starting xenstore-watching thread ..";
let (_: Thread.t) = Watcher.create_watcher_thread () in

debug "Creating monitoring loop thread ..";
Debug.with_thread_associated "main" monitor_loop ();

Expand Down
145 changes: 50 additions & 95 deletions ocaml/xenops/squeeze_xen.ml
Original file line number Diff line number Diff line change
Expand Up @@ -87,102 +87,57 @@ module Domain = struct
Hashtbl.remove cache domid;
None

let _introduceDomain = "@introduceDomain"
let _releaseDomain = "@releaseDomain"
let watch_xenstore () =
with_xc_and_xs
(fun xc xs ->
let interesting_paths = [
[ "memory"; "initial-reservation" ];
[ "memory"; "target" ];
[ "control"; "feature-balloon" ];
[ "data"; "updated" ];
[ "memory"; "memory-offset" ];
[ "memory"; "uncooperative" ];
[ "memory"; "dynamic-min" ];
[ "memory"; "dynamic-max" ];
] in
let watches domid =
List.map (fun p -> Printf.sprintf "/local/domain/%d/%s" domid (String.concat "/" p)) interesting_paths in

let module IntSet = Set.Make(struct type t = int let compare = compare end) in
let watching_domids = ref IntSet.empty in

let look_for_different_domains () =
let list_domains xc =
let open Xenctrl.Domain_info in
let dis = Xenctrl.domain_getinfolist xc 0 in
List.fold_left (fun set x -> if not x.shutdown then IntSet.add x.domid set else set) IntSet.empty dis in
let existing = list_domains xc in
let arrived = IntSet.diff existing !watching_domids in
IntSet.iter
(fun domid ->
debug "Adding watches for domid: %d" domid;
List.iter (fun x ->
try
xs.Xs.watch x x
with e ->
error "watch %s: %s" x (Printexc.to_string e)
) (watches domid);
) arrived;
let gone = IntSet.diff !watching_domids existing in
IntSet.iter
(fun domid ->
debug "Removing watches for domid: %d" domid;
List.iter (fun x ->
try
xs.Xs.unwatch x x
with e ->
error "unwatch %s: %s" x (Printexc.to_string e)
) (watches domid);
) gone;
watching_domids := existing;
Mutex.execute m
(fun () ->
IntSet.iter (Hashtbl.remove cache) gone;
IntSet.iter (fun domid -> try ignore(get_per_domain (xc, xs) domid) with _ -> ()) arrived
) in

xs.Xs.watch _introduceDomain "";
xs.Xs.watch _releaseDomain "";
look_for_different_domains ();

while true do
let path, _ =
if Xs.has_watchevents xs
then Xs.get_watchevent xs
else Xs.read_watchevent xs in
if path = _introduceDomain || path = _releaseDomain
then look_for_different_domains ()
else match List.filter (fun x -> x <> "") (String.split '/' path) with
| "local" :: "domain" :: domid :: rest when List.mem rest interesting_paths ->
let value = try Some (xs.Xs.read path) with _ -> None in
let domid = int_of_string domid in
Mutex.execute m
(fun () ->
match get_per_domain (xc, xs) domid with
| None -> ()
| Some per_domain ->
let key = "/" ^ (String.concat "/" rest) in
debug "watch %s <- %s" key (Opt.default "None" value);
Hashtbl.replace per_domain.keys key value
)
| _ -> debug "Ignoring unexpected watch: %s" path
done
)
module MemoryActions = struct
let interesting_paths = [
[ "memory"; "initial-reservation" ];
[ "memory"; "target" ];
[ "control"; "feature-balloon" ];
[ "data"; "updated" ];
[ "memory"; "memory-offset" ];
[ "memory"; "uncooperative" ];
[ "memory"; "dynamic-min" ];
[ "memory"; "dynamic-max" ];
]

let interesting_paths_for_domain domid uuid =
List.map (fun p -> Printf.sprintf "/local/domain/%d/%s" domid (String.concat "/" p)) interesting_paths

let unmanaged_domain _ _ = false

let found_running_domain _ _ = ()

let domain_disappeared xc xs domid =
Mutex.execute m
(fun () ->
Hashtbl.remove cache domid
)

let start_watch_xenstore_thread () =
let (_: Thread.t) = Thread.create
(fun () ->
while true do
try
watch_xenstore ()
with e ->
error "watch_xenstore: %s" (Printexc.to_string e);
Thread.delay 1.
done
) () in
()
let domain_appeared xc xs domid =
Mutex.execute m
(fun () ->
try ignore(get_per_domain (xc, xs) domid) with _ -> ()
)

let watch_fired xc xs path domains watches =
match List.filter (fun x -> x <> "") (String.split '/' path) with
| "local" :: "domain" :: domid :: rest when List.mem rest interesting_paths ->
let value = try Some (xs.Xs.read path) with _ -> None in
let domid = int_of_string domid in
Mutex.execute m
(fun () ->
match get_per_domain (xc, xs) domid with
| None -> ()
| Some per_domain ->
let key = "/" ^ (String.concat "/" rest) in
debug "watch %s <- %s" key (Opt.default "None" value);
Hashtbl.replace per_domain.keys key value
)
| _ -> debug "Ignoring unexpected watch: %s" path
end

module Watcher = Xenstore_watch.WatchXenstore(MemoryActions)

let start_watch_xenstore_thread () = ignore (Watcher.create_watcher_thread ())

let get_hvm cnx domid =
Mutex.execute m (fun () ->
Expand Down