Permalink
Browse files

Mq_sqlite_persistence: write messages to stderr.

  • Loading branch information...
1 parent 5f62008 commit 792515e5fe8f7f18bc1e7e6f34e04e1c35258664 @mfp committed Nov 12, 2010
Showing with 10 additions and 9 deletions.
  1. +10 −9 mq_sqlite_persistence.ml
View
@@ -31,17 +31,18 @@ let count_unmaterialized_pending_acks db =
let count_acked_messages db =
select_one db sqlc"SELECT @L{COUNT(*)} FROM acked_msgs"
+let pr fmt = ksprintf (eprintf "%s%!") fmt
+let puts fmt = ksprintf prerr_endline fmt
+
let flush_acked_msgs ?(verbose = false) db =
- if verbose then
- printf "Flushing %Ld ACKs\n%!" (count_acked_messages db);
+ if verbose then puts "Flushing %Ld ACK" (count_acked_messages db);
execute db
sqlc"DELETE FROM ocamlmq_msgs WHERE msg_id IN (SELECT * FROM acked_msgs)";
execute db sqlc"DELETE FROM acked_msgs"
let materialize_pending_acks ?(verbose = false) db =
if verbose then
- printf "Materializing %Ld pending ACKs in DB\n%!"
- (count_unmaterialized_pending_acks db);
+ puts "Materializing %Ld pending ACKs in DB" (count_unmaterialized_pending_acks db);
execute db sqlc"UPDATE ocamlmq_msgs SET ack_pending = 1
WHERE msg_id IN (SELECT msg_id FROM pending_acks)";
execute db sqlc"DELETE FROM pending_acks"
@@ -58,12 +59,12 @@ let rec flush t =
unmaterialized_ack_pendings <> 0L || acked_msgs <> 0L
then begin
flushed := true;
- printf "Flushing to disk: %d msgs, %d + %Ld pending ACKS, %Ld ACKS%!"
+ pr "Flushing to disk: %d msgs, %d + %Ld pending ACKS, %Ld ACKS"
in_mem_msgs ack_pending unmaterialized_ack_pendings acked_msgs;
do_flush t db;
end
end;
- if !flushed then printf " (%8.5fs)\n%!" (Unix.gettimeofday () -. t0)
+ if !flushed then puts " (%8.5fs)" (Unix.gettimeofday () -. t0)
and do_flush t db =
Hashtbl.iter
@@ -121,18 +122,18 @@ let make ?(max_msgs_in_mem = max_int) ?(flush_period = 1.0) file =
if not (SSET.is_empty t.unacks) then
transaction t.db
(fun db ->
- printf "UnACKing %d messages in DB\n%!" (SSET.cardinal t.unacks);
+ puts "UnACKing %d messages in DB" (SSET.cardinal t.unacks);
SSET.iter (unack db) t.unacks);
t.unacks <- SSET.empty;
loop_flush_unacks ()
in
ignore
(try_lwt loop_flush wait_flush
- with e -> printf "EXCEPTION IN FLUSHER: %s\n%!" (Printexc.to_string e);
+ with e -> puts "EXCEPTION IN FLUSHER: %s" (Printexc.to_string e);
return ());
ignore
(try_lwt loop_flush_unacks ()
- with e -> printf "EXCEPTION IN UNACK FLUSHER: %s\n%!" (Printexc.to_string e);
+ with e -> puts "EXCEPTION IN UNACK FLUSHER: %s" (Printexc.to_string e);
return ());
t

0 comments on commit 792515e

Please sign in to comment.