Join GitHub today
Fetching latest commit…
Cannot retrieve the latest commit at this time.
|Failed to load latest commit information.|
|ocaml-sqlexpr @ a29d72f|
ocamlmq is a STOMP message broker with features that make it especially suitable for implementing task queues and communication between subsystems: * persistent queues, scaling over arbitrarily large numbers of queues with constant memory usage (i.e. supports millions of queues) * queued messages need not fit in memory * strong durability guarantees: messages can be guaranteed to have been saved to disk (and fsync'ed) by the time the sender gets a message receipt * message priorities * per-subscription prefetch limit for queue messages * error handling and ACK timeout: if a subscriber doesn't ACK a message after a (message-specific) timeout, the message will be sent to another subscriber. Messages are also resent automatically if a subscriber dies and its connection expires. * topic subscriptions (messages broadcast to all subscribers) with prefix matching * support for high numbers of subscriptions: millions of subscriptions pose no problem * simple extensions within the STOMP protocol to report the number of messages in a queue and the number of subscribers to a queue or topic ocamlmq is written in OCaml, in ~1500 lines of code. It is easy to extend and fairly efficient. The server is abstracted over a storage backend; there are currently two backends: * PostgreSQL's (150 LoC) * sqlite with in-mem caching (350 LoC) Scalability =========== ocamlmq has been designed to support millions of queues and topic subscriptions without undue memory usage. This table summarizes the time complexity of some STOMP operations: SEND to queue O(log subscribers) SEND to topic O(subscribers + log (total subs)) SUBSCRIBE to queue O(log (total subs)) SUBSCRIBE to topic O(log subscribers) ACK O(1) ocamlmq needs typically around 150 bytes per subscription, so 1 million subscriptions will not take much more than 150 MB of memory. No extra memory is needed for queues, so you can use lots of them with no concerns for memory usage. Limitations =========== ocamlmq works well in the intended use case (persistent queues and transient topic destinations, with possibly many queues and subscriptions), but it has some limitations which preclude its use in other domains: * ocamlmq is not designed to scale beyond several hundred / a few thousand simultaneous connections (it will work, but performance will be affected) * there is no flow control for topic messages (in the intended use case, topic messages are assumed to be relatively small and processed fast) * messages are limited to 16 MB on 32-bit platforms * ocamlmq does not support very high message rates (ocamlmq delivers only ~60K messages/second on a 3GHz AMD64 box) If you need complex routing rules, scalability to many thousand simultaneous connections or other _enterprise_ messaging features, you'll be better served by AMPQ or JMS brokers. ActiveMQ, in particular, is very configurable, so it'll most likely fit the bill if memory consumption and scaling to many subscriptions are not a concern. Building ======== You'll need a working OCaml environment plus the following libraries: * Lwt * extlib * ocaml-sqlite3 (and sqlite3 >= 3.6.8, for SAVEPOINT support) * csv * estring Additionally, ocamlmq requires PostgreSQL both at compile- and run-time. If you have omake, Just do $ omake otherwise, run $ sh build.sh Installing ========== The ocamlmq executable is self-contained and can be copied to any directory in your PATH if desired. Running ======= ocamlmq's configuration is given via the command line: Usage: ocamlmq [options] [sqlite3 database (default: ocamlmq.db)] -port PORT Port to listen at (default: 61613) -login LOGIN Login expected in CONNECT -passcode PASSCODE Passcode expected in CONNECT -max-prefetch N Maximum allowed prefetch limit (default: 100) -maxmsgs N Keep at most N msgs in mem before hard flush (default: 100000) -flush-period DT Hard flush period in seconds (default: 1.0) -binlog FILE Use FILE as the binlog for msgs in mem (default: none) -sync-binlog fsync the binlog on each write (default: no) -debug Write debug info to stderr -help Display this list of options --help Display this list of options ocamlmq stores the messages in memory (and optionally in the binlog) and flushes to the permanent store (with full fsync) when either: * more than [maxmsgs] messages have been received since the last flush or * it's been [flush-period] seconds since the last flush With the -binlog option, the messages kept in mem will also be written to a binary log before they are flushed (and fsync'ed) to the permanent store. This way, in-mem (not yet flushed) messages can be recovered if ocamlmq were killed or crashed. You can moreover ensure that the binlog is fsync'ed after each write with -sync-binlog. This brings you the strongest durability guarantees, for every single message is fsync'ed before the broker acknowledges its reception. Performance =========== Here follow some figures, taken on an oldish dual core AMD Athlon 64 X2 with a 7200 RPM SATA disk. Queueing (synchronously, with receipt) maxmsgs throughput throughput (-binlog) ---------------------------------------------------- 1000 >11000 msg/s >9000 msg/s 5000 >12000 msg/s >9000 msg/s "infty" >17000 msg/s >14000 msg/s If the binlog is enabled and -sync-binlog in use, the maximum throughput is around 4000 msg/s. Delivering: Topic messages: >40000 msg/s. Queue messages in memory: >12000 msg/s Queue messages on disk: >2000 msg/s STOMP protocol specifics ======================== ocamlmq uses the STOMP protocol as specified in http://stomp.codehaus.org/Protocol and uses a trailing newline (after the NULL byte) to delimit frames. SEND ---- The ACK timeout period, after which queue messages are sent to another subscriber, can be specified in the "ack-timeout" header (as a float in seconds), e.g. SEND destination:/queue/test ack-timeout:3.14 just testing ^@ SUBSCRIBE --------- Clients can subscribe to topics (/topic/xxx) which have broadcast, non-durable semantics, or to queues (/queue/xxx), which are persistent. It is also possible to subscribe to all the topics matching a prefix, by using /topic/someprefix* as the destination. The prefetch limit (max. number of unacknowledged messages allowed by the server) can be specified in the "prefetch" header. BEGIN / COMMIT -------------- They are not implemented, since they are ill-specified. Control messages ================ A client can send messages to special "control" destinations and obtain the response in the message receipt (iow., nothing is returned unless the "receipt" header is set): /control/count-msgs/queue/name-of-the-queue returns the number of messages in the "num-messages" header /control/count-subscribers/queue/name-of-the-queue returns the number of suscribers in the "num-subscribers" header /control/count-subscribers/topic/name-of-the-topic returns the number of suscribers in the "num-subscribers" header