From 3666917737e865b61709d6ade455ef7bf7374abe Mon Sep 17 00:00:00 2001 From: Ben Fradet Date: Tue, 27 Jun 2017 11:43:26 +0100 Subject: [PATCH] EmrEtlRunner: move max attempts configuration to EMR cluster configuration (closes #3246) --- .../lib/snowplow-emr-etl-runner/emr_job.rb | 9 +++++++++ .../EnrichJob.scala | 1 - .../spark/ShredJob.scala | 1 - 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/3-enrich/emr-etl-runner/lib/snowplow-emr-etl-runner/emr_job.rb b/3-enrich/emr-etl-runner/lib/snowplow-emr-etl-runner/emr_job.rb index a9f652a22d..94ea461a6b 100755 --- a/3-enrich/emr-etl-runner/lib/snowplow-emr-etl-runner/emr_job.rb +++ b/3-enrich/emr-etl-runner/lib/snowplow-emr-etl-runner/emr_job.rb @@ -229,6 +229,13 @@ def initialize(debug, enrich, shred, elasticsearch, s3distcp, archive_raw, confi enrich_final_output end + spark_configurations = [{ + "Classification" => "yarn-site", + "Properties" => { + "yarn.resourcemanager.am.max-attempts" => "1" + } + }] + if enrich # 1. Compaction to HDFS (if applicable) @@ -283,6 +290,7 @@ def initialize(debug, enrich, shred, elasticsearch, s3distcp, archive_raw, confi enrich_step = if self.class.is_spark_enrich(config[:enrich][:versions][:spark_enrich]) then @jobflow.add_application("Spark") + spark_configurations.each { |config| @jobflow.add_configuration(config) } build_spark_step( "Enrich Raw Events", assets[:enrich], @@ -376,6 +384,7 @@ def initialize(debug, enrich, shred, elasticsearch, s3distcp, archive_raw, confi if self.class.is_rdb_shredder(config[:storage][:versions][:rdb_shredder]) then duplicate_storage_config = self.class.build_duplicate_storage_json(targets[:DUPLICATE_TRACKING], false) @jobflow.add_application("Spark") + spark_configurations.each { |config| @jobflow.add_configuration(config) } build_spark_step( "Shred Enriched Events", assets[:shred], diff --git a/3-enrich/spark-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich.spark/EnrichJob.scala b/3-enrich/spark-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich.spark/EnrichJob.scala index 7e10ead78f..5a24a69923 100644 --- a/3-enrich/spark-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich.spark/EnrichJob.scala +++ b/3-enrich/spark-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich.spark/EnrichJob.scala @@ -75,7 +75,6 @@ object EnrichJob extends SparkJob { override def sparkConfig(): SparkConf = new SparkConf() .setAppName(getClass().getSimpleName()) .setIfMissing("spark.master", "local[*]") - .set("spark.yarn.maxAppAttempts", "1") .set("spark.serializer", classOf[KryoSerializer].getName()) .registerKryoClasses(classesToRegister) diff --git a/4-storage/rdb-shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJob.scala b/4-storage/rdb-shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJob.scala index baffcd45f3..9654df3eb9 100644 --- a/4-storage/rdb-shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJob.scala +++ b/4-storage/rdb-shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJob.scala @@ -69,7 +69,6 @@ object ShredJob extends SparkJob { override def sparkConfig(): SparkConf = new SparkConf() .setAppName(getClass().getSimpleName()) .setIfMissing("spark.master", "local[*]") - .set("spark.yarn.maxAppAttempts", "1") .set("spark.serializer", classOf[KryoSerializer].getName()) .registerKryoClasses(classesToRegister)