-
-
Notifications
You must be signed in to change notification settings - Fork 87
Implementing ETL transforms
Kiba ETL transforms can be implemented as a Ruby class, or a Ruby block.
A Kiba transform is a Ruby class with:
- a constructor (used for configuration)
- a
process(row)
method (responsible for preparing output rows based on an input row) - optional: a
close
method (useful for "yielding transforms" in particular, see next section)
Here is an example of transform, expecting rows as Hash
instances with an index
key, which will drop rows unless their index
value matches the expected modulo value:
class SamplingTransform
def initialize(modulo_value)
@modulo_value = modulo_value
end
def process(row)
row.fetch(:index) % @modulo_value == 0 ? row : nil
end
end
One can then use the transform this way:
job = Kiba.parse do
# SNIP
transform SamplingTransform, 10
# SNIP
end
Kiba will call the process
method for each input row.
The process
method must return the modified row, or return nil
to indicate that the row should be dropped from the pipeline.
Since Kiba v3 (or Kiba v2 with StreamingRunner
enabled), you can also yield as many rows as you want for a given input row, using the yield
keyword.
For technical reasons, this will only work in class transforms, not in block transforms.
While simple in appearance, this is a powerful feature which you can leverage to build more reusable components (see Kiba v2.0.0 release notes for more information).
class ExplodingTransform
def process(row)
2.times do |i|
yield({ value: row, value_index: i })
end
# avoid returning a row (as a normal "process" call)
# but you could return one if needed
nil
end
end
You can then use this transform this way:
Kiba.parse do
source Kiba::Common::Sources::Enumerable, (1..4)
transform ExplodingTransform
end
This will generate 2 rows for each of the 4 input rows:
{ value: 1, value_index: 0 }
{ value: 1, value_index: 1 }
{ value: 2, value_index: 0 }
{ value: 2, value_index: 1 }
{ value: 3, value_index: 0 }
{ value: 3, value_index: 1 }
{ value: 4, value_index: 0 }
{ value: 4, value_index: 1 }
Since Kiba v2.5.0, it is possible to call yield
from the optional close
method.
This feature is very useful for anything that will batch process groups of rows, or work with aggregates of rows in general.
See PR #57 and those kiba-common transforms for more information.
An alternate syntax is available for simple transforms to be written as blocks:
transform do |row|
row[:this_field] = row.fetch(:that_field) * 10
# make sure to return the row to keep it in the pipeline
row
end
yield
from block transforms.
While you cannot call return
from a Ruby block, you can
return early from a block by using next
:
transform do |row|
# remove a row with `next`
if row.fetch(:index) % 2 == 0
next # the row will be removed from the pipeline
end
# return a modified row
if row.fetch(:index) % 3 == 0
next {great_index: row.fetch(:index) * 10}
end
# otherwise return the row as is
row
end
This is very useful to avoid nested if
statements inside a single block.
Like the class form, the block form can return nil
to dismiss the row. The class form allows better testability and reusability across your(s) ETL script(s).
Home | Core Concepts | Defining jobs | Running jobs | Writing sources | Writing transforms | Writing destinations | Implementation Guidelines | Kiba Pro
This wiki is tracked by git and publicly editable. You are welcome to fix errors and typos. Any defacing or vandalism of content will result in your changes being reverted and you being blocked.