Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

OMG 2.2 - Streaming support

  • Loading branch information...
commit 87d38c16fc681d969b34dcbc171f1c410b86e5f4 1 parent ddb1c3c
@rslifka authored
View
4 HISTORY.md
@@ -1,3 +1,7 @@
+## 2.2 - July 23, 2012
+
++ Hadoop streaming jobs are now supported via ```Elasticity::StreamingStep```.
+
## 2.1.1 - July 22, 2012
+ ```JobFlow::from_jobflow_id``` factory method added so that you can operate on running job flows (add steps, shutdown, status, etc.) that you didn't start in the same Ruby instance.
View
11 README.md
@@ -159,7 +159,7 @@ end
## 5 - Adding Steps
-Each type of step has a default name that can be overridden (the :name field). Apart from that, steps are configured differently - exhaustively described below.
+Each type of step has ```#name``` and ```#action_on_failure``` fields that can be overridden. Apart from that, steps are configured differently - exhaustively described below.
### Adding a Pig Step
@@ -215,6 +215,15 @@ hive_step.variables = {
jobflow.add_step(hive_step)
```
+### Adding a Streaming Step
+
+```ruby
+# Input bucket, output bucket, mapper and reducer scripts
+streaming_step = Elasticity::StreamingStep.new('s3n://elasticmapreduce/samples/wordcount/input', 's3n://elasticityoutput/wordcount/output/2012-07-23', 's3n://elasticmapreduce/samples/wordcount/wordSplitter.py', 'aggregate')
+
+jobflow.add_step(streaming_step)
+```
+
### Adding a Custom Jar Step
```
View
1  lib/elasticity.rb
@@ -21,6 +21,7 @@
require 'elasticity/custom_jar_step'
require 'elasticity/hive_step'
require 'elasticity/pig_step'
+require 'elasticity/streaming_step'
module Elasticity
end
View
33 lib/elasticity/streaming_step.rb
@@ -0,0 +1,33 @@
+module Elasticity
+
+ class StreamingStep
+
+ include JobFlowStep
+
+ attr_accessor :name
+ attr_accessor :action_on_failure
+ attr_accessor :input_bucket
+ attr_accessor :output_bucket
+ attr_accessor :mapper
+ attr_accessor :reducer
+
+ def initialize(input_bucket, output_bucket, mapper, reducer)
+ @name = 'Elasticity Streaming Step'
+ @action_on_failure = 'TERMINATE_JOB_FLOW'
+ @input_bucket = input_bucket
+ @output_bucket = output_bucket
+ @mapper = mapper
+ @reducer = reducer
+ end
+
+ def to_aws_step(job_flow)
+ step = Elasticity::CustomJarStep.new('/home/hadoop/contrib/streaming/hadoop-streaming.jar')
+ step.name = @name
+ step.action_on_failure = @action_on_failure
+ step.arguments = ['-input', @input_bucket, '-output', @output_bucket, '-mapper', @mapper, '-reducer', @reducer]
@webdev
webdev added a note

why using ivars if you have attr_accessors set up

@rslifka Owner
rslifka added a note

Wouldn't that go through a method invocation versus accessing the variable directly?

@rslifka Owner
rslifka added a note

And why aren't you submitting new feature pull requests?? :)

@webdev
webdev added a note

yeah, sort of, then the question is why do you even need attr_accessors for these ivars if you are not using it anywhere?

➜ elasticity git:(streaming) find . -name "*.rb" -exec grep -l "input_bucket" {} \;
./lib/elasticity/streaming_step.rb
./spec/lib/elasticity/streaming_step_spec.rb

Am I missing something?

I may actually start using elasticity :) We have some EMR work lining up.

@rslifka Owner
rslifka added a note

There's GB pulling out some bash wizardry! :) It's an API - fields exposed by attr_accessor are used by clients, not by Slif.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ step.to_aws_step(job_flow)
+ end
+
+ end
+
+end
View
2  lib/elasticity/version.rb
@@ -1,3 +1,3 @@
module Elasticity
- VERSION = '2.1.1'
+ VERSION = '2.2'
end
View
37 spec/lib/elasticity/streaming_step_spec.rb
@@ -0,0 +1,37 @@
+describe Elasticity::StreamingStep do
+
+ subject do
+ Elasticity::StreamingStep.new('INPUT_BUCKET', 'OUTPUT_BUCKET', 'MAPPER', 'REDUCER')
+ end
+
+ it { should be_a Elasticity::JobFlowStep }
+
+ its(:name) { should == 'Elasticity Streaming Step' }
+ its(:action_on_failure) { should == 'TERMINATE_JOB_FLOW' }
+ its(:input_bucket) { should == 'INPUT_BUCKET' }
+ its(:output_bucket) { should == 'OUTPUT_BUCKET' }
+ its(:mapper) { should == 'MAPPER' }
+ its(:reducer) { should == 'REDUCER' }
+
+ describe '#to_aws_step' do
+
+ it 'should convert to aws step format' do
+ subject.to_aws_step(Elasticity::JobFlow.new('_', '_')).should == {
+ :name => 'Elasticity Streaming Step',
+ :action_on_failure => 'TERMINATE_JOB_FLOW',
+ :hadoop_jar_step => {
+ :jar => '/home/hadoop/contrib/streaming/hadoop-streaming.jar',
+ :args => %w(-input INPUT_BUCKET -output OUTPUT_BUCKET -mapper MAPPER -reducer REDUCER),
+ },
+ }
+ end
+
+ end
+
+ describe '.requires_installation?' do
+ it 'should not require installation' do
+ Elasticity::StreamingStep.requires_installation?.should be_false
+ end
+ end
+
+end
@webdev

why using ivars if you have attr_accessors set up

@rslifka

Wouldn't that go through a method invocation versus accessing the variable directly?

@rslifka

And why aren't you submitting new feature pull requests?? :)

@webdev

yeah, sort of, then the question is why do you even need attr_accessors for these ivars if you are not using it anywhere?

➜ elasticity git:(streaming) find . -name "*.rb" -exec grep -l "input_bucket" {} \;
./lib/elasticity/streaming_step.rb
./spec/lib/elasticity/streaming_step_spec.rb

Am I missing something?

I may actually start using elasticity :) We have some EMR work lining up.

@rslifka

There's GB pulling out some bash wizardry! :) It's an API - fields exposed by attr_accessor are used by clients, not by Slif.

Please sign in to comment.
Something went wrong with that request. Please try again.