Permalink
Browse files

Mq_server: don't recur in send_saved_messages (much lower load).

  • Loading branch information...
1 parent 69bf069 commit 35e701bc3d19e3034f292c2fdc852fe68080b234 @mfp committed Jul 7, 2010
Showing with 7 additions and 5 deletions.
  1. +7 −5 mq_server.ml
View
@@ -228,21 +228,23 @@ let rec send_to_recipient ~kind broker listeners conn subs queue msg =
return ())
end
-and send_saved_messages ?(only_once = false) broker queue =
+and send_saved_messages broker queue =
if not (have_recipient broker queue) then return () else
P.get_msg_for_delivery broker.b_msg_store queue >>= function
None -> return ()
| Some msg ->
let msg_id = msg.msg_id in
match find_recipient broker queue with
None -> P.unack_msg broker.b_msg_store msg_id >>
- send_saved_messages ~only_once:true broker queue
+ (* we try to send again because one might have become
+ * available while we were unacking *)
+ send_saved_messages broker queue
| Some (listeners, (conn, subs)) ->
ignore_result
~exn_handler:(handle_send_msg_exn broker conn ~queue ~msg_id)
(send_to_recipient ~kind:Ack_pending broker listeners conn subs queue)
msg;
- if only_once then return () else send_saved_messages broker queue
+ return ()
and handle_send_msg_exn broker ~queue conn ~msg_id = function
| Lwt_unix.Timeout | Lwt.Canceled ->
@@ -254,7 +256,7 @@ and handle_send_msg_exn broker ~queue conn ~msg_id = function
and enqueue_after_timeout broker ~queue ~msg_id =
if not (have_recipient broker queue) then
P.unack_msg broker.b_msg_store msg_id >>
- send_saved_messages ~only_once:true broker queue else
+ send_saved_messages broker queue else
P.get_ack_pending_msg broker.b_msg_store msg_id >>= function
None -> return ()
| Some msg ->
@@ -263,7 +265,7 @@ and enqueue_after_timeout broker ~queue ~msg_id =
| None -> begin (* move to main table *)
DEBUG(show "No recipient for unACKed message %S, saving." msg_id);
P.unack_msg broker.b_msg_store msg_id >>
- send_saved_messages ~only_once:true broker queue
+ send_saved_messages broker queue
end
| Some (listeners, (conn, subs)) ->
DEBUG(show "Found a recipient for unACKed message %S." msg_id);

0 comments on commit 35e701b

Please sign in to comment.