From bf6250f69e0cd8f16e05cefc6bdaf60b987ff581 Mon Sep 17 00:00:00 2001 From: Jon Ludlam Date: Fri, 27 Jul 2012 13:11:40 +0100 Subject: [PATCH 1/4] CA-85142: Add a function to remove messages from the in_memory_cache Signed-off-by: Jon Ludlam --- ocaml/xapi/xapi_message.ml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/ocaml/xapi/xapi_message.ml b/ocaml/xapi/xapi_message.ml index eca0436ba8..ee85f61cdb 100644 --- a/ocaml/xapi/xapi_message.ml +++ b/ocaml/xapi/xapi_message.ml @@ -285,6 +285,15 @@ let cache_insert _ref message = (List.length !in_memory_cache) end) +let cache_remove _ref = + Mutex.execute in_memory_cache_mutex (fun () -> + let (to_delete,to_keep) = List.partition (function | _ , _ref', _ -> _ref' = _ref) !in_memory_cache in + if List.length to_delete > 1 then + error "Internal error: Repeated reference in messages in_memory_cache"; + in_memory_cache := to_keep; + in_memory_cache_length := List.length to_keep) + + (** Write: write message to disk. Returns boolean indicating whether message was written *) let write ~_ref ~message = From 4a9db6f1f25b4546a3fff1d5dbdd4e819ab7d508 Mon Sep 17 00:00:00 2001 From: Jon Ludlam Date: Fri, 27 Jul 2012 13:23:22 +0100 Subject: [PATCH 2/4] CA-85142: Remove messages from the in_memory_cache on deletion Signed-off-by: Jon Ludlam --- ocaml/xapi/xapi_message.ml | 1 + 1 file changed, 1 insertion(+) diff --git a/ocaml/xapi/xapi_message.ml b/ocaml/xapi/xapi_message.ml index ee85f61cdb..d96cef6d8d 100644 --- a/ocaml/xapi/xapi_message.ml +++ b/ocaml/xapi/xapi_message.ml @@ -423,6 +423,7 @@ let destroy_real basefilename = (deleted := Listext.List.take 512 !deleted; ndeleted := 512) ); + cache_remove _ref; Xapi_event.event_add ~snapshot:xml "message" "del" (Ref.string_of _ref) let destroy ~__context ~self = From 4a47d4fafd9846aeae778aa1b48bb7f89474ea8c Mon Sep 17 00:00:00 2001 From: Jon Ludlam Date: Fri, 27 Jul 2012 14:04:32 +0100 Subject: [PATCH 3/4] CA-85142: Change the types returned by Xapi_message.get_since_for_events This is so we can now return delete events too, though we aren't yet. Signed-off-by: Jon Ludlam --- ocaml/xapi/xapi_event.ml | 19 ++++++++----------- ocaml/xapi/xapi_message.ml | 13 +++++++------ 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/ocaml/xapi/xapi_event.ml b/ocaml/xapi/xapi_event.ml index 6e8714c50c..c73c5dc9c9 100644 --- a/ocaml/xapi/xapi_event.ml +++ b/ocaml/xapi/xapi_event.ml @@ -41,7 +41,8 @@ module Token = struct end -let message_get_since_for_events : (__context:Context.t -> float -> (float * (API.ref_message * API.message_t) list)) ref = ref ( fun ~__context _ -> ignore __context; (0.0, [])) +type message_event = MCreate of (API.ref_message * API.message_t) | MDel of API.ref_message +let message_get_since_for_events : (__context:Context.t -> float -> (float * message_event list)) ref = ref ( fun ~__context _ -> ignore __context; (0.0, [])) (** Limit the event queue to this many events: *) let max_stored_events = 500 @@ -399,16 +400,12 @@ let from ~__context ~classes ~token ~timeout = if event_matches sub.subs ev then ev::acc else acc ) modevs creates in - let message_events = List.fold_left (fun acc (_ref,message) -> - let objref = Ref.string_of _ref in - let xml = API.To.message_t message in - let ev = { id=0L; - ts=0.0; - ty="message"; - op=Add; - reference=objref; - snapshot=Some xml } in - ev::acc) createevs messages in + let message_events = List.fold_left (fun acc mev -> + let event = match mev with + | MCreate (_ref,message) -> event_of Add ?snapshot:(Some (API.To.message_t message)) ("message", Ref.string_of _ref, 0L) + | MDel _ref -> event_of Del ("message",Ref.string_of _ref, 0L) + in + event::acc) createevs messages in let valid_ref_counts = Db_cache_types.TableSet.fold diff --git a/ocaml/xapi/xapi_message.ml b/ocaml/xapi/xapi_message.ml index d96cef6d8d..886372e4e9 100644 --- a/ocaml/xapi/xapi_message.ml +++ b/ocaml/xapi/xapi_message.ml @@ -500,19 +500,20 @@ let get_since ~__context ~since = let get_since_for_events ~__context since = let now = Mutex.execute event_mutex (fun () -> Unix.gettimeofday ()) in - let result = Mutex.execute in_memory_cache_mutex + let cached_result = Mutex.execute in_memory_cache_mutex (fun () -> match !in_memory_cache with | (last_in_memory, _, _) :: _ when last_in_memory > since -> Some (List.filter_map (fun (timestamp,_ref,msg) -> - if timestamp > since then Some (_ref, msg) else None) + if timestamp > since then Some (Xapi_event.MCreate (_ref, msg)) else None) !in_memory_cache) | _ -> None) in - match result with - | Some x -> (now,x) - | None -> - (now, get_real message_dir (fun _ -> true) since) + let result = match cached_result with + | Some x -> x + | None -> List.map (fun x -> Xapi_event.MCreate x) (get_real message_dir (fun _ -> true) since) + in + (now, result) let get_by_uuid ~__context ~uuid = try From 4d1d182f897e9e56d91f1077ab8463af91b2aef0 Mon Sep 17 00:00:00 2001 From: Jon Ludlam Date: Wed, 1 Aug 2012 09:45:10 +0100 Subject: [PATCH 4/4] CA-85142: Use generation counts for message events. Retain the distinction between message generation count and standard db generation count for the purposes of event.from - we can't atomically get both, so if we combined them we'd risk losing events. Also add a quicktest test for message events. Signed-off-by: Jon Ludlam --- ocaml/xapi/quicktest.ml | 74 +++++ ocaml/xapi/xapi_event.ml | 35 ++- ocaml/xapi/xapi_message.ml | 551 ++++++++++++++++++++----------------- 3 files changed, 386 insertions(+), 274 deletions(-) diff --git a/ocaml/xapi/quicktest.ml b/ocaml/xapi/quicktest.ml index 6a5a199143..0c4e16effe 100644 --- a/ocaml/xapi/quicktest.ml +++ b/ocaml/xapi/quicktest.ml @@ -170,6 +170,79 @@ let object_level_event_test session_id = end ) +let event_message_test session_id = + let test = make_test "Message creation event test" 1 in + start test; + let events = Client.Event.from !rpc session_id [ "message" ] "" 1.0 |> event_from_of_xmlrpc in + let token = events.token in + let pool = List.hd (Client.Pool.get_all !rpc session_id) in + let obj_uuid = Client.Pool.get_uuid !rpc session_id pool in + debug test "Creating message"; + let cls = `Pool in + let message = Client.Message.create ~rpc:!rpc ~session_id ~name:"test" ~priority:1L ~cls + ~obj_uuid ~body:"Hello" in + debug test (Printf.sprintf "Created message: %s" (Ref.string_of message)); + let events = Client.Event.from !rpc session_id [ "message" ] token 1.0 |> event_from_of_xmlrpc + in + debug test (Printf.sprintf "Got some events: %d %s" (List.length events.events) (String.concat "," (List.map (fun ev -> ev.reference) events.events))); + let token = events.token in + if List.exists (fun ev -> ev.reference = (Ref.string_of message) && ev.op = Add) events.events + then success test + else failed test "Failed to receive an event with the message"; + + let test = make_test "Message deletion event test" 1 in + start test; + debug test "Destroying message"; + Client.Message.destroy !rpc session_id message; + let events = Client.Event.from !rpc session_id [ "message" ] token 1.0 |> event_from_of_xmlrpc in + debug test "Got some events"; + if List.exists (fun ev -> ev.reference = (Ref.string_of message) && ev.op = Del) events.events + then success test + else failed test "Failed to receive a delete event"; + + let test = make_test "Message deletion from cache test" 1 in + start test; + let events = Client.Event.from !rpc session_id [ "message" ] "" 1.0 |> event_from_of_xmlrpc in + debug test "Got lots of events"; + if List.exists (fun ev -> ev.reference = (Ref.string_of message) && ev.op <> Del) events.events + then failed test "Got told about a deleted message" + else success test; + + let test = make_test "Multi message test" 1 in + start test; + let message1 = Client.Message.create ~rpc:!rpc ~session_id ~name:"test" ~priority:1L ~cls + ~obj_uuid ~body:"Hello" in + let message2 = Client.Message.create ~rpc:!rpc ~session_id ~name:"test" ~priority:1L ~cls + ~obj_uuid ~body:"Hello" in + let events = Client.Event.from !rpc session_id [ "message" ] token 1.0 |> event_from_of_xmlrpc in + let token = events.token in + let message3 = Client.Message.create ~rpc:!rpc ~session_id ~name:"test" ~priority:1L ~cls + ~obj_uuid ~body:"Hello" in + let events2 = Client.Event.from !rpc session_id [ "message" ] token 1.0 |> event_from_of_xmlrpc in + debug test (Printf.sprintf "message1=%s" (Ref.string_of message1)); + debug test (Printf.sprintf "message2=%s" (Ref.string_of message2)); + debug test (Printf.sprintf "message3=%s" (Ref.string_of message3)); + List.iter (fun ev -> debug test (Printf.sprintf "events1: ev.ref=%s" ev.reference)) events.events; + List.iter (fun ev -> debug test (Printf.sprintf "events2: ev.ref=%s" ev.reference)) events2.events; + let ok1 = + List.exists (fun ev -> ev.reference = (Ref.string_of message1) && ev.op = Add) events.events && + List.exists (fun ev -> ev.reference = (Ref.string_of message2) && ev.op = Add) events.events in + let ok2 = + List.exists (fun ev -> ev.reference = (Ref.string_of message3) && ev.op = Add) events2.events in + let ok3 = + not (List.exists (fun ev -> ev.reference = (Ref.string_of message1) && ev.op = Add) events2.events) && + not (List.exists (fun ev -> ev.reference = (Ref.string_of message2) && ev.op = Add) events2.events) + in + if ok1 && ok2 && ok3 then success test else failed test (Printf.sprintf "ok1=%b ok2=%b ok3=%b" ok1 ok2 ok3); + + let test = make_test "Object messages test" 1 in + start test; + debug test (Printf.sprintf "Finding messages for object: %s" (Client.Pool.get_uuid !rpc session_id pool)); + let messages = Client.Message.get ~rpc:!rpc ~session_id ~cls ~obj_uuid ~since:(Date.never) in + let has_msg m = List.exists (fun (r,_) -> r=m) messages in + let ok = has_msg message1 && has_msg message2 && has_msg message3 in + if ok then success test else failed test "Failed to get messages for object" + let all_srs_with_vdi_create session_id = let all_srs : API.ref_SR list = Quicktest_storage.list_srs session_id in (* Filter out those which support the vdi_create capability *) @@ -747,6 +820,7 @@ let _ = maybe_run_test "event" (fun () -> event_next_test s); maybe_run_test "event" (fun () -> event_from_test s); (* maybe_run_test "event" (fun () -> object_level_event_test s);*) + maybe_run_test "event" (fun () -> event_message_test s); maybe_run_test "vdi" (fun () -> vdi_test s); maybe_run_test "async" (fun () -> async_test s); maybe_run_test "import" (fun () -> import_export_test s); diff --git a/ocaml/xapi/xapi_event.ml b/ocaml/xapi/xapi_event.ml index c73c5dc9c9..1a49cfb301 100644 --- a/ocaml/xapi/xapi_event.ml +++ b/ocaml/xapi/xapi_event.ml @@ -22,27 +22,27 @@ open D module Token = struct - type t = int64 * float (* last id, timestamp *) + type t = int64 * int64 (* last id, message id *) exception Failed_to_parse of string let of_string token = match String.split ',' token with | [from;from_t] -> - (Int64.of_string from, float_of_string from_t) - | [""] -> (0L, 0.1) + (Int64.of_string from, Int64.of_string from_t) + | [""] -> (0L, 0L) | _ -> raise (Failed_to_parse token) - let to_string (last, timestamp) = + let to_string (last,last_t) = (* We prefix with zeroes so tokens which differ only in the generation can be compared lexicographically as strings. *) - Printf.sprintf "%020Ld,%f" last timestamp + Printf.sprintf "%020Ld,%020Ld" last last_t end type message_event = MCreate of (API.ref_message * API.message_t) | MDel of API.ref_message -let message_get_since_for_events : (__context:Context.t -> float -> (float * message_event list)) ref = ref ( fun ~__context _ -> ignore __context; (0.0, [])) +let message_get_since_for_events : (__context:Context.t -> int64 -> (int64 * message_event list)) ref = ref ( fun ~__context _ -> ignore __context; (0L, [])) (** Limit the event queue to this many events: *) let max_stored_events = 500 @@ -89,7 +89,7 @@ let event_matches subs ev = (** Every session that calls 'register' gets a subscription*) type subscription_record = { mutable last_id: int64; (** last event ID to sent to this client *) - mutable last_timestamp : float; (** Time at which the last event was sent (for messages) *) + mutable last_msg_gen : int64; (** last generation count from the messages *) mutable last_generation : int64; (** Generation count of the last event *) mutable cur_id: int64; (** Most current generation count relevant to the client - only used in new events mechanism *) mutable subs: subscription list; (** list of all the subscriptions *) @@ -177,7 +177,7 @@ let get_subscription ~__context = (fun () -> if Hashtbl.mem subscriptions session then Hashtbl.find subscriptions session else - let subscription = { last_id = !id; last_timestamp=(Unix.gettimeofday ()); last_generation=0L; cur_id = 0L; subs = []; m = Mutex.create(); session = session; session_invalid = false; timeout=0.0; } in + let subscription = { last_id = !id; last_msg_gen = 0L; last_generation=0L; cur_id = 0L; subs = []; m = Mutex.create(); session = session; session_invalid = false; timeout=0.0; } in Hashtbl.replace subscriptions session subscription; subscription) @@ -313,8 +313,8 @@ let from ~__context ~classes ~token ~timeout = sub.timeout <- Unix.gettimeofday () +. timeout; - sub.last_timestamp <- from_t; sub.last_generation <- from; + sub.last_msg_gen <- from_t; Mutex.execute sub.m (fun () -> sub.subs <- subs @ sub.subs); @@ -331,9 +331,9 @@ let from ~__context ~classes ~token ~timeout = let grab_range t = let tableset = Db_cache_types.Database.tableset (Db_ref.get_database t) in - let (timestamp,messages) = - if table_matches all_subs "message" then (!message_get_since_for_events) ~__context sub.last_timestamp else (0.0, []) in - (timestamp, messages, tableset, List.fold_left + let (msg_gen,messages) = + if table_matches all_subs "message" then (!message_get_since_for_events) ~__context sub.last_msg_gen else (0L, []) in + (msg_gen, messages, tableset, List.fold_left (fun acc table -> Db_cache_types.Table.fold_over_recent sub.last_generation (fun ctime mtime dtime objref (creates,mods,deletes,last) -> @@ -353,13 +353,13 @@ let from ~__context ~classes ~token ~timeout = in let rec grab_nonempty_range () = - let (timestamp, messages, tableset, (creates,mods,deletes,last)) as result = Db_lock.with_lock (fun () -> grab_range (Db_backend.make ())) in + let (msg_gen, messages, tableset, (creates,mods,deletes,last)) as result = Db_lock.with_lock (fun () -> grab_range (Db_backend.make ())) in if List.length creates = 0 && List.length mods = 0 && List.length deletes = 0 && List.length messages = 0 && Unix.gettimeofday () < sub.timeout then ( sub.last_generation <- last; (* Cur_id was bumped, but nothing relevent fell out of the db. Therefore the *) - sub.last_timestamp <- timestamp; sub.cur_id <- last; (* last id the client got is equivalent to the current one *) + sub.last_msg_gen <- msg_gen; wait2 sub last; Thread.delay 0.05; grab_nonempty_range ()) @@ -367,10 +367,9 @@ let from ~__context ~classes ~token ~timeout = result in - let (timestamp, messages, tableset, (creates,mods,deletes,last)) = grab_nonempty_range () in + let (msg_gen, messages, tableset, (creates,mods,deletes,last)) = grab_nonempty_range () in sub.last_generation <- last; - sub.last_timestamp <- timestamp; let event_of op ?snapshot (table, objref, time) = { @@ -422,7 +421,7 @@ let from ~__context ~classes ~token ~timeout = let result = { events = message_events; valid_ref_counts = valid_ref_counts; - token = Token.to_string (last, timestamp); + token = Token.to_string (last,msg_gen); } in xmlrpc_of_event_from result @@ -437,7 +436,7 @@ let inject ~__context ~_class ~ref = Db_cache_impl.refresh_row db_ref _class ref; (* consumes this generation *) g ) in - let token = (Int64.sub generation 1L), 0. in + let token = Int64.sub generation 1L, 0L in Token.to_string token (** Inject an unnecessary update as a heartbeat. This will: diff --git a/ocaml/xapi/xapi_message.ml b/ocaml/xapi/xapi_message.ml index 886372e4e9..29ab657d97 100644 --- a/ocaml/xapi/xapi_message.ml +++ b/ocaml/xapi/xapi_message.ml @@ -69,7 +69,7 @@ let timestamp_to_string f = (************* Marshalling/unmarshalling functions ************) -let to_xml output _ref message = +let to_xml output _ref gen message = let tag n next () = Xmlm.output output (`El_start (("",n),[])); List.iter (fun x -> x ()) next; @@ -78,16 +78,26 @@ let to_xml output _ref message = let data dat () = Xmlm.output output (`Data dat) in Xmlm.output output (`Dtd None); - tag "message" [ - tag "ref" [ data (Ref.string_of _ref) ]; - tag "name" [ data message.API.message_name ]; - tag "priority" [ data (Int64.to_string message.API.message_priority) ]; - tag "cls" [data (class_to_string message.API.message_cls) ]; - tag "obj_uuid" [data message.API.message_obj_uuid ]; - tag "timestamp" [data (Date.to_string message.API.message_timestamp) ]; - tag "uuid" [data message.API.message_uuid]; - tag "body" [data message.API.message_body] - ] () + + let message_subtags = [ + tag "ref" [ data (Ref.string_of _ref) ]; + tag "name" [ data message.API.message_name ]; + tag "priority" [ data (Int64.to_string message.API.message_priority) ]; + tag "cls" [data (class_to_string message.API.message_cls) ]; + tag "obj_uuid" [data message.API.message_obj_uuid ]; + tag "timestamp" [data (Date.to_string message.API.message_timestamp) ]; + tag "uuid" [data message.API.message_uuid]; + tag "body" [data message.API.message_body] + ] in + + let message_subtags = match gen with + | Some g -> + (tag "generation" [data (Int64.to_string g) ])::message_subtags + | None -> + message_subtags + in + + tag "message" message_subtags () let of_xml input = let current_elt = ref "" in @@ -101,34 +111,36 @@ let of_xml input = API.message_uuid = ""} in let _ref = ref "" in + let gen = ref 0L in let rec f () = match Xmlm.input input with | `El_start ((ns,tag),attr) -> current_elt := tag; f () | `El_end -> current_elt := ""; if Xmlm.eoi input then () else f () - | `Data dat -> - begin match !current_elt with - | "name" -> message := {!message with API.message_name=dat} - | "priority" -> message := {!message with API.message_priority=Int64.of_string dat} - | "cls" -> message := {!message with API.message_cls=string_to_class dat} - | "obj_uuid" -> message := {!message with API.message_obj_uuid=dat} - | "timestamp" -> message := {!message with API.message_timestamp=Date.of_string dat} - | "uuid" -> message := {!message with API.message_uuid=dat} - | "body" -> message := {!message with API.message_body=dat} - | "ref" -> _ref := dat - | _ -> failwith "Bad XML!" - end; - f () - | `Dtd _ -> f () + | `Data dat -> + begin match !current_elt with + | "name" -> message := {!message with API.message_name=dat} + | "priority" -> message := {!message with API.message_priority=Int64.of_string dat} + | "cls" -> message := {!message with API.message_cls=string_to_class dat} + | "obj_uuid" -> message := {!message with API.message_obj_uuid=dat} + | "timestamp" -> message := {!message with API.message_timestamp=Date.of_string dat} + | "uuid" -> message := {!message with API.message_uuid=dat} + | "body" -> message := {!message with API.message_body=dat} + | "generation" -> gen := Int64.of_string dat; + | "ref" -> _ref := dat + | _ -> failwith "Bad XML!" + end; + f () + | `Dtd _ -> f () in - try + try f (); - (Ref.of_string !_ref,!message) + (!gen,Ref.of_string !_ref,!message) with e -> log_backtrace (); debug "Caught exception: %s" (Printexc.to_string e); raise e let export_xml messages = let size = 500 * (List.length messages) in let buf = Buffer.create size in let output = Xmlm.make_output (`Buffer buf) in - List.iter (function | r,m -> to_xml output r m) messages ; + List.iter (function | r,m -> to_xml output r None m) messages ; Buffer.contents buf let import_xml xml_in = @@ -163,43 +175,54 @@ let import_xml xml_in = (********** Symlink functions *************) -let class_symlink cls obj_uuid = +let class_symlink cls obj_uuid = let strcls = class_to_string cls in Printf.sprintf "%s/%s/%s" message_dir strcls obj_uuid -let uuid_symlink () = +let uuid_symlink () = Printf.sprintf "%s/uuids" message_dir let ref_symlink () = Printf.sprintf "%s/refs" message_dir -(** Returns a list of tuples - (directory, filename) *) -let symlinks _ref message basefilename = - List.map (fun (dir,fnameopt) -> - let newfname = match fnameopt with - | None -> basefilename - | Some f -> f - in - (dir,dir ^ "/" ^ newfname)) - [(class_symlink message.API.message_cls message.API.message_obj_uuid, None); - (uuid_symlink (), Some message.API.message_uuid); - (ref_symlink (), Some (Ref.string_of _ref));] +let gen_symlink () = + Printf.sprintf "%s/gen" message_dir +(** Returns a list of tuples - (directory, filename) *) +let symlinks _ref gen message basefilename = + let symlinks = + [(class_symlink message.API.message_cls message.API.message_obj_uuid, None); + (uuid_symlink (), Some message.API.message_uuid); + (ref_symlink (), Some (Ref.string_of _ref))] in + let symlinks = + match gen with + | Some gen -> + (gen_symlink (), Some (Int64.to_string gen)) :: symlinks + | None -> + symlinks + in + List.map (fun (dir,fnameopt) -> + let newfname = match fnameopt with + | None -> basefilename + | Some f -> f + in + (dir,dir ^ "/" ^ newfname)) + symlinks (** Check to see if the UUID is valid. This should not use get_by_uuid as - this causes spurious exceptions to be logged... *) + this causes spurious exceptions to be logged... *) let check_uuid ~__context ~cls ~uuid = try - (match cls with - | `VM -> ignore(Db.VM.get_by_uuid ~__context ~uuid) - | `Host -> ignore(Db.Host.get_by_uuid ~__context ~uuid) - | `SR -> ignore(Db.SR.get_by_uuid ~__context ~uuid) - | `Pool -> ignore(Db.Pool.get_by_uuid ~__context ~uuid) - | `VMPP -> ignore(Db.VMPP.get_by_uuid ~__context ~uuid) - ); - true - with _ -> - false + (match cls with + | `VM -> ignore(Db.VM.get_by_uuid ~__context ~uuid) + | `Host -> ignore(Db.Host.get_by_uuid ~__context ~uuid) + | `SR -> ignore(Db.SR.get_by_uuid ~__context ~uuid) + | `Pool -> ignore(Db.Pool.get_by_uuid ~__context ~uuid) + | `VMPP -> ignore(Db.VMPP.get_by_uuid ~__context ~uuid) + ); + true + with _ -> + false (*********** Thread_queue to exec the message script hook ***********) @@ -208,20 +231,20 @@ let queue_push = ref (fun (description: string) (m : string) -> false) let message_to_string (_ref,message) = let buffer = Buffer.create 10 in let output = Xmlm.make_output (`Buffer buffer) in - to_xml output _ref message; + to_xml output _ref None message; Buffer.contents buffer -let handle_message ~__context message = +let handle_message ~__context message = try - if not (Pool_features.is_enabled ~__context Features.Email) - then info "Email alerting is restricted by current license: not generating email" - else begin - let output, log = Forkhelpers.execute_command_get_output Xapi_globs.xapi_message_script [message] in - debug "Executed message hook: output='%s' log='%s'" output log - end + if not (Pool_features.is_enabled ~__context Features.Email) + then info "Email alerting is restricted by current license: not generating email" + else begin + let output, log = Forkhelpers.execute_command_get_output Xapi_globs.xapi_message_script [message] in + debug "Executed message hook: output='%s' log='%s'" output log + end with e -> - error "Unexpected exception in message hook. Exception='%s'" (ExnHelper.string_of_exn e); - log_backtrace () + error "Unexpected exception in message hook. Exception='%s'" (ExnHelper.string_of_exn e); + log_backtrace () let start_message_hook_thread ~__context () = queue_push := (Thread_queue.make ~name:"email message queue" ~max_q_length:100 (handle_message ~__context)).Thread_queue.push_fn @@ -229,48 +252,10 @@ let start_message_hook_thread ~__context () = (********************************************************************) -let get_last_n dir n = - let cmp a b = compare - (float_of_string a) - (float_of_string b) (* oldest first, reverse later *) - and files = List.filter - (fun m -> try float_of_string m > 0.0 with _ -> false) - (Array.to_list (Sys.readdir dir)) - and msg_of_file mf = - let fn = message_dir ^ "/" ^ mf in - let ic = open_in fn in - let xi = Xmlm.make_input (`Channel ic) in - let (_ref,msg) = Pervasiveext.finally - (fun () -> of_xml xi) - (fun () -> close_in ic) in - let ts = Date.to_float msg.API.message_timestamp in - (ts, _ref, msg) - in - List.rev_map msg_of_file - (Listext.List.take n - (List.fast_sort cmp files)) - -let repopulate_cache () = try - let msgs = get_last_n - message_dir - in_memory_cache_length_default in - let len = List.length msgs in - let n = Mutex.execute in_memory_cache_mutex - (fun () -> - in_memory_cache := msgs ; - in_memory_cache_length := len ; - !in_memory_cache_length - ) in - debug "Repopulating in-memory cache of messages: Length=%d" n - with e -> - error "Exception '%s' in Xapi_message.repopulate_cache; cache left unchanged" - (ExnHelper.string_of_exn e) - -let cache_insert _ref message = - let timestamp = Date.to_float message.API.message_timestamp in +let cache_insert _ref message gen = Mutex.execute in_memory_cache_mutex (fun () -> in_memory_cache := - (timestamp,_ref,message) :: !in_memory_cache ; + (gen,_ref,message) :: !in_memory_cache ; in_memory_cache_length := !in_memory_cache_length + 1 ; @@ -292,64 +277,80 @@ let cache_remove _ref = error "Internal error: Repeated reference in messages in_memory_cache"; in_memory_cache := to_keep; in_memory_cache_length := List.length to_keep) - + (** Write: write message to disk. Returns boolean indicating whether message was written *) -let write ~_ref ~message = +let write ~__context ~_ref ~message = (* Check if a message with _ref has already been written *) let message_exists () = let file = (ref_symlink ()) ^ "/" ^ (Ref.string_of _ref) in try Unix.access file [Unix.F_OK] ; true with _ -> false in - (* Make sure the directory is there *) - if not (Helpers.local_storage_exists ()) - then false (* couldn't write message *) - else begin - Unixext.mkdir_safe message_dir 0o700; - let timestamp = ref (Date.to_float (message.API.message_timestamp)) in - - if message_exists () then false (* Don't overwrite message with same ref *) - else try Mutex.execute event_mutex (fun () -> - let fd, basefilename, filename = - (* Try 10, no wait, 11 times to create message file *) - let rec doit n = - if n>10 then failwith "Couldn't create a file" else begin - let basefilename = timestamp_to_string !timestamp in - let filename = message_dir ^ "/" ^ basefilename in - try - let fd = Unix.openfile filename - [Unix.O_RDWR; Unix.O_CREAT; Unix.O_EXCL] 0o600 in - (* Set file's timestamp to message timestamp *) - Unix.utimes filename !timestamp !timestamp ; - fd, basefilename, filename - with _ -> begin - (* We may be copying messages from another - pool, in which case we may have - filename collision (unlikely, but - possible). So increment the filename - and try again, but leave the original - timestamp in the message untouched. *) - timestamp := !timestamp +. 0.00001 ; - doit (n+1) - end + + let message_gen () = + let fn = (ref_symlink ()) ^ "/" ^ (Ref.string_of _ref) in + let ic = open_in fn in + let xi = Xmlm.make_input (`Channel ic) in + let (gen,_,_) = Pervasiveext.finally + (fun () -> of_xml xi) + (fun () -> close_in ic) in + gen + in + + let gen = ref 0L in + + Db_lock.with_lock (fun () -> + let t = Context.database_of __context in + Db_ref.update_database t (fun db -> + gen := Db_cache_types.Manifest.generation (Db_cache_types.Database.manifest db); + Db_cache_types.Database.increment db)); + + Unixext.mkdir_safe message_dir 0o700; + let timestamp = ref (Date.to_float (message.API.message_timestamp)) in + + if message_exists () then (Some (message_gen ())) + else try Mutex.execute event_mutex (fun () -> + let fd, basefilename, filename = + (* Try 10, no wait, 11 times to create message file *) + let rec doit n = + if n>10 then failwith "Couldn't create a file" else begin + let basefilename = timestamp_to_string !timestamp in + let filename = message_dir ^ "/" ^ basefilename in + try + let fd = Unix.openfile filename + [Unix.O_RDWR; Unix.O_CREAT; Unix.O_EXCL] 0o600 in + (* Set file's timestamp to message timestamp *) + Unix.utimes filename !timestamp !timestamp ; + fd, basefilename, filename + with _ -> begin + (* We may be copying messages from another + pool, in which case we may have + filename collision (unlikely, but + possible). So increment the filename + and try again, but leave the original + timestamp in the message untouched. *) + timestamp := !timestamp +. 0.00001 ; + doit (n+1) end - in doit 0 - in - - (* Write the message to file *) - let oc = Unix.out_channel_of_descr fd in - let output = Xmlm.make_output (`Channel oc) in - to_xml output _ref message; - close_out oc; - - (* Message now written, let's symlink it in various places *) - let symlinks = symlinks _ref message basefilename in - List.iter (fun (dir,newpath) -> - Unixext.mkdir_rec dir 0o700; - Unix.symlink filename newpath) symlinks - ) ; true (* End of event_mutex *) - with _ -> false - end + end + in doit 0 + in + + (* Write the message to file *) + let oc = Unix.out_channel_of_descr fd in + let output = Xmlm.make_output (`Channel oc) in + to_xml output _ref (Some !gen) message; + close_out oc; + + (* Message now written, let's symlink it in various places *) + let symlinks = symlinks _ref (Some !gen) message basefilename in + List.iter (fun (dir,newpath) -> + Unixext.mkdir_rec dir 0o700; + Unix.symlink filename newpath) symlinks; + Some !gen + ) + with _ -> None + (** create: Create a new message, and write to disk. Returns null ref if write failed, or message ref otherwise. *) @@ -357,6 +358,7 @@ let create ~__context ~name ~priority ~cls ~obj_uuid ~body = debug "Message.create %s %Ld %s %s" name priority (class_to_string cls) obj_uuid; + (if not (Encodings.UTF8_XML.is_valid body) then raise (Api_errors.Server_error (Api_errors.invalid_value, ["UTF8 expected"]))) ; @@ -380,10 +382,10 @@ let create ~__context ~name ~priority ~cls ~obj_uuid ~body = in (* Write the message to disk *) - let was_written = write ~_ref ~message in + let gen = write ~__context ~_ref ~message in (* Insert a written message into in_memory_cache *) - (if was_written then cache_insert _ref message) ; + Opt.iter (cache_insert _ref message) gen; (* Emit a create event (with the old event API). If the message hasn't been written, we may want to also emit a del even, for @@ -395,152 +397,186 @@ let create ~__context ~name ~priority ~cls ~obj_uuid ~body = (*Xapi_event.event_add ~snapshot:xml "message" "del" (Ref.string_of _ref);*) (* Return the message ref, or Ref.null if the message wasn't written *) - if was_written then _ref else Ref.null + match gen with + | Some _ -> _ref + | None -> Ref.null + - -let deleted : (Date.iso8601 * API.ref_message) list ref = ref [Date.never, Ref.null] +let deleted : (Generation.t * API.ref_message) list ref = ref [0L, Ref.null] let ndeleted = ref 1 let deleted_mutex = Mutex.create () - -let destroy_real basefilename = + +let destroy_real __context basefilename = let filename = message_dir ^ "/" ^ basefilename in let ic = open_in filename in - let (_ref,message) = Pervasiveext.finally - (fun () -> of_xml (Xmlm.make_input (`Channel ic))) - (fun () -> close_in ic) + let (gen,_ref,message) = Pervasiveext.finally + (fun () -> of_xml (Xmlm.make_input (`Channel ic))) + (fun () -> close_in ic) in - let symlinks = symlinks _ref message basefilename in + let symlinks = symlinks _ref (Some gen) message basefilename in List.iter (fun (dir,newpath) -> - Unixext.unlink_safe newpath) symlinks; + Unixext.unlink_safe newpath) symlinks; Unixext.unlink_safe filename; let xml = API.To.message_t message in - Mutex.execute event_mutex + + let gen = ref 0L in + + Db_lock.with_lock (fun () -> + let t = Context.database_of __context in + Db_ref.update_database t (fun db -> + gen := Db_cache_types.Manifest.generation (Db_cache_types.Database.manifest db); + Db_cache_types.Database.increment db)); + + Mutex.execute event_mutex (fun () -> - deleted := (Date.of_float (Unix.gettimeofday ()), _ref) :: !deleted; + deleted := (!gen, _ref) :: !deleted; ndeleted := !ndeleted + 1; if !ndeleted > 1024 then (deleted := Listext.List.take 512 !deleted; - ndeleted := 512) + ndeleted := 512) ); cache_remove _ref; Xapi_event.event_add ~snapshot:xml "message" "del" (Ref.string_of _ref) -let destroy ~__context ~self = +let destroy ~__context ~self = (* Find the original message so we know where the symlinks will be *) let symlinkfname = (ref_symlink ()) ^ "/" ^ (Ref.string_of self) in - let fullpath = - try Unix.readlink symlinkfname - with _ -> raise (Api_errors.Server_error (Api_errors.handle_invalid, [Datamodel._message; Ref.string_of self])) + let fullpath = + try Unix.readlink symlinkfname + with _ -> raise (Api_errors.Server_error (Api_errors.handle_invalid, [Datamodel._message; Ref.string_of self])) in let basefilename = List.hd (List.rev (String.split '/' fullpath)) in - destroy_real basefilename + destroy_real __context basefilename (* Gc the messages - leave only the number of messages defined in 'Xapi_globs.message_limit' *) let gc ~__context = if (try (Unix.access message_dir [Unix.F_OK]; true) with _ -> false) then - begin - let allmsg = List.filter_map - (fun msg -> - try - Some (float_of_string msg, msg) - with _ -> - None) - (Array.to_list (Sys.readdir message_dir)) - in - if List.length allmsg > Xapi_globs.message_limit then + begin + let allmsg = List.filter_map + (fun msg -> + try + Some (float_of_string msg, msg) + with _ -> + None) + (Array.to_list (Sys.readdir message_dir)) + in + if List.length allmsg > Xapi_globs.message_limit then begin let sorted = List.sort (fun (t1,_) (t2,_) -> compare t1 t2) allmsg in let n = List.length sorted in let to_reap = n - Xapi_globs.message_limit in let rec reap_one i msgs = - if i=to_reap then () else - begin + if i=to_reap then () else + begin begin - try destroy_real (snd (List.hd msgs)) - with e -> - debug "Failed to destroy message %s" (snd (List.hd msgs)); - debug "Caught exception %s" (Printexc.to_string e) + try destroy_real __context (snd (List.hd msgs)) + with e -> + debug "Failed to destroy message %s" (snd (List.hd msgs)); + debug "Caught exception %s" (Printexc.to_string e) end; reap_one (i+1) (List.tl msgs) - end + end in reap_one 0 sorted end - end + end -let get_real_inner dir filter since = +let get_real_inner dir filter name_filter = try - let allmsgs = Array.to_list (Sys.readdir dir) in - let since_f = since in - let messages = List.filter (fun msg -> try float_of_string msg > since_f with _ -> false) allmsgs in + let allmsgs = Array.to_list (Sys.readdir dir) in + let messages = List.filter name_filter allmsgs in let messages = List.filter_map (fun msg_fname -> let filename = dir ^ "/" ^ msg_fname in - try - let ic = open_in filename in - let (_ref,msg) = Pervasiveext.finally (fun () -> of_xml (Xmlm.make_input (`Channel ic))) (fun () -> close_in ic) in - if filter msg then Some (float_of_string msg_fname,_ref,msg) else None - with _ -> None) messages - in - List.sort (fun (t1,r1,m1) (t2,r2,m2) -> compare t2 t1) messages + try + let ic = open_in filename in + let (gen,_ref,msg) = Pervasiveext.finally (fun () -> of_xml (Xmlm.make_input (`Channel ic))) (fun () -> close_in ic) in + if filter msg then Some (gen,_ref,msg) else None + with _ -> None) messages + in + List.sort (fun (t1,r1,m1) (t2,r2,m2) -> + let r = compare t2 t1 in + if r <> 0 then r else compare (Date.to_float m2.API.message_timestamp) (Date.to_float m1.API.message_timestamp)) messages with _ -> [] (* Message directory missing *) +let since_name_filter since name = + try + float_of_string name > since + with _ -> false + +let get_from_generation gen = + if gen > 0L + then get_real_inner (gen_symlink ()) (fun x -> true) (fun n -> try Int64.of_string n > gen with _ -> false) + else get_real_inner message_dir (fun _ -> true) (fun n -> try ignore(float_of_string n); true with _ -> false) + let get_real dir filter since = - List.map (fun (_,r,m) -> (r,m)) (get_real_inner dir filter since) + List.map (fun (_,r,m) -> (r,m)) (get_real_inner dir filter (since_name_filter since)) let get ~__context ~cls ~obj_uuid ~since = (* Read in all the messages for a particular object *) let class_symlink = class_symlink cls obj_uuid in (if not (check_uuid ~__context ~cls ~uuid:obj_uuid) then raise (Api_errors.Server_error (Api_errors.uuid_invalid, []))); - get_real class_symlink (fun _ -> true) (Date.to_float since) + let msg = get_real_inner class_symlink (fun msg -> (Date.to_float msg.API.message_timestamp) > (Date.to_float since)) (fun _ -> true) in + List.map (fun (_,b,c) -> (b,c)) msg let get_since ~__context ~since = get_real message_dir (fun _ -> true) (Date.to_float since) let get_since_for_events ~__context since = - let now = Mutex.execute event_mutex (fun () -> Unix.gettimeofday ()) in let cached_result = Mutex.execute in_memory_cache_mutex (fun () -> - match !in_memory_cache with - | (last_in_memory, _, _) :: _ when last_in_memory > since -> + match List.rev !in_memory_cache with + | (last_in_memory, _, _) :: _ when last_in_memory < since -> Some (List.filter_map - (fun (timestamp,_ref,msg) -> - if timestamp > since then Some (Xapi_event.MCreate (_ref, msg)) else None) - !in_memory_cache) - | _ -> None) in + (fun (gen,_ref,msg) -> + if gen > since then Some (gen, Xapi_event.MCreate (_ref, msg)) else None) + !in_memory_cache) + | (last_in_memory, _, _) :: _ -> + debug "get_since_for_events: last_in_memory (%Ld) > since (%Ld): Using slow message lookup" last_in_memory since; + None + | _ -> + warn "get_since_for_events: no in_memory_cache!"; + None) + in let result = match cached_result with | Some x -> x - | None -> List.map (fun x -> Xapi_event.MCreate x) (get_real message_dir (fun _ -> true) since) + | None -> + List.map (fun (ts,x,y) -> (ts, Xapi_event.MCreate (x,y))) (get_from_generation since) in - (now, result) + let delete_results = Mutex.execute deleted_mutex (fun () -> + let deleted = List.filter (fun (deltime,_ref) -> deltime > since) !deleted in + List.map (fun (ts , _ref) -> (ts,Xapi_event.MDel _ref)) deleted) in + let all_results = result @ delete_results in + let newsince = List.fold_left (fun acc (ts,m) -> max ts acc) since all_results in + (newsince, List.map snd all_results) let get_by_uuid ~__context ~uuid = try - let message_filename = (uuid_symlink ()) ^ "/" ^ uuid in - let ic = open_in message_filename in - let (_ref,_) = Pervasiveext.finally (fun () -> of_xml (Xmlm.make_input (`Channel ic))) (fun () -> close_in ic) in - _ref + let message_filename = (uuid_symlink ()) ^ "/" ^ uuid in + let ic = open_in message_filename in + let (_,_ref,_) = Pervasiveext.finally (fun () -> of_xml (Xmlm.make_input (`Channel ic))) (fun () -> close_in ic) in + _ref with - _ -> raise (Api_errors.Server_error (Api_errors.uuid_invalid, [ uuid ])) + _ -> raise (Api_errors.Server_error (Api_errors.uuid_invalid, [ uuid ])) let get_all ~__context = try - let allmsgs = Array.to_list (Sys.readdir (ref_symlink ())) in - List.map (fun r -> Ref.of_string r) allmsgs + let allmsgs = Array.to_list (Sys.readdir (ref_symlink ())) in + List.map (fun r -> Ref.of_string r) allmsgs with _ -> [] let get_record ~__context ~self = try - let symlinkfname = (ref_symlink ()) ^ "/" ^ (Ref.string_of self) in - let fullpath = Unix.readlink symlinkfname in - let ic = open_in fullpath in - let (_ref,message) = Pervasiveext.finally - (fun () -> of_xml (Xmlm.make_input (`Channel ic))) - (fun () -> close_in ic) - in message + let symlinkfname = (ref_symlink ()) ^ "/" ^ (Ref.string_of self) in + let fullpath = Unix.readlink symlinkfname in + let ic = open_in fullpath in + let (_,_ref,message) = Pervasiveext.finally + (fun () -> of_xml (Xmlm.make_input (`Channel ic))) + (fun () -> close_in ic) + in message with _ -> - raise (Api_errors.Server_error (Api_errors.handle_invalid, ["message";(Ref.string_of self)])) + raise (Api_errors.Server_error (Api_errors.handle_invalid, ["message";(Ref.string_of self)])) let get_all_records ~__context = get_real message_dir (fun _ -> true) (0.0) @@ -548,19 +584,22 @@ let get_all_records ~__context = let get_all_records_where ~__context ~expr = get_real message_dir (fun _ -> true) (0.0) -let register_event_hook () = +let repopulate_cache () = Mutex.execute in_memory_cache_mutex (fun () -> - let messages = get_real_inner message_dir (fun _ -> true) 0.0 in + let messages = get_real_inner message_dir (fun _ -> true) (fun n -> try ignore(float_of_string n); true with _ -> false) in let last_256 = List.take 256 messages in in_memory_cache := last_256; - let get_ts (ts,_,_) = ts in + let get_ts (ts,_,m) = Printf.sprintf "%Ld (%s)" ts (Date.to_string m.API.message_timestamp) in debug "Constructing in-memory-cache: most length=%d" (List.length last_256); - (try debug "newest=%f oldest=%f" (get_ts (List.hd last_256)) (get_ts (List.hd (List.rev last_256))) with _ -> ()); - in_memory_cache_length := List.length !in_memory_cache); + (try debug "newest=%s oldest=%s" (get_ts (List.hd last_256)) (get_ts (List.hd (List.rev last_256))) with _ -> ()); + in_memory_cache_length := List.length !in_memory_cache) + +let register_event_hook () = + repopulate_cache (); Xapi_event.message_get_since_for_events := get_since_for_events (* Query params: cls=VM etc, obj_uuid=<..>, min_priority. Returns the last - days worth of messages as an RSS feed. *) + days worth of messages as an RSS feed. *) let rss_handler (req: Http.Request.t) (bio: Buf_io.t) _ = let query = req.Http.Request.query in req.Http.Request.close <- true; @@ -568,48 +607,48 @@ let rss_handler (req: Http.Request.t) (bio: Buf_io.t) _ = let s = Buf_io.fd_of bio in Buf_io.assert_buffer_empty bio; Xapi_http.with_context ~dummy:true "Obtaining the RSS message feed" req s - (fun __context -> - let now = Unix.gettimeofday () in - let since = + (fun __context -> + let now = Unix.gettimeofday () in + let since = if List.mem_assoc "since" query then Date.of_float (float_of_string (List.assoc "since" query)) else let since=now -. (3600.0 *. 24.0) in Date.of_float since - in - let messages = + in + let messages = if List.mem_assoc "cls" query then let cls = string_to_class (List.assoc "cls" query) in let obj_uuid = List.assoc "obj_uuid" query in get ~__context ~cls ~obj_uuid ~since else - get_since ~__context ~since - in - let items = List.map (fun (_ref,message) -> - let body = Printf.sprintf "

%s: %s

%s

" - (class_to_string message.API.message_cls) + get_since ~__context ~since + in + let items = List.map (fun (_ref,message) -> + let body = Printf.sprintf "

%s: %s

%s

" + (class_to_string message.API.message_cls) message.API.message_obj_uuid - message.API.message_body + message.API.message_body in {Rss.item_title=message.API.message_name; Rss.item_link=None; Rss.item_description=body; - Rss.item_pubdate=Date.rfc822_to_string - (Date.rfc822_of_float (Date.to_float message.API.message_timestamp))}) messages - in - let channel = + Rss.item_pubdate=Date.rfc822_to_string + (Date.rfc822_of_float (Date.to_float message.API.message_timestamp))}) messages + in + let channel = {Rss.chan_title="XenServer Messages"; Rss.chan_description="Message from the XenServer"; Rss.chan_language="en"; Rss.chan_pubdate=Date.to_string (Date.of_float now); Rss.chan_items=items;} - in - let body = Xml.to_string (Rss.to_xml [channel]) in - let body = "" ^ body in - Http_svr.headers s ((Http.http_200_ok_with_content - (Int64.of_int (String.length body)) - ~version:"1.1" ~keep_alive:false ())@[Http.Hdr.content_type ^": application/rss+xml"]); - ignore(Unix.write s body 0 (String.length body))) + in + let body = Xml.to_string (Rss.to_xml [channel]) in + let body = "" ^ body in + Http_svr.headers s ((Http.http_200_ok_with_content + (Int64.of_int (String.length body)) + ~version:"1.1" ~keep_alive:false ())@[Http.Hdr.content_type ^": application/rss+xml"]); + ignore(Unix.write s body 0 (String.length body))) (** Handler for PUTing messages to a host. Query params: { cls=, uuid= } *) @@ -656,7 +695,7 @@ let handler (req: Http.Request.t) fd _ = let xml_in = Xmlm.make_input (`Channel (Unix.in_channel_of_descr fd)) in let messages = import_xml xml_in in - List.iter (function (r,m) -> ignore (write r m)) messages ; + List.iter (function (_,r,m) -> ignore (write ~__context ~_ref:r ~message:m)) messages ; (* Flush cache and reload *) repopulate_cache () ;