Skip to content

Commit

Permalink
EmrEtlRunner: move max attempts configuration to EMR cluster configur…
Browse files Browse the repository at this point in the history
…ation (closes #3246)
  • Loading branch information
BenFradet committed Jun 27, 2017
1 parent 5714d5e commit 3666917
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,13 @@ def initialize(debug, enrich, shred, elasticsearch, s3distcp, archive_raw, confi
enrich_final_output
end

spark_configurations = [{
"Classification" => "yarn-site",
"Properties" => {
"yarn.resourcemanager.am.max-attempts" => "1"
}
}]

if enrich

# 1. Compaction to HDFS (if applicable)
Expand Down Expand Up @@ -283,6 +290,7 @@ def initialize(debug, enrich, shred, elasticsearch, s3distcp, archive_raw, confi
enrich_step =
if self.class.is_spark_enrich(config[:enrich][:versions][:spark_enrich]) then
@jobflow.add_application("Spark")
spark_configurations.each { |config| @jobflow.add_configuration(config) }
build_spark_step(
"Enrich Raw Events",
assets[:enrich],
Expand Down Expand Up @@ -376,6 +384,7 @@ def initialize(debug, enrich, shred, elasticsearch, s3distcp, archive_raw, confi
if self.class.is_rdb_shredder(config[:storage][:versions][:rdb_shredder]) then
duplicate_storage_config = self.class.build_duplicate_storage_json(targets[:DUPLICATE_TRACKING], false)
@jobflow.add_application("Spark")
spark_configurations.each { |config| @jobflow.add_configuration(config) }
build_spark_step(
"Shred Enriched Events",
assets[:shred],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ object EnrichJob extends SparkJob {
override def sparkConfig(): SparkConf = new SparkConf()
.setAppName(getClass().getSimpleName())
.setIfMissing("spark.master", "local[*]")
.set("spark.yarn.maxAppAttempts", "1")
.set("spark.serializer", classOf[KryoSerializer].getName())
.registerKryoClasses(classesToRegister)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ object ShredJob extends SparkJob {
override def sparkConfig(): SparkConf = new SparkConf()
.setAppName(getClass().getSimpleName())
.setIfMissing("spark.master", "local[*]")
.set("spark.yarn.maxAppAttempts", "1")
.set("spark.serializer", classOf[KryoSerializer].getName())
.registerKryoClasses(classesToRegister)

Expand Down

0 comments on commit 3666917

Please sign in to comment.