Skip to content
Pablo Cantero edited this page Dec 30, 2015 · 44 revisions

shoryuken_options(...)

queue

The queue option associates a queue with a worker.

class MyWorker
  include Shoryuken::Worker
  
  shoryuken_options queue: 'my_queue'
  
  def perform(sqs_msg, body); end
end

You can also pass a block to define the queue:

class MyWorker
  include Shoryuken::Worker
  
  shoryuken_options queue: ->{ "#{ENV['APPLICATION_ENV']}_my_queue" }

  # shoryuken_options queue: ->{ "#{Socket.gethostname}_my_queue" }
    
  def perform(sqs_msg, body); end
end

Or an array to associate multiple queues to a single worker:

class MyWorker
  include Shoryuken::Worker
  
  shoryuken_options queue: %w[queue1 queue2 queue3]

  # shoryuken_options queue: ->{ load_queue_names # a method that returns an array }
    
  def perform(sqs_msg, body); end
end

auto_delete

When auto_delete is true, Shoryuken deletes the messages after their consumption, but only in case the worker doesn't raise any exception during the consumption. Default false.

As auto_delete is false by default, remember to set it to true or call sqs_msg.delete before ending the perform method, otherwise the messages will get back to the queue, becoming available to be consumed again.

Note: auto_delete is true by default when using Active Job.

batch

When batch is true, Shoryuken sends the messages in batches to the worker instead of individually. Default false.

One of the advantages of batch is when you use APIs that accept batch, for example Keen IO.

The BatchableKeenIOWorker example below is more efficient than the KeenIOWorker. This better efficiency isn't a general rule, it depends on the use case.

class KeenIOWorker
  include Shoryuken::Worker

  shoryuken_options queue: 'keen_io', auto_delete: true, body_parser: :json

  def perform(sqs_msg, event)    
    Keen.publish 'stats', event
  end
end

class BatchableKeenIOWorker
  include Shoryuken::Worker

  shoryuken_options queue: 'keen_io', auto_delete: true, batch: true, body_parser: :json

  def perform(sqs_msgs, events)
    Keen.publish_batch 'stats' => events
  end
end

Be careful with batchable workers if you are using non default middleware, because the sqs_msg and body will be an array.

body_parser

The body_parser allows to parse the body before calling the perform method. It accepts symbols: :json or :text or a block returning the body or a class that responds to .parse. Default :text.

class MyWorker
  include Shoryuken::Worker

  shoryuken_options queue: 'default', body_parser: :json
  # shoryuken_options queue: 'default', body_parser: ->(sqs_msg){ REXML::Document.new(sqs_msg.body) }
  # shoryuken_options queue: 'default', body_parser: JSON


  def perform(sqs_msg, hash)
    puts hash['name']

    sqs_msg.delete
  end
end

auto_visibility_timeout

Although I strongly recommend to set the visibility timeout to a super pessimistic value, sometimes we can't control it. So, for these cases, you can enable the auto_visibility_timeout by setting its value to true. Default false

When it's enabled, 5 seconds before the default visibility timeout expires, Shoryuken will reset it to its original value again.

Be generous while configuring the default visibility_timeout for a queue. If your worker in the worst case takes 2 minutes to consume a message, set the visibility_timeout to at least 4 minutes. It doesn't hurt and it will be better than having the same message being consumed more than the expected. http://www.pablocantero.com/blog/2014/11/29/sqs-to-the-rescue/

retry_intervals

If a job raises an exception while consuming a message, by default the message will be available to be consumed again after its expiration (visibility) timeout.

But if you want to increase or decrease the next time a failing message will be available to be consumed again, you can use retry_intervals to implement an exponential backoff.

class MyWorker
  include Shoryuken::Worker

  shoryuken_options queue: 'default', retry_intervals: [360, 1200, 3600] # 5.minutes, 20.minutes and 1.hour

  def perform(sqs_msg, body); end
end

Keep in mind that AWS SQS does not officially support exponential backoff, it's something implemented in Shoryuken using the Visibility Timeout, which can extended to a maximum of 12 hours.

https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ChangeMessageVisibility.html

You can continue to call ChangeMessageVisibility to extend the visibility timeout to a maximum of 12 hours. If you try to extend beyond 12 hours, the request will be rejected.