Skip to content
Thibaut Barrère edited this page May 13, 2020 · 12 revisions

Kiba Pro ParallelTransform provides an easy way to process a group of rows at the same time using a pool of threads. In its current state, it is intended to accelerate ETL transforms doing IO operations such as HTTP requests, by going multithreaded instead of single threaded.

Currently tested against: MRI Ruby 2.4-2.7. Not tested strictly speaking against JRuby and TruffleRuby, yet, but will likely work equally (if it does not, get in touch!).

Requirements:

  • Add concurrent-ruby to your Gemfile (future implementations may or may not use concurrent-ruby).
  • Kiba's StreamingRunner (the default in v3) must be enabled.

Typical use

require 'kiba-pro/transforms/parallel_transform'

job = Kiba.parse do
  extend Kiba::Pro::Transforms::ParallelTransform::DSLExtension

  # SNIP

  parallel_transform(max_threads: 10) do |r|
    # this code will run in its own thread
    extra_data = get_extra_json_hash_from_http!(r.fetch(:extra_data_url))
    r.merge(extra_data: extra_data)
  end
  
  # SNIP
end

The parallel_transform call is actually a shortcut for:

transform Kiba::Pro::Transforms::ParallelTransform,
  max_threads: 10,
  on_row: -> (r) { ... transform code ... }

The number of threads (defaulting to 10) can be adjusted with the max_threads: xx option.

How many threads should I use?

It is recommended to play with different values on your specific workloads and setups, before deciding what to use. The optimal value will vary largely!

For jobs which are running alone, as their own process (e.g. Rake tasks, cron jobs), you can push the envelope a bit further.

For jobs which are part of existing systems (e.g. a Sidekiq background job), you have to keep in mind that each ETL job will use N threads, and that other ETL jobs + regular Sidekiq jobs may run at the same time (see this thread for interesting notes).

In addition to that, your optimal setting will vary depending on which version of Ruby you are using (MRI Ruby, JRuby, TruffleRuby).

As always: measure, measure, measure!

Technical notes

Processing

In its current form, the transform will buffer N rows (up to max_threads) then will trigger the processing using N threads. The current implementation relies on the default global thread pool created automatically by concurrent-ruby. Future versions of this component may let you override that to use a custom thread pool instead.

Rows are processed by batches (sized with max_threads rows), which means that the transform will wait for N rows to be accumulated, before running threads.

The order of the input rows is kept in the output rows (a characteristic which is often very helpful in ETL).

Exception handling

Exceptions not rescued inside the code block will be intercepted by the parallel transform, then re-raised at the end of the batch (of max_threads rows), hence stopping the processing.

In a given batch, rows not generating an exception will have the opportunity to see their code block complete (but the job processing will be halted at the end of batch if at least one exception has been raised).

Once a given batch is complete, the parallel transform checks if exceptions occurred:

  • If exactly one exception occurred in a thread, the parallel transform will interrupt the job by raising the very same exception.
  • If multiple exceptions occurred during the batch, the parallel transform will also interrupt the job, this time by wrapping all the exceptions in a single Concurrent::MultipleErrors (see doc).

⚠️ If you rescue Concurrent::MultipleErrors in your code, make sure to verify if one of the wrapped errors is not a StandardError (e.g. Exception). If that's the case, you may want to re-raise it (see Rescue StandardError, Not Exception).

Handling timeouts

It is recommended to ensure you set proper timeouts on your queries (especially if you work with Sidekiq, which is intended to use with shorter jobs compared to e.g. Rake tasks).

That said: Do not use Ruby's Timeout (doc) inside your code. It is a dangerous API (see here, here or again here).

Instead, use lower-level timeouts, specific to each library.

You can find a greatly detailed list in the Ultimate Guide to Ruby Timeouts.