Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Small issue in the implementation of submit job #1

Merged
merged 6 commits into from

2 participants

@fbrubacher

Hey Marc, please review this patch, also I'm working on adding tests for this functionality , I will commit them soon

@mlimotte
Owner

Looks good. Add the test case, and I'll merge it.

@fbrubacher

Hi Marc, I added a test case and a bunch of other things needed, there are test cases failing due to the new add-steps, I will try to make them work. Also one of the commits is an upgrade to lein2 , I found myself needing that for nrepl etc. This commit of course is optional. But I think is a good to have. Best.

project.clj
((8 lines not shown))
:dependencies [[org.clojure/clojure "1.3.0"]
[org.clojure/tools.logging "0.2.3"]
[org.clojure/data.json "0.1.2"]
+ [org.clojure/tools.trace "0.7.3"]
@mlimotte Owner

Maybe this should be a dev dependency?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
src/main/clj/com/climate/services/aws/emr.clj
((7 lines not shown))
- (.setMainClass main-class)
- (.setArgs (vec cli-args)) ;collection of strings
- (.setProperties (kv-props properties))
- (.setActionOnFailure (str (or action-on-failure
- (and alive? ActionOnFailure/CANCEL_AND_WAIT)
- ActionOnFailure/TERMINATE_JOB_FLOW)))))]
- sc))
+ (doto
+ (HadoopJarStepConfig.)
+ (.setJar jar-path)
+ (.setMainClass main-class)
+ (.setArgs (vec cli-args)) ;collection of strings
+ (.setProperties (kv-props properties))))]
+ (.setActionOnFailure sc (str (or action-on-failure
+ (and alive? ActionOnFailure/CANCEL_AND_WAIT)
+ ActionOnFailure/TERMINATE_JOB_FLOW)))))
@mlimotte Owner

fix indent of these two lines.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
src/test/clj/com/climate/services/aws/emr_test.clj
((12 lines not shown))
java.util.Date))
+
@mlimotte Owner

extra blank line

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
src/test/clj/com/climate/services/aws/emr_test.clj
@@ -101,46 +108,21 @@
(is= "m1.xlarge" (.getMasterInstanceType instances))
(is= "m1.xlarge" (.getSlaveInstanceType instances)))))
-(deftest ^{:manual true} test-job-flow-detail
@mlimotte Owner

Why did you remove these 3 deftests and the 2 deftests below.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Brubacher, F... added some commits
@fbrubacher

Hi Marc, I just rebased everything and fixed the issues you pointed out. Sorry about the delay , I was on vacation for the last couple weeks. Will continue looking into this today especially looking that all tests pass with yours/mine modifications. Happy NY.

@mlimotte

I think add-steps should be changed to accept a seq, and create the ArrayList inside the function. The exposed API function shouldn't require an ArrayList-- that's not clojure-esque.

@mlimotte
Owner

Moving to lein2 is a good idea. We should also add:

:min-lein-version "2.0.0"

as a reminder.

@mlimotte
Owner
@fbrubacher

Hey Marc some of the tests in the emr_test ns don't pass. I spent some time investigating but will continue and will let you know what I find out. Best

@mlimotte
Owner

I found the issue with the emr-test. As fara as I know, I can't make a change to your pull-request before merging it, so you need to put the fix in. Just do this:

diff --git a/src/main/clj/com/climate/services/aws/emr.clj b/src/main/clj/com/climate/services/aws/emr.clj
index c9300c1..fb0a9ff 100644
--- a/src/main/clj/com/climate/services/aws/emr.clj
+++ b/src/main/clj/com/climate/services/aws/emr.clj
@@ -247,7 +247,8 @@
                           (.setProperties (kv-props properties))))]
     (.setActionOnFailure sc (str (or action-on-failure
                                      (and alive? ActionOnFailure/CANCEL_AND_WAIT)
-                                     ActionOnFailure/TERMINATE_JOB_FLOW)))))
+                                     ActionOnFailure/TERMINATE_JOB_FLOW)))
+    sc))

 (defn add-steps
   "Add a step to a running jobflow. Steps is a seq of StepConfig objects.
@mlimotte
Owner

Should also fix the project.clj like this:

diff --git a/project.clj b/project.clj
index 12f548a..a8cba5a 100644
--- a/project.clj
+++ b/project.clj
@@ -1,4 +1,4 @@
-(defproject lemur "1.2.0"
+(defproject lemur "1.2.2-SNAPSHOT"

   :description "Lemur is a tool to launch hadoop jobs locally or on EMR
                 based on a configuration file, referred to as a jobdef."
@@ -32,14 +32,11 @@
                  ; Other
                  [log4j/log4j "1.2.16"]]

-  :dev-dependencies [[robert/hooke "1.1.2"] ;for leiningen test-selectors
-                     [org.clojure/tools.trace "0.7.1"]
-                     [com.offbytwo.iclojure/iclojure "1.1.0"]
-                     [clojure-source "1.3.0"]
-                     [org.clojure/tools.trace "0.7.3"]]
-
   :profiles {:dev {:plugins [[lein-midje "2.0.4"]]
-                   :dependencies [[midje "1.4.0"]]}} 
+                   :dependencies [[midje "1.4.0"] 
+                                  [org.clojure/tools.trace "0.7.3"]
+                                  [clojure-source "1.3.0"]]}}
+
   :repl-init lemur.repl
   :main ^:skip-aot lemur.repl
   :min-lein-version "2.0.0"
@mlimotte
Owner

This is good work, Federico. I'm going to merge this pull request now into my repository and apply the changes I commented above directly.

@mlimotte mlimotte merged commit 47ee6f6 into mlimotte:master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Nov 19, 2012
  1. SetActionOnFailure should be called for the StepConfig object

    Brubacher, Federico authored
Commits on Jan 7, 2013
  1. Test for add-action step

    Brubacher, Federico authored
  2. update project.clj to be compatible with lein2

    Brubacher, Federico authored
  3. Require commons from lemur.core.test

    Brubacher, Federico authored
Commits on Jan 25, 2013
  1. the method add-steps now recieves a seq

    Brubacher, Federico authored
  2. Final support for lein 2 (correct versions of bultitude , midje, lein…

    Brubacher, Federico authored
    … midje)
This page is out of date. Refresh to see the latest.
View
23 project.clj
@@ -8,12 +8,13 @@
["-Dlog4j.configuration=file:resources/log4j.properties"]
[])
- :source-path "src/main/clj"
- :test-path "src/test/clj"
+ :source-paths ["src/main/clj"]
+ :test-paths ["src/test/clj"]
:dependencies [[org.clojure/clojure "1.3.0"]
[org.clojure/tools.logging "0.2.3"]
[org.clojure/data.json "0.1.2"]
+ [bultitude "0.2.0"]
; aws-java-sdk-1.3.3 does not specify the correct httpclient, so we do it explicitly
[org.apache.httpcomponents/httpclient "4.1.1"]
@@ -33,22 +34,24 @@
:dev-dependencies [[robert/hooke "1.1.2"] ;for leiningen test-selectors
[org.clojure/tools.trace "0.7.1"]
- [midje "1.3.1"]
- [lein-midje "1.0.8"]
[com.offbytwo.iclojure/iclojure "1.1.0"]
- [clojure-source "1.3.0"]]
-
- :test-selectors {:default (fn [v] (not (or (:integration v) (:manual v))))
- :integration :integration
- :manual :manual
- :all (fn [v] (not (:manual v)))}
+ [clojure-source "1.3.0"]
+ [org.clojure/tools.trace "0.7.3"]]
+ :profiles {:dev {:plugins [[lein-midje "2.0.4"]]
+ :dependencies [[midje "1.4.0"]]}}
:repl-init lemur.repl
:main ^:skip-aot lemur.repl
+ :min-lein-version "2.0.0"
:run-aliases {:lemur lemur.core}
; Launch irepl:
;java -cp lib/*:lib/dev/* com.offbytwo.iclojure.Main
+ :test-selectors {:default (fn [v] (not (or (:integration v) (:manual v))))
+ :integration :integration
+ :manual :manual
+ :all (fn [v] (not (:manual v)))}
+
:aot [lemur.core])
View
30 src/main/clj/com/climate/services/aws/emr.clj
@@ -9,6 +9,7 @@
(:import
java.io.File
com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient
+ com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsRequest
com.amazonaws.services.elasticmapreduce.util.StepFactory
com.amazonaws.auth.BasicAWSCredentials
[com.amazonaws.services.elasticmapreduce.model
@@ -224,6 +225,13 @@
(.setActionOnFailure (str ActionOnFailure/TERMINATE_JOB_FLOW))
(.setHadoopJarStep (.newEnableDebuggingStep (StepFactory.)))))
+(defn terminate-flow-id
+ ([jobflow-id]
+ (terminate-flow-id jobflow-id *emr*))
+ ([jobflow-id emr]
+ (.terminateJobFlows emr
+ (TerminateJobFlowsRequest. (java.util.ArrayList. [jobflow-id])))))
+
(defn step-config [name alive? jar-path main-class cli-args & {:keys [action-on-failure properties]}]
"Create a step to be submitted to EMR.
jar-path is the hadoop job jar, usually an s3:// path.
@@ -231,22 +239,22 @@
action-on-failure is a String or enum com.amazonaws.services.elasticmapreduce.model.ActionOnFailure.
properties is a map of Java properties that are set when the step runs."
(let [sc (StepConfig. name
- (doto
- (HadoopJarStepConfig.)
- (.setJar jar-path)
- (.setMainClass main-class)
- (.setArgs (vec cli-args)) ;collection of strings
- (.setProperties (kv-props properties))
- (.setActionOnFailure (str (or action-on-failure
- (and alive? ActionOnFailure/CANCEL_AND_WAIT)
- ActionOnFailure/TERMINATE_JOB_FLOW)))))]
- sc))
+ (doto
+ (HadoopJarStepConfig.)
+ (.setJar jar-path)
+ (.setMainClass main-class)
+ (.setArgs (vec cli-args)) ;collection of strings
+ (.setProperties (kv-props properties))))]
+ (.setActionOnFailure sc (str (or action-on-failure
+ (and alive? ActionOnFailure/CANCEL_AND_WAIT)
+ ActionOnFailure/TERMINATE_JOB_FLOW)))))
(defn add-steps
"Add a step to a running jobflow. Steps is a seq of StepConfig objects.
Use (step-config) to create StepConfig objects."
[jobflow-id steps]
- (.addJobFlowSteps *emr* (AddJobFlowStepsRequest. jobflow-id steps)))
+ (let [steps-array (to-array steps)]
+ (.addJobFlowSteps *emr* (AddJobFlowStepsRequest. jobflow-id steps))))
(defn start-job-flow [name steps {:keys [log-uri bootstrap-actions ami-version supported-products]
:or {bootstrap-actions [] supported-products []}
View
56 src/test/clj/com/climate/services/aws/emr_test.clj
@@ -24,9 +24,13 @@
[ec2 :as ec2]
[common :as awscommon]])
(:import
+ com.amazonaws.services.elasticmapreduce.util.StepFactory
[com.amazonaws.services.elasticmapreduce.model
JobFlowDetail
- JobFlowExecutionStatusDetail]
+ JobFlowExecutionStatusDetail
+ HadoopJarStepConfig
+ ActionOnFailure
+ StepConfig]
java.util.Date))
;; Some tests are labelled as :manual, rather than :integration, because
@@ -37,6 +41,23 @@
(def aws-creds (awscommon/aws-credential-discovery))
+(def ^:dynamic *flow-args*
+ {:bootstrap-actions
+ ; Only publicly available script, so we don't have to upload the others.
+ [(bootstrap "Hadoop Config"
+ "s3://elasticmapreduce/bootstrap-actions/configure-hadoop"
+ ["-m" "mapred.map.tasks.speculative.execution=false"])]
+ :log-uri (str "s3://" bucket)
+ :keypair (:keypair aws-creds) ; the elastic-mapreduce credentials.json file as a keypair entry
+ :ami-version "latest"
+ :num-instances 2
+ :master-type "m1.xlarge"
+ :slave-type "m1.xlarge"
+ :spot-task-type "m1.xlarge"
+ :spot-task-bid "1.00"
+ :spot-task-num 1
+ :keep-alive false})
+
(use-fixtures :once
(fn [f]
(binding [s3/*s3* (s3/s3 aws-creds)
@@ -70,22 +91,7 @@
["-input" (format "s3://%s/data/simple.txt" bucket)
"-output" "/out"
"-mapper" (format "s3://%s/scripts/wc.sh" bucket)])]
- {:bootstrap-actions
- ; Only publicly available script, so we don't have to upload the others.
- [(bootstrap "Hadoop Config"
- "s3://elasticmapreduce/bootstrap-actions/configure-hadoop"
- ["-m" "mapred.map.tasks.speculative.execution=false"])]
- :log-uri (str "s3://" bucket)
- :keypair (:keypair aws-creds) ; the elastic-mapreduce credentials.json file as a keypair entry
- :ami-version "latest"
- :num-instances 2
- :master-type "m1.xlarge"
- :slave-type "m1.xlarge"
- :spot-task-type "m1.xlarge"
- :spot-task-bid "1.00"
- :spot-task-num 1
- ;too dangerous to use keep-alive in tests, so tests are limited-- no (emr/add-step) for example
- :keep-alive false}))))
+ *flow-args*))))
; Specified as a fn rather than a test. This is a hack to force it to run before
; test-wait-on-step. It will fail if the cluster has already COMPLETED.
@@ -159,3 +165,19 @@
(is= "m1.xlarge" spot-task-type)
(is= 20 spot-task-num)
(is= (format "%.3f" expected-bid) spot-task-bid)))
+
+(defn make-dummy-step []
+ (doto (StepConfig.)
+ (.setName "Dummy")
+ (.setActionOnFailure (str ActionOnFailure/TERMINATE_JOB_FLOW))
+ (.setHadoopJarStep (.newEnableDebuggingStep (StepFactory.)))))
+
+(deftest ^{:manual true} test-add-steps-to-existing-flow
+ (testing "emr/add-step"
+ (binding [*flow-args* (conj *flow-args* {:keep-alive true})]
+ (let [jf-id (setup)
+ dummy-steps (make-dummy-step)]
+ (is= 0 (.size (steps-for-jobflow jf-id)))
+ (add-steps jf-id [dummy-steps])
+ (is= 1 (.size (steps-for-jobflow jf-id)))
+ (terminate-flow-id jf-id)))))
View
1  src/test/clj/lemur/core_test.clj
@@ -17,6 +17,7 @@
lemur.core
[lemur.command-line :only [quit]]
[lemur.evaluating-map :only [evaluating-map]]
+ [lemur.common :only [eoval]]
midje.sweet
clojure.test
lemur.test)
Something went wrong with that request. Please try again.