diff --git a/lib/kiba/streaming_runner.rb b/lib/kiba/streaming_runner.rb index 2015ef0..943fbc9 100644 --- a/lib/kiba/streaming_runner.rb +++ b/lib/kiba/streaming_runner.rb @@ -11,6 +11,11 @@ def transform_stream(stream, t) end y << returned_row if returned_row end + if t.respond_to?(:close) + t.close do |close_row| + y << close_row + end + end end end diff --git a/test/support/test_aggregate_transform.rb b/test/support/test_aggregate_transform.rb new file mode 100644 index 0000000..1420aa6 --- /dev/null +++ b/test/support/test_aggregate_transform.rb @@ -0,0 +1,19 @@ +class AggregateTransform + def initialize(aggregate_size:) + @aggregate_size = aggregate_size + end + + def process(row) + @buffer ||= [] + @buffer << row + if @buffer.size == @aggregate_size + yield @buffer + @buffer = [] + end + nil + end + + def close + yield @buffer unless @buffer.empty? + end +end diff --git a/test/support/test_non_closing_transform.rb b/test/support/test_non_closing_transform.rb new file mode 100644 index 0000000..485e439 --- /dev/null +++ b/test/support/test_non_closing_transform.rb @@ -0,0 +1,5 @@ +class NonClosingTransform + def process(row) + row + end +end diff --git a/test/test_buffering_transform.rb b/test/test_buffering_transform.rb new file mode 100644 index 0000000..ac725ca --- /dev/null +++ b/test/test_buffering_transform.rb @@ -0,0 +1,26 @@ +require_relative 'helper' +require 'kiba/cli' +require_relative 'support/test_aggregate_transform' +require_relative 'support/test_non_closing_transform' + +class TestBufferingTransform < Kiba::Test + def test_buffering_transform + destination_array = [] + job = Kiba.parse do + extend Kiba::DSLExtensions::Config + config :kiba, runner: Kiba::StreamingRunner + + source TestEnumerableSource, (1..12) + # ensure that a non closing transform won't raise an error + transform NonClosingTransform + transform AggregateTransform, aggregate_size: 5 + destination TestArrayDestination, destination_array + end + Kiba.run(job) + assert_equal [ + [1, 2, 3, 4, 5], + [6, 7, 8, 9, 10], + [11, 12] + ], destination_array + end +end \ No newline at end of file