Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'sqlite'

  • Loading branch information...
commit 225f236907deacfff66f6946f7762e76e05e6678 2 parents fef36b6 + be34d96
@mfp authored
View
3  .gitmodules
@@ -0,0 +1,3 @@
+[submodule "ocaml-sqlexpr"]
+ path = ocaml-sqlexpr
+ url = git://github.com/mfp/ocaml-sqlexpr.git
View
31 OMakefile
@@ -4,37 +4,46 @@ BYTE_ENABLED = true
OCAMLOPTFLAGS = -S -inline 100
USE_OCAMLFIND = true
+.SUBDIRS: ocaml-sqlexpr
+
OCAMLCFLAGS += -g -annot
-OCAMLFLAGS += -syntax camlp4o
-OCAMLDEPFLAGS += -syntax camlp4o
+OCAMLFINDFLAGS += -syntax camlp4o
+OCAMLINCLUDES += ocaml-sqlexpr
+OCAML_LIBS[] += ocaml-sqlexpr/sqlexpr
OCAMLPACKS[] =
+ csv
lwt
lwt.unix
lwt.syntax
+ estring
extlib
unix
str
- pgocaml
- pgocaml.syntax
+ sqlite3
camlp4.macro
OBJECTS[] =
+ binlog
extSet
mq_types
mq_stomp
- pGOCaml_lwt
- mq_pg_persistence
mq_server
+ mq_sqlite_persistence
ternary
- ocamlmq
-mq_schema.ml: schema.sql generate_prepared_statements.rb
- ruby generate_prepared_statements.rb create_db $< > $@
+section
+ OCAMLFINDFLAGS += -ppopt ocaml-sqlexpr/pa_sql.cmo
+ .SCANNER: scan-ocaml-%.ml: %.ml ocaml-sqlexpr/pa_sql.cmo
+ $(addsuffixes .cmi .cmx .cmo .o, mq_sqlite_persistence):
-.SCANNER: scan-ocaml-mq_pg_persistence.ml: mq_pg_persistence.ml mq_schema.ml
+OCamlProgram(ocamlmq, $(OBJECTS) ocamlmq)
-OCamlProgram(ocamlmq, $(OBJECTS))
+section
+ OCAMLPACKS[] += oUnit
+ TEST_FILES[] = $(removesuffix $(ls test*ml))
+ OCamlProgram(test, $(OBJECTS) $(TEST_FILES) test)
+ $(addsuffixes .cmi .cmo .cmx .o, $(TEST_FILES) test):
.DEFAULT: ocamlmq$(EXE)
View
121 README
@@ -4,8 +4,9 @@ suitable for implementing task queues and communication between subsystems:
* persistent queues, scaling over arbitrarily large numbers of queues with
constant memory usage (i.e. supports millions of queues)
-* strong durability guarantees: messages are guaranteed to have been saved to
- disk by the time the sender gets a message receipt
+* queued messages need not fit in memory
+* strong durability guarantees: messages can be guaranteed to have been saved
+ to disk (and fsync'ed) by the time the sender gets a message receipt
* message priorities
* per-subscription prefetch limit for queue messages
* error handling and ACK timeout: if a subscriber doesn't ACK a message
@@ -19,9 +20,11 @@ suitable for implementing task queues and communication between subsystems:
* simple extensions within the STOMP protocol to report the number of messages
in a queue and the number of subscribers to a queue or topic
-ocamlmq is written in OCaml, in less than 1200 lines of code. It is easy to
-extend and fairly efficient. The server is abstracted over a storage backend;
-currently only PostgreSQL's is implemented (< 150 lines of code).
+ocamlmq is written in OCaml, in ~1500 lines of code. It is easy to extend and
+fairly efficient. The server is abstracted over a storage backend; there are
+currently two backends:
+* PostgreSQL's (150 LoC)
+* sqlite with in-mem caching (350 LoC)
Scalability
===========
@@ -53,11 +56,8 @@ some limitations which preclude its use in other domains:
* there is no flow control for topic messages (in the intended use case, topic
messages are assumed to be relatively small and processed fast)
* messages are limited to 16 MB on 32-bit platforms
-* the PostgreSQL storage backend can only persist a few thousand
- messages per second (note that ocamlmq allows >50K/s persistent message
- bursts in async mode)
* ocamlmq does not support very high message rates (ocamlmq delivers only
- ~40K messages/second on a 3GHz AMD64 box)
+ ~60K messages/second on a 3GHz AMD64 box)
If you need complex routing rules, scalability to many thousand simultaneous
connections or other _enterprise_ messaging features, you'll be better served
@@ -71,75 +71,78 @@ Building
You'll need a working OCaml environment plus the following libraries:
* Lwt
* extlib
-* PGOCaml
+* ocaml-sqlite3
+* csv
+* estring
Additionally, ocamlmq requires PostgreSQL both at compile- and run-time.
-If you have omake, just do
+If you have omake, Just do
$ omake
-Otherwise, run
+otherwise, run
$ sh build.sh
-If the compilation fails with the following error
+Installing
+==========
- File "./mq_schema.ml", line 3, characters 21-327 (end at line 11, character 3):
- Camlp4: Uncaught exception: Unix.Unix_error (20 | CstTag21, "connect", "")
+The ocamlmq executable is self-contained and can be copied to any directory in
+your PATH if desired.
-you will have to provide some information using env. variables so that PGOCaml
-can connect to the PostgreSQL server (PGOCaml does this to check statically
-that all the SQL statements are valid), e.g.:
-
- PGHOST=localhost PGUSER=myself PGPASSWORD=myself omake --verbose
+Running
+=======
- or
+ocamlmq's configuration is given via the command line:
- PGHOST=localhost PGUSER=myself PGPASSWORD=myself sh build.sh
+Usage: ocamlmq [options] [sqlite3 database (default: ocamlmq.db)]
+ -port PORT Port to listen at (default: 61613)
+ -login LOGIN Login expected in CONNECT
+ -passcode PASSCODE Passcode expected in CONNECT
+ -maxmsgs N Keep at most N msgs in mem before hard flush (default: 100000)
+ -flush-period DT Hard flush period in seconds (default: 1.0)
+ -binlog FILE Use FILE as the binlog for msgs in mem (default: none)
+ -sync-binlog fsync the binlog on each write (default: no)
+ -debug Write debug info to stderr
+ -help Display this list of options
+ --help Display this list of options
+
+ocamlmq stores the messages in memory (and optionally in the binlog) and
+flushes to the permanent store (with full fsync) when either:
+* more than [maxmsgs] messages have been received since the last flush or
+* it's been [flush-period] seconds since the last flush
+
+With the -binlog option, the messages kept in mem will also be written to a
+binary log before they are flushed (and fsync'ed) to the permanent store.
+This way, in-mem (not yet flushed) messages can be recovered if ocamlmq were
+killed or crashed. You can moreover ensure that the binlog is fsync'ed after
+each write with -sync-binlog. This brings you the strongest durability
+guarantees, for every single message is fsync'ed before the broker
+acknowledges its reception.
+
+Performance
+===========
-This is the full list of options recognized by PGOcaml:
+Here follow some figures, taken on an oldish dual core AMD Athlon 64 X2 with a
+7200 RPM SATA disk.
- PGHOST
- PGPORT
- PGUSER
- PGPASSWORD
- PGDATABASE
- UNIX_DOMAIN_SOCKET_DIR
+Queueing (synchronously, with receipt)
-A temporary table will be created in the database while ocamlmq is compiled;
-no permanent changes will be made.
+ maxmsgs throughput throughput (-binlog)
+ ----------------------------------------------------
+ 1000 >11000 msg/s >9000 msg/s
+ 5000 >12000 msg/s >9000 msg/s
+ "infty" >17000 msg/s >14000 msg/s
-Running
-=======
+ If the binlog is enabled and -sync-binlog in use, the maximum throughput is
+ around 4000 msg/s.
-ocamlmq's configuration is given via the command line:
+Delivering:
- $ ./ocamlmq -help
- Usage: ocamlmq [options]
- -dbhost HOST Database server host.
- -dbport HOST Database server port.
- -dbdatabase DATABASE Database name.
- -dbsockdir DIR Database UNIX domain socket dir.
- -dbuser USER Database user.
- -dbpassword PASSWORD Database password.
- -dbmaxconns NUM Maximum size of DB connection pool.
- -port PORT Port to listen at (default: 61613).
- -login LOGIN Login expected in CONNECT.
- -passcode PASSCODE Passcode expected in CONNECT.
- -initdb Initialize the database (create required tables).
- -debug Write debug info to stderr.
- -help Display this list of options
- --help Display this list of options
-
-ocamlmq can create the required tables (currently, only one, named
-"ocamlmq_msgs") in the PostgreSQL table indicated with -dbdatabase (defaulting
-to one with the same name as the user) with the -initdb option, so you can try
-it with
-
- $ ./ocamlmq -dbdatabase mydatabase -initdb
-
-which will listen on port 61613 by default.
+Topic messages: >40000 msg/s.
+Queue messages in memory: >12000 msg/s
+Queue messages on disk: >2000 msg/s
STOMP protocol specifics
========================
View
114 binlog.ml
@@ -0,0 +1,114 @@
+open Lwt
+open Mq_types
+open Printf
+
+type t = { fd : Unix.file_descr; och : Lwt_io.output_channel }
+
+type record =
+ Add of message
+ | Del of string
+ | Nothing
+
+let truncate t =
+ Unix.ftruncate t.fd 0;
+ ignore (Unix.lseek t.fd 0 Unix.SEEK_SET)
+
+module LE = Lwt_io.LE
+
+let read_exactly ch n =
+ if n < 0 then fail End_of_file else
+ let s = String.create n in
+ Lwt_io.read_into_exactly ch s 0 n >>
+ return s
+
+let read_string ich =
+ lwt len = LE.read_int ich in
+ read_exactly ich len
+
+let read_record ich =
+ try_lwt
+ lwt kind = Lwt_io.read_char ich in
+ begin match kind with
+ 'A' ->
+ lwt id = read_string ich in
+ lwt dest = read_string ich in
+ lwt prio = LE.read_int ich in
+ lwt timestamp = LE.read_float64 ich in
+ lwt body = read_string ich in
+ lwt timeout = LE.read_float64 ich in
+ let r =
+ Add
+ {
+ msg_id = id; msg_destination = Queue dest;
+ msg_priority = prio; msg_timestamp = timestamp;
+ msg_body = body; msg_ack_timeout = timeout
+ }
+ in return r
+ | 'B' ->
+ lwt id = read_string ich in return (Del id)
+ | _ -> raise End_of_file
+ end
+ with End_of_file -> return Nothing
+
+let write_string och s =
+ LE.write_int och (String.length s) >>
+ Lwt_io.write och s
+
+let write_record och = function
+ Nothing -> return ()
+ | Del msg_id ->
+ Lwt_io.write_char och 'D' >> write_string och msg_id
+ | Add msg ->
+ Lwt_io.write_char och 'A' >>
+ write_string och msg.msg_id >>
+ write_string och (destination_name msg.msg_destination) >>
+ LE.write_int och msg.msg_priority >>
+ LE.write_float64 och msg.msg_timestamp >>
+ write_string och msg.msg_body >>
+ LE.write_float64 och msg.msg_ack_timeout
+
+let write_record ?(flush = true) och r =
+ Lwt_io.atomic (fun och -> write_record och r) och >>
+ if flush then Lwt_io.flush och else return ()
+
+let cancel t msg = write_record t.och (Del msg.msg_id)
+let add t msg = write_record t.och (Add msg)
+
+(* Creates [dst] even if [src] cannot be read *)
+let copy src dst =
+ Lwt_io.with_file
+ ~flags:[Unix.O_WRONLY; Unix.O_CREAT; Unix.O_TRUNC]
+ ~mode:Lwt_io.output dst (fun _ -> return ()) >>
+ Lwt_io.with_file ~mode:Lwt_io.input src
+ (fun ich ->
+ Lwt_io.with_file
+ ~flags:[Unix.O_SYNC; Unix.O_WRONLY; Unix.O_CREAT; Unix.O_TRUNC]
+ ~buffer_size:(1024 * 1024)
+ ~mode:Lwt_io.output dst
+ (fun och ->
+ let h = Hashtbl.create 13 in
+ let rec copy_loop () =
+ read_record ich >>= function
+ Add msg as x ->
+ Hashtbl.add h msg.msg_id msg;
+ write_record ~flush:false och x >> copy_loop ()
+ | Del msg_id as x ->
+ Hashtbl.remove h msg_id;
+ write_record ~flush:false och x >> copy_loop ()
+ | Nothing -> return ()
+ in copy_loop () >>
+ return (Hashtbl.fold (fun _ msg l -> msg :: l) h [])))
+
+let copy src dst =
+ try_lwt
+ copy src dst
+ with Unix.Unix_error (Unix.ENOENT, _, _) -> return []
+
+let make ?(sync = false) file =
+ let tmp = sprintf "%s.%d.%d" file (Unix.getpid ()) (Random.int 0x3FFFFFFF) in
+ lwt msgs = copy file tmp in
+ Sys.rename tmp file;
+ let sync = if sync then [ Unix.O_SYNC ] else [] in
+ let fd = Unix.openfile file ([ Unix.O_WRONLY; ] @ sync) 0o640 in
+ let och = Lwt_io.of_unix_fd ~mode:Lwt_io.output fd in
+ return ({ fd = fd; och = och; }, msgs)
View
8 binlog.mli
@@ -0,0 +1,8 @@
+
+type t
+
+val make : ?sync:bool -> string -> (t * Mq_types.message list) Lwt.t
+val truncate : t -> unit
+val add : t -> Mq_types.message -> unit Lwt.t
+val cancel : t -> Mq_types.message -> unit Lwt.t
+
View
51 build.sh
@@ -1,22 +1,37 @@
#!/bin/sh
set -e
+set -x
+
+cd ocaml-sqlexpr
+
+ocamlfind ocamlc -syntax camlp4o -package estring,camlp4.quotations -warn-error A -g -I . -c pa_sql.ml
+ocamlfind ocamlc -syntax camlp4o -syntax camlp4o -ppopt pa_sql.cmo -package csv,extlib,sqlite3,estring,lwt,lwt.syntax -warn-error A -g -I . -c sqlexpr_concurrency.ml
+ocamlfind ocamlopt -syntax camlp4o -syntax camlp4o -ppopt pa_sql.cmo -package csv,extlib,sqlite3,estring,lwt,lwt.syntax -warn-error A -S -inline 100 -I . -c sqlexpr_concurrency.ml
+ocamlfind ocamlc -syntax camlp4o -syntax camlp4o -ppopt pa_sql.cmo -package csv,extlib,sqlite3,estring,lwt,lwt.syntax -warn-error A -g -I . -c sqlexpr_sqlite.mli
+ocamlfind ocamlopt -syntax camlp4o -syntax camlp4o -ppopt pa_sql.cmo -package csv,extlib,sqlite3,estring,lwt,lwt.syntax -warn-error A -S -inline 100 -I . -c sqlexpr_sqlite.ml
+ocamlfind ocamlc -syntax camlp4o -syntax camlp4o -ppopt pa_sql.cmo -package csv,extlib,sqlite3,estring,lwt,lwt.syntax -warn-error A -g -I . -c sqlexpr_sqlite.ml
+ocamlfind ocamlopt -syntax camlp4o -syntax camlp4o -ppopt pa_sql.cmo -package csv,extlib,sqlite3,estring,lwt,lwt.syntax -warn-error A -S -inline 100 -a -o sqlexpr.cmxa sqlexpr_concurrency.cmx sqlexpr_sqlite.cmx
+ocamlfind ocamlc -syntax camlp4o -syntax camlp4o -ppopt pa_sql.cmo -package csv,extlib,sqlite3,estring,lwt,lwt.syntax -warn-error A -g -a -o sqlexpr.cma sqlexpr_concurrency.cmo sqlexpr_sqlite.cmo
+
+cd ..
+
+ocamlfind ocamlc -syntax camlp4o -package csv,lwt,lwt.unix,lwt.syntax,estring,extlib,unix,str,sqlite3,camlp4.macro -warn-error A -g -g -annot -I . -I ocaml-sqlexpr -c mq_types.ml
+ocamlfind ocamlopt -syntax camlp4o -package csv,lwt,lwt.unix,lwt.syntax,estring,extlib,unix,str,sqlite3,camlp4.macro -warn-error A -S -inline 100 -I . -I ocaml-sqlexpr -c mq_types.ml
+ocamlfind ocamlc -syntax camlp4o -package csv,lwt,lwt.unix,lwt.syntax,estring,extlib,unix,str,sqlite3,camlp4.macro -warn-error A -g -g -annot -I . -I ocaml-sqlexpr -c binlog.mli
+ocamlfind ocamlopt -syntax camlp4o -package csv,lwt,lwt.unix,lwt.syntax,estring,extlib,unix,str,sqlite3,camlp4.macro -warn-error A -S -inline 100 -I . -I ocaml-sqlexpr -c binlog.ml
+ocamlfind ocamlc -syntax camlp4o -package csv,lwt,lwt.unix,lwt.syntax,estring,extlib,unix,str,sqlite3,camlp4.macro -warn-error A -g -g -annot -I . -I ocaml-sqlexpr -c extSet.mli
+ocamlfind ocamlopt -syntax camlp4o -package csv,lwt,lwt.unix,lwt.syntax,estring,extlib,unix,str,sqlite3,camlp4.macro -warn-error A -S -inline 100 -I . -I ocaml-sqlexpr -c extSet.ml
+ocamlfind ocamlc -syntax camlp4o -package csv,lwt,lwt.unix,lwt.syntax,estring,extlib,unix,str,sqlite3,camlp4.macro -warn-error A -g -g -annot -I . -I ocaml-sqlexpr -c ternary.mli
+ocamlfind ocamlopt -syntax camlp4o -package csv,lwt,lwt.unix,lwt.syntax,estring,extlib,unix,str,sqlite3,camlp4.macro -warn-error A -S -inline 100 -I . -I ocaml-sqlexpr -c ternary.ml
+ocamlfind ocamlc -syntax camlp4o -package csv,lwt,lwt.unix,lwt.syntax,estring,extlib,unix,str,sqlite3,camlp4.macro -warn-error A -g -g -annot -I . -I ocaml-sqlexpr -c mq_stomp.ml
+ocamlfind ocamlopt -syntax camlp4o -package csv,lwt,lwt.unix,lwt.syntax,estring,extlib,unix,str,sqlite3,camlp4.macro -warn-error A -S -inline 100 -I . -I ocaml-sqlexpr -c mq_stomp.ml
+ocamlfind ocamlc -syntax camlp4o -package csv,lwt,lwt.unix,lwt.syntax,estring,extlib,unix,str,sqlite3,camlp4.macro -warn-error A -g -g -annot -I . -I ocaml-sqlexpr -c mq_server.ml
+ocamlfind ocamlopt -syntax camlp4o -package csv,lwt,lwt.unix,lwt.syntax,estring,extlib,unix,str,sqlite3,camlp4.macro -warn-error A -S -inline 100 -I . -I ocaml-sqlexpr -c mq_server.ml
+ocamlfind ocamlc -syntax camlp4o -ppopt ocaml-sqlexpr/pa_sql.cmo -package csv,lwt,lwt.unix,lwt.syntax,estring,extlib,unix,str,sqlite3,camlp4.macro -warn-error A -g -g -annot -I . -I ocaml-sqlexpr -c mq_sqlite_persistence.mli
+ocamlfind ocamlopt -syntax camlp4o -ppopt ocaml-sqlexpr/pa_sql.cmo -package csv,lwt,lwt.unix,lwt.syntax,estring,extlib,unix,str,sqlite3,camlp4.macro -warn-error A -S -inline 100 -I . -I ocaml-sqlexpr -c mq_sqlite_persistence.ml
+ocamlfind ocamlopt -syntax camlp4o -ppopt ocaml-sqlexpr/pa_sql.cmo -package csv,lwt,lwt.unix,lwt.syntax,estring,extlib,unix,str,sqlite3,camlp4.macro -warn-error A -S -inline 100 -I . -I ocaml-sqlexpr -c mq_sqlite_persistence.ml
+ocamlfind ocamlc -syntax camlp4o -package csv,lwt,lwt.unix,lwt.syntax,estring,extlib,unix,str,sqlite3,camlp4.macro -warn-error A -g -g -annot -I . -I ocaml-sqlexpr -c ocamlmq.ml
+ocamlfind ocamlopt -syntax camlp4o -package csv,lwt,lwt.unix,lwt.syntax,estring,extlib,unix,str,sqlite3,camlp4.macro -warn-error A -S -inline 100 -I . -I ocaml-sqlexpr -c ocamlmq.ml
+ocamlfind ocamlopt -syntax camlp4o -package csv,lwt,lwt.unix,lwt.syntax,estring,extlib,unix,str,sqlite3,camlp4.macro -warn-error A -S -inline 100 -I . -I ocaml-sqlexpr -o ocamlmq ocaml-sqlexpr/sqlexpr.cmxa mq_types.cmx binlog.cmx extSet.cmx mq_stomp.cmx ternary.cmx mq_server.cmx mq_sqlite_persistence.cmx ocamlmq.cmx -linkpkg
-ocamlfind ocamlc -package lwt,lwt.unix,lwt.syntax,extlib,unix,str,pgocaml,pgocaml.syntax,camlp4.macro -warn-error A -syntax camlp4o -g -g -annot -I . -c mq_types.ml
-ocamlfind ocamlopt -package lwt,lwt.unix,lwt.syntax,extlib,unix,str,pgocaml,pgocaml.syntax,camlp4.macro -warn-error A -syntax camlp4o -S -inline 100 -I . -c mq_types.ml
-ocamlfind ocamlc -package lwt,lwt.unix,lwt.syntax,extlib,unix,str,pgocaml,pgocaml.syntax,camlp4.macro -warn-error A -syntax camlp4o -g -g -annot -I . -c pGOCaml_lwt.ml
-ocamlfind ocamlopt -package lwt,lwt.unix,lwt.syntax,extlib,unix,str,pgocaml,pgocaml.syntax,camlp4.macro -warn-error A -syntax camlp4o -S -inline 100 -I . -c pGOCaml_lwt.ml
-ocamlfind ocamlc -package lwt,lwt.unix,lwt.syntax,extlib,unix,str,pgocaml,pgocaml.syntax,camlp4.macro -warn-error A -syntax camlp4o -g -g -annot -I . -c extSet.mli
-ocamlfind ocamlopt -package lwt,lwt.unix,lwt.syntax,extlib,unix,str,pgocaml,pgocaml.syntax,camlp4.macro -warn-error A -syntax camlp4o -S -inline 100 -I . -c extSet.ml
-ocamlfind ocamlc -package lwt,lwt.unix,lwt.syntax,extlib,unix,str,pgocaml,pgocaml.syntax,camlp4.macro -warn-error A -syntax camlp4o -g -g -annot -I . -c ternary.mli
-ocamlfind ocamlopt -package lwt,lwt.unix,lwt.syntax,extlib,unix,str,pgocaml,pgocaml.syntax,camlp4.macro -warn-error A -syntax camlp4o -S -inline 100 -I . -c ternary.ml
-ruby generate_prepared_statements.rb create_db schema.sql > mq_schema.ml
-ocamlfind ocamlc -package lwt,lwt.unix,lwt.syntax,extlib,unix,str,pgocaml,pgocaml.syntax,camlp4.macro -warn-error A -syntax camlp4o -g -g -annot -I . -c mq_stomp.ml
-ocamlfind ocamlopt -package lwt,lwt.unix,lwt.syntax,extlib,unix,str,pgocaml,pgocaml.syntax,camlp4.macro -warn-error A -syntax camlp4o -S -inline 100 -I . -c mq_stomp.ml
-ocamlfind ocamlc -package lwt,lwt.unix,lwt.syntax,extlib,unix,str,pgocaml,pgocaml.syntax,camlp4.macro -warn-error A -syntax camlp4o -g -g -annot -I . -c mq_server.ml
-ocamlfind ocamlopt -package lwt,lwt.unix,lwt.syntax,extlib,unix,str,pgocaml,pgocaml.syntax,camlp4.macro -warn-error A -syntax camlp4o -S -inline 100 -I . -c mq_server.ml
-ocamlfind ocamlc -package lwt,lwt.unix,lwt.syntax,extlib,unix,str,pgocaml,pgocaml.syntax,camlp4.macro -warn-error A -syntax camlp4o -g -g -annot -I . -c mq_pg_persistence.mli
-ocamlfind ocamlopt -package lwt,lwt.unix,lwt.syntax,extlib,unix,str,pgocaml,pgocaml.syntax,camlp4.macro -warn-error A -syntax camlp4o -S -inline 100 -I . -c mq_pg_persistence.ml
-ocamlfind ocamlc -package lwt,lwt.unix,lwt.syntax,extlib,unix,str,pgocaml,pgocaml.syntax,camlp4.macro -warn-error A -syntax camlp4o -g -g -annot -I . -c ocamlmq.ml
-ocamlfind ocamlopt -package lwt,lwt.unix,lwt.syntax,extlib,unix,str,pgocaml,pgocaml.syntax,camlp4.macro -warn-error A -syntax camlp4o -S -inline 100 -I . -c ocamlmq.ml
-ocamlfind ocamlopt -package lwt,lwt.unix,lwt.syntax,extlib,unix,str,pgocaml,pgocaml.syntax,camlp4.macro -warn-error A -syntax camlp4o -S -inline 100 -I . -o ocamlmq extSet.cmx mq_types.cmx mq_stomp.cmx pGOCaml_lwt.cmx mq_pg_persistence.cmx ternary.cmx mq_server.cmx ocamlmq.cmx -linkpkg
View
21 generate_prepared_statements.rb
@@ -1,21 +0,0 @@
-#!/usr/bin/env ruby
-
-funcname = ARGV.shift
-stmts = ARGF.read.split(/;/).map{|x| x.strip}.reject{|x| x.empty?}
-
-temp_stmts = stmts.map do |s|
- s = s.gsub(/create table/i, "create temporary table")
- %[PGSQL(dbh) "execute" "#{s.gsub(/"/, "\\\"")};"]
-end
-
-stmts = stmts.map{ |s| %[PGSQL(dbh) "#{s.gsub(/"/, "\\\"")};"] }
-
-puts <<EOF
-let #{funcname}_temp dbh =
-
-#{temp_stmts.join(" >> \n")}
-
-let #{funcname} dbh =
-
-#{stmts.join(" >> \n")}
-EOF
View
146 mq_pg_persistence.ml
@@ -1,146 +0,0 @@
-(* Copyright (c) 2010 Mauricio Fernández <mfp@acm.org> *)
-open Printf
-open Mq_types
-open Lwt
-
-module PGOCaml = PGOCaml_lwt
-INCLUDE "mq_schema.ml"
-
-type t = {
- dbconns : PGOCaml.pa_pg_data PGOCaml.t Lwt_pool.t;
- dbconns' : PGOCaml.pa_pg_data PGOCaml.t Lwt_pool.t; (* low priority *)
- debug : bool
-}
-
-let initialize t = Lwt_pool.use t.dbconns create_db
-
-let connect
- ?host ?port ?unix_domain_socket_dir ?database ?user ?password
- ?(debug = false) ?(max_conns = 2) () =
- let create_conn () = PGOCaml.connect ?host ?port ?database ?unix_domain_socket_dir
- ?user ?password () in
- let max_conns = max 2 max_conns in
- let pool = Lwt_pool.create (max_conns / 2) create_conn in
- let pool' = Lwt_pool.create (max_conns / 2) create_conn in
- (* try to connect so we raise the exception as early as possible if
- * something's wrong *)
- Lwt_pool.use pool (fun _ -> return ()) >>
- return { dbconns = pool; dbconns' = pool'; debug = debug }
-
-let msg_of_tuple (msg_id, dst, timestamp, priority, ack_timeout, body) =
- {
- msg_id = msg_id;
- msg_destination = Queue dst;
- msg_priority = Int32.to_int priority;
- msg_timestamp = CalendarLib.Calendar.to_unixfloat timestamp;
- msg_ack_timeout = ack_timeout;
- msg_body = body;
- }
-
-DEFINE WithDB(x) = Lwt_pool.use t.dbconns (fun dbh -> x)
-DEFINE WithDB_trans(x) =
- Lwt_pool.use t.dbconns
- (fun dbh ->
- PGOCaml.begin_work dbh >>
- try_lwt
- lwt y = x in
- PGOCaml.commit dbh >> return y
- with e -> PGOCaml.rollback dbh >> fail e)
-
-let do_save t dbh ?(ack_pending = false) msg =
- let body = msg.msg_body in
- let time = CalendarLib.Calendar.from_unixfloat msg.msg_timestamp in
- let msg_id = msg.msg_id in
- let priority = Int32.of_int msg.msg_priority in
- let queue = destination_name msg.msg_destination in
- let ack_timeout = msg.msg_ack_timeout in
- if t.debug then eprintf "Saving message %S.\n%!" msg_id;
- PGSQL(dbh)
- "INSERT INTO ocamlmq_msgs(msg_id, ack_pending, priority, destination,
- timestamp, ack_timeout, body)
- VALUES ($msg_id, $ack_pending, $priority, $queue, $time,
- $ack_timeout, $body)"
-
-let save_msg t ?(low_priority = false) msg = match msg.msg_destination with
- Topic _ | Control _ -> return ()
- | Queue queue ->
- if not low_priority then WithDB(do_save t dbh msg)
- else Lwt_pool.use t.dbconns' (fun dbh -> do_save t dbh msg)
-
-let get_ack_pending_msg t msg_id =
- WithDB begin
- PGSQL(dbh)
- "SELECT msg_id, destination, timestamp, priority, ack_timeout, body
- FROM ocamlmq_msgs as msg
- WHERE msg_id = $msg_id AND ack_pending = true"
- end
- >>= function
- | tuple :: _ -> return (Some (msg_of_tuple tuple))
- | [] -> return None
-
-let register_ack_pending_new_msg t msg =
- WithDB(do_save t dbh ~ack_pending:true msg)
-
-let register_ack_pending_msg t msg_id =
- try_lwt
- WithDB_trans begin
- PGSQL(dbh)
- "SELECT 1 FROM ocamlmq_msgs
- WHERE msg_id = $msg_id AND ack_pending = false
- FOR UPDATE" >>= function
- [] -> return false
- | _ ->
- PGSQL(dbh)
- "UPDATE ocamlmq_msgs SET ack_pending = true WHERE msg_id = $msg_id" >>
- return true
- end
- with _ -> return false
-
-let rec get_msg_for_delivery t queue =
- try_lwt
- WithDB_trans begin
- lwt tuples =
- PGSQL(dbh)
- "SELECT msg_id, destination, timestamp, priority, ack_timeout, body
- FROM ocamlmq_msgs as msg
- WHERE destination = $queue AND ack_pending = false
- ORDER BY priority, timestamp
- LIMIT 1
- FOR UPDATE"
- in match tuples with
- tuple :: _ ->
- let msg = msg_of_tuple tuple in
- let msg_id = msg.msg_id in
- PGSQL(dbh)
- "UPDATE ocamlmq_msgs SET ack_pending = true
- WHERE msg_id = $msg_id" >>
- return (Some msg)
- | [] -> return None
- end
- with _ -> return None (* FIXME: is this OK? *)
-
-let ack_msg t msg_id =
- WithDB(PGSQL(dbh) "DELETE FROM ocamlmq_msgs WHERE msg_id = $msg_id")
-
-let unack_msg t msg_id =
- WithDB(PGSQL(dbh) "UPDATE ocamlmq_msgs SET ack_pending = false WHERE msg_id = $msg_id")
-
-let count_queue_msgs t queue =
- WithDB(PGSQL(dbh) "SELECT COUNT(*) FROM ocamlmq_msgs WHERE destination = $queue")
- >>= function
- Some count :: _ -> return count
- | _ -> return 0L
-
-let crash_recovery t =
- WithDB_trans begin
- if t.debug then eprintf "Recovering from crash...\n%!";
- PGSQL(dbh) "SELECT COUNT(*) FROM ocamlmq_msgs WHERE ack_pending = true" >>= function
- Some n :: _ ->
- eprintf "Recovering %Ld ACK-pending messages: %!" n;
- lwt () = PGSQL(dbh) "UPDATE ocamlmq_msgs SET ack_pending = false
- WHERE ack_pending = true" in
- eprintf "DONE\n%!";
- return ()
- | _ -> eprintf "No ACK-pending messages found.\n%!";
- return ()
- end
View
7 mq_pg_persistence.mli
@@ -1,7 +0,0 @@
-
-include Mq_server.PERSISTENCE
-
-val connect :
- ?host:string -> ?port:int -> ?unix_domain_socket_dir:string ->
- ?database:string -> ?user:string -> ?password:string ->
- ?debug:bool -> ?max_conns:int -> unit -> t Lwt.t
View
10 mq_server.ml
@@ -9,7 +9,6 @@ sig
val initialize : t -> unit Lwt.t
val save_msg : t -> ?low_priority:bool -> Mq_types.message -> unit Lwt.t
- val register_ack_pending_new_msg : t -> Mq_types.message -> unit Lwt.t
(** Returns [false] if the msg was already in the ACK-pending set. *)
val register_ack_pending_msg : t -> string -> bool Lwt.t
val get_ack_pending_msg : t -> string -> Mq_types.message option Lwt.t
@@ -582,7 +581,10 @@ let server_loop ?(debug = false) broker =
eprintf "Got toplevel exception: %s\n%!" (Printexc.to_string e);
Printexc.print_backtrace stderr;
Lwt_unix.sleep 0.01) >>
- loop ()
- in
- P.crash_recovery broker.b_msg_store >> loop ()
+ loop () in
+ let t0 = Unix.gettimeofday () in
+ eprintf "Performing crash recovery... %!";
+ lwt () = P.crash_recovery broker.b_msg_store in
+ eprintf "DONE (%8.5fs)\n%!" (Unix.gettimeofday () -. t0);
+ loop ()
end (* Make functor *)
View
349 mq_sqlite_persistence.ml
@@ -0,0 +1,349 @@
+
+open Lwt
+open Printf
+open Mq_types
+
+module Sqlexpr = Sqlexpr_sqlite.Make(Sqlexpr_concurrency.Id)
+open Sqlexpr
+
+module MSET = Set.Make(struct
+ type t = int * message
+ let compare ((p1, m1) : t) (p2, m2) =
+ if p2 = p1 then
+ String.compare m1.msg_id m2.msg_id
+ else p2 - p1
+ end)
+module SSET = Set.Make(String)
+
+type t = {
+ db : Sqlexpr_sqlite.db;
+ in_mem : (string, MSET.t * SSET.t) Hashtbl.t;
+ in_mem_msgs : (string, message) Hashtbl.t;
+ mutable ack_pending : SSET.t;
+ mutable flush_alarm : unit Lwt.u;
+ max_msgs_in_mem : int;
+ mutable unacks : SSET.t;
+ binlog_file : string option;
+ mutable binlog : Binlog.t option;
+ sync_binlog : bool;
+}
+
+let count_unmaterialized_pending_acks db =
+ select_one db sqlc"SELECT @L{COUNT(*)} FROM pending_acks"
+
+let count_acked_messages db =
+ select_one db sqlc"SELECT @L{COUNT(*)} FROM acked_msgs"
+
+let pr fmt = ksprintf (eprintf "%s%!") fmt
+let puts fmt = ksprintf prerr_endline fmt
+
+let timed verbose gen_msg f =
+ let t0 = Unix.gettimeofday () in
+ if verbose then pr "%s" (gen_msg ());
+ let y = f () in
+ if verbose then puts " (%8.5fs)" (Unix.gettimeofday () -. t0);
+ y
+
+let flush_acked_msgs ?(verbose = false) db =
+ timed verbose (fun () -> sprintf "Flushing %Ld ACKs" (count_acked_messages db))
+ (fun () ->
+ execute db
+ sqlc"DELETE FROM ocamlmq_msgs WHERE msg_id IN (SELECT * FROM acked_msgs)";
+ execute db sqlc"DELETE FROM acked_msgs")
+
+let materialize_pending_acks ?(verbose = false) db =
+ timed verbose
+ (fun () -> sprintf "Materializing %Ld pending ACKs in DB"
+ (count_unmaterialized_pending_acks db))
+ (fun () ->
+ execute db sqlc"UPDATE ocamlmq_msgs SET ack_pending = 1
+ WHERE msg_id IN (SELECT msg_id FROM pending_acks)";
+ execute db sqlc"DELETE FROM pending_acks")
+
+let rec flush t =
+ let t0 = Unix.gettimeofday () in
+ let flushed = ref false in
+ transaction t.db begin fun db ->
+ let in_mem_msgs = Hashtbl.length t.in_mem_msgs
+ and ack_pending = SSET.cardinal t.ack_pending
+ and unmaterialized_ack_pendings = count_unmaterialized_pending_acks db
+ and acked_msgs = count_acked_messages db in
+ if in_mem_msgs <> 0 || ack_pending <> 0 ||
+ unmaterialized_ack_pendings <> 0L || acked_msgs <> 0L
+ then begin
+ flushed := true;
+ pr "Flushing to disk: %d msgs, %d + %Ld pending ACKS, %Ld ACKS"
+ in_mem_msgs ack_pending unmaterialized_ack_pendings acked_msgs;
+ do_flush t db;
+ end
+ end;
+ Option.may Binlog.truncate t.binlog;
+ if !flushed then puts " (%8.5fs)" (Unix.gettimeofday () -. t0)
+
+and do_flush t db =
+ Hashtbl.iter
+ (fun _ msg ->
+ execute db
+ sqlc"INSERT INTO ocamlmq_msgs
+ (ack_pending, msg_id, priority, destination, timestamp,
+ ack_timeout, body)
+ VALUES(0, %s, %d, %s, %f, %f, %S)"
+ msg.msg_id msg.msg_priority (destination_name msg.msg_destination)
+ msg.msg_timestamp msg.msg_ack_timeout msg.msg_body)
+ t.in_mem_msgs;
+ SSET.iter
+ (execute db sqlc"INSERT INTO pending_acks(msg_id) VALUES(%s)")
+ t.ack_pending;
+ flush_acked_msgs db;
+ materialize_pending_acks db;
+ Hashtbl.clear t.in_mem;
+ Hashtbl.clear t.in_mem_msgs;
+ t.ack_pending <- SSET.empty
+
+let msg_materialized_as_ack_pending db msg_id =
+ select_one db
+ sqlc"SELECT @b{EXISTS (SELECT 1 FROM ocamlmq_msgs
+ WHERE msg_id = %s AND ack_pending)}"
+ msg_id
+
+let unack db msg_id =
+ execute db sqlc"DELETE FROM pending_acks WHERE msg_id = %s" msg_id;
+ if msg_materialized_as_ack_pending db msg_id then
+ execute db
+ sqlc"UPDATE ocamlmq_msgs SET ack_pending = 0 WHERE msg_id = %s"
+ msg_id
+
+let make ?(max_msgs_in_mem = max_int) ?(flush_period = 1.0)
+ ?binlog ?(sync_binlog = false) file =
+ let wait_flush, awaken_flush = Lwt.wait () in
+ let t =
+ { db = Sqlexpr_sqlite.open_db file; in_mem = Hashtbl.create 13;
+ in_mem_msgs = Hashtbl.create 13; ack_pending = SSET.empty;
+ flush_alarm = awaken_flush;
+ max_msgs_in_mem = max_msgs_in_mem;
+ unacks = SSET.empty;
+ binlog_file = binlog; binlog = None;
+ sync_binlog = sync_binlog;
+ } in
+ let flush_period = max flush_period 0.005 in
+ let rec loop_flush wait_flush =
+ Lwt.pick [Lwt_unix.sleep flush_period; wait_flush] >>
+ begin
+ let wait, awaken = Lwt.wait () in
+ flush t;
+ t.flush_alarm <- awaken;
+ loop_flush wait
+ end in
+ let rec loop_flush_unacks () =
+ lwt () = Lwt_unix.sleep 0.1 in
+ if not (SSET.is_empty t.unacks) then
+ transaction t.db
+ (fun db ->
+ puts "UnACKing %d messages in DB" (SSET.cardinal t.unacks);
+ SSET.iter (unack db) t.unacks);
+ t.unacks <- SSET.empty;
+ loop_flush_unacks ()
+ in
+ ignore
+ (try_lwt loop_flush wait_flush
+ with e -> puts "EXCEPTION IN FLUSHER: %s" (Printexc.to_string e);
+ return ());
+ ignore
+ (try_lwt loop_flush_unacks ()
+ with e -> puts "EXCEPTION IN UNACK FLUSHER: %s" (Printexc.to_string e);
+ return ());
+ t
+
+let initialize t =
+ execute t.db sqlinit"ATTACH \":memory:\" AS mem";
+ execute t.db
+ sqlinit"CREATE TABLE IF NOT EXISTS ocamlmq_msgs(
+ ack_pending BOOL NOT NULL,
+ msg_id VARCHAR(255) NOT NULL PRIMARY KEY,
+ priority INT NOT NULL,
+ destination VARCHAR(255) NOT NULL,
+ timestamp DOUBLE NOT NULL,
+ ack_timeout DOUBLE NOT NULL,
+ body BLOB NOT NULL
+ )";
+ execute t.db
+ sqlinit"CREATE INDEX IF NOT EXISTS
+ ocamlmq_msgs_destination_priority_timestamp
+ ON ocamlmq_msgs(destination, ack_pending, priority, timestamp)";
+ execute t.db
+ sqlinit"CREATE INDEX IF NOT EXISTS
+ ocamlmq_msgs_ack_pending
+ ON ocamlmq_msgs(ack_pending)";
+ execute t.db
+ sqlinit"CREATE TABLE mem.pending_acks(msg_id VARCHAR(255) NOT NULL PRIMARY KEY)";
+ execute t.db
+ sqlinit"CREATE TABLE mem.acked_msgs(msg_id VARCHAR(255) NOT NULL PRIMARY KEY)";
+ return ()
+
+let do_save_msg ?(can_flush = true) t sent msg =
+ let dest = destination_name msg.msg_destination in
+ let v = (msg.msg_priority, msg) in
+ begin
+ try
+ let unsent, want_ack = Hashtbl.find t.in_mem dest in
+ let p =
+ if sent then (unsent, SSET.add msg.msg_id want_ack)
+ else (MSET.add v unsent, want_ack)
+ in Hashtbl.replace t.in_mem dest p
+ with Not_found ->
+ let p =
+ if sent then (MSET.empty, SSET.singleton msg.msg_id)
+ else (MSET.singleton v, SSET.empty)
+ in Hashtbl.add t.in_mem dest p
+ end;
+ Hashtbl.add t.in_mem_msgs msg.msg_id msg;
+ if can_flush && Hashtbl.length t.in_mem_msgs > t.max_msgs_in_mem then
+ Lwt.wakeup t.flush_alarm ();
+ return ()
+
+let save_msg t ?low_priority msg =
+ Option.map_default (fun log -> Binlog.add log msg) (return ()) t.binlog >>
+ do_save_msg t false msg
+
+let register_ack_pending_msg t msg_id =
+ if Hashtbl.mem t.in_mem_msgs msg_id then
+ let r = SSET.mem msg_id t.ack_pending in
+ if not r then begin
+ let msg = Hashtbl.find t.in_mem_msgs msg_id in
+ let dest = destination_name msg.msg_destination in
+ let unsent, want_ack = Hashtbl.find t.in_mem dest in
+ let v = (msg.msg_priority, msg) in
+ t.ack_pending <- SSET.add msg_id t.ack_pending;
+ Hashtbl.replace t.in_mem dest (MSET.remove v unsent, SSET.add msg_id want_ack)
+ end;
+ return (not r)
+ else
+ match
+ select_one_maybe t.db
+ sqlc"SELECT @b{1} FROM pending_acks WHERE msg_id = %s" msg_id
+ with
+ Some _ -> return false
+ | None ->
+ if msg_materialized_as_ack_pending t.db msg_id then
+ return false
+ else begin
+ execute t.db sqlc"INSERT INTO pending_acks(msg_id) VALUES(%s)" msg_id;
+ if count_unmaterialized_pending_acks t.db > 100L then
+ transaction t.db (materialize_pending_acks ~verbose:true);
+ return true
+ end
+
+let msg_of_tuple (msg_id, dst, timestamp, priority, ack_timeout, body) =
+ {
+ msg_id = msg_id;
+ msg_destination = Queue dst;
+ msg_priority = priority;
+ msg_timestamp = timestamp;
+ msg_ack_timeout = ack_timeout;
+ msg_body = body;
+ }
+
+let get_ack_pending_msg t msg_id =
+ return begin try
+ let msg = Hashtbl.find t.in_mem_msgs msg_id in
+ if SSET.mem msg_id t.ack_pending then Some msg else None
+ with Not_found ->
+ select_one_f_maybe t.db msg_of_tuple
+ sqlc"SELECT @s{msg_id}, @s{destination}, @f{timestamp},
+ @d{priority}, @f{ack_timeout}, @S{body}
+ FROM ocamlmq_msgs AS msg
+ WHERE msg_id = %s
+ AND (msg.ack_pending OR
+ EXISTS (SELECT 1 FROM pending_acks WHERE msg_id = msg.msg_id))
+ AND NOT EXISTS (SELECT 1 FROM acked_msgs WHERE msg_id = msg.msg_id)"
+ msg_id
+ end
+
+let ack_msg t msg_id =
+ if SSET.mem msg_id t.ack_pending then begin
+ let msg = Hashtbl.find t.in_mem_msgs msg_id in
+ Hashtbl.remove t.in_mem_msgs msg_id;
+ t.ack_pending <- SSET.remove msg_id t.ack_pending;
+ let dst = destination_name msg.msg_destination in
+ let unsent, want_ack = Hashtbl.find t.in_mem dst in
+ Hashtbl.replace t.in_mem dst (unsent, SSET.remove msg_id want_ack);
+ Option.map_default (fun log -> Binlog.cancel log msg) (return ()) t.binlog
+ end else begin
+ execute t.db sqlc"INSERT INTO acked_msgs(msg_id) VALUES(%s)" msg_id;
+ t.ack_pending <- SSET.remove msg_id t.ack_pending;
+ if count_acked_messages t.db > 100L then
+ transaction t.db (flush_acked_msgs ~verbose:true);
+ return ()
+ end
+
+let unack_msg t msg_id =
+ if SSET.mem msg_id t.ack_pending then begin
+ t.ack_pending <- SSET.remove msg_id t.ack_pending;
+ let msg = Hashtbl.find t.in_mem_msgs msg_id in
+ let dst = destination_name msg.msg_destination in
+ let msg = Hashtbl.find t.in_mem_msgs msg_id in
+ let unsent, want_ack = Hashtbl.find t.in_mem dst in
+ let v = (msg.msg_priority, msg) in
+ Hashtbl.replace t.in_mem dst (MSET.add v unsent, SSET.remove msg_id want_ack)
+ end else begin
+ t.unacks <- SSET.add msg_id t.unacks;
+ end;
+ return ()
+
+exception Msg of message
+
+let get_msg_for_delivery t dest =
+ try
+ let unsent, want_ack = Hashtbl.find t.in_mem dest in
+ let ((prio, msg) as v) = MSET.min_elt unsent in
+ t.ack_pending <- SSET.add msg.msg_id t.ack_pending;
+ Hashtbl.replace t.in_mem dest
+ (MSET.remove v unsent, SSET.add msg.msg_id want_ack);
+ return (Some msg)
+ with Not_found ->
+ match
+ select_one_f_maybe t.db msg_of_tuple
+ sqlc"SELECT @s{msg_id}, @s{destination}, @f{timestamp},
+ @d{priority}, @f{ack_timeout}, @S{body}
+ FROM ocamlmq_msgs as msg
+ WHERE destination = %s
+ AND msg.ack_pending = 0
+ AND NOT EXISTS (SELECT 1 FROM pending_acks WHERE msg_id = msg.msg_id)
+ AND NOT EXISTS (SELECT 1 FROM acked_msgs WHERE msg_id = msg.msg_id)
+ ORDER BY priority, timestamp"
+ dest
+ with
+ None -> return None
+ | Some msg ->
+ execute t.db sqlc"INSERT INTO pending_acks VALUES(%s)" msg.msg_id;
+ if count_unmaterialized_pending_acks t.db > 100L then
+ transaction t.db (materialize_pending_acks ~verbose:true);
+ return (Some msg)
+
+let count_queue_msgs t dst =
+ let in_mem =
+ try
+ let unsent, want_ack = Hashtbl.find t.in_mem dst in
+ MSET.cardinal unsent + SSET.cardinal want_ack
+ with Not_found -> 0 in
+ let in_db =
+ select_one t.db
+ sqlc"SELECT @L{COUNT(*)} FROM ocamlmq_msgs as msg
+ WHERE destination=%s
+ AND NOT EXISTS (SELECT 1 FROM acked_msgs WHERE msg_id = msg.msg_id)"
+ dst
+ in
+ return (Int64.add (Int64.of_int in_mem) in_db)
+
+let crash_recovery t =
+ execute t.db sqlc"UPDATE ocamlmq_msgs SET ack_pending = 0 WHERE ack_pending = 1";
+ begin match t.binlog_file with
+ None -> return ()
+ | Some f ->
+ lwt binlog, msgs = Binlog.make ~sync:t.sync_binlog f in
+ t.binlog <- Some binlog;
+ eprintf "(binlog: %d msgs) %!" (List.length msgs);
+ Lwt_list.iter_s (do_save_msg ~can_flush:false t false) msgs
+ end
+
+let init_db, check_db, auto_check_db = sql_check"sqlite"
View
9 mq_sqlite_persistence.mli
@@ -0,0 +1,9 @@
+
+include Mq_server.PERSISTENCE
+
+val make : ?max_msgs_in_mem:int -> ?flush_period:float ->
+ ?binlog:string -> ?sync_binlog:bool -> string -> t
+
+(* Used for testing *)
+val auto_check_db : Format.formatter -> bool
+
1  ocaml-sqlexpr
@@ -0,0 +1 @@
+Subproject commit 3948bec5a1008a3bacfd8e403e07ec6d24e651af
View
74 ocamlmq.ml
@@ -5,73 +5,63 @@ open Lwt
let set_some_string r = Arg.String (fun s -> r := Some s)
let set_some_int r = Arg.Int (fun n -> r := Some n)
-let db_host = ref None
-let db_port = ref None
-let db_database = ref None
-let db_user = ref None
-let db_password = ref None
-let db_unix_sock_dir = ref None
-let db_max_conns = ref 10
let port = ref 61613
let debug = ref false
-let initdb = ref false
let login = ref None
let passcode = ref None
+let db = ref None
+let max_in_mem = ref 100000
+let flush_period = ref 1.
+let binlog = ref ""
+let sync_binlog = ref false
let params =
Arg.align
[
- "-dbhost", set_some_string db_host, "HOST Database server host.";
- "-dbport", set_some_int db_port, "HOST Database server port.";
- "-dbdatabase", set_some_string db_database, "DATABASE Database name.";
- "-dbsockdir", set_some_string db_password, "DIR Database UNIX domain socket dir.";
- "-dbuser", set_some_string db_user, "USER Database user.";
- "-dbpassword", set_some_string db_password, "PASSWORD Database password.";
- "-dbmaxconns", Arg.Set_int db_max_conns, "NUM Maximum size of DB connection pool.";
- "-port", Arg.Set_int port, "PORT Port to listen at (default: 61613).";
- "-login", set_some_string login, "LOGIN Login expected in CONNECT.";
- "-passcode", set_some_string passcode, "PASSCODE Passcode expected in CONNECT.";
- "-initdb", Arg.Set initdb, " Initialize the database (create required tables).";
- "-debug", Arg.Set debug, " Write debug info to stderr.";
+ "-port", Arg.Set_int port, "PORT Port to listen at (default: 61613)";
+ "-login", set_some_string login, "LOGIN Login expected in CONNECT";
+ "-passcode", set_some_string passcode, "PASSCODE Passcode expected in CONNECT";
+ "-maxmsgs", Arg.Set_int max_in_mem,
+ "N Keep at most N msgs in mem before hard flush (default: 100000)";
+ "-flush-period", Arg.Set_float flush_period,
+ "DT Hard flush period in seconds (default: 1.0)";
+ "-binlog", Arg.Set_string binlog,
+ "FILE Use FILE as the binlog for msgs in mem (default: none)";
+ "-sync-binlog", Arg.Set sync_binlog,
+ " fsync the binlog on each write (default: no)";
+ "-debug", Arg.Set debug, " Write debug info to stderr";
]
-let usage_message = "Usage: ocamlmq [options]"
+let usage_message = "Usage: ocamlmq [options] [sqlite3 database (default: ocamlmq.db)]"
let _ = Sys.set_signal Sys.sigpipe Sys.Signal_ignore
let _ = Sys.set_signal Sys.sigint (Sys.Signal_handle (fun _ -> exit 0))
-module SERVER = Mq_server.Make(Mq_pg_persistence)
+module SERVER = Mq_server.Make(Mq_sqlite_persistence)
let () =
Arg.parse
params
- (fun s -> eprintf "Unknown argument: %S\n%!" s;
+ (function
+ s when !db = None && s <> "" && s.[0] <> '-' -> db := Some s
+ | s -> eprintf "Unknown argument: %S\n%!" s;
Arg.usage params usage_message;
exit 1)
usage_message;
let addr = Unix.ADDR_INET (Unix.inet_addr_any, !port) in
Lwt_unix.run begin
- lwt msg_store =
- try_lwt
- Mq_pg_persistence.connect
- ?host:!db_host
- ?port:!db_port
- ?database:!db_database
- ?unix_domain_socket_dir:!db_unix_sock_dir
- ?user:!db_user
- ?password:!db_password
- ~debug:!debug
- ~max_conns:!db_max_conns
- ()
- with _ -> (* counldn't connect to DB *)
- eprintf "Could not connect to DB, use the -db* options.\n%!";
- exit 1
+ let msg_store =
+ Mq_sqlite_persistence.make
+ ~max_msgs_in_mem:!max_in_mem
+ ~flush_period:!flush_period
+ ?binlog:(match !binlog with "" -> None | s -> Some s)
+ ~sync_binlog:!sync_binlog
+ (Option.default "ocamlmq.db" !db)
in
if !debug then eprintf "Connected to database.\n%!";
- (if !initdb then begin
- eprintf "Initializing database.\n%!";
- Mq_pg_persistence.initialize msg_store
- end else return ()) >>
+ eprintf "Initializing database... %!";
+ Mq_sqlite_persistence.initialize msg_store >>
+ let () = eprintf "DONE\n%!" in
lwt broker = SERVER.make_broker
?login:!login ?passcode:!passcode msg_store addr
in SERVER.server_loop ~debug:!debug broker
View
23 pGOCaml_lwt.ml
@@ -1,23 +0,0 @@
-(* Copyright (c) 2010 Mauricio Fernández <mfp@acm.org> *)
-include PGOCaml_generic.Make(struct include Lwt include Lwt_chan end)
-
-let buf = Buffer.create 64000
-
-let string_of_bytea b =
- Buffer.clear buf;
- let len = String.length b in
- for i = 0 to len - 1 do
- let c = String.unsafe_get b i in
- let cc = Char.code c in
- if cc < 0x20 || cc > 0x7e then begin
- Buffer.add_char buf '\\';
- Buffer.add_char buf (Char.unsafe_chr (48 + ((cc lsr 6) land 0x7))); (* '0' + higher bits *)
- Buffer.add_char buf (Char.unsafe_chr (48 + ((cc lsr 3) land 0x7)));
- Buffer.add_char buf (Char.unsafe_chr (48 + (cc land 0x7)));
- (* Buffer.add_string buf (sprintf "\\%03o" cc) [> non-print -> \ooo <] *)
- end else if c = '\\' then
- Buffer.add_string buf "\\\\" (* \ -> \\ *)
- else
- Buffer.add_char buf c
- done;
- Buffer.contents buf
View
18 schema.sql
@@ -1,18 +0,0 @@
-
-CREATE TABLE ocamlmq_msgs(
- msg_id VARCHAR(255) NOT NULL PRIMARY KEY,
- ack_pending BOOL NOT NULL DEFAULT false,
- priority INT NOT NULL,
- destination VARCHAR(255) NOT NULL,
- timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL,
- ack_timeout FLOAT NOT NULL,
- body BYTEA NOT NULL
-);
-
-CREATE INDEX ocamlmq_msgs_destination_priority_timestamp ON ocamlmq_msgs
- USING BTREE(destination, priority, timestamp);
-
-CREATE INDEX ocamlmq_msgs_ack_pending ON ocamlmq_msgs
- USING BTREE(ack_pending);
-
-
View
22 test.ml
@@ -0,0 +1,22 @@
+open OUnit
+
+let test_sql f () =
+ let b = Buffer.create 13 in
+ let fmt = Format.formatter_of_buffer b in
+ let contents b = Format.fprintf fmt "@?"; Buffer.contents b in
+ if not (f fmt) then
+ assert_failure (contents b)
+
+let db_tests =
+ [
+ "Mq_sqlite_persistence", Mq_sqlite_persistence.auto_check_db;
+ ]
+
+let all_tests =
+ List.map (fun (n, f) -> n >:: test_sql f) db_tests @
+ [
+ ]
+
+
+let _ =
+ run_test_tt_main ("All" >::: all_tests)
Please sign in to comment.
Something went wrong with that request. Please try again.