Skip to content

Commit

Permalink
Code clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
mchung committed Oct 28, 2008
1 parent 18d354e commit 8197df0
Show file tree
Hide file tree
Showing 11 changed files with 36 additions and 147 deletions.
11 changes: 0 additions & 11 deletions README

This file was deleted.

11 changes: 11 additions & 0 deletions README.markdown
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
== What is Gisting?

Gisting is a zero-configuration, open source implementation of Google's MapReduce programming paradigm. Gisters use Gisting to reduce complex datasets into simple digestable information.

== Author

Marc Chung marc.chung@openrain.com

== Warning

Extremely Alpha Software
2 changes: 1 addition & 1 deletion bin/server
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require 'gisting'
SERVERS = [
["127.0.0.1", 9081, Gisting::MapServer],
["127.0.0.1", 9082, Gisting::MapServer],
# ["127.0.0.1", 9083, Gisting::MapServer],
["127.0.0.1", 9083, Gisting::MapServer],

["127.0.0.1", 9091, Gisting::ReduceServer],
["127.0.0.1", 9092, Gisting::ReduceServer]
Expand Down
6 changes: 3 additions & 3 deletions examples/term_count.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

# TODO: Grab this from ARGV
def args
# args = ["/Users/mchung/Public/datasets/sample.data", "/Users/mchung/Public/datasets/sample.data"]
args = ["/Users/mchung/Public/datasets/sample.data", "/Users/mchung/Public/datasets/sample.data"]
# args = ["/Users/mchung/Public/datasets/sample.data", "/Users/mchung/Public/datasets/sample.data", "/Users/mchung/Public/datasets/sample.data"]
args = ["/Users/mchung/Public/datasets/aoldb_dev.txt", "/Users/mchung/Public/datasets/aoldb_dev.txt"]
# args = ["/Users/mchung/Public/datasets/aoldb_dev.txt", "/Users/mchung/Public/datasets/aoldb_dev.txt"]
# args = ["/Users/mchung/Public/datasets/sample.data", "/Users/mchung/Public/datasets/aoldb_dev.txt"]
args
end
Expand All @@ -26,7 +26,7 @@ def args
end
output = spec.output
output.filebase = "/Users/mchung/Public/datasets/output"
output.num_tasks = 2
output.num_tasks = 1
output.reduce do |reduce_input|
count = 0
reduce_input.each do |value|
Expand Down
13 changes: 0 additions & 13 deletions lib/gisting/map_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,6 @@ def post_init
def receive_data(output_data)
@result.recv_map_data!(output_data)
end

# def unbind
# puts "unbinding"
# if error?
# puts "An error occurred"
# else
# puts "Completed successfully"
# end
# end
#
# def connection_completed
# puts "..connection_completed called"
# end

protected

Expand Down
1 change: 0 additions & 1 deletion lib/gisting/map_runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module Gisting
class MapRunner

attr_accessor :data_source, :map_proc
attr_reader :output

def initialize(input)
pp input
Expand Down
4 changes: 3 additions & 1 deletion lib/gisting/reduce_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,22 @@ def initialize(*args)
end

def post_init
# puts "post init"
send_task_if_available
end

def receive_data(output_result)
# puts "recv data #{output_result}"
recv_task(output_result)
send_task_if_available
end

protected


def send_task_if_available
next_task = @result.next_available_map_result
if next_task
# pp [@output.filebase, next_task]
send_data([@output, next_task].to_yaml)
@result.sent_reduce_data!
end
Expand Down
103 changes: 7 additions & 96 deletions lib/gisting/reduce_runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ class ReduceRunner
attr_accessor :map_data_input, :red_proc

def initialize(output, input)
@map_data_input = input
@histogram = {}
@red_proc = output.reduce_proc
@output_file = output.filebase
@map_data_input = input
setup_emit
end

Expand All @@ -21,7 +21,6 @@ def Emit(count)

def reduce!
begin
puts "reducing"
File.open(@map_data_input).each do |line|
@key, val = line.strip.split(":")
apply([val.strip])
Expand All @@ -45,116 +44,28 @@ def apply(data_item)
end

def setup_emit
# puts "setting up emit"
if File.exists?(@output_file)
# puts "#{@output_file} exists... loading into memory"
File.open(@output_file).each do |line|
key, val = line.strip.split(":")
@histogram[key] = val
@key, val = line.strip.split(":")
apply([val.strip])
end
else
# puts "#{@output_file} does not exist.. creating for first time"
FileUtils.touch(@output_file)
end
end

def reduce_completed!
out = Tempfile.new("tempfile")
pp out.path
# pp out.path
@histogram.each_pair do |key, val|
out.puts("#{key}: #{val}")
end
out.close
FileUtils.mv(out.path, @output_file)
end

# attr_accessor :data_source, :map_proc
# attr_reader :output
#
# def initialize(map_input)
# pp map_input
# @data_source = map_input.file_pattern
# @map_proc = map_input.map_proc
# setup_emit
# end
#
# def Emit(key, value)
# @emit.store(key, value)
# end
#
# def map!
# # TODO Abstract away file data source
# File.read(self.data_source).each do |line|
# apply(line)
# end
# map_completed!
# end
#
# def output
# @intermediate_output
# end
#
# protected
#
# def apply(data_item)
# # pp data_item
# @proc ||= eval(self.map_proc)
# @proc.call(data_item)
# end
#
# def setup_emit
# # TODO Abstract away file data source
# @intermediate_output = make_intermediate_output
# @output = File.new(@intermediate_output, "w")
# @emit = FileEmit.new(@output)
# end
#
# def make_intermediate_output
# # TODO Abstract away file data source
# basedir = File.dirname(@data_source)
# filename = File.basename(@data_source)
# old_ext = File.extname(filename)
# filename_no_ext = File.basename(filename, old_ext)
# new_ext = rand(100).to_s
# intermediate_filename = "#{filename_no_ext}.#{new_ext}#{old_ext}"
#
# File.join(basedir, "results", intermediate_filename)
# end
#
# def map_completed!
# # TODO Abstract away file data source
# @output.flush
# @output.close
# end

# def giest_old(spec, result)
#
# # Map. two data sources.
# EM::run {
# EM::connect "127.0.0.1", 8081, Gisting::Conductor, spec.map_inputs[0], result
# EM::connect "127.0.0.1", 8082, Gisting::Conductor, spec.map_inputs[1], result
# }
#
# # Reduce is hacked for now.
# # pp result
#
# data = String.new
# result.responses.each do |file|
# data += File.read(file)
# end
# # puts data
#
# # data = data.sort{|a, b| a <=> b} # need to sort?
#
# # red_proc ||= eval(spec.output.reduce_proc)
# # data.each do |key|
# # key, val = key.strip.split(":")
# # @key = key
# # red_proc.call(val)
# # end
# # print_term_freq(@histogram)
#
# term_count(data)
#
#
# end

end
end
2 changes: 1 addition & 1 deletion lib/gisting/reduce_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def receive_data(output_data)
output, input = YAML::load(output_data)
runner = ReduceRunner.new(output, input)
runner.reduce!
pp runner.output
# pp ["output", runner.output]
send_data(runner.output)
# rescue => e
# e.backtrace.each do |x|
Expand Down
9 changes: 4 additions & 5 deletions lib/gisting/result.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,18 @@ def sent_map_data!
@sent_count += 1
if @sent_count == @spec.map_input_count
puts "Maps distributed"
else
puts "More maps to distribute"
# else
# puts "More maps to distribute"
end
end

def recv_reduce_data!(output_result)
puts "asdf"
@reduce_responses << output_result
if @reduce_responses.size == @spec.reduce_output_count
if @reduce_responses.size == @spec.map_input_count
puts "Stopping Reduce phase"
@spec.stop!
else
puts "Got Reduce result data #{output_result}. #{@spec.reduce_output_count - @reduce_responses.size} remaining."
puts "Got Reduce result data #{output_result}. #{@spec.map_input_count - @reduce_responses.size} remaining."
end
end

Expand Down
21 changes: 6 additions & 15 deletions lib/gisting/spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,20 @@ def add_input
def output
@map_output ||= Output.new
end

def map_input_count
@map_inputs.size
end

def reduce_output_count
@map_output.num_tasks
end

def stop!
EM::stop_event_loop
end

# TODO Round-robin assign available servers with jobs
# TODO Round-robin assign available servers with jobs. Need to map M jobs to N servers
def run_map!(result)
EM::run do
# One for every map input
Expand All @@ -43,24 +43,15 @@ def run_map!(result)
end
end

# TODO Round-robin assign available servers with job
# TODO Round-robin assign available servers with job. Need to map M jobs to N servers
def run_reduce!(result)
reduce = result.setup_reduce_stage(@map_output)
EM::run do
EM::run do
# One for every output#num_task
EM::connect "127.0.0.1", 9091, Gisting::ReduceClient, reduce[0], result
# EM::connect "127.0.0.1", 9092, Gisting::ReduceClient, reduce[1], result
end
end

## should have a spec.output_task which when called, marks the object for queue and sends it to the reduce server.
## reduce conductor should keep doing this until we're done with the data sets

# Reduce should initiate a call for every num_task there is, and creating a file name that's predictable based on filebase
# Should also modify "result" object to sync against the connects. for instance, creating the files from the filebase, but for each map file, dispatch against them. then unblock in similar fashion
# Should be able to dispatch multiple Reduce tasks round robin to only a fixed number of machines
# probably need a reduce conductor.. delgates calls to reduceserver. uses results to share items across #connects


end
end

0 comments on commit 8197df0

Please sign in to comment.