Job processing...for the future!
Clone or download
obrie Merge pull request #48 from Tapjoy/fix/filesystem_publisher_locks
Fix filesystem publisher potentially leaving files locked until the Ruby process exits
Latest commit f3d978c Sep 14, 2017

README.md

Chore: Job processing... for the future!

Build Status

About

Chore is a pluggable, multi-backend job processor. It was built from the ground up to be extremely flexible. We hope that you will find integrating and using Chore to be as pleasant as we do.

The full docs for Chore can always be found at http://tapjoy.github.io/chore.

Configuration

Chore can be integrated with any Ruby-based project by following these instructions:

gem 'chore-core', '~> 1.6.0'

If you also plan on using SQS, you must also bring in dalli to use for memcached:

gem 'dalli'

Create a Chorefile file in the root of your project directory. While you can configure Chore itself from this file, it's primarly used to direct the Chore binary toward the root of your application, so that it can locate all of the depdendencies and required code.

--require=./<FILE_TO_LOAD>

Make sure that --require points to the main entry point for your app. If integrating with a Rails app, just point it to the directory of your application and it will handle loading the correct files on its own.

Other options include:

--concurrency 16 # number of concurrent worker processes, if using forked worker strategy
--worker-strategy Chore::Strategy::ForkedWorkerStrategy # which worker strategy class to use
--consumer Chore::Queues::SQS::Consumer # which consumer class to use Options are SQS::Consumer and Filesystem::Consumer. Filesystem is recommended for local and testing purposes only.
--consumer-strategy Chore::Queues::Strategies::Consumer::ThreadedConsumerStrategy # which consuming strategy to use. Options are SingleConsumerStrategy and ThreadedConsumerStrategy. Threaded is recommended for better tuning your consuming profile
--consumer-sleep-interval 1.0 # The amount of time in seconds to sleep when a consumer doesn't receive any messages. Sub-second values are accepted. The default varies by consumer implementation. This is a weak form of backoff for when there is no work to do.
--threads-per-queue 4 # number of threads per queue for consuming from a given queue.
--dedupe-servers # if using SQS or similiar queue with at-least once delivery and your memcache is running on something other than localhost
--batch-size 50 # how many messages are batched together before handing them to a worker
--batch-timeout 20 # maximum number of seconds to wait until handing a message over to a worker
--queue_prefix prefixy # A prefix to prepend to queue names, mainly for development and qa testing purposes
--max-attempts 100 # The maximum number of times a job can be attempted
--dupe-on-cache-failure # Determines the deduping behavior when a cache connection error occurs. When set to `false`, the message is assumed not to be a duplicate. Defaults to `false`.
--queue-polling-size 10 # If your particular queueing system supports responding with messages in batches of a certain size, you can control that with this flag. SQS has a built in upper-limit of 10, but other systems will vary.

If you're using SQS, you'll want to add AWS keys so that Chore can authenticate with AWS.

--aws-access-key=<AWS KEY>
--aws-secret-key=<AWS SECRET>

By default, Chore will run over all queues it detects among the required files. If you wish to change this behavior, you can use:

--queues QUEUE1,QUEUE2... # a list of queues to process
--except-queues QUEUE1,QUEUE2... # a list of queues _not_ to process

Note that you can use one or the other but not both. Chore will quit and make fun of you if both options are specified.

Tips for configuring Chore

For Rails, it can be necessary to add the directory you store your jobs in to the eager loading path, found in application.rb. You likely need a similar approach for most apps using jobs, unless you places them into a directory that is already eager loaded by default. One example of this might be:

config.eager_load_paths += File.join(config.root, "app", "jobs")

However, due to the way eager_load_paths works in Rails, this may only solve the issue in your production environment. You might also find it useful for other environments to throw soemthing like this in an config/initializers/chore.rb file, although you can choose to load the job files in any way you like:

Dir["#{Rails.root}/app/jobs/**/*"].each do |file|
  require file unless File.directory?(file)
end unless Rails.env.production?

Producing and Consuming Jobs

