Browse files

Add support for streaming batch operations

  • Loading branch information...
1 parent 3cfae16 commit faa90fe6766840c01da22012f0166a90f53ae634 @mperham committed Sep 30, 2011
Showing with 58 additions and 7 deletions.
  1. +6 −0 History.md
  2. +31 −7 lib/girl_friday/batch.rb
  3. +21 −0 test/test_batch.rb
View
6 History.md
@@ -1,6 +1,12 @@
Changes
================
+HEAD
+---------
+
+* Add streaming batch support for adding elements to a batch one at a
+ time rather than all at once.
+
0.9.6
---------
View
38 lib/girl_friday/batch.rb
@@ -15,32 +15,56 @@ class Batch
def initialize(enumerable, options, &block)
@queue = GirlFriday::Queue.new(:batch, options, &block)
@complete = 0
- @size = enumerable.count
- @results = Array.new(@size)
+ if enumerable
+ @size = enumerable.count
+ @results = Array.new(@size)
+ else
+ @size = 0
+ @results = []
+ end
@lock = Mutex.new
@condition = ConditionVariable.new
+ @frozen = false
start(enumerable)
end
def results(timeout=nil)
+ @frozen = true
@lock.synchronize do
@condition.wait(@lock, timeout) if @complete != @size
@queue.shutdown
@results
end
end
+ def push(msg)
+ raise ArgumentError, "Batch is frozen, you cannot push more items into it" if @frozen
+ @lock.synchronize do
+ @results << nil
+ @size += 1
+ index = @results.size - 1
+ @queue.push(msg) do |result|
+ completion(result, index)
+ end
+ end
+ end
+ alias_method :<<, :push
+
private
def start(operations)
operations.each_with_index do |packet, index|
@queue.push(packet) do |result|
- @lock.synchronize do
- @complete += 1
- @results[index] = result
- @condition.signal if @complete == @size
- end
+ completion(result, index)
end
+ end if operations
+ end
+
+ def completion(result, index)
+ @lock.synchronize do
+ @complete += 1
+ @results[index] = result
+ @condition.signal if @complete == @size
end
end
View
21 test/test_batch.rb
@@ -39,4 +39,25 @@ def test_batch_timeout
# http://redmine.ruby-lang.org/issues/5342
sleep 0.1
end
+
+ def test_streaming_batch_api
+ batch = GirlFriday::Batch.new(nil, :size => 4) do |msg|
+ sleep msg
+ 'x'
+ end
+ a = Time.now
+ batch << 0.1
+ batch << 0.1
+ batch << 0.1
+ batch << 0.1
+ values = batch.results
+ b = Time.now
+ values.must_be_kind_of Array
+ values.must_equal %w(x x x x)
+ assert_in_delta 0.1, (b - a), 0.1
+
+ assert_raises ArgumentError do
+ batch << 0.1
+ end
+ end
end

0 comments on commit faa90fe

Please sign in to comment.