Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
branch: master
Fetching contributors…

Cannot retrieve contributors at this time

file 120 lines (103 sloc) 3.421 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
module CollectInject

  # base worker class. you should inherit it and implement methods
  # +map+ and +reduce+. See examples/ dir for possible implementations.
  class WorkerBase
    attr_reader :intermediate_data # mapper outputs here
    attr_reader :output_data # reducer outputs here

    def initialize
      reset
    end

    # runs +map+ on each item in an input chunk and emits its result
    def map_items items
      items.each do |item|
        map_emit(*map(item))
      end
    end

    # map method should receive an item which should be processed
    # returns corresponding (key, value) pair
    # This is just example implementation and should be overridden in inherited class
    def map item
      [:key, item]
    end

    # reduce processes a list generated for each key and returns a single value
    # This is just example implementation and should be overridden in inherited class
    def reduce key, list
      :result
    end

    # stores result of single +map+ operation to intermediate data store
    def map_emit key, value
      @intermediate_data[key] ||= []
      @intermediate_data[key] << value
    end

    # clears all local storage, preparing for be filled with new data
    def reset
      @map_data = nil
      @reduce_data = {}
      @intermediate_data = {}
      @output_data = []
    end

    # Feed worker with initial dataset (usually a chunk of original input data)
    def load_map_data data
      @map_data = data
    end

    # between map and reduce, feed reduce with appropriate data
    def append_reduce_data key, list
      @reduce_data[key] ||= []
      @reduce_data[key] += list
    end

    # run map on initial data chunk
    def run_map
      map_items @map_data
    end

    # run reduce on data processed by map
    def run_reduce
      @reduce_data.each do |key, list|
        result = reduce key, list
        @output_data << result
      end
    end
  end

  # Manager instantiates workers, feeds them with data and puts them to work
  class Manager
    # receives worker class and number of workers to instantiate and run
    def initialize worker_class, worker_count
      @worker_class = worker_class
      @worker_count = worker_count
      @workers = (0...@worker_count).map{
        @worker_class.new
      }
    end

    # based on data key, decide which worker should be fed with data for reduce
    def get_reducer_index key
      key.hash % @worker_count
    end

    # executing collectinject job on given data
    # returns processed data
    def run data
      # splitting data into N chunks:
      data.group_by.with_index{|elem, i| i % @worker_count}.map{ |i, chunk|
        Thread.new do
          # initializing workers and running map
          @workers[i].reset
          @workers[i].load_map_data chunk
          @workers[i].run_map
        end
      }.map(&:join) # waiting for all to finish

      # distributing data for reduce phase
      @workers.each do |worker|
# p worker.intermediate_data
        worker.intermediate_data.each do |k, v|
          @workers[get_reducer_index(k)].append_reduce_data(k, v)
        end
      end

      # running reduce phase
      (0...@worker_count).map{ |i|
        Thread.new do
          @workers[i].run_reduce
        end
      }.map(&:join) # waiting for all to finish

      # collect data from reducers and return it
      @workers.map(&:output_data).inject(&:+)
    end
  end
end
Something went wrong with that request. Please try again.