Skip to content

midas/rabbit-wq

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

90 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

RabbitWQ

A work queue built on RabbitMQ and Celluloid.

Installation

Add this line to your application's Gemfile:

gem 'rabbit-wq'

And then execute:

$ bundle

Or install it yourself as:

$ gem install rabbit-wq

Usage

Queue Subscriber

Starting the Work Queue Subscriber in Interactive Mode

$ rabbit-wq start --interactive

Starting the Work Queue Subscriber as a Daemon

$ rabbit-wq start

Stopping the Work Queue Subscriber

$ rabbit-wq stop

Restarting the Work Queue Subscriber

$ rabbit-wq restart

Checking the Status of the Work Queue Subscriber

$ rabbit-wq status

Queueing Work

Implementing as a PORO

class SomeWorker < Struct.new( :some_variable )
  def call
    # do some work
  end
end

worker = SomeWorker.new( 1 )
RabbitWQ::Work.enqueue( worker )

Work in the Future

RabbitWQ::Work.enqueue( worker, delay: 30000 )

Queueing Work with a Retry

RabbitWQ::Work.enqueue( worker, retry: 1 )

Queueing Work with a Retry and a Retry Delay

RabbitWQ::Work.enqueue( worker, retry: 1, retry_delay: 30000 )

Skipping Retry by Forcing a Final Error

You can use the RabbitWQ::FinalError to skip retry functionality in the case of an error that will never recover given a time delay.

class SomeWorker < Struct.new( :some_variable )
  def call
    final_error( "Some message" )
  end
end

The #final_error method will raise a RabbitWQ::FinalError which triggers the normal final error functionality (callbacks, logging, error queue queueing). If you owuld like to treat the final error as something other than an error level occurence, you can pass the level to #final_error.

class SomeWorker < Struct.new( :some_variable )
  def call
    final_error( "Some message", :info )
  end
end

When a level other than :error is provided, an log entry to the work log is created at the level provided, ie. :info. However, an error is not logged and an error is not queued on the error queue. The error callbacks are still executed. If you want to avoid functionality in an existing callback, you can test the #level of the error.

Queueing Work with a Retry and Auto-Scaling Retry Dealy

RabbitWQ::Work.enqueue( worker, retry: 1, retry_delay: 'auto-scale' )

Auto-scale will set up retries at the following intervals: 1 min, 5 mins, 15 mins, 30 mins, 1 hr, 6 hrs, 12 hrs, 24 hrs. and 48 hrs.

Implementing with the Worker Module

class SomeWorker < Struct.new( :some_variable )
  include RabbitWQ::Worker

  def call
    # do some work
  end
end

worker = SomeWorker.new( 1 )
worker.work # same as RabbitWQ::Work.enqueue( worker )

Success Callback

on_success

Called on success when defined on a worker.

Error Handling

Once a worker has raised an exception and no retry attempts are remaining, the worker is placed on the error queue with the exception type, message and backtraces.

Error Callbacks

There are several error callbacks that will be called if defined on a worker. Each error callback will receive a single parameter, the error.

on_error

Called anytime an error is raised, including if a retry will be attempted.

on_final_error

Called when an error is raised and either no retries were requested or are remaining.

on_retryable_error

Called when an error is raised and a retry will be attempted.

Disabling a Worker

A worker can be disabled by overriding the #enabled? method.

class SomeWorker
  include RabbitWQ::Worker

  def enabled?
    # some logic to determine enabled
  end
end

By default, when a worker is disabled a log entry is created and the worker does no work and leaves hte work queue system. In order to have the worker sent to the error queue if disabled, simply override the #error_on_disabled? method:

class SomeWorker
  include RabbitWQ::Worker

  def error_on_disabled?
    true
  end
end

Logging

RabbitWQ provides a work logger that is available within all workers. You must send a reference to self so that the #object_id may be put into the log message.

RabbitWQ.work_logger.info( self, 'Some message' )

When the RabbitWQ::Worker module is mixed into a worker you can use the logging convenience methods. You do not have to provide a refrence to self in this case.

class SomeWorker < Struct.new( :some_variable )
  include RabbitWQ::Worker

  def call
    info( 'Some message' )
    # do some work
  end
end

The RabbitWQ loggers provide the following log levels: debug, info, warn, error and fatal.

Work Publish vs. Subscribe Queue

Quite often the work publish and subscribe queues are the same queue. However, certain use cases require a seprarate work publish and subscribe queue. For instance, when you use te RabbitMQ shovel plugin to effectively create a distributed queue, you may want to publish to a local queue that is shoveled to a central work queue, where the subscriber resides and performs the actual work.

Configuration File

The RabbitWQ configuration file uses JSON syntax. The default location for the configuration file is /etc/rabbit-wq/rabbit-wq.conf

Here is an example configuration file with each option's default value:

{
  "delayed_exchange_prefix": "work-delay",
  "delayed_queue_prefix": "work-delay",
  "environment_file_path": nil,
  "env": "production",
  "error_queue": "work-error",
  "threads": 1,
  "time_zone": "UTC",
  "work_exchange": "work",
  "work_exchange_type": "fanout",
  "work_log_level": "info",
  "work_log_path": "/var/log/rabbit-wq/rabbit-wq-work.log",
  "work_publish_queue": "work"
  "work_subscribe_queue": "work"
  "ignored_workers": {
    "to_error_queue": [
      "Some::Worker"
    ],
    "trash": [
      "Some::OtherWorker"
    ]
  }
}

Options

#####delayed_exchange_prefix The prefix for the delayed exchanges. Defaults to work-delay.

#####delayed_queue_prefix The prefix for the delayed queues. Defaults to work-delay.

#####environment_file_path The path to the environment file (loads all or some of application environment). No default.

#####env The environment to run in. Defaults to production.

#####error_queue The name of the error queue. Defaults to error.

#####ignored_workers/to_error_queue List of workers to ignore. Will add directly to error_queue without obeying retires, etc.

#####ignored_workers/trash List of workers to ignore. Trashes the worker without error or complaining.

#####threads The size of the thread pool. Can be overridden with the command line option --threads or -t. Defaults to 1.

#####time_zone The time zone to use with ActiveRecord.

#####work_exchange The name of the work exchange. Defaults to work.

#####work_exchange_type The RabbitMQ exchange type of the work exchange. Defaults to fanout. For more information see RabbitMQ Docs.

#####work_log_level The log level of the worker logger. Defaults to info.

#####work_log_path The path the worker logger will log to. Defaults to /var/log/rabbit-wq/rabbit-wq-work.log.

#####work_publish_queue The name of the work queue to publish to. Defaults to work.

#####work_subscribe_queue The name of the work queue to subscribe to. Defaults to work.

Command Line Interface

Commands

#####start Starts the subscriber.

#####stop Stops the subscriber.

#####restart Restarts the subscriber.

#####status Reports the status of the subscriber, started or stopped.

Options

#####config (--config or -c) The path for the configuration file. Defaults to /etc/rabbit-wq/rabbit-wq.conf.

#####log_level (--log_level) The log level for the work subscriber's logger. This does not have an effect on the worker logger. Defaults to info.

#####log (--log or -l) The path for the work subsciber's log file. Defaults to /var/log/rabbit-wq/rabbit-wq.log.

#####pid (--pid) The path to the PID file. Defaults to /var/run/rabbit-wq/rabbit-wq.pid.

#####interactive (--interactive or -i) When used the work subscriber is executed in interactive (attached) mode as opposed to as a daemon.

About

A work queue built on RabbitMQ and Celluloid.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages