Skip to content
Thibaut Barrère edited this page May 12, 2020 · 12 revisions

⚠️ This documentation is in draft mode.

Kiba Pro ParallelTransform provides an easy way to process a group of rows at the same time using a pool of threads.

In its current state, it is intended to accelerate ETL transforms doing IO operations such as HTTP requests, by going multithreaded instead of single threaded.

Currently tested against: MRI Ruby 2.4-2.7. Not tested strictly speaking against JRuby and TruffleRuby, yet, but will likely work equally (if it does not, get in touch!).

Requirements: add concurrent-ruby to your Gemfile.

Typical use

require 'kiba-pro/transforms/parallel_transform'

job = Kiba.parse do
  extend Kiba::Pro::Transforms::ParallelTransform::DSLExtension

  # SNIP

  parallel_transform(concurrency: 10) do |r|
    extra_data = get_extra_json_hash_from_http!(r.fetch(:extra_data_url))
    r.merge(extra_data: extra_data)
  end
  
  # SNIP
end

The parallel_transform call is actually a shortcut for:

transform Kiba::Pro::Transforms::ParallelTransform,
  concurrency: 10,
  on_row: -> (r) { ... transform code ... }

Technical notes

Processing

In its current form, the transform will buffer N rows (up to concurrency) then will trigger the processing using N threads. The current implementation relies on the default global thread pool created automatically by concurrent-ruby. Future versions of this component may let you override that to use a custom thread pool instead.

Exception handling

At the moment, exceptions are handled differently if there is a single row leading to an exception vs more than one.

An exception in a given thread does not interrupt the processing for other rows in the same batch. The processing, though, is stopped at the end of the batch, if any row generated an exception.

If an exception is raised in a thread processing a single row (in a given batch), that exception will be caught then re-raised in the thread calling Kiba.run, once the processing for all rows is complete.

If more than one exception occur in the processing of a given batch, these exceptions will be caught in each thread, then wrapped into a Concurrent::MultipleErrors (see doc). That exception will be re-raised in the thread calling Kiba.run, once the processing for all rows is complete.

⚠️ It must be noted that Concurrent::MultipleErrors could at some point contain a non-StandardError exception (such as Exception), depending on what you achieve. If you rescue Concurrent::MultipleErrors, it is currently your responsibility to check out each individual error and make sure to re-raise if such an exception is contained (see Rescue StandardError, Not Exception). Future versions of this component may improve that, but in the meantime be careful about that.

Handling timeouts

Handling process termination

Working with Sidekiq