Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Binlog, Mq_sqlite_persistence: change recovery scheme.

  • Loading branch information...
commit c65f14108d2e36cdd91e17e2a5eeb636f8926acf 1 parent 8da11a1
@mfp authored
Showing with 48 additions and 28 deletions.
  1. +43 −21 binlog.ml
  2. +1 −2  binlog.mli
  3. +4 −5 mq_sqlite_persistence.ml
View
64 binlog.ml
@@ -1,5 +1,6 @@
open Lwt
open Mq_types
+open Printf
type t = { fd : Unix.file_descr; och : Lwt_io.output_channel }
@@ -8,12 +9,6 @@ type record =
| Del of string
| Nothing
-let make ?(sync = false) file =
- let sync = if sync then [ Unix.O_SYNC ] else [] in
- let fd = Unix.openfile file ([ Unix.O_WRONLY; Unix.O_CREAT; Unix.O_TRUNC ] @ sync) 0o640 in
- let och = Lwt_io.of_unix_fd ~mode:Lwt_io.output fd in
- { fd = fd; och = och; }
-
let truncate t =
Unix.ftruncate t.fd 0;
ignore (Unix.lseek t.fd 0 Unix.SEEK_SET)
@@ -55,19 +50,6 @@ let read_record ich =
end
with End_of_file -> return Nothing
-let read file =
- try_lwt
- Lwt_io.with_file ~mode:Lwt_io.input file
- (fun ich ->
- let h = Hashtbl.create 13 in
- let rec loop () = read_record ich >>= function
- Nothing -> return ()
- | Add msg -> Hashtbl.add h msg.msg_id msg; loop ()
- | Del msg_id -> Hashtbl.remove h msg_id; loop ()
- in loop () >>
- return (Hashtbl.fold (fun _ msg l -> msg :: l) h []))
- with Unix.Unix_error _ -> return []
-
let write_string och s =
LE.write_int och (String.length s) >>
Lwt_io.write och s
@@ -85,8 +67,48 @@ let write_record och = function
write_string och msg.msg_body >>
LE.write_float64 och msg.msg_ack_timeout
-let write_record och r =
- Lwt_io.atomic (fun och -> write_record och r) och >> Lwt_io.flush och
+let write_record ?(flush = true) och r =
+ Lwt_io.atomic (fun och -> write_record och r) och >>
+ if flush then Lwt_io.flush och else return ()
let cancel t msg = write_record t.och (Del msg.msg_id)
let add t msg = write_record t.och (Add msg)
+
+(* Creates [dst] even if [src] cannot be read *)
+let copy src dst =
+ Lwt_io.with_file
+ ~flags:[Unix.O_WRONLY; Unix.O_CREAT; Unix.O_TRUNC]
+ ~mode:Lwt_io.output dst (fun _ -> return ()) >>
+ Lwt_io.with_file ~mode:Lwt_io.input src
+ (fun ich ->
+ Lwt_io.with_file
+ ~flags:[Unix.O_SYNC; Unix.O_WRONLY; Unix.O_CREAT; Unix.O_TRUNC]
+ ~buffer_size:(1024 * 1024)
+ ~mode:Lwt_io.output dst
+ (fun och ->
+ let h = Hashtbl.create 13 in
+ let rec copy_loop () =
+ read_record ich >>= function
+ Add msg as x ->
+ Hashtbl.add h msg.msg_id msg;
+ write_record ~flush:false och x >> copy_loop ()
+ | Del msg_id as x ->
+ Hashtbl.remove h msg_id;
+ write_record ~flush:false och x >> copy_loop ()
+ | Nothing -> return ()
+ in copy_loop () >>
+ return (Hashtbl.fold (fun _ msg l -> msg :: l) h [])))
+
+let copy src dst =
+ try_lwt
+ copy src dst
+ with Unix.Unix_error (Unix.ENOENT, _, _) -> return []
+
+let make ?(sync = false) file =
+ let tmp = sprintf "%s.%d.%d" file (Unix.getpid ()) (Random.int 0x3FFFFFFF) in
+ lwt msgs = copy file tmp in
+ Sys.rename tmp file;
+ let sync = if sync then [ Unix.O_SYNC ] else [] in
+ let fd = Unix.openfile file ([ Unix.O_WRONLY; ] @ sync) 0o640 in
+ let och = Lwt_io.of_unix_fd ~mode:Lwt_io.output fd in
+ return ({ fd = fd; och = och; }, msgs)
View
3  binlog.mli
@@ -1,8 +1,7 @@
type t
-val read : string -> Mq_types.message list Lwt.t
-val make : ?sync:bool -> string -> t
+val make : ?sync:bool -> string -> (t * Mq_types.message list) Lwt.t
val truncate : t -> unit
val add : t -> Mq_types.message -> unit Lwt.t
val cancel : t -> Mq_types.message -> unit Lwt.t
View
9 mq_sqlite_persistence.ml
@@ -180,7 +180,7 @@ let initialize t =
sqlinit"CREATE TABLE mem.acked_msgs(msg_id VARCHAR(255) NOT NULL PRIMARY KEY)";
return ()
-let do_save_msg t sent msg =
+let do_save_msg ?(can_flush = true) t sent msg =
let dest = destination_name msg.msg_destination in
let v = (msg.msg_priority, msg) in
begin
@@ -197,7 +197,7 @@ let do_save_msg t sent msg =
in Hashtbl.add t.in_mem dest p
end;
Hashtbl.add t.in_mem_msgs msg.msg_id msg;
- if Hashtbl.length t.in_mem_msgs > t.max_msgs_in_mem then
+ if can_flush && Hashtbl.length t.in_mem_msgs > t.max_msgs_in_mem then
Lwt.wakeup t.flush_alarm ();
return ()
@@ -340,11 +340,10 @@ let crash_recovery t =
begin match t.binlog_file with
None -> return ()
| Some f ->
- lwt msgs = Binlog.read f in
- let binlog = Binlog.make ~sync:t.sync_binlog f in
+ lwt binlog, msgs = Binlog.make ~sync:t.sync_binlog f in
t.binlog <- Some binlog;
eprintf "(binlog: %d msgs) %!" (List.length msgs);
- Lwt_list.iter_s (save_msg t) msgs
+ Lwt_list.iter_s (do_save_msg ~can_flush:false t false) msgs
end
let init_db, check_db, auto_check_db = sql_check"sqlite"
Please sign in to comment.
Something went wrong with that request. Please try again.