Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Merge commit '4d1d182f897e9e56d91f1077ab8463af91b2aef0'
  • Loading branch information
xen-git committed Aug 6, 2012
2 parents d0d5ccd + 4d1d182 commit 70c578f
Show file tree
Hide file tree
Showing 3 changed files with 405 additions and 285 deletions.
74 changes: 74 additions & 0 deletions ocaml/xapi/quicktest.ml
Expand Up @@ -170,6 +170,79 @@ let object_level_event_test session_id =
end
)

let event_message_test session_id =
let test = make_test "Message creation event test" 1 in
start test;
let events = Client.Event.from !rpc session_id [ "message" ] "" 1.0 |> event_from_of_xmlrpc in
let token = events.token in
let pool = List.hd (Client.Pool.get_all !rpc session_id) in
let obj_uuid = Client.Pool.get_uuid !rpc session_id pool in
debug test "Creating message";
let cls = `Pool in
let message = Client.Message.create ~rpc:!rpc ~session_id ~name:"test" ~priority:1L ~cls
~obj_uuid ~body:"Hello" in
debug test (Printf.sprintf "Created message: %s" (Ref.string_of message));
let events = Client.Event.from !rpc session_id [ "message" ] token 1.0 |> event_from_of_xmlrpc
in
debug test (Printf.sprintf "Got some events: %d %s" (List.length events.events) (String.concat "," (List.map (fun ev -> ev.reference) events.events)));
let token = events.token in
if List.exists (fun ev -> ev.reference = (Ref.string_of message) && ev.op = Add) events.events
then success test
else failed test "Failed to receive an event with the message";

let test = make_test "Message deletion event test" 1 in
start test;
debug test "Destroying message";
Client.Message.destroy !rpc session_id message;
let events = Client.Event.from !rpc session_id [ "message" ] token 1.0 |> event_from_of_xmlrpc in
debug test "Got some events";
if List.exists (fun ev -> ev.reference = (Ref.string_of message) && ev.op = Del) events.events
then success test
else failed test "Failed to receive a delete event";

let test = make_test "Message deletion from cache test" 1 in
start test;
let events = Client.Event.from !rpc session_id [ "message" ] "" 1.0 |> event_from_of_xmlrpc in
debug test "Got lots of events";
if List.exists (fun ev -> ev.reference = (Ref.string_of message) && ev.op <> Del) events.events
then failed test "Got told about a deleted message"
else success test;

let test = make_test "Multi message test" 1 in
start test;
let message1 = Client.Message.create ~rpc:!rpc ~session_id ~name:"test" ~priority:1L ~cls
~obj_uuid ~body:"Hello" in
let message2 = Client.Message.create ~rpc:!rpc ~session_id ~name:"test" ~priority:1L ~cls
~obj_uuid ~body:"Hello" in
let events = Client.Event.from !rpc session_id [ "message" ] token 1.0 |> event_from_of_xmlrpc in
let token = events.token in
let message3 = Client.Message.create ~rpc:!rpc ~session_id ~name:"test" ~priority:1L ~cls
~obj_uuid ~body:"Hello" in
let events2 = Client.Event.from !rpc session_id [ "message" ] token 1.0 |> event_from_of_xmlrpc in
debug test (Printf.sprintf "message1=%s" (Ref.string_of message1));
debug test (Printf.sprintf "message2=%s" (Ref.string_of message2));
debug test (Printf.sprintf "message3=%s" (Ref.string_of message3));
List.iter (fun ev -> debug test (Printf.sprintf "events1: ev.ref=%s" ev.reference)) events.events;
List.iter (fun ev -> debug test (Printf.sprintf "events2: ev.ref=%s" ev.reference)) events2.events;
let ok1 =
List.exists (fun ev -> ev.reference = (Ref.string_of message1) && ev.op = Add) events.events &&
List.exists (fun ev -> ev.reference = (Ref.string_of message2) && ev.op = Add) events.events in
let ok2 =
List.exists (fun ev -> ev.reference = (Ref.string_of message3) && ev.op = Add) events2.events in
let ok3 =
not (List.exists (fun ev -> ev.reference = (Ref.string_of message1) && ev.op = Add) events2.events) &&
not (List.exists (fun ev -> ev.reference = (Ref.string_of message2) && ev.op = Add) events2.events)
in
if ok1 && ok2 && ok3 then success test else failed test (Printf.sprintf "ok1=%b ok2=%b ok3=%b" ok1 ok2 ok3);

let test = make_test "Object messages test" 1 in
start test;
debug test (Printf.sprintf "Finding messages for object: %s" (Client.Pool.get_uuid !rpc session_id pool));
let messages = Client.Message.get ~rpc:!rpc ~session_id ~cls ~obj_uuid ~since:(Date.never) in
let has_msg m = List.exists (fun (r,_) -> r=m) messages in
let ok = has_msg message1 && has_msg message2 && has_msg message3 in
if ok then success test else failed test "Failed to get messages for object"

let all_srs_with_vdi_create session_id =
let all_srs : API.ref_SR list = Quicktest_storage.list_srs session_id in
(* Filter out those which support the vdi_create capability *)
Expand Down Expand Up @@ -747,6 +820,7 @@ let _ =
maybe_run_test "event" (fun () -> event_next_test s);
maybe_run_test "event" (fun () -> event_from_test s);
(* maybe_run_test "event" (fun () -> object_level_event_test s);*)
maybe_run_test "event" (fun () -> event_message_test s);
maybe_run_test "vdi" (fun () -> vdi_test s);
maybe_run_test "async" (fun () -> async_test s);
maybe_run_test "import" (fun () -> import_export_test s);
Expand Down
52 changes: 24 additions & 28 deletions ocaml/xapi/xapi_event.ml
Expand Up @@ -22,26 +22,27 @@ open D

module Token = struct

type t = int64 * float (* last id, timestamp *)
type t = int64 * int64 (* last id, message id *)

exception Failed_to_parse of string

let of_string token =
match String.split ',' token with
| [from;from_t] ->
(Int64.of_string from, float_of_string from_t)
| [""] -> (0L, 0.1)
(Int64.of_string from, Int64.of_string from_t)
| [""] -> (0L, 0L)
| _ ->
raise (Failed_to_parse token)

let to_string (last, timestamp) =
let to_string (last,last_t) =
(* We prefix with zeroes so tokens which differ only in the generation
can be compared lexicographically as strings. *)
Printf.sprintf "%020Ld,%f" last timestamp
Printf.sprintf "%020Ld,%020Ld" last last_t
end


let message_get_since_for_events : (__context:Context.t -> float -> (float * (API.ref_message * API.message_t) list)) ref = ref ( fun ~__context _ -> ignore __context; (0.0, []))
type message_event = MCreate of (API.ref_message * API.message_t) | MDel of API.ref_message
let message_get_since_for_events : (__context:Context.t -> int64 -> (int64 * message_event list)) ref = ref ( fun ~__context _ -> ignore __context; (0L, []))

(** Limit the event queue to this many events: *)
let max_stored_events = 500
Expand Down Expand Up @@ -88,7 +89,7 @@ let event_matches subs ev =
(** Every session that calls 'register' gets a subscription*)
type subscription_record = {
mutable last_id: int64; (** last event ID to sent to this client *)
mutable last_timestamp : float; (** Time at which the last event was sent (for messages) *)
mutable last_msg_gen : int64; (** last generation count from the messages *)
mutable last_generation : int64; (** Generation count of the last event *)
mutable cur_id: int64; (** Most current generation count relevant to the client - only used in new events mechanism *)
mutable subs: subscription list; (** list of all the subscriptions *)
Expand Down Expand Up @@ -176,7 +177,7 @@ let get_subscription ~__context =
(fun () ->
if Hashtbl.mem subscriptions session then Hashtbl.find subscriptions session
else
let subscription = { last_id = !id; last_timestamp=(Unix.gettimeofday ()); last_generation=0L; cur_id = 0L; subs = []; m = Mutex.create(); session = session; session_invalid = false; timeout=0.0; } in
let subscription = { last_id = !id; last_msg_gen = 0L; last_generation=0L; cur_id = 0L; subs = []; m = Mutex.create(); session = session; session_invalid = false; timeout=0.0; } in
Hashtbl.replace subscriptions session subscription;
subscription)

Expand Down Expand Up @@ -312,8 +313,8 @@ let from ~__context ~classes ~token ~timeout =

sub.timeout <- Unix.gettimeofday () +. timeout;

sub.last_timestamp <- from_t;
sub.last_generation <- from;
sub.last_msg_gen <- from_t;

Mutex.execute sub.m (fun () -> sub.subs <- subs @ sub.subs);

Expand All @@ -330,9 +331,9 @@ let from ~__context ~classes ~token ~timeout =

let grab_range t =
let tableset = Db_cache_types.Database.tableset (Db_ref.get_database t) in
let (timestamp,messages) =
if table_matches all_subs "message" then (!message_get_since_for_events) ~__context sub.last_timestamp else (0.0, []) in
(timestamp, messages, tableset, List.fold_left
let (msg_gen,messages) =
if table_matches all_subs "message" then (!message_get_since_for_events) ~__context sub.last_msg_gen else (0L, []) in
(msg_gen, messages, tableset, List.fold_left
(fun acc table ->
Db_cache_types.Table.fold_over_recent sub.last_generation
(fun ctime mtime dtime objref (creates,mods,deletes,last) ->
Expand All @@ -352,24 +353,23 @@ let from ~__context ~classes ~token ~timeout =
in

let rec grab_nonempty_range () =
let (timestamp, messages, tableset, (creates,mods,deletes,last)) as result = Db_lock.with_lock (fun () -> grab_range (Db_backend.make ())) in
let (msg_gen, messages, tableset, (creates,mods,deletes,last)) as result = Db_lock.with_lock (fun () -> grab_range (Db_backend.make ())) in
if List.length creates = 0 && List.length mods = 0 && List.length deletes = 0 && List.length messages = 0 && Unix.gettimeofday () < sub.timeout
then
(
sub.last_generation <- last; (* Cur_id was bumped, but nothing relevent fell out of the db. Therefore the *)
sub.last_timestamp <- timestamp;
sub.cur_id <- last; (* last id the client got is equivalent to the current one *)
sub.last_msg_gen <- msg_gen;
wait2 sub last;
Thread.delay 0.05;
grab_nonempty_range ())
else
result
in

let (timestamp, messages, tableset, (creates,mods,deletes,last)) = grab_nonempty_range () in
let (msg_gen, messages, tableset, (creates,mods,deletes,last)) = grab_nonempty_range () in

sub.last_generation <- last;
sub.last_timestamp <- timestamp;

let event_of op ?snapshot (table, objref, time) =
{
Expand Down Expand Up @@ -399,16 +399,12 @@ let from ~__context ~classes ~token ~timeout =
if event_matches sub.subs ev then ev::acc else acc
) modevs creates in

let message_events = List.fold_left (fun acc (_ref,message) ->
let objref = Ref.string_of _ref in
let xml = API.To.message_t message in
let ev = { id=0L;
ts=0.0;
ty="message";
op=Add;
reference=objref;
snapshot=Some xml } in
ev::acc) createevs messages in
let message_events = List.fold_left (fun acc mev ->
let event = match mev with
| MCreate (_ref,message) -> event_of Add ?snapshot:(Some (API.To.message_t message)) ("message", Ref.string_of _ref, 0L)
| MDel _ref -> event_of Del ("message",Ref.string_of _ref, 0L)
in
event::acc) createevs messages in

let valid_ref_counts =
Db_cache_types.TableSet.fold
Expand All @@ -425,7 +421,7 @@ let from ~__context ~classes ~token ~timeout =
let result = {
events = message_events;
valid_ref_counts = valid_ref_counts;
token = Token.to_string (last, timestamp);
token = Token.to_string (last,msg_gen);
} in
xmlrpc_of_event_from result

Expand All @@ -440,7 +436,7 @@ let inject ~__context ~_class ~ref =
Db_cache_impl.refresh_row db_ref _class ref; (* consumes this generation *)
g
) in
let token = (Int64.sub generation 1L), 0. in
let token = Int64.sub generation 1L, 0L in
Token.to_string token

(** Inject an unnecessary update as a heartbeat. This will:
Expand Down

0 comments on commit 70c578f

Please sign in to comment.