Permalink
Find file
Fetching contributors…
Cannot retrieve contributors at this time
413 lines (287 sloc) 16.3 KB

Rack Rabbit (v0.5.0)

A forking server for hosting rabbitMQ consumer processes as load balanced rack applications.

$ rack-rabbit --queue myqueue --workers 4 app/config.ru

Contents

Summary

What Unicorn does for HTTP services, RackRabbit can do for hosting AMQP services, and more:

HTTP AMQP
Make a synchronous request/response Unicorn rabbitMQ + RackRabbit
Asynchronous worker queue Redis + Resque rabbitMQ + RackRabbit
Asynchronous publish/subscribe Redis rabbitMQ + RackRabbit

RackRabbit hosts a cluster of worker processes that:

  • Subscribe to a queue/exchange
  • Convert incoming messages into a suitable Rack environment
  • Call your Rack app to handle the message
  • Publish a reply back to the original caller (if reply_to was provided)

RackRabbit supports a variety of messaging patterns:

  • Synchronous Request/Response (e.g. GET/POST/PUT/DELETE)
  • Asynchronous Worker queue (e.g. ENQUEUE)
  • Asynchronous PubSub (e.g. PUBLISH)

Installation

Install a rabbitMQ server if necessary (docs):

$ sudo apt-get install rabbitmq-server

Update your Gemfile to include RackRabbit and your preferred rabbitMQ client library

gem bunny,       "~> 1.4"             # or an alternative such as AMQP or march-hare
gem rack-rabbit, "~> 0.1"

Getting started by example

You can use RackRabbit to host an AMQP service as a rack application in the same way that you might use Unicorn to host an HTTP service.

Imagine a simple rack application in config.ru:

class Service
  def self.call(env)
    request = Rack::Request.new(env)
    method  = request.request_method
    path    = request.path_info
    body    = request.body.read
    message = "#{method} #{path} #{body}"
    [ 200, {}, [ message ]]
  end
end
run Service

You can host and load balance this service using rack-rabbit:

$ rack-rabbit --queue myqueue --workers 4  config.ru

Ensure the worker processes are running:

$ ps xawf | grep rack-rabbit
15714 pts/4    Sl+    0:00  |   \_ ruby rack-rabbit --queue myqueue --workers 4  config.ru
15716 pts/4    Sl+    0:00  |       \_ rack-rabbit -- waiting for request
15718 pts/4    Sl+    0:00  |       \_ rack-rabbit -- waiting for request
15721 pts/4    Sl+    0:00  |       \_ rack-rabbit -- waiting for request
15723 pts/4    Sl+    0:00  |       \_ rack-rabbit -- waiting for request

Connect to the worker from the command line using the rr command:

$ rr -q myqueue /hello                    # synchronous GET request/response
GET /hello

$ rr -q myqueue POST /submit "data"       # synchronous POST request/response
POST /submit data

$ rr -q myqueue PUT /update "data"        # synchronous PUT request/response
PUT /update data

$ rr -q myqueue DELETE /resource          # synchronous DELETE request/response
DELETE /resource

Connect to the worker from your applications using the RR class.

require 'rack-rabbit/client'

RR.get    :myqueue, "/hello"             # returns "GET /hello"
RR.post   :myqueue, "/submit", "data"    # returns "POST /submit data"
RR.put    :myqueue, "/update", "data"    # returns "PUT /update data"
RR.delete :myqueue, "/resource",         # returns "DELETE /resource"

See EXAMPLES.md for more detailed examples, including ENQUEUE and PUBLISH communication patterns, and using Sinatra.

Server usage

Use the rack-rabbit executable to host your Rack app in a forking server that subscribes to either a named queue or an exchange.

$ rack-rabbit --help

A load balanced rack server for hosting rabbitMQ consumer processes.

Usage:   rack-rabbit [options] rack-file

Examples:

  rack-rabbit -h broker -q my.queue                          # subscribe to a named queue
  rack-rabbit -h broker -e my.exchange -t fanout             # subscribe to a fanout exchange
  rack-rabbit -h broker -e my.exchange -t topic -r my.topic  # subscribe to a topic exchange with a routing key
  rack-rabbit -c rack-rabbit.conf                            # subscribe with advanced options provided by a config file

RackRabbit options:
    -c, --config CONFIG              provide options using a rack-rabbit configuration file
    -q, --queue QUEUE                subscribe to a queue for incoming requests
    -e, --exchange EXCHANGE          subscribe to an exchange for incoming requests
    -t, --type TYPE                  subscribe to an exchange for incoming requests - type (e.g. :direct, :fanout, :topic)
    -r, --route ROUTE                subscribe to an exchange for incoming requests - routing key
    -a, --app_id ID                  an app_id for this application server
        --host HOST                  the rabbitMQ broker IP address (default: 127.0.0.1)
        --port PORT                  the rabbitMQ broker port (default: 5672)

