New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for "close" on transforms #53
Comments
Not sure that it is really needed cause you can always "cache" stuff inside an instance variable or some other cache store and flush it by returning the contents of the cache and return nil or call next if inside a block to aggregate. So it can be easy accomplished with the current interface. |
@sirfilip thanks for your input! I'm still thinking through this, and be sure (since I'm always quite aggressively removing features) I'll wait to have an actual production requirement that shows it's really needed, before implementing this :-) |
A first experimental implementation is available in #57. I'll give myself some time to play around with it and see how necessary it is for various scenarios. |
It just occurred to me (while working on another ETL), that #57 also allows to achieve sorting quite easily (here in memory): class SortTransform
def initialize(sort_by:)
@sort_by = sort_by
@buffer = []
end
def process(row)
@buffer << row
nil
end
def close
@buffer.sort_by(@sort_by).each do |row|
yield row
end
end
end I will experiment more, but this looks promising. |
* Add support for aggregating transforms (see #53) * Fix spec for MRI 2.0 * Add support for JRuby 1.7
#57 has been merged into master. This will be released at next round (Kiba 2.5.0). |
@thbar in addition to sorting, this is going to help me solve the following problem: A set of snapshot data might include multiple fact values from the same product. Some of my exports expect me to pick a specific value for each product, such as the min, avg, max, or most recent price. This will allow me to bake that into the middle of the pipeline, still using other transforms, and actual destinations, instead of the ole' instance-var-in-the-pipeline-doing--its-thing-in-the-post-process trick. Cool addition, thanks. |
@ttilberg you made me laugh with this one 😄 it's such a common thing that it's worth having a long word like this! Thanks for the feedback, and I can say I definitely share your findings. I've been using this construct in various places now, and this is useful for a wide range of topics (e.g. data profiling that you can plug conditionally on/off without touching the rest of the pipeline). And in occasions, you can easily plug stuff (still relying on instance variables though), like the upcoming transform LambdaTransform,
on_init: -> { @count = 0 },
on_process: -> (r) { @count += 1; r },
on_close: -> { logger.info "Total #{@count} rows read from source" } This is very convenient for quick profiling while iterating on the data at development time. In all cases, thanks for taking the time to comment & provide feedback, appreciated! |
Sources and destinations both support some form of "close":
#each
(so one can close as part of this)#close
In contrast, transforms cannot currently close.
This will be interesting in particular with Kiba v2
StreamingRunner
, to implement some forms of buffering in the transform itself (e.g: keeping N records before doing a grouped query, aggregating N records), in which caseclose
is a way to ensure we flush out the buffer.See https://stackoverflow.com/questions/49422860/transforming-a-table-into-a-hash-of-sets-using-kiba-etl for an example of use.
The text was updated successfully, but these errors were encountered: