Permalink
Browse files

Implemented -sync-binlog option.

  • Loading branch information...
1 parent 157b00e commit 8da11a118d70569edcb88d75f1a8e469323f3610 @mfp committed Nov 15, 2010
Showing with 15 additions and 6 deletions.
  1. +3 −2 binlog.ml
  2. +1 −1 binlog.mli
  3. +5 −2 mq_sqlite_persistence.ml
  4. +2 −1 mq_sqlite_persistence.mli
  5. +4 −0 ocamlmq.ml
View
@@ -8,8 +8,9 @@ type record =
| Del of string
| Nothing
-let make file =
- let fd = Unix.openfile file [ Unix.O_WRONLY; Unix.O_CREAT; Unix.O_TRUNC ] 0o640 in
+let make ?(sync = false) file =
+ let sync = if sync then [ Unix.O_SYNC ] else [] in
+ let fd = Unix.openfile file ([ Unix.O_WRONLY; Unix.O_CREAT; Unix.O_TRUNC ] @ sync) 0o640 in
let och = Lwt_io.of_unix_fd ~mode:Lwt_io.output fd in
{ fd = fd; och = och; }
View
@@ -2,7 +2,7 @@
type t
val read : string -> Mq_types.message list Lwt.t
-val make : string -> t
+val make : ?sync:bool -> string -> t
val truncate : t -> unit
val add : t -> Mq_types.message -> unit Lwt.t
val cancel : t -> Mq_types.message -> unit Lwt.t
View
@@ -25,6 +25,7 @@ type t = {
mutable unacks : SSET.t;
binlog_file : string option;
mutable binlog : Binlog.t option;
+ sync_binlog : bool;
}
let count_unmaterialized_pending_acks db =
@@ -112,7 +113,8 @@ let unack db msg_id =
sqlc"UPDATE ocamlmq_msgs SET ack_pending = 0 WHERE msg_id = %s"
msg_id
-let make ?(max_msgs_in_mem = max_int) ?(flush_period = 1.0) ?binlog file =
+let make ?(max_msgs_in_mem = max_int) ?(flush_period = 1.0)
+ ?binlog ?(sync_binlog = false) file =
let wait_flush, awaken_flush = Lwt.wait () in
let t =
{ db = Sqlexpr_sqlite.open_db file; in_mem = Hashtbl.create 13;
@@ -121,6 +123,7 @@ let make ?(max_msgs_in_mem = max_int) ?(flush_period = 1.0) ?binlog file =
max_msgs_in_mem = max_msgs_in_mem;
unacks = SSET.empty;
binlog_file = binlog; binlog = None;
+ sync_binlog = sync_binlog;
} in
let flush_period = max flush_period 0.005 in
let rec loop_flush wait_flush =
@@ -338,7 +341,7 @@ let crash_recovery t =
None -> return ()
| Some f ->
lwt msgs = Binlog.read f in
- let binlog = Binlog.make f in
+ let binlog = Binlog.make ~sync:t.sync_binlog f in
t.binlog <- Some binlog;
eprintf "(binlog: %d msgs) %!" (List.length msgs);
Lwt_list.iter_s (save_msg t) msgs
@@ -1,7 +1,8 @@
include Mq_server.PERSISTENCE
-val make : ?max_msgs_in_mem:int -> ?flush_period:float -> ?binlog:string -> string -> t
+val make : ?max_msgs_in_mem:int -> ?flush_period:float ->
+ ?binlog:string -> ?sync_binlog:bool -> string -> t
(* Used for testing *)
val auto_check_db : Format.formatter -> bool
View
@@ -13,6 +13,7 @@ let db = ref None
let max_in_mem = ref 100000
let flush_period = ref 1.
let binlog = ref ""
+let sync_binlog = ref false
let params =
Arg.align
@@ -26,6 +27,8 @@ let params =
"DT Flush period in seconds (default: 1.0)";
"-binlog", Arg.Set_string binlog,
"FILE Use FILE as the binlog for msgs in mem (default: none).";
+ "-sync-binlog", Arg.Set sync_binlog,
+ " fsync the binlog on each write (default: no)";
"-debug", Arg.Set debug, " Write debug info to stderr.";
]
@@ -52,6 +55,7 @@ let () =
~max_msgs_in_mem:!max_in_mem
~flush_period:!flush_period
?binlog:(match !binlog with "" -> None | s -> Some s)
+ ~sync_binlog:!sync_binlog
(Option.default "ocamlmq.db" !db)
in
if !debug then eprintf "Connected to database.\n%!";

0 comments on commit 8da11a1

Please sign in to comment.