Skip to content

Commit

Permalink
Add support for running steps on a persistent EMR cluster (closes sno…
Browse files Browse the repository at this point in the history
  • Loading branch information
jbeemster authored and peel committed May 25, 2020
1 parent 7afb864 commit 3b718ed
Show file tree
Hide file tree
Showing 12 changed files with 296 additions and 80 deletions.
3 changes: 3 additions & 0 deletions bin/snowplow-emr-etl-runner
Expand Up @@ -93,6 +93,9 @@ rescue runner::LinterError => e
rescue runner::LockHeldError => e
puts "#{e.message}"
exit 17
rescue runner::EmrClusterStateError => e
puts "#{e.message}"
exit 18
# Special retval so rest of pipeline knows not to continue
rescue runner::NoDataToProcessError => e
puts "No logs to process: #{e.message}"
Expand Down
28 changes: 28 additions & 0 deletions bin/snowplow-hadoop-fs-rmr-0.2.0.sh
@@ -0,0 +1,28 @@
#!/bin/sh

# Copyright (c) 2012-2018 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0,
# and you may not use this file except in compliance with the Apache License Version 2.0.
# You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the Apache License Version 2.0 is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.

# Author:: Joshua Beemster (mailto:support@snowplowanalytics.com)
# Copyright:: Copyright (c) 2012-2018 Snowplow Analytics Ltd
# License:: Apache License Version 2.0

# Recursively removes a list of directories on HDFS
for i in "${@}"
do
hadoop fs -test -d ${i}
if [ $? == 0 ]; then
echo "Removing directory ${i} ..."
hadoop fs -rm -r -skipTrash ${i}
else
echo "Directory ${i} does not exist"
fi
done
1 change: 1 addition & 0 deletions lib/snowplow-emr-etl-runner.rb
Expand Up @@ -22,6 +22,7 @@
require_relative 'snowplow-emr-etl-runner/cli'
require_relative 'snowplow-emr-etl-runner/job_result'
require_relative 'snowplow-emr-etl-runner/s3'
require_relative 'snowplow-emr-etl-runner/emr'
require_relative 'snowplow-emr-etl-runner/emr_job'
require_relative 'snowplow-emr-etl-runner/lock/lock'
require_relative 'snowplow-emr-etl-runner/lock/file_lock'
Expand Down
8 changes: 7 additions & 1 deletion lib/snowplow-emr-etl-runner/cli.rb
Expand Up @@ -58,6 +58,8 @@ def self.get_args_config_enrichments_resolver
:skip => [],
:include => [],
:ignore_lock_on_start => false,
:use_persistent_jobflow => false,
:persistent_jobflow_duration => "0m",
}

commands = {
Expand All @@ -75,6 +77,8 @@ def self.get_args_config_enrichments_resolver
opts.on('-l', '--lock PATH', 'where to store the lock') { |config| options[:lock] = config }
opts.on('--ignore-lock-on-start', 'ignore the lock if it is set when starting') { |config| options[:ignore_lock_on_start] = true }
opts.on('--consul ADDRESS', 'address to the Consul server') { |config| options[:consul] = config }
opts.on('--use-persistent-jobflow', 'discovers and uses a persistent cluster for steps') { |config| options[:use_persistent_jobflow] = true }
opts.on('--persistent-jobflow-duration DURATION', 'the length of time a persistent cluster can run for before being terminated (e.g. 1d12h)') { |config| options[:persistent_jobflow_duration] = config }
end,
'generate emr-config' => OptionParser.new do |opts|
opts.banner = 'Usage: generate emr-config [options]'
Expand Down Expand Up @@ -186,7 +190,9 @@ def self.process_options(options, optparse, cmd_name)
:include => options[:include],
:lock => options[:lock],
:ignore_lock_on_start => options[:ignore_lock_on_start],
:consul => options[:consul]
:consul => options[:consul],
:use_persistent_jobflow => options[:use_persistent_jobflow],
:persistent_jobflow_duration => options[:persistent_jobflow_duration],
}

summary = optparse.to_s
Expand Down
4 changes: 3 additions & 1 deletion lib/snowplow-emr-etl-runner/contracts.rb
Expand Up @@ -43,7 +43,9 @@ module EmrEtlRunner
:include => Maybe[ArrayOf[String]],
:lock => Maybe[String],
:consul => Maybe[String],
:ignore_lock_on_start => Maybe[Bool]
:ignore_lock_on_start => Maybe[Bool],
:use_persistent_jobflow => Maybe[Bool],
:persistent_jobflow_duration => Maybe[String]
})

# The Hash containing the buckets field from the configuration YAML
Expand Down
69 changes: 69 additions & 0 deletions lib/snowplow-emr-etl-runner/emr.rb
@@ -0,0 +1,69 @@
# Copyright (c) 2012-2018 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0,
# and you may not use this file except in compliance with the Apache License Version 2.0.
# You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the Apache License Version 2.0 is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.

# Author:: Joshua Beemster (mailto:support@snowplowanalytics.com)
# Copyright:: Copyright (c) 2012-2018 Snowplow Analytics Ltd
# License:: Apache License Version 2.0

require 'contracts'
require 'pathname'
require 'uri'

module Snowplow
module EmrEtlRunner
module EMR

include Contracts

# Attempts to find an active EMR JobFlow with a given name
#
# Parameters:
# +client+:: EMR client
# +name+:: EMR cluster name
def get_emr_jobflow_id(client, name)
# Marker is used for paginating through all results
marker = nil
emr_clusters = []

loop do
response = list_clusters(client, marker)
emr_clusters = emr_clusters + response['Clusters'].select { |c| c['Name'] == name }
marker = response['Marker'] if response.has_key?('Marker')
break if marker.nil?
end

case emr_clusters.size
when 0
return nil
when 1
emr_cluster = emr_clusters.first
if emr_cluster['Status']['State'] == "RUNNING"
raise EmrClusterStateError, "EMR Cluster must be in WAITING state before new job steps can be submitted - found #{emr_cluster['Status']['State']}"
end
return emr_cluster['Id']
else
raise EmrDiscoveryError, "EMR Cluster name must be unique for safe discovery - found #{emr_clusters.size} with name #{name}"
end
end

private

def list_clusters(client, marker)
options = {
states: ["WAITING", "RUNNING"],
}
options[:marker] = marker unless marker.nil?
client.list_clusters(options)
end

end
end
end

0 comments on commit 3b718ed

Please sign in to comment.