When it comes to configuring Chore, you have 2 main use cases - as a producer of messages, or as a consumer of messages (the consumer is also able to produce messages if need be, but is running as it's own isolated instance of your application).

For producers, you must do all of your Chore configuration in an intializer.

For consumers, you need to either use a Chorefile or Chorefile + an initializer.

Because you are likely to use the same app as the basis for both producing and consuming messages, you'll already have a considerable amount of configuration in your Producer - it makes sense to use Chorefile to simply provide the require option, and stick to the initializer for the rest of the configuration to keep things DRY.

However, like many aspects of Chore, it is ultimately up to the developer to decide which use case fits their needs best. Chore is happy to let you configure it in almost any way you want.

An example of how to configure chore via an initializer:

Chore.configure do |c|
  c.concurrency = 16
  c.worker_strategy = Chore::Strategy::ForkedWorkerStrategy
  c.max_attempts = 100
  ...
  c.batch_size = 50
  c.batch_timeout = 20
end

Integration

Add an appropriate line to your Procfile:

jobs: bundle exec chore -c config/chore.config

If your queues do not exist, you must create them before you run the application:

require 'aws-sdk'
sqs = AWS::SQS.new
sqs.queues.create("test_queue")

Finally, start foreman as usual

bundle exec foreman start

Chore::Job

A Chore::Job is any class that includes Chore::Job and implements perform(*args) Here is an example job class:

class TestJob
  include Chore::Job
  queue_options :name => 'test_queue'

  def perform(args={})
    Chore.logger.debug "My first async job"
  end

end

This job declares that the name of the queue it uses is test_queue, set in the queue_options method.

Chore::Job and perform signatures

The perform method signature can have explicit argument names, but in practice this makes changing the signature more difficult later on. Once a Job is in production and is being used at a constant rate, it becomes problematic to begin mixing versions of jobs which have non-matching signatures.

While this is able to be overcome with a number of techniques, such as versioning your jobs/queues, it increases the complexity of making changes.

The simplest way to structure job signatures is to treat the arguments as a hash. This will allow you to maintain forwards and backwards compatibility between signature changes with the same job class.

However, Chore is ultimately agnostic to your particular needs in this regard, and will let you use explicit arguments in your signatures as easily as you can use a simple hash - the choice is left to you, the developer.

Chore::Job and publishing Jobs

Now that you've got a test job, if you wanted to publish to that job it's as simple as:

TestJob.perform_async({"message"=>"YES, DO THAT THING."})

It's advisable to specify the Publisher chore uses to send messages globally, so that you can change it easily for local and test environments. To do this, you can add a configuration block to an initializer like so:

Chore.configure do |c|
  c.publisher = Some::Other::Publisher
end

It is worth noting that any option that can be set via config file or command-line args can also be set in a configure block.

If a global publisher is set, it can be overridden on a per-job basis by specifying the publisher in queue_options.

Retry Backoff Strategy

Chore has basic support for delaying retries of a failed job using a step function. Currently the only queue that supports this functionality is SQS, all others will simply ignore the delay setting.

Setup

The :backoff option for a queue expects a lambda that takes a single UnitOfWork argument. The return should be a number of seconds to delay the next attempt.

queue_options :name => 'nameOfQueue',
  :backoff => lambda { |work| work.current_attempt ** 2 } # Exponential backoff

Using the Backoff

If there is a :backoff option supplied, any failures will delay the next attempt by the result of that lambda.

Notes on SQS and Delays

Read more details about SQS and Delays [here](docs/Delayed Jobs.md)

Hooks

A number of hooks, both global and per-job, exist in Chore for your convenience.

Global Hooks:

  • before_start
  • before_first_fork
  • before_fork
  • after_fork
  • around_fork
  • within_fork
  • before_shutdown

("within_fork" behaves similarly to around_fork, except that it is called after the worker process has been forked. In contrast, around_fork is called by the parent process.)

Filesystem Consumer/Publisher

  • on_fetch(job_file, job_json)

SQS Consumer

  • on_fetch(handle, body)

Per Job:

  • before_publish
  • after_publish
  • before_perform(message)
  • after_perform(message)
  • on_rejected(message)
  • on_failure(message, error)
  • on_permanent_failure(queue_name, message, error)

All per-job hooks can also be global hooks.

Hooks can be added to a job class as so:

class TestJob
  include Chore::Job
  queue_options :name => 'test_queue'

  def perform(args={})
    # Do something cool
  end

  def before_perform_log(message)
    Chore.logger.debug "About to do something cool with: #{message.inspect}"
  end
end

Global hooks can also be registered like so:

Chore.add_hook :after_publish do
  # your special handler here
end

Signals

Signal handling can get complicated when you have multiple threads, process forks, and both signal handlers and application code making use of mutexes.

To simplify the complexities around this, Chore introduces some additional behaviors on top of Ruby's default Signal.trap implementation. This functionality is primarily inspired by sidekiq's signal handling @ https://github.com/mperham/sidekiq/blob/master/lib/sidekiq/cli.rb.

In particular, Chore handles signals in a separate thread and does so sequentially instead of interrupt-driven. See Chore::Signal for more details on the differences between Ruby's Signal.trap and Chore's Chore::Signal.trap.

Chore will respond to the following Signals:

  • INT , TERM, QUIT - Chore will begin shutting down, taking steps to safely terminate workers and not interrupt jobs in progress unless it believes they may be hung
  • USR1 - Re-opens logfiles, useful for handling log rotations

Timeouts

When using the forked worker strategy for processing jobs, inevitably there are cases in which child processes become stuck. This could result from deadlocks, hung network calls, tight loops, etc. When these jobs hang, they consume resources and can affect throughput.

To mitigate this, Chore has built-in monitoring of forked child processes. When a fork is created to process a batch of work, that fork is assigned an expiration time -- if it doesn't complete by that time, the process is sent a KILL signal.

Fork expiration times are determined from one of two places:

  1. The timeout associated with the queue. For SQS, this is the visibility timeout.
  2. The default queue timeout configured for Chore. For Filesystem queues, this is the value used.

For example, if a worker is processing a batch of 5 jobs and each job's queue has a timeout of 60s, then the expiration time will be 5 minutes for the worker.

To change the default queue timeout (when one can't be inferred), you can do the following:

Chore.configure do |c|
  c.default_queue_timeout = 3600
end

A reasonable timeout would be based on the maximum amount of time you expect any job in your system to run. Keep in mind that the process running the job may get killed if the job is running for too long.

Plugins

Chore has several plugin gems available, which extend it's core functionality

New Relic - Integrating Chore with New Relic

Airbrake - Integrating Chore with Airbrake

Managing Chore processes

Sample Upstart

There are lots of ways to create upstart scripts, so it's difficult to give a prescriptive example of the "right" way to do it. However, here are some ideas from how we run it in production at Tapjoy:

You should have a specific user that the process runs under, for security reasons. Swap to this user at the beginning of your exec line

su - $USER --command '...'

For the command to run Chore itself keeping all of the necessary environment variables in an env file that your upstart can source on it's exec line, to prevent having to mix changing environment variables with having to change the upstart script itself

source $PATHTOENVVARS ;

After that, you'll want to make sure you're running Chore under the right ruby version. Additionally, we like to redirect STDOUT and STDERR to logger, with an app name. This makes it easy to find information in syslog later on. Putting that all together looks like:

rvm use $RUBYVERSION do  bundle exec chore -c Chorefile  2>&1 | logger -t $APPNAME

There are many other ways you could manage the Upstart file, but these are a few of the ways we prefer to do it. Putting it all together, it looks something like:

exec su - special_user --command 'source /the/path/to/env ; rvm use ruby-1.9.3-p484 do bundle exec chore -c Chorefile 2>&1 | logger chore-app'

Locating Processes

As Chore does not keep a PIDfile, and has both a master and a potential number of workers, you may find it difficult to isolate the exact PID for the master process.

To find Chore master processes via ps, you can run the following:

ps aux | grep bin/chore

or

pgrep -f bin/chore

To find a list of only Chore worker processes:

ps aux | grep chore-

or

pgrep -f chore-

Copyright

Copyright (c) 2013 - 2015 Tapjoy. See LICENSE.txt for further details.