Permalink
Browse files

removed broken class from version_2

  • Loading branch information...
1 parent 4d8a460 commit 7dacf58323d8b9a7495c42ca62b88c3afd3b6b4c Philip (flip) Kromer committed Jan 28, 2011
Showing with 0 additions and 87 deletions.
  1. +0 −87 lib/wukong/streamer/reducer.rb
@@ -1,87 +0,0 @@
- module Wukong
- module Streamer
-
- #
- # AccumulatingReducer makes it easy to apply one operation across all
- # occurrences of each key
- #
- # On each occurrence of a given key, AccumulatingReducer calls
- # accumulate, and at the final occurrence calls finalize.
- #
- # See ListAccumulatingReducer and KeyCountingReducer for examples
- #
- # Make sure you don't have the bad luck, bad judgement or bad approach to
- # accumulate more data than your box can hold before finalizing.
- #
- class Reducer < Wukong::Streamer::Base
- attr_accessor :key
- def initialize options
- super options
- self.key = :__first_pass__
- end
-
- #
- # override for multiple-field keys, etc.
- #
- # Note that get_key is called by +process+ -- so the arguments have
- # already been +recordize+d. In particular, if you are using
- # StructRecordizer (or StructStreamer), you can write this as
- #
- # def get_key(thing) thing.id.to_i ; end
- #
- # or whatever
- def get_key *record
- record.first
- end
-
- #
- # Accumulate all records for a given key.
- #
- # When the last record for the key is seen, finalize processing and adopt the
- # new key.
- #
- def process *args, &block
- this_key = get_key(*args)
- if this_key != self.key # if this is a new key,
- unless self.key == :__first_pass__
- finalize(&block) # process what we've collected so far
- end
- self.key = this_key # adopt the new key
- start! *args # and set up for the next accumulation
- end
- # collect the current record
- accumulate *args, &block
- end
-
- #
- # start! is called on the the first record of the new key
- #
- def start! *args
- raise %Q{start! is the new reset! -- it has args now, namely the first
- record of the new key. It doesn\'t want #super either}
- end
-
- #
- # Override this to accumulate each record for the given key in turn.
- #
- def accumulate *args, &block
- raise "override the accumulate method in your subclass"
- end
-
- #
- #
- # You must override this method.
- #
- def finalize
- raise "override the finalize method in your subclass"
- end
-
- # Finalize the last-seen group.
- def after_stream *args
- finalize(){|record| emit record } unless (self.key == :__first_pass__)
- super *args
- end
- end
-
- end
-end

0 comments on commit 7dacf58

Please sign in to comment.