Considerations for running Kiba jobs programmatically (from Sidekiq, Faktory, Rake, ...)

Thibaut Barrère edited this page Jul 19, 2018 · 5 revisions

While the original way to use Kiba is to put your ETL jobs in an .etl file and call the kiba command line (see Running Kiba jobs from the command line), it's also possible to run them programmatically, as part of a Sidekiq or Faktory background job, Rake task, etc.

To call a Kiba job programmatically, you can use the block form of its public API:

job = Kiba.parse do
  source MySource
  transform MyFirstTransform
  transform { |r| r.merge(extra_field: 10) }
  destination MyDestination
end

Kiba.run(job)

If you implement jobs this way instead of using command-line, they also become testable "in-process", which make tests much faster than shelling out to a command, and also make mocking much easier.

A few considerations must be taken into account. Please read below:

Do not call require inside Kiba.parse

My understanding is that require is not thread-safe, so calling require inside Kiba.parse is not recommended in multi-threaded environments.

Do not do this:

job = Kiba.parse do
  require 'dsl_extensions/progress_bar'
  # SNIP
end

You are advised to eager-load all your dependencies instead (e.g. from a Sidekiq initializer, or calling require at the top of your files).

You can pass variables to Kiba.parse

It is very common, and definitely allowed, to reference parameters (such as filenames) or live instances (such as Sequel connections) from Kiba.parse, in order to condition how your job will run.

In the job below, the name of a source file, a live Sequel connection, and a Logger instance, are passed as parameters then used in the definition:

require 'kiba-pro/destinations/sql_upsert'

module ETL
  module SyncPartners
    module_function

    def setup(source_file, sequel_connection, logger)
      Kiba.parse do
        pre_process do
          logger.info "Starting processing for file #{source_file}"
        end

        source CSVSource, 
          filename: source_file,
          csv_options: { headers: true, col_sep: ',' }

        # SNIP
        
        destination Kiba::Pro::Destination::SQLUpsert,
          table: :partners,
          unique_key: :crm_partner_id,
          database: sequel_connection
      end
    end
  end
end

You can then call your job programmatically:

job = ETL::SyncPartners.setup(my_source_file, my_sequel_connection, logger)
Kiba.run(job)

Avoid re-using the job instance

It is not recommended to re-use the output of Kiba.parse (variable job above) for multiple calls to Kiba.run.

If you do so, you may unknowingly end up sharing some form of state between runs (such as variables parameters as described above, or in the way you write ETL components), leading to unexpected results.

Close resources explicitly

At time of writing (Kiba 2.0.0), if an error is raised while Kiba.run is called, nothing is done by Kiba to close resources that you may have opened during the processing (such as files, database connections, etc).

It is for now your responsibility to rescue any error that may happen and to close resources that your components may have opened, or to use construct that will automatically close resources on error (such as the block form of CSV.open).

When applicable, you can also wrap the call to Kiba.run by a block-construct to automatically close the resources you need, e.g:

allocate_connection_from_pool do |connection|
  job = Kiba.parse do
    source SQL, connection: connection
    # SNIP
  end
  Kiba.run(job)
end

(here the connection will be returned to the pool automatically).

You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.
Press h to open a hovercard with more details.