Permalink
Browse files

ocamlmq: implemented -max-prefetch option.

  • Loading branch information...
1 parent 645c93a commit 80fa348e5ad4658e019ebe3eb81c8ec758da8fbf @mfp committed Nov 17, 2010
Showing with 12 additions and 4 deletions.
  1. +1 −0 README
  2. +7 −3 mq_server.ml
  3. +4 −1 ocamlmq.ml
View
1 README
@@ -100,6 +100,7 @@ Usage: ocamlmq [options] [sqlite3 database (default: ocamlmq.db)]
-port PORT Port to listen at (default: 61613)
-login LOGIN Login expected in CONNECT
-passcode PASSCODE Passcode expected in CONNECT
+ -max-prefetch N Maximum allowed prefetch limit (default: 100)
-maxmsgs N Keep at most N msgs in mem before hard flush (default: 100000)
-flush-period DT Hard flush period in seconds (default: 1.0)
-binlog FILE Use FILE as the binlog for msgs in mem (default: none)
View
@@ -78,6 +78,7 @@ type broker = {
mutable b_async_usedmem : int;
b_login : string option;
b_passcode : string option;
+ b_max_prefetch : int;
}
DEFINE DEBUG(what_to_print) = if broker.b_debug then what_to_print
@@ -319,12 +320,14 @@ let cmd_subscribe broker conn frame =
end
| Queue name -> begin
DEBUG(show "Conn %d subscribed to queue %S." conn.conn_id name);
+ let max_prefetch = broker.b_max_prefetch in
let subscription =
{
qs_prefetch =
(try
- int_of_string (STOMP.get_header frame "prefetch")
- with _ -> -1);
+ let n = int_of_string (STOMP.get_header frame "prefetch") in
+ if n <= 0 then max_prefetch else min n max_prefetch
+ with _ -> max_prefetch);
qs_pending_acks = 0;
}
in H.replace conn.conn_queues name subscription;
@@ -547,7 +550,7 @@ and do_establish_connection broker addr ich och =
let make_broker
?(frame_eol = true) ?(force_send_async = false)
?(send_async_max_mem = 32 * 1024 * 1024)
- ?login ?passcode
+ ?(max_prefetch=10) ?login ?passcode
msg_store address =
let sock = Lwt_unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
Lwt_unix.setsockopt sock Unix.SO_REUSEADDR true;
@@ -567,6 +570,7 @@ let make_broker
b_debug = false;
b_login = login;
b_passcode = passcode;
+ b_max_prefetch = max_prefetch;
}
let server_loop ?(debug = false) broker =
View
@@ -14,13 +14,16 @@ let max_in_mem = ref 100000
let flush_period = ref 1.
let binlog = ref ""
let sync_binlog = ref false
+let max_prefetch = ref 100
let params =
Arg.align
[
"-port", Arg.Set_int port, "PORT Port to listen at (default: 61613)";
"-login", set_some_string login, "LOGIN Login expected in CONNECT";
"-passcode", set_some_string passcode, "PASSCODE Passcode expected in CONNECT";
+ "-max-prefetch", Arg.Set_int max_prefetch,
+ "N Maximum allowed prefetch limit (default: 100)";
"-maxmsgs", Arg.Set_int max_in_mem,
"N Keep at most N msgs in mem before hard flush (default: 100000)";
"-flush-period", Arg.Set_float flush_period,
@@ -62,7 +65,7 @@ let () =
eprintf "Initializing database... %!";
Mq_sqlite_persistence.initialize msg_store >>
let () = eprintf "DONE\n%!" in
- lwt broker = SERVER.make_broker
+ lwt broker = SERVER.make_broker ~max_prefetch:!max_prefetch
?login:!login ?passcode:!passcode msg_store addr
in SERVER.server_loop ~debug:!debug broker
end

0 comments on commit 80fa348

Please sign in to comment.