Skip to content
Browse files

ocamlmq: implemented -binlog option.

  • Loading branch information...
1 parent cb8b4aa commit ad7ae2164ba6bfd298781d99687542682e2f41f4 @mfp committed Nov 14, 2010
Showing with 125 additions and 6 deletions.
  1. +1 −0 OMakefile
  2. +91 −0 binlog.ml
  3. +9 −0 binlog.mli
  4. +19 −5 mq_sqlite_persistence.ml
  5. +1 −1 mq_sqlite_persistence.mli
  6. +4 −0 ocamlmq.ml
View
1 OMakefile
@@ -24,6 +24,7 @@ OCAMLPACKS[] =
camlp4.macro
OBJECTS[] =
+ binlog
extSet
mq_types
mq_stomp
View
91 binlog.ml
@@ -0,0 +1,91 @@
+open Lwt
+open Mq_types
+
+type t = { fd : Unix.file_descr; och : Lwt_io.output_channel }
+
+type record =
+ Add of message
+ | Del of string
+ | Nothing
+
+let make file =
+ let fd = Unix.openfile file [ Unix.O_WRONLY; Unix.O_CREAT; Unix.O_TRUNC ] 0o640 in
+ let och = Lwt_io.of_unix_fd ~mode:Lwt_io.output fd in
+ { fd = fd; och = och; }
+
+let truncate t =
+ Unix.ftruncate t.fd 0;
+ ignore (Unix.lseek t.fd 0 Unix.SEEK_SET)
+
+module LE = Lwt_io.LE
+
+let read_exactly ch n =
+ if n < 0 then fail End_of_file else
+ let s = String.create n in
+ Lwt_io.read_into_exactly ch s 0 n >>
+ return s
+
+let read_string ich =
+ lwt len = LE.read_int ich in
+ read_exactly ich len
+
+let read_record ich =
+ try_lwt
+ lwt kind = Lwt_io.read_char ich in
+ begin match kind with
+ 'A' ->
+ lwt id = read_string ich in
+ lwt dest = read_string ich in
+ lwt prio = LE.read_int ich in
+ lwt timestamp = LE.read_float64 ich in
+ lwt body = read_string ich in
+ lwt timeout = LE.read_float64 ich in
+ let r =
+ Add
+ {
+ msg_id = id; msg_destination = Queue dest;
+ msg_priority = prio; msg_timestamp = timestamp;
+ msg_body = body; msg_ack_timeout = timeout
+ }
+ in return r
+ | 'B' ->
+ lwt id = read_string ich in return (Del id)
+ | _ -> raise End_of_file
+ end
+ with End_of_file -> return Nothing
+
+let read file =
+ try_lwt
+ Lwt_io.with_file ~mode:Lwt_io.input file
+ (fun ich ->
+ let h = Hashtbl.create 13 in
+ let rec loop () = read_record ich >>= function
+ Nothing -> return ()
+ | Add msg -> Hashtbl.add h msg.msg_id msg; loop ()
+ | Del msg_id -> Hashtbl.remove h msg_id; loop ()
+ in loop () >>
+ return (Hashtbl.fold (fun _ msg l -> msg :: l) h []))
+ with Unix.Unix_error _ -> return []
+
+let write_string och s =
+ LE.write_int och (String.length s) >>
+ Lwt_io.write och s
+
+let write_record och = function
+ Nothing -> return ()
+ | Del msg_id ->
+ Lwt_io.write_char och 'D' >> write_string och msg_id
+ | Add msg ->
+ Lwt_io.write_char och 'A' >>
+ write_string och msg.msg_id >>
+ write_string och (destination_name msg.msg_destination) >>
+ LE.write_int och msg.msg_priority >>
+ LE.write_float64 och msg.msg_timestamp >>
+ write_string och msg.msg_body >>
+ LE.write_float64 och msg.msg_ack_timeout
+
+let write_record och r =
+ Lwt_io.atomic (fun och -> write_record och r) och >> Lwt_io.flush och
+
+let cancel t msg = write_record t.och (Del msg.msg_id)
+let add t msg = write_record t.och (Add msg)
View
9 binlog.mli
@@ -0,0 +1,9 @@
+
+type t
+
+val read : string -> Mq_types.message list Lwt.t
+val make : string -> t
+val truncate : t -> unit
+val add : t -> Mq_types.message -> unit Lwt.t
+val cancel : t -> Mq_types.message -> unit Lwt.t
+
View
24 mq_sqlite_persistence.ml
@@ -23,6 +23,8 @@ type t = {
mutable flush_alarm : unit Lwt.u;
max_msgs_in_mem : int;
mutable unacks : SSET.t;
+ binlog_file : string option;
+ mutable binlog : Binlog.t option;
}
let count_unmaterialized_pending_acks db =
@@ -74,6 +76,7 @@ let rec flush t =
do_flush t db;
end
end;
+ Option.may Binlog.truncate t.binlog;
if !flushed then puts " (%8.5fs)" (Unix.gettimeofday () -. t0)
and do_flush t db =
@@ -109,14 +112,15 @@ let unack db msg_id =
sqlc"UPDATE ocamlmq_msgs SET ack_pending = 0 WHERE msg_id = %s"
msg_id
-let make ?(max_msgs_in_mem = max_int) ?(flush_period = 1.0) file =
+let make ?(max_msgs_in_mem = max_int) ?(flush_period = 1.0) ?binlog file =
let wait_flush, awaken_flush = Lwt.wait () in
let t =
{ db = Sqlexpr_sqlite.open_db file; in_mem = Hashtbl.create 13;
in_mem_msgs = Hashtbl.create 13; ack_pending = SSET.empty;
flush_alarm = awaken_flush;
max_msgs_in_mem = max_msgs_in_mem;
unacks = SSET.empty;
+ binlog_file = binlog; binlog = None;
} in
let flush_period = max flush_period 0.005 in
let rec loop_flush wait_flush =
@@ -195,6 +199,7 @@ let do_save_msg t sent msg =
return ()
let save_msg t ?low_priority msg =
+ Option.map_default (fun log -> Binlog.add log msg) (return ()) t.binlog >>
do_save_msg t false msg
let register_ack_pending_msg t msg_id =
@@ -258,14 +263,15 @@ let ack_msg t msg_id =
t.ack_pending <- SSET.remove msg_id t.ack_pending;
let dst = destination_name msg.msg_destination in
let unsent, want_ack = Hashtbl.find t.in_mem dst in
- Hashtbl.replace t.in_mem dst (unsent, SSET.remove msg_id want_ack)
+ Hashtbl.replace t.in_mem dst (unsent, SSET.remove msg_id want_ack);
+ Option.map_default (fun log -> Binlog.cancel log msg) (return ()) t.binlog
end else begin
execute t.db sqlc"INSERT INTO acked_msgs(msg_id) VALUES(%s)" msg_id;
t.ack_pending <- SSET.remove msg_id t.ack_pending;
if count_acked_messages t.db > 100L then
transaction t.db (flush_acked_msgs ~verbose:true);
- end;
- return ()
+ return ()
+ end
let unack_msg t msg_id =
if SSET.mem msg_id t.ack_pending then begin
@@ -328,6 +334,14 @@ let count_queue_msgs t dst =
let crash_recovery t =
execute t.db sqlc"UPDATE ocamlmq_msgs SET ack_pending = 0 WHERE ack_pending = 1";
- return ()
+ begin match t.binlog_file with
+ None -> return ()
+ | Some f ->
+ lwt msgs = Binlog.read f in
+ let binlog = Binlog.make f in
+ t.binlog <- Some binlog;
+ eprintf "(binlog: %d msgs) %!" (List.length msgs);
+ Lwt_list.iter_s (save_msg t) msgs
+ end
let init_db, check_db, auto_check_db = sql_check"sqlite"
View
2 mq_sqlite_persistence.mli
@@ -1,7 +1,7 @@
include Mq_server.PERSISTENCE
-val make : ?max_msgs_in_mem:int -> ?flush_period:float -> string -> t
+val make : ?max_msgs_in_mem:int -> ?flush_period:float -> ?binlog:string -> string -> t
(* Used for testing *)
val auto_check_db : Format.formatter -> bool
View
4 ocamlmq.ml
@@ -12,6 +12,7 @@ let passcode = ref None
let db = ref None
let max_in_mem = ref 100000
let flush_period = ref 1.
+let binlog = ref ""
let params =
Arg.align
@@ -23,6 +24,8 @@ let params =
"N Flush to disk when there are more than N msgs in mem (default: 100000)";
"-flush-period", Arg.Set_float flush_period,
"DT Flush period in seconds (default: 1.0)";
+ "-binlog", Arg.Set_string binlog,
+ "FILE Use FILE as the binlog for msgs in mem (default: none).";
"-debug", Arg.Set debug, " Write debug info to stderr.";
]
@@ -48,6 +51,7 @@ let () =
Mq_sqlite_persistence.make
~max_msgs_in_mem:!max_in_mem
~flush_period:!flush_period
+ ?binlog:(match !binlog with "" -> None | s -> Some s)
(Option.default "ocamlmq.db" !db)
in
if !debug then eprintf "Connected to database.\n%!";

0 comments on commit ad7ae21

Please sign in to comment.
Something went wrong with that request. Please try again.