diff --git a/bin/snowplow-emr-etl-runner b/bin/snowplow-emr-etl-runner index 5f7e0fb..bb9975d 100755 --- a/bin/snowplow-emr-etl-runner +++ b/bin/snowplow-emr-etl-runner @@ -93,6 +93,9 @@ rescue runner::LinterError => e rescue runner::LockHeldError => e puts "#{e.message}" exit 17 +rescue runner::EmrClusterStateError => e + puts "#{e.message}" + exit 18 # Special retval so rest of pipeline knows not to continue rescue runner::NoDataToProcessError => e puts "No logs to process: #{e.message}" diff --git a/bin/snowplow-hadoop-fs-rmr-0.2.0.sh b/bin/snowplow-hadoop-fs-rmr-0.2.0.sh new file mode 100755 index 0000000..14dd05f --- /dev/null +++ b/bin/snowplow-hadoop-fs-rmr-0.2.0.sh @@ -0,0 +1,28 @@ +#!/bin/sh + +# Copyright (c) 2012-2018 Snowplow Analytics Ltd. All rights reserved. +# +# This program is licensed to you under the Apache License Version 2.0, +# and you may not use this file except in compliance with the Apache License Version 2.0. +# You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the Apache License Version 2.0 is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + +# Author:: Joshua Beemster (mailto:support@snowplowanalytics.com) +# Copyright:: Copyright (c) 2012-2018 Snowplow Analytics Ltd +# License:: Apache License Version 2.0 + +# Recursively removes a list of directories on HDFS +for i in "${@}" +do + hadoop fs -test -d ${i} + if [ $? == 0 ]; then + echo "Removing directory ${i} ..." + hadoop fs -rm -r -skipTrash ${i} + else + echo "Directory ${i} does not exist" + fi +done diff --git a/lib/snowplow-emr-etl-runner.rb b/lib/snowplow-emr-etl-runner.rb index 6ac8dd5..83f3de1 100755 --- a/lib/snowplow-emr-etl-runner.rb +++ b/lib/snowplow-emr-etl-runner.rb @@ -22,6 +22,7 @@ require_relative 'snowplow-emr-etl-runner/cli' require_relative 'snowplow-emr-etl-runner/job_result' require_relative 'snowplow-emr-etl-runner/s3' +require_relative 'snowplow-emr-etl-runner/emr' require_relative 'snowplow-emr-etl-runner/emr_job' require_relative 'snowplow-emr-etl-runner/lock/lock' require_relative 'snowplow-emr-etl-runner/lock/file_lock' diff --git a/lib/snowplow-emr-etl-runner/cli.rb b/lib/snowplow-emr-etl-runner/cli.rb index 6d75acc..79ed624 100755 --- a/lib/snowplow-emr-etl-runner/cli.rb +++ b/lib/snowplow-emr-etl-runner/cli.rb @@ -58,6 +58,8 @@ def self.get_args_config_enrichments_resolver :skip => [], :include => [], :ignore_lock_on_start => false, + :use_persistent_jobflow => false, + :persistent_jobflow_duration => "0m", } commands = { @@ -75,6 +77,8 @@ def self.get_args_config_enrichments_resolver opts.on('-l', '--lock PATH', 'where to store the lock') { |config| options[:lock] = config } opts.on('--ignore-lock-on-start', 'ignore the lock if it is set when starting') { |config| options[:ignore_lock_on_start] = true } opts.on('--consul ADDRESS', 'address to the Consul server') { |config| options[:consul] = config } + opts.on('--use-persistent-jobflow', 'discovers and uses a persistent cluster for steps') { |config| options[:use_persistent_jobflow] = true } + opts.on('--persistent-jobflow-duration DURATION', 'the length of time a persistent cluster can run for before being terminated (e.g. 1d12h)') { |config| options[:persistent_jobflow_duration] = config } end, 'generate emr-config' => OptionParser.new do |opts| opts.banner = 'Usage: generate emr-config [options]' @@ -186,7 +190,9 @@ def self.process_options(options, optparse, cmd_name) :include => options[:include], :lock => options[:lock], :ignore_lock_on_start => options[:ignore_lock_on_start], - :consul => options[:consul] + :consul => options[:consul], + :use_persistent_jobflow => options[:use_persistent_jobflow], + :persistent_jobflow_duration => options[:persistent_jobflow_duration], } summary = optparse.to_s diff --git a/lib/snowplow-emr-etl-runner/contracts.rb b/lib/snowplow-emr-etl-runner/contracts.rb index d9032b1..d495e26 100755 --- a/lib/snowplow-emr-etl-runner/contracts.rb +++ b/lib/snowplow-emr-etl-runner/contracts.rb @@ -43,7 +43,9 @@ module EmrEtlRunner :include => Maybe[ArrayOf[String]], :lock => Maybe[String], :consul => Maybe[String], - :ignore_lock_on_start => Maybe[Bool] + :ignore_lock_on_start => Maybe[Bool], + :use_persistent_jobflow => Maybe[Bool], + :persistent_jobflow_duration => Maybe[String] }) # The Hash containing the buckets field from the configuration YAML diff --git a/lib/snowplow-emr-etl-runner/emr.rb b/lib/snowplow-emr-etl-runner/emr.rb new file mode 100644 index 0000000..2905d2a --- /dev/null +++ b/lib/snowplow-emr-etl-runner/emr.rb @@ -0,0 +1,69 @@ +# Copyright (c) 2012-2018 Snowplow Analytics Ltd. All rights reserved. +# +# This program is licensed to you under the Apache License Version 2.0, +# and you may not use this file except in compliance with the Apache License Version 2.0. +# You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the Apache License Version 2.0 is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + +# Author:: Joshua Beemster (mailto:support@snowplowanalytics.com) +# Copyright:: Copyright (c) 2012-2018 Snowplow Analytics Ltd +# License:: Apache License Version 2.0 + +require 'contracts' +require 'pathname' +require 'uri' + +module Snowplow + module EmrEtlRunner + module EMR + + include Contracts + + # Attempts to find an active EMR JobFlow with a given name + # + # Parameters: + # +client+:: EMR client + # +name+:: EMR cluster name + def get_emr_jobflow_id(client, name) + # Marker is used for paginating through all results + marker = nil + emr_clusters = [] + + loop do + response = list_clusters(client, marker) + emr_clusters = emr_clusters + response['Clusters'].select { |c| c['Name'] == name } + marker = response['Marker'] if response.has_key?('Marker') + break if marker.nil? + end + + case emr_clusters.size + when 0 + return nil + when 1 + emr_cluster = emr_clusters.first + if emr_cluster['Status']['State'] == "RUNNING" + raise EmrClusterStateError, "EMR Cluster must be in WAITING state before new job steps can be submitted - found #{emr_cluster['Status']['State']}" + end + return emr_cluster['Id'] + else + raise EmrDiscoveryError, "EMR Cluster name must be unique for safe discovery - found #{emr_clusters.size} with name #{name}" + end + end + + private + + def list_clusters(client, marker) + options = { + states: ["WAITING", "RUNNING"], + } + options[:marker] = marker unless marker.nil? + client.list_clusters(options) + end + + end + end +end diff --git a/lib/snowplow-emr-etl-runner/emr_job.rb b/lib/snowplow-emr-etl-runner/emr_job.rb index 468cf56..1e23196 100755 --- a/lib/snowplow-emr-etl-runner/emr_job.rb +++ b/lib/snowplow-emr-etl-runner/emr_job.rb @@ -55,10 +55,11 @@ class EmrJob include Monitoring::Logging include Snowplow::EmrEtlRunner::Utils include Snowplow::EmrEtlRunner::S3 + include Snowplow::EmrEtlRunner::EMR # Initializes our wrapper for the Amazon EMR client. - Contract Bool, Bool, Bool, Bool, Bool, Bool, Bool, Bool, ArchiveStep, ArchiveStep, ConfigHash, ArrayOf[String], String, TargetsHash, RdbLoaderSteps => EmrJob - def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive_raw, rdb_load, archive_enriched, archive_shredded, config, enrichments_array, resolver, targets, rdbloader_steps) + Contract Bool, Bool, Bool, Bool, Bool, Bool, Bool, Bool, ArchiveStep, ArchiveStep, ConfigHash, ArrayOf[String], String, TargetsHash, RdbLoaderSteps, Bool, String => EmrJob + def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive_raw, rdb_load, archive_enriched, archive_shredded, config, enrichments_array, resolver, targets, rdbloader_steps, use_persistent_jobflow, persistent_jobflow_duration) logger.debug "Initializing EMR jobflow" @@ -75,12 +76,12 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive config[:storage][:versions][:rdb_loader]) collector_format = config.dig(:collectors, :format) - run_tstamp = Time.new - run_id = run_tstamp.strftime("%Y-%m-%d-%H-%M-%S") + @run_tstamp = Time.new + run_id = @run_tstamp.strftime("%Y-%m-%d-%H-%M-%S") @run_id = run_id @rdb_loader_log_base = config[:aws][:s3][:buckets][:log] + "rdb-loader/#{@run_id}/" @rdb_loader_logs = [] # pairs of target name and associated log - etl_tstamp = (run_tstamp.to_f * 1000).to_i.to_s + etl_tstamp = (@run_tstamp.to_f * 1000).to_i.to_s output_codec = output_codec_from_compression_format(config.dig(:enrich, :output_compression)) encrypted = config[:aws][:s3][:buckets][:encrypted] @@ -98,7 +99,26 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive end # Create a job flow - @jobflow = Elasticity::JobFlow.new + @use_persistent_jobflow = use_persistent_jobflow + @persistent_jobflow_duration_s = parse_duration(persistent_jobflow_duration) + + found_persistent_jobflow = false + if use_persistent_jobflow + emr = Elasticity::EMR.new(:region => config[:aws][:emr][:region]) + emr_jobflow_id = get_emr_jobflow_id(emr, config[:aws][:emr][:jobflow][:job_name]) + + if emr_jobflow_id.nil? + @jobflow = Elasticity::JobFlow.new + else + @jobflow = Elasticity::JobFlow.from_jobflow_id(emr_jobflow_id, config[:aws][:emr][:region]) + found_persistent_jobflow = true + end + + @jobflow.action_on_failure = "CANCEL_AND_WAIT" + @jobflow.keep_job_flow_alive_when_no_steps = true + else + @jobflow = Elasticity::JobFlow.new + end # Configure @jobflow.name = config[:aws][:emr][:jobflow][:job_name] @@ -140,6 +160,14 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive csbe = config[:aws][:s3][:buckets][:enriched] csbs = config[:aws][:s3][:buckets][:shredded] + @pending_jobflow_steps = [] + + # Clear HDFS if persistent jobflow has been found + if found_persistent_jobflow + submit_jobflow_step(get_rmr_step([ENRICH_STEP_INPUT, ENRICH_STEP_OUTPUT, SHRED_STEP_OUTPUT], standard_assets_bucket, "Empty Snowplow HDFS"), use_persistent_jobflow) + submit_jobflow_step(get_hdfs_expunge_step, use_persistent_jobflow) + end + # staging if staging unless empty?(s3, csbr[:processing]) @@ -172,7 +200,7 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive staging_step.arguments = staging_step.arguments + [ '--s3ServerSideEncryption' ] end staging_step.name << ": Raw #{l} -> Raw Staging S3" - @jobflow.add_step(staging_step) + submit_jobflow_step(staging_step, use_persistent_jobflow) } end end @@ -198,7 +226,7 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive @jobflow.set_core_ebs_configuration(ebs_c) end - @jobflow.add_application("Hadoop") + @jobflow.add_application("Hadoop") unless found_persistent_jobflow if collector_format == 'thrift' if @legacy @@ -206,7 +234,7 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive Elasticity::HadoopBootstrapAction.new('-c', 'io.file.buffer.size=65536'), Elasticity::HadoopBootstrapAction.new('-m', 'mapreduce.user.classpath.first=true') ].each do |action| - @jobflow.add_bootstrap_action(action) + @jobflow.add_bootstrap_action(action) unless found_persistent_jobflow end else [{ @@ -221,7 +249,7 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive "mapreduce.user.classpath.first" => "true" } }].each do |config| - @jobflow.add_configuration(config) + @jobflow.add_configuration(config) unless found_persistent_jobflow end end end @@ -230,7 +258,7 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive bootstrap_actions = config[:aws][:emr][:bootstrap] unless bootstrap_actions.nil? bootstrap_actions.each do |bootstrap_action| - @jobflow.add_bootstrap_action(Elasticity::BootstrapAction.new(bootstrap_action)) + @jobflow.add_bootstrap_action(Elasticity::BootstrapAction.new(bootstrap_action)) unless found_persistent_jobflow end end @@ -243,32 +271,34 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive "#{standard_assets_bucket}common/emr/snowplow-ami5-bootstrap-0.1.0.sh" end cc_version = get_cc_version(config.dig(:enrich, :versions, :spark_enrich)) - @jobflow.add_bootstrap_action(Elasticity::BootstrapAction.new(bootstrap_script_location, cc_version)) + @jobflow.add_bootstrap_action(Elasticity::BootstrapAction.new(bootstrap_script_location, cc_version)) unless found_persistent_jobflow # Install and launch HBase hbase = config[:aws][:emr][:software][:hbase] unless not hbase install_hbase_action = Elasticity::BootstrapAction.new("s3://#{config[:aws][:emr][:region]}.elasticmapreduce/bootstrap-actions/setup-hbase") - @jobflow.add_bootstrap_action(install_hbase_action) + @jobflow.add_bootstrap_action(install_hbase_action) unless found_persistent_jobflow start_hbase_step = Elasticity::CustomJarStep.new("/home/hadoop/lib/hbase-#{hbase}.jar") start_hbase_step.name = "Start HBase #{hbase}" start_hbase_step.arguments = [ 'emr.hbase.backup.Main', '--start-master' ] - @jobflow.add_step(start_hbase_step) + + # NOTE: Presumes that HBase will remain available for a persistent cluster + submit_jobflow_step(start_hbase_step, use_persistent_jobflow) unless found_persistent_jobflow end # Install Lingual lingual = config[:aws][:emr][:software][:lingual] unless not lingual install_lingual_action = Elasticity::BootstrapAction.new("s3://files.concurrentinc.com/lingual/#{lingual}/lingual-client/install-lingual-client.sh") - @jobflow.add_bootstrap_action(install_lingual_action) + @jobflow.add_bootstrap_action(install_lingual_action) unless found_persistent_jobflow end # EMR configuration: Spark, YARN, etc configuration = config[:aws][:emr][:configuration] unless configuration.nil? configuration.each do |k, h| - @jobflow.add_configuration({"Classification" => k, "Properties" => h}) + @jobflow.add_configuration({"Classification" => k, "Properties" => h}) unless found_persistent_jobflow end end @@ -335,7 +365,7 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive compact_to_hdfs_step.arguments = compact_to_hdfs_step.arguments + [ '--s3ServerSideEncryption' ] end compact_to_hdfs_step.name << ": Raw S3 -> Raw HDFS" - @jobflow.add_step(compact_to_hdfs_step) + submit_jobflow_step(compact_to_hdfs_step, use_persistent_jobflow) # 2. Enrichment enrich_asset = if assets[:enrich].nil? @@ -348,7 +378,7 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive enrich_step = if is_spark_enrich(enrich_version) then - @jobflow.add_application("Spark") + @jobflow.add_application("Spark") unless found_persistent_jobflow build_spark_step( "Enrich Raw Events", enrich_asset, @@ -385,7 +415,7 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive unless empty?(s3, csbe[:good]) raise DirectoryNotEmptyError, "Cannot safely add enrichment step to jobflow, #{csbe[:good]} is not empty" end - @jobflow.add_step(enrich_step) + submit_jobflow_step(enrich_step, use_persistent_jobflow) # We need to copy our enriched events from HDFS back to S3 copy_to_s3_step = Elasticity::S3DistCpStep.new(legacy = @legacy) @@ -399,7 +429,7 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive copy_to_s3_step.arguments = copy_to_s3_step.arguments + [ '--s3ServerSideEncryption' ] end copy_to_s3_step.name << ": Enriched HDFS -> S3" - @jobflow.add_step(copy_to_s3_step) + submit_jobflow_step(copy_to_s3_step, use_persistent_jobflow) copy_success_file_step = Elasticity::S3DistCpStep.new(legacy = @legacy) copy_success_file_step.arguments = [ @@ -412,7 +442,7 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive copy_success_file_step.arguments = copy_success_file_step.arguments + [ '--s3ServerSideEncryption' ] end copy_success_file_step.name << ": Enriched HDFS _SUCCESS -> S3" - @jobflow.add_step(copy_success_file_step) + submit_jobflow_step(copy_success_file_step, use_persistent_jobflow) end # Staging data produced by Stream Enrich @@ -438,7 +468,7 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive staging_step.arguments = staging_step.arguments + [ '--s3ServerSideEncryption' ] end staging_step.name << ": Stream Enriched #{csbe[:stream]} -> Enriched Staging S3" - @jobflow.add_step(staging_step) + submit_jobflow_step(staging_step, use_persistent_jobflow) end if shred @@ -462,7 +492,7 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive # If we enriched, we free some space on HDFS by deleting the raw events # otherwise we need to copy the enriched events back to HDFS if enrich - @jobflow.add_step(get_rmr_step(ENRICH_STEP_INPUT, standard_assets_bucket)) + submit_jobflow_step(get_rmr_step([ENRICH_STEP_INPUT], standard_assets_bucket, "Empty Raw HDFS"), use_persistent_jobflow) else src_pattern = if stream_enrich_mode then STREAM_ENRICH_REGEXP else PARTFILE_REGEXP end @@ -478,12 +508,12 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive copy_to_hdfs_step.arguments = copy_to_hdfs_step.arguments + [ '--s3ServerSideEncryption' ] end copy_to_hdfs_step.name << ": Enriched S3 -> HDFS" - @jobflow.add_step(copy_to_hdfs_step) + submit_jobflow_step(copy_to_hdfs_step, use_persistent_jobflow) end shred_step = if is_rdb_shredder(config[:storage][:versions][:rdb_shredder]) then - @jobflow.add_application("Spark") + @jobflow.add_application("Spark") unless found_persistent_jobflow duplicate_storage_config = build_duplicate_storage_json(targets[:DUPLICATE_TRACKING], false) build_spark_step( "Shred Enriched Events", @@ -518,7 +548,7 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive unless empty?(s3, csbs[:good]) raise DirectoryNotEmptyError, "Cannot safely add shredding step to jobflow, #{csbs[:good]} is not empty" end - @jobflow.add_step(shred_step) + submit_jobflow_step(shred_step, use_persistent_jobflow) # We need to copy our shredded types from HDFS back to S3 copy_to_s3_step = Elasticity::S3DistCpStep.new(legacy = @legacy) @@ -532,7 +562,7 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive copy_to_s3_step.arguments = copy_to_s3_step.arguments + [ '--s3ServerSideEncryption' ] end copy_to_s3_step.name << ": Shredded HDFS -> S3" - @jobflow.add_step(copy_to_s3_step) + submit_jobflow_step(copy_to_s3_step, use_persistent_jobflow) copy_success_file_step = Elasticity::S3DistCpStep.new(legacy = @legacy) copy_success_file_step.arguments = [ @@ -545,12 +575,12 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive copy_success_file_step.arguments = copy_success_file_step.arguments + [ '--s3ServerSideEncryption' ] end copy_success_file_step.name << ": Shredded HDFS _SUCCESS -> S3" - @jobflow.add_step(copy_success_file_step) + submit_jobflow_step(copy_success_file_step, use_persistent_jobflow) end if es get_elasticsearch_steps(config, assets, enrich, shred, targets[:FAILED_EVENTS]).each do |step| - @jobflow.add_step(step) + submit_jobflow_step(step, use_persistent_jobflow) end end @@ -567,35 +597,35 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive archive_raw_step.arguments = archive_raw_step.arguments + [ '--s3ServerSideEncryption' ] end archive_raw_step.name << ": Raw Staging S3 -> Raw Archive S3" - @jobflow.add_step(archive_raw_step) + submit_jobflow_step(archive_raw_step, use_persistent_jobflow) end if rdb_load rdb_loader_version = Gem::Version.new(config[:storage][:versions][:rdb_loader]) skip_manifest = stream_enrich_mode && rdb_loader_version > RDB_LOADER_WITH_PROCESSING_MANIFEST get_rdb_loader_steps(config, targets[:ENRICHED_EVENTS], resolver, assets[:loader], rdbloader_steps, skip_manifest).each do |step| - @jobflow.add_step(step) + submit_jobflow_step(step, use_persistent_jobflow) end end if archive_enriched == 'pipeline' archive_enriched_step = get_archive_step(csbe[:good], csbe[:archive], run_id, s3_endpoint, ": Enriched S3 -> Enriched Archive S3", encrypted) - @jobflow.add_step(archive_enriched_step) + submit_jobflow_step(archive_enriched_step, use_persistent_jobflow) elsif archive_enriched == 'recover' latest_run_id = get_latest_run_id(s3, csbe[:good]) archive_enriched_step = get_archive_step(csbe[:good], csbe[:archive], latest_run_id, s3_endpoint, ': Enriched S3 -> S3 Enriched Archive', encrypted) - @jobflow.add_step(archive_enriched_step) + submit_jobflow_step(archive_enriched_step, use_persistent_jobflow) else # skip nil end if archive_shredded == 'pipeline' archive_shredded_step = get_archive_step(csbs[:good], csbs[:archive], run_id, s3_endpoint, ": Shredded S3 -> Shredded Archive S3", encrypted) - @jobflow.add_step(archive_shredded_step) + submit_jobflow_step(archive_shredded_step, use_persistent_jobflow) elsif archive_shredded == 'recover' latest_run_id = get_latest_run_id(s3, csbs[:good], 'atomic-events') archive_shredded_step = get_archive_step(csbs[:good], csbs[:archive], latest_run_id, s3_endpoint, ": Shredded S3 -> S3 Shredded Archive", encrypted) - @jobflow.add_step(archive_shredded_step) + submit_jobflow_step(archive_shredded_step, use_persistent_jobflow) else # skip nil end @@ -650,15 +680,22 @@ def run(config) snowplow_tracking_enabled = ! config[:monitoring][:snowplow].nil? - jobflow_id = @jobflow.run + @pending_jobflow_steps.each do |jobflow_step| + @jobflow.add_step(jobflow_step) + end + + jobflow_id = @jobflow.jobflow_id + if jobflow_id.nil? + jobflow_id = @jobflow.run + end logger.debug "EMR jobflow #{jobflow_id} started, waiting for jobflow to complete..." if snowplow_tracking_enabled Monitoring::Snowplow.parameterize(config) - Monitoring::Snowplow.instance.track_job_started(@jobflow) + Monitoring::Snowplow.instance.track_job_started(jobflow_id, @jobflow.cluster_status, cluster_step_status_for_run(@jobflow)) end - status = wait_for() + status = wait_for if status.successful or status.rdb_loader_failure or status.rdb_loader_cancellation log_level = if status.successful @@ -675,22 +712,29 @@ def run(config) if status.successful logger.debug "EMR jobflow #{jobflow_id} completed successfully." if snowplow_tracking_enabled - Monitoring::Snowplow.instance.track_job_succeeded(@jobflow) + Monitoring::Snowplow.instance.track_job_succeeded(jobflow_id, @jobflow.cluster_status, cluster_step_status_for_run(@jobflow)) end elsif status.bootstrap_failure if snowplow_tracking_enabled - Monitoring::Snowplow.instance.track_job_failed(@jobflow) + Monitoring::Snowplow.instance.track_job_failed(jobflow_id, @jobflow.cluster_status, cluster_step_status_for_run(@jobflow)) end raise BootstrapFailureError, get_failure_details(jobflow_id) else if snowplow_tracking_enabled - Monitoring::Snowplow.instance.track_job_failed(@jobflow) + Monitoring::Snowplow.instance.track_job_failed(jobflow_id, @jobflow.cluster_status, cluster_step_status_for_run(@jobflow)) end raise EmrExecutionError, get_failure_details(jobflow_id) end + if @use_persistent_jobflow and + @persistent_jobflow_duration_s > 0 and + @jobflow.cluster_status.created_at + @persistent_jobflow_duration_s < @run_tstamp + logger.debug "EMR jobflow has expired and will be shutdown." + @jobflow.shutdown + end + nil end @@ -746,6 +790,18 @@ def output_rdb_loader_logs(region, aws_access_key_id, aws_secret_key, log_level) private + # Adds a step to the jobflow according to whether or not + # we are using a persistent cluster. + # + # Parameters: + # +jobflow_step+:: the step to add + # +use_persistent_jobflow+:: whether a persistent jobflow should be used + def submit_jobflow_step(jobflow_step, use_persistent_jobflow = false) + if use_persistent_jobflow + jobflow_step.action_on_failure = "CANCEL_AND_WAIT" + end + @pending_jobflow_steps << jobflow_step + end # Build an Elasticity RDB Loader step. # @@ -913,7 +969,7 @@ def build_spark_step(step_name, jar, main_class, folders, extra_step_args={}) # Returns true if the jobflow completed without error, # false otherwise. Contract None => JobResult - def wait_for() + def wait_for success = false bootstrap_failure = false @@ -924,20 +980,22 @@ def wait_for() while true do begin # Count up running tasks and failures - statuses = @jobflow.cluster_step_status.map(&:state).inject([0, 0]) do |sum, state| + statuses = cluster_step_status_for_run(@jobflow).map(&:state).inject([0, 0]) do |sum, state| [ sum[0] + (@@running_states.include?(state) ? 1 : 0), sum[1] + (@@failed_states.include?(state) ? 1 : 0) ] end # If no step is still running, then quit if statuses[0] == 0 + cluster_step_status = cluster_step_status_for_run(@jobflow) + success = statuses[1] == 0 # True if no failures - bootstrap_failure = EmrJob.bootstrap_failure?(@jobflow) - rdb_loader_failure = EmrJob.rdb_loader_failure?(@jobflow.cluster_step_status) - rdb_loader_cancellation = EmrJob.rdb_loader_cancellation?(@jobflow.cluster_step_status) + bootstrap_failure = EmrJob.bootstrap_failure?(@jobflow, cluster_step_status) + rdb_loader_failure = EmrJob.rdb_loader_failure?(cluster_step_status) + rdb_loader_cancellation = EmrJob.rdb_loader_cancellation?(cluster_step_status) break else # Sleep a while before we check again - sleep(120) + sleep(30) end rescue SocketError => se @@ -984,7 +1042,7 @@ def wait_for() Contract String => String def get_failure_details(jobflow_id) - cluster_step_status = @jobflow.cluster_step_status + cluster_step_status = cluster_step_status_for_run(@jobflow) cluster_status = @jobflow.cluster_status [ @@ -1071,6 +1129,17 @@ def deep_copy(o) Marshal.load(Marshal.dump(o)) end + # Ensures we only look at the steps submitted in this run + # and not within prior persistent runs + # + # Parameters: + # +jobflow+:: The jobflow to extract steps from + def cluster_step_status_for_run(jobflow) + jobflow.cluster_step_status + .select { |a| a.created_at >= @run_tstamp } + .sort_by { |a| a.created_at } + end + # Returns true if the jobflow failed at a rdb loader step Contract ArrayOf[Elasticity::ClusterStepStatus] => Bool def self.rdb_loader_failure?(cluster_step_statuses) @@ -1086,18 +1155,25 @@ def self.rdb_loader_cancellation?(cluster_step_statuses) end # Returns true if the jobflow seems to have failed due to a bootstrap failure - Contract Elasticity::JobFlow => Bool - def self.bootstrap_failure?(jobflow) + Contract Elasticity::JobFlow, ArrayOf[Elasticity::ClusterStepStatus] => Bool + def self.bootstrap_failure?(jobflow, cluster_step_statuses) bootstrap_failure_indicator = /BOOTSTRAP_FAILURE|bootstrap action|Master instance startup failed/ - jobflow.cluster_step_status.all? {|s| s.state == 'CANCELLED'} && + cluster_step_statuses.all? { |s| s.state == 'CANCELLED' } && (!(jobflow.cluster_status.last_state_change_reason =~ bootstrap_failure_indicator).nil?) end - Contract String, String => Elasticity::CustomJarStep - def get_rmr_step(location, bucket) + Contract ArrayOf[String], String, String => Elasticity::CustomJarStep + def get_rmr_step(locations, bucket, description) step = Elasticity::CustomJarStep.new("s3://#{@jobflow.region}.elasticmapreduce/libs/script-runner/script-runner.jar") - step.arguments = ["#{bucket}common/emr/snowplow-hadoop-fs-rmr-0.1.0.sh", location] - step.name << ": Empty Raw HDFS" + step.arguments = ["#{bucket}common/emr/snowplow-hadoop-fs-rmr-0.2.0.sh"] + locations + step.name << ": #{description}" + step + end + + def get_hdfs_expunge_step + step = Elasticity::CustomJarStep.new("command-runner.jar") + step.arguments = %W(hdfs dfs -expunge) + step.name << ": Empty HDFS trash" step end diff --git a/lib/snowplow-emr-etl-runner/errors.rb b/lib/snowplow-emr-etl-runner/errors.rb index 920758d..63e8da0 100644 --- a/lib/snowplow-emr-etl-runner/errors.rb +++ b/lib/snowplow-emr-etl-runner/errors.rb @@ -29,6 +29,14 @@ class ConfigError < Error class EmrExecutionError < Error end + # Problem when discovering Amazon EMR JobFlow (e.g. cluster is RUNNING rather than WAITING) + class EmrClusterStateError < Error + end + + # Problem when discovering Amazon EMR JobFlow (e.g. multiple job-flows discovered with the same name) + class EmrDiscoveryError < Error + end + # A bootstrap failure indicates the job can be safely retried class BootstrapFailureError < EmrExecutionError end diff --git a/lib/snowplow-emr-etl-runner/monitoring/snowplow.rb b/lib/snowplow-emr-etl-runner/monitoring/snowplow.rb index d7f52c6..766137c 100644 --- a/lib/snowplow-emr-etl-runner/monitoring/snowplow.rb +++ b/lib/snowplow-emr-etl-runner/monitoring/snowplow.rb @@ -101,26 +101,25 @@ def to_jsonschema_compatible_timestamp(time) end # Context for the entire job - Contract Elasticity::JobFlow => SnowplowTracker::SelfDescribingJson - def get_job_context(jobflow) - status = jobflow.cluster_status + Contract String, Elasticity::ClusterStatus => SnowplowTracker::SelfDescribingJson + def get_job_context(jobflow_id, jobflow_status) SnowplowTracker::SelfDescribingJson.new( JOB_STATUS_SCHEMA, { - :name => status.name, - :jobflow_id => jobflow.jobflow_id, - :state => status.state, - :created_at => to_jsonschema_compatible_timestamp(status.created_at), - :ended_at => to_jsonschema_compatible_timestamp(status.ended_at), - :last_state_change_reason => status.last_state_change_reason + :name => jobflow_status.name, + :jobflow_id => jobflow_id, + :state => jobflow_status.state, + :created_at => to_jsonschema_compatible_timestamp(jobflow_status.created_at), + :ended_at => to_jsonschema_compatible_timestamp(jobflow_status.ended_at), + :last_state_change_reason => jobflow_status.last_state_change_reason } ) end # One context per job step - Contract Elasticity::JobFlow => ArrayOf[SnowplowTracker::SelfDescribingJson] - def get_job_step_contexts(jobflow) - jobflow.cluster_step_status.map { |step| + Contract ArrayOf[Elasticity::ClusterStepStatus] => ArrayOf[SnowplowTracker::SelfDescribingJson] + def get_job_step_contexts(jobflow_steps) + jobflow_steps.map { |step| SnowplowTracker::SelfDescribingJson.new( STEP_STATUS_SCHEMA, { @@ -135,38 +134,38 @@ def get_job_step_contexts(jobflow) end # Track a job started event - Contract Elasticity::JobFlow => SnowplowTracker::Tracker - def track_job_started(jobflow) + Contract String, Elasticity::ClusterStatus, ArrayOf[Elasticity::ClusterStepStatus] => SnowplowTracker::Tracker + def track_job_started(jobflow_id, jobflow_status, jobflow_steps) @tracker.track_unstruct_event( SnowplowTracker::SelfDescribingJson.new( JOB_STARTED_SCHEMA, {} ), - [@@app_context, get_job_context(jobflow)] + get_job_step_contexts(jobflow) + [@@app_context, get_job_context(jobflow_id, jobflow_status)] + get_job_step_contexts(jobflow_steps) ) end # Track a job succeeded event - Contract Elasticity::JobFlow => SnowplowTracker::Tracker - def track_job_succeeded(jobflow) + Contract String, Elasticity::ClusterStatus, ArrayOf[Elasticity::ClusterStepStatus] => SnowplowTracker::Tracker + def track_job_succeeded(jobflow_id, jobflow_status, jobflow_steps) @tracker.track_unstruct_event( SnowplowTracker::SelfDescribingJson.new( JOB_SUCCEEDED_SCHEMA, {} ), - [@@app_context, get_job_context(jobflow)] + get_job_step_contexts(jobflow) + [@@app_context, get_job_context(jobflow_id, jobflow_status)] + get_job_step_contexts(jobflow_steps) ) end # Track a job failed event - Contract Elasticity::JobFlow => SnowplowTracker::Tracker - def track_job_failed(jobflow) + Contract String, Elasticity::ClusterStatus, ArrayOf[Elasticity::ClusterStepStatus] => SnowplowTracker::Tracker + def track_job_failed(jobflow_id, jobflow_status, jobflow_steps) @tracker.track_unstruct_event( SnowplowTracker::SelfDescribingJson.new( JOB_FAILED_SCHEMA, {} ), - [@@app_context, get_job_context(jobflow)] + get_job_step_contexts(jobflow) + [@@app_context, get_job_context(jobflow_id, jobflow_status)] + get_job_step_contexts(jobflow_steps) ) end diff --git a/lib/snowplow-emr-etl-runner/runner.rb b/lib/snowplow-emr-etl-runner/runner.rb index 612f867..5b7edee 100755 --- a/lib/snowplow-emr-etl-runner/runner.rb +++ b/lib/snowplow-emr-etl-runner/runner.rb @@ -134,7 +134,7 @@ def run tries_left -= 1 job = EmrJob.new(@args[:debug], steps[:staging], steps[:enrich], steps[:staging_stream_enrich], steps[:shred], steps[:es], steps[:archive_raw], steps[:rdb_load], archive_enriched, archive_shredded, @config, - @enrichments_array, @resolver_config, @targets, rdbloader_steps) + @enrichments_array, @resolver_config, @targets, rdbloader_steps, @args[:use_persistent_jobflow], @args[:persistent_jobflow_duration]) job.run(@config) break rescue BootstrapFailureError => bfe @@ -147,7 +147,7 @@ def run else raise end - rescue DirectoryNotEmptyError, NoDataToProcessError => e + rescue DirectoryNotEmptyError, NoDataToProcessError, EmrClusterStateError => e # unlock on no-op if not lock.nil? lock.unlock diff --git a/lib/snowplow-emr-etl-runner/utils.rb b/lib/snowplow-emr-etl-runner/utils.rb index 25b23c9..b81dfa6 100644 --- a/lib/snowplow-emr-etl-runner/utils.rb +++ b/lib/snowplow-emr-etl-runner/utils.rb @@ -206,6 +206,24 @@ def glob_path(path) path.end_with?('/*') ? path : "#{path}/*" end + DURATION_TOKENS = { + "m" => (60), + "h" => (60 * 60), + "d" => (60 * 60 * 24), + "w" => (60 * 60 * 24 * 7) + } + + # Converts a duration string into a seconds integer + # e.g. 5h 10m is converted into 18600 + Contract String => Integer + def parse_duration(input) + time = 0 + input.scan(/(\d+)(\w)/).each do |amount, measure| + time += amount.to_i * DURATION_TOKENS[measure] + end + time + end + end end end diff --git a/spec/snowplow-emr-etl-runner/utils_spec.rb b/spec/snowplow-emr-etl-runner/utils_spec.rb index 260c63e..cca041e 100644 --- a/spec/snowplow-emr-etl-runner/utils_spec.rb +++ b/spec/snowplow-emr-etl-runner/utils_spec.rb @@ -185,4 +185,10 @@ expect(subject.glob_path('p/*')).to eq('p/*') end end + + describe '#parse_duration' do + it 'should successfully convert weeks, days, hours and minutes into seconds correctly' do + expect(subject.parse_duration("1w 5d 3h 13m")).to eq(1048380) + end + end end