Permalink
Browse files

Store the queue bindings/subscriptions in a Hashtbl -> StringSet

  • Loading branch information...
1 parent 337d044 commit a4dda03a1b6d0e30a3d0e121e0c403df72da6054 David Scott committed Oct 16, 2012
Showing with 23 additions and 8 deletions.
  1. +23 −8 broker/switch.ml
View
@@ -52,6 +52,8 @@ let make_unique_id =
let make_fresh_name () = Printf.sprintf "queue-%Ld" (make_unique_id ())
module IntMap = Map.Make(struct type t = int64 let compare = Int64.compare end)
+module StringSet = Set.Make(struct type t = string let compare = String.compare end)
+
type origin =
| Anonymous of int (** An un-named connection, probably a temporary client connection *)
@@ -70,6 +72,8 @@ module Entry = struct
{ origin; time; message }
end
+let bindings : (string, StringSet.t) Hashtbl.t = Hashtbl.create 128
+
let queues : (string, Entry.t IntMap.t) Hashtbl.t = Hashtbl.create 128
let message_id_to_queue : string IntMap.t ref = ref IntMap.empty
let queues_c = Hashtbl.create 128
@@ -142,8 +146,11 @@ let make_server () =
end;
let conn_id = string_of_int conn_id in
lwt token = read xs (Printf.sprintf "/id/%s" conn_id) in
- lwt () = write xs (Printf.sprintf "/by_name/%s/%s" name token) "" in
- lwt () = write xs (Printf.sprintf "/by_session/%s/%s" token name) "" in
+ let existing =
+ if Hashtbl.mem bindings token
+ then Hashtbl.find bindings token
+ else StringSet.empty in
+ Hashtbl.replace bindings token (StringSet.add name existing);
Printf.fprintf stderr "Responding\n%!";
Out.to_response (Out.Bind name)
| In.Ack id ->
@@ -156,19 +163,27 @@ let make_server () =
| In.Transfer(from, timeout) ->
let conn_id = string_of_int conn_id in
lwt token = read xs (Printf.sprintf "/id/%s" conn_id) in
- lwt names = directory xs (Printf.sprintf "/by_session/%s" token) in
- let name = List.hd names (* XXX *) in
+ let names =
+ if Hashtbl.mem bindings token
+ then Hashtbl.find bindings token
+ else StringSet.empty in
+
let start = Unix.gettimeofday () in
let rec wait () =
- let q = find_or_create_queue name in
- let _, _, not_seen = IntMap.split from q in
+ let not_seen = StringSet.fold (fun name map ->
+ let q = find_or_create_queue name in
+ let _, _, not_seen = IntMap.split from q in
+ IntMap.fold IntMap.add map not_seen
+ ) names IntMap.empty in
if not_seen <> IntMap.empty
then return not_seen
else
let remaining_timeout = max 0. (start +. timeout -. (Unix.gettimeofday ())) in
let timeout = Lwt.map (fun () -> `Timeout) (Lwt_unix.sleep remaining_timeout) in
- let more = Lwt.map (fun () -> `Data) (queue_wait name) in
- match_lwt Lwt.pick [ timeout; more ] with
+ let more = StringSet.fold (fun name acc ->
+ Lwt.map (fun () -> `Data) (queue_wait name) :: acc
+ ) names [] in
+ match_lwt Lwt.pick (timeout :: more) with
| `Timeout -> return IntMap.empty
| `Data -> wait () in
lwt messages = wait () in

0 comments on commit a4dda03

Please sign in to comment.