Skip to content

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
  • 2 commits
  • 1 file changed
  • 0 commit comments
  • 1 contributor
Showing with 37 additions and 23 deletions.
  1. +37 −23 mq_server.ml
View
60 mq_server.ml
@@ -95,6 +95,7 @@ let is_prefix_topic topic =
let remove_topic_subs broker topic conn =
try
if not (is_prefix_topic topic) then begin
+ H.remove conn.conn_topics topic;
let conns = H.find broker.b_topics topic in
match CONNS.remove conn conns with
s when CONNS.is_empty s -> H.remove broker.b_topics topic
@@ -207,31 +208,34 @@ let rec send_to_recipient ~kind broker listeners conn subs queue msg =
(* if kind is Saved, the msg is believed not to be in the ACK-pending set;
* if it actually is, this means it was already sent to some other conn,
* so we don't try to send it again *)
- lwt must_send = (match kind with
- Ack_pending -> (* the message was already in ACK-pending set *) return true
- | Saved -> (* just move to ACK *)
- P.register_ack_pending_msg broker.b_msg_store msg_id) in
- if not must_send then return () else
-
- STOMP.send_message ~eol:broker.b_frame_eol conn.conn_och msg >>
- begin try_lwt
- match msg.msg_ack_timeout with
- dt when dt > 0. -> Lwt_unix.with_timeout dt (fun () -> sleep)
- | _ -> sleep
- finally
- (* either ACKed or Timeout/Cancel, at any rate, no longer want the ACK *)
- H.remove conn.conn_pending_acks msg_id;
- subs.qs_pending_acks <- subs.qs_pending_acks - 1;
- return ()
- end >>
- begin
- DEBUG(show "Conn %d ACKed %S." conn.conn_id msg_id);
- P.ack_msg broker.b_msg_store msg_id >>
+ lwt must_send =
+ (match kind with
+ Ack_pending -> (* the message was already in ACK-pending set *) return true
+ | Saved -> (* just move to ACK *)
+ P.register_ack_pending_msg broker.b_msg_store msg_id)
+ in
+ if not must_send then
+ return ()
+ else
+ STOMP.send_message ~eol:broker.b_frame_eol conn.conn_och msg >>
+ begin try_lwt
+ match msg.msg_ack_timeout with
+ dt when dt > 0. -> Lwt_unix.with_timeout dt (fun () -> sleep)
+ | _ -> sleep
+ finally
+ (* either ACKed or Timeout/Cancel, at any rate, no longer want the ACK *)
+ H.remove conn.conn_pending_acks msg_id;
+ subs.qs_pending_acks <- subs.qs_pending_acks - 1;
+ return ()
+ end >>
+ begin
+ DEBUG(show "Conn %d ACKed %S." conn.conn_id msg_id);
+ P.ack_msg broker.b_msg_store msg_id >>
(* try to send older messages for the subscription whose message
* we just ACKed *)
- (ignore_result (send_saved_messages broker) queue;
- return ())
- end
+ (ignore_result (send_saved_messages broker) queue;
+ return ())
+ end
and send_saved_messages ?(only_once = false) broker queue =
if not (have_recipient broker queue) then return () else
@@ -455,6 +459,16 @@ let handle_control_message broker dst conn frame =
Gc.set { Gc.get () with Gc.verbose = int_of_string frame.STOMP.fr_body };
return []
end
+ else if dst = "gc-max-overhead" then begin
+ DEBUG(show "GC max-overhead %S" frame.STOMP.fr_body);
+ Gc.set { Gc.get () with Gc.max_overhead = int_of_string frame.STOMP.fr_body };
+ return []
+ end
+ else if dst = "gc-space-overhead" then begin
+ DEBUG(show "GC space-overhead %S" frame.STOMP.fr_body);
+ Gc.set { Gc.get () with Gc.space_overhead = int_of_string frame.STOMP.fr_body };
+ return []
+ end
else if dst = "toggle-debug" then begin
broker.b_debug <- not (broker.b_debug);
return []

No commit comments for this range

Something went wrong with that request. Please try again.