Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge remote branch 'origin/version_2'

  • Loading branch information...
commit 4d8a4601e76ac213b6c914b4469ef97d4722c004 2 parents 86183f3 + 55e0970
Philip (flip) Kromer authored
Showing with 87 additions and 0 deletions.
  1. +87 −0 lib/wukong/streamer/reducer.rb
View
87 lib/wukong/streamer/reducer.rb
@@ -0,0 +1,87 @@
+ module Wukong
+ module Streamer
+
+ #
+ # AccumulatingReducer makes it easy to apply one operation across all
+ # occurrences of each key
+ #
+ # On each occurrence of a given key, AccumulatingReducer calls
+ # accumulate, and at the final occurrence calls finalize.
+ #
+ # See ListAccumulatingReducer and KeyCountingReducer for examples
+ #
+ # Make sure you don't have the bad luck, bad judgement or bad approach to
+ # accumulate more data than your box can hold before finalizing.
+ #
+ class Reducer < Wukong::Streamer::Base
+ attr_accessor :key
+ def initialize options
+ super options
+ self.key = :__first_pass__
+ end
+
+ #
+ # override for multiple-field keys, etc.
+ #
+ # Note that get_key is called by +process+ -- so the arguments have
+ # already been +recordize+d. In particular, if you are using
+ # StructRecordizer (or StructStreamer), you can write this as
+ #
+ # def get_key(thing) thing.id.to_i ; end
+ #
+ # or whatever
+ def get_key *record
+ record.first
+ end
+
+ #
+ # Accumulate all records for a given key.
+ #
+ # When the last record for the key is seen, finalize processing and adopt the
+ # new key.
+ #
+ def process *args, &block
+ this_key = get_key(*args)
+ if this_key != self.key # if this is a new key,
+ unless self.key == :__first_pass__
+ finalize(&block) # process what we've collected so far
+ end
+ self.key = this_key # adopt the new key
+ start! *args # and set up for the next accumulation
+ end
+ # collect the current record
+ accumulate *args, &block
+ end
+
+ #
+ # start! is called on the the first record of the new key
+ #
+ def start! *args
+ raise %Q{start! is the new reset! -- it has args now, namely the first
+ record of the new key. It doesn\'t want #super either}
+ end
+
+ #
+ # Override this to accumulate each record for the given key in turn.
+ #
+ def accumulate *args, &block
+ raise "override the accumulate method in your subclass"
+ end
+
+ #
+ #
+ # You must override this method.
+ #
+ def finalize
+ raise "override the finalize method in your subclass"
+ end
+
+ # Finalize the last-seen group.
+ def after_stream *args
+ finalize(){|record| emit record } unless (self.key == :__first_pass__)
+ super *args
+ end
+ end
+
+ end
+end
Please sign in to comment.
Something went wrong with that request. Please try again.