Skip to content
Browse files

Switch to Sqlite-based persistence module.

  • Loading branch information...
1 parent 35e701b commit 351eaa12c2f621f9a0d4856f058d2d59dd1b3190 @mfp committed
Showing with 215 additions and 240 deletions.
  1. +3 −0 .gitmodules
  2. +12 −10 OMakefile
  3. +0 −21 generate_prepared_statements.rb
  4. +0 −146 mq_pg_persistence.ml
  5. +0 −7 mq_pg_persistence.mli
  6. +185 −0 mq_sqlite_persistence.ml
  7. +4 −0 mq_sqlite_persistence.mli
  8. +1 −0 ocaml-sqlexpr
  9. +10 −38 ocamlmq.ml
  10. +0 −18 schema.sql
View
3 .gitmodules
@@ -0,0 +1,3 @@
+[submodule "ocaml-sqlexpr"]
+ path = ocaml-sqlexpr
+ url = git://github.com/mfp/ocaml-sqlexpr.git
View
22 OMakefile
@@ -4,35 +4,37 @@ BYTE_ENABLED = true
OCAMLOPTFLAGS = -S -inline 100
USE_OCAMLFIND = true
+.SUBDIRS: ocaml-sqlexpr
+
OCAMLCFLAGS += -g -annot
-OCAMLFLAGS += -syntax camlp4o
-OCAMLDEPFLAGS += -syntax camlp4o
+OCAMLFINDFLAGS += -syntax camlp4o
+OCAMLINCLUDES += ocaml-sqlexpr
OCAMLPACKS[] =
lwt
lwt.unix
lwt.syntax
+ estring
extlib
unix
str
- pgocaml
- pgocaml.syntax
+ sqlite3
camlp4.macro
OBJECTS[] =
extSet
mq_types
mq_stomp
- pGOCaml_lwt
- mq_pg_persistence
mq_server
+ ocaml-sqlexpr/sqlexpr_sqlite
+ mq_sqlite_persistence
ternary
ocamlmq
-mq_schema.ml: schema.sql generate_prepared_statements.rb
- ruby generate_prepared_statements.rb create_db $< > $@
-
-.SCANNER: scan-ocaml-mq_pg_persistence.ml: mq_pg_persistence.ml mq_schema.ml
+section
+ OCAMLFINDFLAGS += -ppopt ocaml-sqlexpr/pa_sql.cmo
+ .SCANNER: scan-ocaml-%.ml: %.ml ocaml-sqlexpr/pa_sql.cmo
+ $(addsuffixes .cmi .cmx .cmo .o, mq_sqlite_persistence):
OCamlProgram(ocamlmq, $(OBJECTS))
View
21 generate_prepared_statements.rb
@@ -1,21 +0,0 @@
-#!/usr/bin/env ruby
-
-funcname = ARGV.shift
-stmts = ARGF.read.split(/;/).map{|x| x.strip}.reject{|x| x.empty?}
-
-temp_stmts = stmts.map do |s|
- s = s.gsub(/create table/i, "create temporary table")
- %[PGSQL(dbh) "execute" "#{s.gsub(/"/, "\\\"")};"]
-end
-
-stmts = stmts.map{ |s| %[PGSQL(dbh) "#{s.gsub(/"/, "\\\"")};"] }
-
-puts <<EOF
-let #{funcname}_temp dbh =
-
-#{temp_stmts.join(" >> \n")}
-
-let #{funcname} dbh =
-
-#{stmts.join(" >> \n")}
-EOF
View
146 mq_pg_persistence.ml
@@ -1,146 +0,0 @@
-(* Copyright (c) 2010 Mauricio Fernández <mfp@acm.org> *)
-open Printf
-open Mq_types
-open Lwt
-
-module PGOCaml = PGOCaml_lwt
-INCLUDE "mq_schema.ml"
-
-type t = {
- dbconns : PGOCaml.pa_pg_data PGOCaml.t Lwt_pool.t;
- dbconns' : PGOCaml.pa_pg_data PGOCaml.t Lwt_pool.t; (* low priority *)
- debug : bool
-}
-
-let initialize t = Lwt_pool.use t.dbconns create_db
-
-let connect
- ?host ?port ?unix_domain_socket_dir ?database ?user ?password
- ?(debug = false) ?(max_conns = 2) () =
- let create_conn () = PGOCaml.connect ?host ?port ?database ?unix_domain_socket_dir
- ?user ?password () in
- let max_conns = max 2 max_conns in
- let pool = Lwt_pool.create (max_conns / 2) create_conn in
- let pool' = Lwt_pool.create (max_conns / 2) create_conn in
- (* try to connect so we raise the exception as early as possible if
- * something's wrong *)
- Lwt_pool.use pool (fun _ -> return ()) >>
- return { dbconns = pool; dbconns' = pool'; debug = debug }
-
-let msg_of_tuple (msg_id, dst, timestamp, priority, ack_timeout, body) =
- {
- msg_id = msg_id;
- msg_destination = Queue dst;
- msg_priority = Int32.to_int priority;
- msg_timestamp = CalendarLib.Calendar.to_unixfloat timestamp;
- msg_ack_timeout = ack_timeout;
- msg_body = body;
- }
-
-DEFINE WithDB(x) = Lwt_pool.use t.dbconns (fun dbh -> x)
-DEFINE WithDB_trans(x) =
- Lwt_pool.use t.dbconns
- (fun dbh ->
- PGOCaml.begin_work dbh >>
- try_lwt
- lwt y = x in
- PGOCaml.commit dbh >> return y
- with e -> PGOCaml.rollback dbh >> fail e)
-
-let do_save t dbh ?(ack_pending = false) msg =
- let body = msg.msg_body in
- let time = CalendarLib.Calendar.from_unixfloat msg.msg_timestamp in
- let msg_id = msg.msg_id in
- let priority = Int32.of_int msg.msg_priority in
- let queue = destination_name msg.msg_destination in
- let ack_timeout = msg.msg_ack_timeout in
- if t.debug then eprintf "Saving message %S.\n%!" msg_id;
- PGSQL(dbh)
- "INSERT INTO ocamlmq_msgs(msg_id, ack_pending, priority, destination,
- timestamp, ack_timeout, body)
- VALUES ($msg_id, $ack_pending, $priority, $queue, $time,
- $ack_timeout, $body)"
-
-let save_msg t ?(low_priority = false) msg = match msg.msg_destination with
- Topic _ | Control _ -> return ()
- | Queue queue ->
- if not low_priority then WithDB(do_save t dbh msg)
- else Lwt_pool.use t.dbconns' (fun dbh -> do_save t dbh msg)
-
-let get_ack_pending_msg t msg_id =
- WithDB begin
- PGSQL(dbh)
- "SELECT msg_id, destination, timestamp, priority, ack_timeout, body
- FROM ocamlmq_msgs as msg
- WHERE msg_id = $msg_id AND ack_pending = true"
- end
- >>= function
- | tuple :: _ -> return (Some (msg_of_tuple tuple))
- | [] -> return None
-
-let register_ack_pending_new_msg t msg =
- WithDB(do_save t dbh ~ack_pending:true msg)
-
-let register_ack_pending_msg t msg_id =
- try_lwt
- WithDB_trans begin
- PGSQL(dbh)
- "SELECT 1 FROM ocamlmq_msgs
- WHERE msg_id = $msg_id AND ack_pending = false
- FOR UPDATE" >>= function
- [] -> return false
- | _ ->
- PGSQL(dbh)
- "UPDATE ocamlmq_msgs SET ack_pending = true WHERE msg_id = $msg_id" >>
- return true
- end
- with _ -> return false
-
-let rec get_msg_for_delivery t queue =
- try_lwt
- WithDB_trans begin
- lwt tuples =
- PGSQL(dbh)
- "SELECT msg_id, destination, timestamp, priority, ack_timeout, body
- FROM ocamlmq_msgs as msg
- WHERE destination = $queue AND ack_pending = false
- ORDER BY priority, timestamp
- LIMIT 1
- FOR UPDATE"
- in match tuples with
- tuple :: _ ->
- let msg = msg_of_tuple tuple in
- let msg_id = msg.msg_id in
- PGSQL(dbh)
- "UPDATE ocamlmq_msgs SET ack_pending = true
- WHERE msg_id = $msg_id" >>
- return (Some msg)
- | [] -> return None
- end
- with _ -> return None (* FIXME: is this OK? *)
-
-let ack_msg t msg_id =
- WithDB(PGSQL(dbh) "DELETE FROM ocamlmq_msgs WHERE msg_id = $msg_id")
-
-let unack_msg t msg_id =
- WithDB(PGSQL(dbh) "UPDATE ocamlmq_msgs SET ack_pending = false WHERE msg_id = $msg_id")
-
-let count_queue_msgs t queue =
- WithDB(PGSQL(dbh) "SELECT COUNT(*) FROM ocamlmq_msgs WHERE destination = $queue")
- >>= function
- Some count :: _ -> return count
- | _ -> return 0L
-
-let crash_recovery t =
- WithDB_trans begin
- if t.debug then eprintf "Recovering from crash...\n%!";
- PGSQL(dbh) "SELECT COUNT(*) FROM ocamlmq_msgs WHERE ack_pending = true" >>= function
- Some n :: _ ->
- eprintf "Recovering %Ld ACK-pending messages: %!" n;
- lwt () = PGSQL(dbh) "UPDATE ocamlmq_msgs SET ack_pending = false
- WHERE ack_pending = true" in
- eprintf "DONE\n%!";
- return ()
- | _ -> eprintf "No ACK-pending messages found.\n%!";
- return ()
- end
View
7 mq_pg_persistence.mli
@@ -1,7 +0,0 @@
-
-include Mq_server.PERSISTENCE
-
-val connect :
- ?host:string -> ?port:int -> ?unix_domain_socket_dir:string ->
- ?database:string -> ?user:string -> ?password:string ->
- ?debug:bool -> ?max_conns:int -> unit -> t Lwt.t
View
185 mq_sqlite_persistence.ml
@@ -0,0 +1,185 @@
+
+open Lwt
+open Printf
+open Mq_types
+open Sqlexpr_sqlite
+
+module MSET = Set.Make(struct type t = int * message let compare = compare end)
+module SSET = Set.Make(String)
+
+type t = {
+ db : db;
+ in_mem : (string, MSET.t) Hashtbl.t;
+ in_mem_msgs : (string, message) Hashtbl.t;
+ mutable ack_pending : SSET.t;
+}
+
+let flush t =
+ transaction t.db begin fun db ->
+ printf "Flushing %d messages to disk (%d ACK pending)\n%!"
+ (Hashtbl.length t.in_mem_msgs) (SSET.cardinal t.ack_pending);
+ Hashtbl.iter
+ (fun _ msg ->
+ execute db
+ sql"INSERT INTO ocamlmq_msgs
+ (msg_id, priority, destination, timestamp,
+ ack_timeout, body)
+ VALUES(%s, %d, %s, %f, %f, %S)"
+ msg.msg_id msg.msg_priority (destination_name msg.msg_destination)
+ msg.msg_timestamp msg.msg_ack_timeout msg.msg_body)
+ t.in_mem_msgs;
+ SSET.iter
+ (execute db sql"INSERT INTO pending_acks(msg_id) VALUES(%s)")
+ t.ack_pending;
+ Hashtbl.clear t.in_mem;
+ Hashtbl.clear t.in_mem_msgs;
+ t.ack_pending <- SSET.empty;
+ end
+
+let make file =
+ let t =
+ { db = open_db file; in_mem = Hashtbl.create 13;
+ in_mem_msgs = Hashtbl.create 13; ack_pending = SSET.empty; } in
+ let rec loop_flush () = Lwt_unix.sleep 1.0 >> (flush t; loop_flush ()) in
+ Lwt.ignore_result
+ (try_lwt loop_flush ()
+ with e -> printf "EXCEPTION IN FLUSHER: %s\n%!" (Printexc.to_string e);
+ return ());
+ t
+
+let initialize t =
+ execute t.db sql"ATTACH \":memory:\" AS mem";
+ execute t.db
+ sql"CREATE TABLE IF NOT EXISTS ocamlmq_msgs(
+ msg_id VARCHAR(255) NOT NULL PRIMARY KEY,
+ priority INT NOT NULL,
+ destination VARCHAR(255) NOT NULL,
+ timestamp DOUBLE NOT NULL,
+ ack_timeout DOUBLE NOT NULL,
+ body BLOB NOT NULL)";
+ execute t.db
+ sql"CREATE INDEX IF NOT EXISTS
+ ocamlmq_msgs_destination_priority_timestamp
+ ON ocamlmq_msgs(destination, priority, timestamp)";
+ execute t.db
+ sql"CREATE TABLE mem.pending_acks(msg_id VARCHAR(255) NOT NULL PRIMARY KEY)";
+ return ()
+
+let save_msg t ?low_priority msg =
+ let dest = destination_name msg.msg_destination in
+ let v = (msg.msg_priority, msg) in
+ begin
+ try Hashtbl.replace t.in_mem dest (MSET.add v (Hashtbl.find t.in_mem dest));
+ with Not_found -> Hashtbl.add t.in_mem dest (MSET.singleton v )
+ end;
+ Hashtbl.add t.in_mem_msgs msg.msg_id msg;
+ return ()
+
+let register_ack_pending_new_msg t msg =
+ t.ack_pending <- SSET.add msg.msg_id t.ack_pending;
+ save_msg t msg
+
+let register_ack_pending_msg t msg_id =
+ if Hashtbl.mem t.in_mem_msgs msg_id then
+ let r = SSET.mem msg_id t.ack_pending in
+ if not r then t.ack_pending <- SSET.add msg_id t.ack_pending;
+ return (not r)
+ else
+ match select t.db sql"SELECT @s{msg_id} FROM mem WHERE msg_id = %s" msg_id with
+ [x] -> return false
+ | _ ->
+ execute t.db sql"INSERT INTO pending_acks(msg_id) VALUES(%s)" msg_id;
+ return true
+
+let msg_of_tuple (msg_id, dst, timestamp, priority, ack_timeout, body) =
+ {
+ msg_id = msg_id;
+ msg_destination = Queue dst;
+ msg_priority = priority;
+ msg_timestamp = timestamp;
+ msg_ack_timeout = ack_timeout;
+ msg_body = body;
+ }
+
+let get_ack_pending_msg t msg_id =
+ try
+ let msg = Hashtbl.find t.in_mem_msgs msg_id in
+ return (if SSET.mem msg_id t.ack_pending then Some msg else None)
+ with Not_found ->
+ match
+ select t.db
+ sql"SELECT @s{msg_id}, @s{destination}, @f{timestamp},
+ @d{priority}, @f{ack_timeout}, @S{body}
+ FROM ocamlmq_msgs AS msg
+ WHERE msg_id = %s
+ AND EXISTS (SELECT 1 FROM pending_acks WHERE msg_id = msg.msg_id)"
+ msg_id
+ with
+ [] -> return None
+ | msg :: _ -> return (Some (msg_of_tuple msg))
+
+let ack_msg t msg_id =
+ if SSET.mem msg_id t.ack_pending then begin
+ let msg = Hashtbl.find t.in_mem_msgs msg_id in
+ Hashtbl.remove t.in_mem_msgs msg_id;
+ t.ack_pending <- SSET.remove msg_id t.ack_pending;
+ let v = (msg.msg_priority, msg) in
+ let dst = destination_name msg.msg_destination in
+ Hashtbl.replace t.in_mem dst (MSET.remove v (Hashtbl.find t.in_mem dst))
+ end else begin
+ execute t.db sql"DELETE FROM ocamlmq_msgs WHERE msg_id = %s" msg_id;
+ execute t.db sql"DELETE FROM pending_acks WHERE msg_id = %s" msg_id;
+ end;
+ return ()
+
+let unack_msg t msg_id =
+ if SSET.mem msg_id t.ack_pending then
+ t.ack_pending <- SSET.remove msg_id t.ack_pending
+ else
+ execute t.db sql"DELETE FROM pending_acks WHERE msg_id = %s" msg_id;
+ return ()
+
+exception Msg of message
+
+let get_msg_for_delivery t dest =
+ try
+ let set = Hashtbl.find t.in_mem dest in
+ try
+ MSET.iter
+ (fun (_, msg) ->
+ if not (SSET.mem msg.msg_id t.ack_pending) then raise (Msg msg))
+ set;
+ raise Not_found
+ with Msg msg ->
+ t.ack_pending <- SSET.add msg.msg_id t.ack_pending;
+ return (Some msg)
+ with Not_found ->
+ let tup =
+ select t.db
+ sql"SELECT @s{msg_id}, @s{destination}, @f{timestamp},
+ @d{priority}, @f{ack_timeout}, @S{body}
+ FROM ocamlmq_msgs as msg
+ WHERE destination = %s
+ AND NOT EXISTS (SELECT 1 FROM pending_acks WHERE msg_id = msg.msg_id)
+ ORDER BY priority, timestamp
+ LIMIT 1 "
+ dest
+ in match tup with
+ [] -> return None
+ | tup :: _ ->
+ let msg = msg_of_tuple tup in
+ execute t.db sql"INSERT INTO pending_acks VALUES(%s)" msg.msg_id;
+ return (Some msg)
+
+let count_queue_msgs t dest =
+ let in_mem =
+ try MSET.cardinal (Hashtbl.find t.in_mem dest) with Not_found -> 0 in
+ let in_db =
+ match
+ select t.db sql"SELECT @L{COUNT(*)} FROM ocamlmq_msgs WHERE destination=%s"
+ dest
+ with [n] -> n
+ | _ -> assert false
+ in return (Int64.add (Int64.of_int in_mem) in_db)
+
+let crash_recovery t = return ()
View
4 mq_sqlite_persistence.mli
@@ -0,0 +1,4 @@
+
+include Mq_server.PERSISTENCE
+
+val make : string -> t
1 ocaml-sqlexpr
@@ -0,0 +1 @@
+Subproject commit 8151e98f68c92bc2ff63326da99fd30d57ae2a69
View
48 ocamlmq.ml
@@ -5,73 +5,45 @@ open Lwt
let set_some_string r = Arg.String (fun s -> r := Some s)
let set_some_int r = Arg.Int (fun n -> r := Some n)
-let db_host = ref None
-let db_port = ref None
-let db_database = ref None
-let db_user = ref None
-let db_password = ref None
-let db_unix_sock_dir = ref None
-let db_max_conns = ref 10
let port = ref 61613
let debug = ref false
-let initdb = ref false
let login = ref None
let passcode = ref None
+let db = ref None
let params =
Arg.align
[
- "-dbhost", set_some_string db_host, "HOST Database server host.";
- "-dbport", set_some_int db_port, "HOST Database server port.";
- "-dbdatabase", set_some_string db_database, "DATABASE Database name.";
- "-dbsockdir", set_some_string db_password, "DIR Database UNIX domain socket dir.";
- "-dbuser", set_some_string db_user, "USER Database user.";
- "-dbpassword", set_some_string db_password, "PASSWORD Database password.";
- "-dbmaxconns", Arg.Set_int db_max_conns, "NUM Maximum size of DB connection pool.";
"-port", Arg.Set_int port, "PORT Port to listen at (default: 61613).";
"-login", set_some_string login, "LOGIN Login expected in CONNECT.";
"-passcode", set_some_string passcode, "PASSCODE Passcode expected in CONNECT.";
- "-initdb", Arg.Set initdb, " Initialize the database (create required tables).";
"-debug", Arg.Set debug, " Write debug info to stderr.";
]
-let usage_message = "Usage: ocamlmq [options]"
+let usage_message = "Usage: ocamlmq [options] [sqlite3 database (default: ocamlmq.db)]"
let _ = Sys.set_signal Sys.sigpipe Sys.Signal_ignore
let _ = Sys.set_signal Sys.sigint (Sys.Signal_handle (fun _ -> exit 0))
-module SERVER = Mq_server.Make(Mq_pg_persistence)
+module SERVER = Mq_server.Make(Mq_sqlite_persistence)
let () =
Arg.parse
params
- (fun s -> eprintf "Unknown argument: %S\n%!" s;
+ (function
+ s when !db = None && s <> "" && s.[0] <> '-' -> db := Some s
+ | s -> eprintf "Unknown argument: %S\n%!" s;
Arg.usage params usage_message;
exit 1)
usage_message;
let addr = Unix.ADDR_INET (Unix.inet_addr_any, !port) in
Lwt_unix.run begin
- lwt msg_store =
- try_lwt
- Mq_pg_persistence.connect
- ?host:!db_host
- ?port:!db_port
- ?database:!db_database
- ?unix_domain_socket_dir:!db_unix_sock_dir
- ?user:!db_user
- ?password:!db_password
- ~debug:!debug
- ~max_conns:!db_max_conns
- ()
- with _ -> (* counldn't connect to DB *)
- eprintf "Could not connect to DB, use the -db* options.\n%!";
- exit 1
+ let msg_store =
+ Mq_sqlite_persistence.make (Option.default "ocamlmq.db" !db)
in
if !debug then eprintf "Connected to database.\n%!";
- (if !initdb then begin
- eprintf "Initializing database.\n%!";
- Mq_pg_persistence.initialize msg_store
- end else return ()) >>
+ eprintf "Initializing database.\n%!";
+ Mq_sqlite_persistence.initialize msg_store >>
lwt broker = SERVER.make_broker
?login:!login ?passcode:!passcode msg_store addr
in SERVER.server_loop ~debug:!debug broker
View
18 schema.sql
@@ -1,18 +0,0 @@
-
-CREATE TABLE ocamlmq_msgs(
- msg_id VARCHAR(255) NOT NULL PRIMARY KEY,
- ack_pending BOOL NOT NULL DEFAULT false,
- priority INT NOT NULL,
- destination VARCHAR(255) NOT NULL,
- timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL,
- ack_timeout FLOAT NOT NULL,
- body BYTEA NOT NULL
-);
-
-CREATE INDEX ocamlmq_msgs_destination_priority_timestamp ON ocamlmq_msgs
- USING BTREE(destination, priority, timestamp);
-
-CREATE INDEX ocamlmq_msgs_ack_pending ON ocamlmq_msgs
- USING BTREE(ack_pending);
-
-

0 comments on commit 351eaa1

Please sign in to comment.
Something went wrong with that request. Please try again.