Skip to content

Implementing ETL destinations

Thibaut Barrère edited this page Feb 10, 2020 · 4 revisions

Like sources, destinations are classes that you are providing. Destinations must implement:

  • a constructor (to which Kiba will pass the provided arguments in the DSL)
  • a write(row) method that will be called for each non-dismissed row
  • an optional close method (only called if no error was raised - see notes below on handling resources)

Here is an example destination:

require 'csv'

# simple destination assuming all rows have the same fields
class MyCsvDestination
  attr_reader :output_file

  def initialize(output_file)
    @output_file = output_file
  end

  def write(row)
    @csv ||= CSV.open(output_file, 'w')
    unless @headers_written
      @headers_written = true
      @csv << row.keys
    end
    @csv << row.values
  end

  def close
    @csv.close
  end
end

Note that you do not have to pass the row back as a result (unlike on transforms), because the return of write(row) on a destination is currently ignored.

Handling resources clean-up

If a failure occurs somewhere in your pipeline, the close method of your destinations will not be called.

This is quite different from the way sources work (with each the single method, where it's easy to use a block-form for resources).

This can be problematic if you have to clean up resources.

If you need to guarantee some form of tear-down for a resource in a destination, it is recommended that you move that responsibility up into the code that calls Kiba.parse and Kiba.run, e.g.:

open_my_database do |db|
  job = ETL::SyncJob.setup(db: db)
  Kiba.run(job)
end

Next: Implementing pre and post processors