Permalink
Browse files

Add streaming support to MapReduce. Closes #55.

  • Loading branch information...
1 parent 7eeaf2b commit 772c5be2d0a31cc9f8be629a75e15470c0c793ab @seancribbs seancribbs committed Jan 3, 2011
Showing with 25 additions and 4 deletions.
  1. +13 −4 riak-client/lib/riak/map_reduce.rb
  2. +12 −0 riak-client/spec/riak/http_backend_spec.rb
@@ -171,10 +171,19 @@ def to_json(*a)
end
# Executes this map-reduce job.
- # @return [Array<Array>] similar to link-walking, each element is
- # an array of results from a phase where "keep" is true. If there
- # is only one "keep" phase, only the results from that phase will
- # be returned.
+ # @overload run
+ # Return the entire collection of results.
+ # @return [Array<Array>] similar to link-walking, each element is
+ # an array of results from a phase where "keep" is true. If there
+ # is only one "keep" phase, only the results from that phase will
+ # be returned.
+ # @overload run
+ # Stream the results through the given block without accumulating.
+ # @yield [phase, data] A block to stream results through
+ # @yieldparam [Fixnum] phase the phase from which the results were
+ # generated
+ # @yieldparam [Array] data a list of results from the phase
+ # @return [nil] nothing
def run(&block)
raise MapReduceError.new(t("empty_map_reduce_query")) if @query.empty?
@client.backend.mapred(self, &block)
@@ -227,6 +227,18 @@
@backend.stub!(:post).and_return(response)
@backend.mapred(@mr).should == response
end
+
+ it "should stream results through the block" do
+ data = File.read("spec/fixtures/multipart-mapreduce.txt")
+ @backend.should_receive(:post).with(200, "/mapred", {:chunked => true}, @mr.to_json, hash_including("Content-Type" => "application/json")).and_yield(data)
+ block = mock
+ block.should_receive(:ping).twice.and_return(true)
+ @backend.mapred(@mr) do |phase, data|
+ block.ping
+ phase.should == 0
+ phase.data.should have(1).item
+ end
+ end
end
context "getting statistics" do

0 comments on commit 772c5be

Please sign in to comment.