Process options:
    -w, --workers COUNT              the number of worker processes (default: 1)
    -d, --daemonize                  run daemonized in the background (default: false)
    -p, --pid PIDFILE                the pid filename (default when daemonized: /var/run/<app_id>.pid)
    -l, --log LOGFILE                the log filename (default when daemonized: /var/log/<app_id>.log)
        --log-level LEVEL            the log level for rack rabbit output (default: info)
        --preload                    preload the rack app before forking worker processes (default: false)

Ruby options:
    -I, --include PATH               an additional $LOAD_PATH (may be used more than once)
        --debug                      set $DEBUG to true
        --warn                       enable warnings

Common options:
    -h, --help
    -v, --version

Server configuration

Detailed configuration can be provided by an external config file using the --config option

# set the Rack application to be used to handle messages (default 'config.ru'):
rack_file 'app/config.ru'

# set the rabbitMQ connection:
rabbit :host    => '10.0.0.42',  # default '127.0.0.1'
       :port    => '1234'        # default '5672'
       :adapter => :amqp         # default :bunny

# subscribe to a queue:
queue 'my.queue'

# ... or, subscribe to an exchange:
exchange      'my.exchange'
exchange_type :topic
routing_key   'my.topic'

# set the app_id used to identify your application in response messages
app_id 'my-application'

# enable rabbitMQ acknowledgements (default: false):
ack true

# set the initial number of worker processes (default: 1):
workers 8

# set the minimum number of worker processes (default: 1):
min_workers 1

# set the maximum number of worker processes (default: 100):
max_workers 16

# preload the Rack app in the server for faster worker forking (default: false):
preload_app true

# daemonize the process (default: false)
daemonize true

# set the path to the logfile
logfile "/var/log/my-application.log"

# set the path to the pidfile
pidfile "/var/run/my-application.pid"

# set the log level for the Rack Rabbit logger (default: info)
log_level 'debug'

# set the Logger to used by the Rack Rabbit server and the worker Rack applications (default: Logger)
logger MyLogger.new

Signals

Signals should be sent to the master process

  • HUP - reload the RackRabbit config file and gracefully restart all workers
  • QUIT - graceful shutdown, waits for workers to complete handling of their current message before finishing
  • TERM - quick shutdown kills all workers immediately
  • INT - quick shutdown kills all workers immediately
  • TTIN - increase the number of worker processes by one
  • TTOU - decrease the number of worker processes by one

Forking worker processes

If you are using the preload_app directive, your app will be loaded into the master server process before any workers have forked. Therefore, you may need to re-initialize resources after each worker process forks, e.g if using ActiveRecord:

before_fork do |server|
  # no need for connection in the server process
  defined?(ActiveRecord::Base) and ActiveRecord::Base.connection.disconnect!
end

after_fork do |server, worker|
  # reestablish connection in each worker process
  defined?(ActiveRecord::Base) and ActiveRecord::Base.establish_connection
end

This should NOT be needed when the preload_app directive is false.

this is an issue with any preforking style server (e.g. Unicorn)

RabbitMQ acknowledgements

By default, an AMQP broker removes a message from the queue immediately after sending it to the consumer. If the consumer dies before processing the message completely then the message is lost. Users who need more control can configure the broker to use explicit acknowledgements (learn more) by setting the RackRabbit ack configuration option to true.

With explicit acknowledgements enabled...

  • If your rack handler succeeds (returns a 2xx status code) then RackRabbit will automatically send an acknowledgement to rabbitMQ.

  • If your rack handler fails (throws an exception or returns a non-2xx status code) then RackRabbit will automatically send a rejection to rabbitMQ - you might want to setup a dead-letter queue for these rejections.

  • If your rack handler process crashes then rabbitMQ will hand the message off to the next available worker process.

If your service action is idempotent then nothing more is needed.

However, if you need more fine-grained controls, then RackRabbit exposes the underlying message to your application in the rack environment as env['rabbit.message']. You can use this object to explicitly acknowledge or reject the message at any time during your rack handler, e.g:

  post "/work" do

      message = request.env['rabbit.message']

      # ... do some preliminary work (idempotent)

      if everything_looks_good

        message.ack     # take responsibility
        ...             # and do some more work (that might not be idempotent)

      else

        message.reject  # reject the message
        ...             # and (maybe) do some more work

      end

  end

