Skip to content

Really Complex Workflows with Batches

Mike Perham edited this page Jun 13, 2017 · 12 revisions

Sidekiq Pro's Batches feature can handle job workflows of any complexity. This page shows how to implement a complex workflow given to me by one Sidekiq Pro customer.

The workflow looks like this, where jobs are blue circles and purple boxes hold jobs which can execute in parallel. All jobs within a purple box must succeed before the workflow can move "down".

complex workflow

Let's call this workflow the Order workflow. Perhaps it represents the series of steps necessary to ship a customer's order.

First of all, we're going to create an overall Batch to represent the entire workflow and then create a child batch to represent the first step in the workflow. That child batch will use a success callback to schedule step 2 in the workflow:

order = ...
overall = Sidekiq::Batch.new
overall.on(:success, 'FulfillmentCallbacks#shipped', 'oid' => order.id)
overall.description = "Fulfillment for Order #{order.id}"
overall.jobs do
  StartWorkflow.perform_async(order.id)
end

class StartWorkflow
  def perform
    batch.jobs do
      step1 = Sidekiq::Batch.new
      step1.on(:success, 'FulfillmentCallbacks#step1_done', 'oid' => order.id)
      step1.jobs do
        A.perform_async(order.id)
      end
    end
  end
end

Note that we create a StartWorkflow job in the overall batch which creates Step 1 batch. This is because all batches must have one job to be valid, otherwise their behavior is not defined. That one job can create further child batches with their own jobs.

class FulfillmentCallbacks
  def step1_done(status, options)
    oid = options['oid']
    overall = Sidekiq::Batch.new(status.parent_bid)
    overall.jobs do
      step2 = Sidekiq::Batch.new
      step2.on(:success, 'FulfillmentCallbacks#step2_done', 'oid' => oid)
      step2.jobs do
        B.perform_async
        C.perform_async
        D.perform_async
        E.perform_async
        F.perform_async
      end
    end
  end

  def step2_done(status, options)
    oid = options['oid']
    overall = Sidekiq::Batch.new(status.parent_bid)
    overall.jobs do
      step3 = Sidekiq::Batch.new
      step3.on(:success, 'FulfillmentCallbacks#step3_done', 'oid' => oid)
      step3.jobs do
        G.perform_async(oid)
      end
    end
  end

  def step3_done(status, options)
    oid = options['oid']
    overall = Sidekiq::Batch.new(status.parent_bid)
    overall.jobs do
      step4 = Sidekiq::Batch.new
      step4.on(:success, 'FulfillmentCallbacks#step4_done', 'oid' => oid)
      step4.jobs do
        H.perform_async(oid)
        I.perform_async(oid)
      end
    end
  end

  def step4_done(status, options)
    oid = options['oid']
    overall = Sidekiq::Batch.new(status.parent_bid)
    overall.jobs do
      J.perform_async(oid)
      K.perform_async(oid)
      L.perform_async(oid)
    end
  end

  def shipped(status, options)
    # this callback will fire once M has succeeded
    oid = options['oid']
    puts "Order #{oid} has shipped!"
  end
end

class L
  include Sidekiq::Worker

  def perform(oid)
    # do stuff
    if bid
      # if we belong to a batch, assume we're within the fulfillment workflow
      # and need to kick off job M
      batch.jobs do
        M.perform_async(oid)
      end
    end
  end
end

In this manner, we can implement serial steps in the workflow, with the jobs in each step executing in parallel.

Tips

Using the Batch API can be complex, these tips can help you avoid some of the more insideous gotchas:

  • Once a batch is created, you cannot edit it except to add more jobs to it. Jobs can only be dynamically added to a Batch within another job in that Batch.
  • Callbacks should never, ever change the associated batch. Notably you cannot add more jobs to the Batch once its callbacks are running. Callbacks are fired when all jobs have executed, what does it mean to add more jobs now?

Adding more jobs to the current batch:

def perform(...)
  if ...
    # Sidekiq::Worker#batch is a method which gives you access 
    # to the current batch for this job
    batch.jobs do
      AnotherWorker.perform_async(...)
    end
  end
end

Once a step in a workflow is complete, it's idiomatic to reopen the parent batch within a callback and add another step to that workflow. This is how we get sequential execution ("don't execute this job until these 3 jobs are complete").

def step1_done(status, options)
  parent = Sidekiq::Batch.new(status.parent_bid)
  parent.jobs do
    # add a new child batch which represents all the jobs
    # necessary for Step 2, the batch picks up its parent 
    # automatically since it is defined within the parent's `jobs`.
    b = Sidekiq::Batch.new
    b.description = "Step 2"
    b.on(:success, 'SomeClass#step2_done', options)
    b.jobs do
      MoreWork.perform_async(...)
    end
  end
end

Rule of thumb: jobs only open their own batch, callbacks only open their parent batch.

You can’t perform that action at this time.