Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Remove Assembly registry usage from Flow to preserve Cascade namespac…

…es and support multiple Cascades per app
  • Loading branch information...
commit 0f15f0e3817b20951c35596969cf3ee9645430ae 1 parent 58c4245
@mrwalker authored
View
9 lib/cascading/base.rb
@@ -25,6 +25,15 @@ def add_child(node)
@last_child = node
node
end
+
+ def find_child(name)
+ children.each do |child_name, child|
+ return child if child_name == name
+ result = child.find_child(name)
+ return result if result
+ end
+ return nil
+ end
end
# A module to add auto-registration capability
View
19 lib/cascading/flow.rb
@@ -145,23 +145,22 @@ def build_connect_parameter
end
def make_tap_parameter(taps)
- taps.keys.inject({}) do |map, name|
- assembly = Assembly.get(name)
- raise "Could not find assembly '#{name}' to connect to tap #{taps[name]}" unless assembly
+ taps.inject({}) do |map, (name, tap)|
+ assembly = find_child(name)
+ raise "Could not find assembly '#{name}' to connect to tap: #{tap}" unless assembly
- map[assembly.tail_pipe.name] = taps[name]
+ map[assembly.tail_pipe.name] = tap
map
end
end
def make_pipes
- pipes = []
- @sinks.keys.each do |key|
- assembly = Assembly.get(key)
- raise "Undefined assembly #{key}" unless assembly
+ @sinks.inject([]) do |pipes, (name, sink)|
+ assembly = find_child(name)
+ raise "Could not find assembly '#{name}' to make pipe for sink: #{sink}" unless assembly
pipes << assembly.tail_pipe
- end
- return pipes.to_java(Java::CascadingPipe::Pipe)
+ pipes
+ end.to_java(Java::CascadingPipe::Pipe)
end
end
end
View
46 spec/cascading_spec.rb
@@ -26,4 +26,50 @@
end
end.should raise_error Java::CascadingTuple::TupleException, 'field name already exists: line'
end
+
+ it 'should find branches to sink' do
+ cascade 'branched_pass' do
+ flow 'branched_pass' do
+ source 'input', tap('spec/resource/test_input.txt', :kind => :lfs, :scheme => text_line_scheme)
+ assembly 'input' do
+ branch 'branched_input' do
+ project 'line'
+ end
+ end
+ sink 'branched_input', tap("#{OUTPUT_DIR}/branched_pass_out", :kind => :lfs, :sink_mode => :replace)
+ end
+ end.complete
+
+ ilc = `wc -l spec/resource/test_input.txt`.split(/\s+/).first
+ olc = `wc -l #{OUTPUT_DIR}/branched_pass_out/part-00000`.split(/\s+/).first
+ ilc.should == olc
+ end
+
+ it 'should create an isolated namespace per cascade' do
+ cascade 'double' do
+ flow 'double' do
+ source 'input', tap('spec/resource/test_input.txt', :kind => :lfs, :scheme => text_line_scheme)
+ assembly 'input' do # Dup name
+ insert 'doubled' => expr('line:string + "," + line:string')
+ project 'doubled'
+ end
+ sink 'input', tap("#{OUTPUT_DIR}/double_out", :kind => :lfs, :sink_mode => :replace)
+ end
+ end
+
+ cascade 'pass' do
+ flow 'pass' do
+ source 'input', tap('spec/resource/test_input.txt', :kind => :lfs, :scheme => text_line_scheme)
+ assembly 'input' do # Dup name
+ project 'line'
+ end
+ sink 'input', tap("#{OUTPUT_DIR}/pass_out", :kind => :lfs, :sink_mode => :replace)
+ end
+ end
+
+ Cascade.get('double').complete
+ Cascade.get('pass').complete
+ diff = `diff #{OUTPUT_DIR}/double_out/part-00000 #{OUTPUT_DIR}/pass_out/part-00000`
+ diff.should_not be_empty
+ end
end
Please sign in to comment.
Something went wrong with that request. Please try again.