Client binary

Communicating with a RackRabbit hosted service from the command line can be done using the rr binary:

Make a request to a RackRabbit service.

Usage: rr <command> [options] [METHOD] [PATH] [BODY]

list of commands:

 request      make  a synchronous  request to a rabbitMQ queue and wait for a reply
 enqueue      make an asynchronous request to a rabbitMQ queue and continue
 publish      make an asynchronous request to a rabbitMQ exchange with a routing key
 help         show help for a given topic or a help overview
 version      show version

Examples:

 rr request -q queue              GET    /hello          # submit GET to queue and WAIT for reply
 rr request -q queue              POST   /submit 'data'  # submit POST to queue and WAIT for reply
 rr enqueue -q queue              POST   /submit 'data'  # submit POST to queue and CONTINUE
 rr enqueue -q queue              DELETE /resource       # submit DELETE to queue and CONTINUE
 rr publish -e ex -t fanout       POST   /event          # submit POST to a fanout exchange and CONTINUE
 rr publish -e ex -t topic -r foo POST   /submit 'data'  # submit POST to a topic exchange with routing key and CONTINUE

RackRabbit options:
        --host HOST                  the rabbitMQ broker IP address (default: 127.0.0.1)
        --port PORT                  the rabbitMQ broker port (default: 5672)
    -q, --queue QUEUE                a queue for publishing outgoing requests
    -e, --exchange EXCHANGE          publish to a non-default exchange - name
    -t, --type TYPE                  publish to a non-default exchange - type (e.g. :direct, :fanout, :topic)
    -r, --route ROUTE                a routing key when publishing to a non-default exchange

Ruby options:
    -I, --include PATH               specify an additional $LOAD_PATH (may be used more than once)
        --debug                      set $DEBUG to true
        --warn                       enable warnings

Common options:
    -h, --help
    -v, --version

Client library

Communicating with a RackRabbit hosted service from your application can be done using the RR class:

RR.get(    "myqueue",    "/path/to/resource")
RR.post(   "myqueue",    "/path/to/resource", "content")
RR.put(    "myqueue",    "/path/to/resource", "content")
RR.delete( "myqueue",    "/path/to/resource")
RR.enqueue("myqueue",    "/path/to/resource", "content")
RR.publish("myexchange", "/path/to/resource", "content")

These methods are wrappers around a more detailed RackRabbit::Client class:

client = RackRabbit::Client.new(:host => "127.0.0.1", :port => 5672, :adapter => :bunny)

client.get(   "myqueue",     "/path/to/resource")
client.post(  "myqueue",     "/path/to/resource", "content")
client.put(   "myqueue",     "/path/to/resource", "content")
client.delete("myqueue",     "/path/to/resource")
client.enqueue("myqueue",    "/path/to/resource", "content")
client.publish("myexchange", "/path/to/resource", "content")

client.disconnect

More advanced options can be passed as an (optional) last parameter, e.g:

client.post("myqueue", "/path", content.to_json, {
  :headers          => { "additional" => "header" },   # made available in the service's rack env
  :priority         => 5,                              # specify the rabbitMQ message priority
  :content_type     => "application/json",             # specify the request content_type
  :content_encoding => "utf-8"                         # specify the request content_enoding
})

client.publish("myexchange", "/path", "content", {
  :exchange_type    => :topic,                         # specify the rabbitMQ exchange type
  :routing_key      => "my.custom.topic",              # specify a custom rabbitMQ routing key
})

Supported platforms

  • MRI 2.1.2
  • MRI 1.9.3

TODO

  • FEATURE - allow multiple Client#reqeust in parallel (block until all have replied) - a-la-typheous
  • FEATURE - share a single reply queue across all Client#request
  • FEATURE - automatically deserialize body for known content type (e.g. json)
  • FEATURE - have exception stack trace sent back to client in development/test mode
  • FEATURE - support JRuby
  • FEATURE - support Rubinius
  • BUG - avoid infinte worker spawn loop if worker fails during startup (e.g. connection to rabbit fails)
  • TEST - integration tests for worker, server, adapter/bunny, and adapter/amqp

License

See LICENSE file.

Credits

Thanks to Jesse Storimer for his book Working with Unix Processes

Thanks to the Unicorn Team for providing a great example of a preforking server.

Thanks to the Bunny Team for providing an easy rabbitMQ Ruby client.

Contact

If you have any ideas, feedback, requests or bug reports, you can reach me at jake@codeincomplete.com, or via my website: Code inComplete.