Skip to content

Commit

Permalink
Limit requests to the EMR API (close snowplow/snowplow#4056)
Browse files Browse the repository at this point in the history
  • Loading branch information
BenFradet authored and Piotr Limanowski committed May 26, 2020
1 parent 81a780a commit 041b7c9
Showing 1 changed file with 43 additions and 44 deletions.
87 changes: 43 additions & 44 deletions lib/snowplow-emr-etl-runner/emr_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -747,15 +747,7 @@ def run(config)

if snowplow_tracking_enabled
Monitoring::Snowplow.parameterize(config)
cluster_status =
begin
retries ||= 0
@jobflow.cluster_status
rescue Elasticity::ThrottlingException, RestClient::RequestTimeout, RestClient::InternalServerError, RestClient::ServiceUnavailable, RestClient::SSLCertificateNotVerified
retries += 1
sleep(2 ** retries + 30)
retry if retries < 3
Monitoring::Snowplow.instance.track_job_started(jobflow_id, cluster_status, cluster_step_status_for_run(@jobflow))
Monitoring::Snowplow.instance.track_job_started(jobflow_id, cluster_status(@jobflow), cluster_step_status_for_run(@jobflow))
end

status = wait_for
Expand All @@ -772,28 +764,31 @@ def run(config)
config[:aws][:secret_access_key], log_level)
end

cluster_status = cluster_status(@jobflow)
cluster_step_status_for_run = cluster_step_status_for_run(@jobflow)

if status.successful
logger.debug "EMR jobflow #{jobflow_id} completed successfully."
if snowplow_tracking_enabled
Monitoring::Snowplow.instance.track_job_succeeded(jobflow_id, @jobflow.cluster_status, cluster_step_status_for_run(@jobflow))
Monitoring::Snowplow.instance.track_job_succeeded(jobflow_id, cluster_status, cluster_step_status_for_run)
end

elsif status.bootstrap_failure
if snowplow_tracking_enabled
Monitoring::Snowplow.instance.track_job_failed(jobflow_id, @jobflow.cluster_status, cluster_step_status_for_run(@jobflow))
Monitoring::Snowplow.instance.track_job_failed(jobflow_id, cluster_status, cluster_step_status_for_run)
end
raise BootstrapFailureError, get_failure_details(jobflow_id)
raise BootstrapFailureError, get_failure_details(jobflow_id, cluster_status, cluster_step_status_for_run)

else
if snowplow_tracking_enabled
Monitoring::Snowplow.instance.track_job_failed(jobflow_id, @jobflow.cluster_status, cluster_step_status_for_run(@jobflow))
Monitoring::Snowplow.instance.track_job_failed(jobflow_id, cluster_status, cluster_step_status_for_run)
end
raise EmrExecutionError, get_failure_details(jobflow_id)
raise EmrExecutionError, get_failure_details(jobflow_id, cluster_status, cluster_step_status_for_run)
end

if @use_persistent_jobflow and
@persistent_jobflow_duration_s > 0 and
@jobflow.cluster_status.created_at + @persistent_jobflow_duration_s < @run_tstamp
cluster_status.created_at + @persistent_jobflow_duration_s < @run_tstamp
logger.debug "EMR jobflow has expired and will be shutdown."
begin
retries ||= 0
Expand Down Expand Up @@ -1049,23 +1044,28 @@ def wait_for
# Loop until we can quit...
while true do
begin
# Count up running tasks and failures
statuses = cluster_step_status_for_run(@jobflow).map(&:state).inject([0, 0]) do |sum, state|
[ sum[0] + (@@running_states.include?(state) ? 1 : 0), sum[1] + (@@failed_states.include?(state) ? 1 : 0) ]
end

# If no step is still running, then quit
if statuses[0] == 0
cluster_step_status = cluster_step_status_for_run(@jobflow)
cluster_step_status_for_run = cluster_step_status_for_run(@jobflow)

success = statuses[1] == 0 # True if no failures
bootstrap_failure = EmrJob.bootstrap_failure?(@jobflow, cluster_step_status)
rdb_loader_failure = EmrJob.rdb_loader_failure?(cluster_step_status)
rdb_loader_cancellation = EmrJob.rdb_loader_cancellation?(cluster_step_status)
break
if cluster_step_status_for_run.nil?
logger.warn "Could not retrieve cluster status, waiting 5 minutes before checking jobflow again"
sleep(300)
else
# Sleep a while before we check again
sleep(30)
# Count up running tasks and failures
statuses = cluster_step_status_for_run.map(&:state).inject([0, 0]) do |sum, state|
[ sum[0] + (@@running_states.include?(state) ? 1 : 0), sum[1] + (@@failed_states.include?(state) ? 1 : 0) ]
end

# If no step is still running, then quit
if statuses[0] == 0
success = statuses[1] == 0 # True if no failures
bootstrap_failure = EmrJob.bootstrap_failure?(@jobflow, cluster_step_status_for_run)
rdb_loader_failure = EmrJob.rdb_loader_failure?(cluster_step_status_for_run)
rdb_loader_cancellation = EmrJob.rdb_loader_cancellation?(cluster_step_status_for_run)
break
else
# Sleep a while before we check again
sleep(60)
end
end

rescue SocketError => se
Expand Down Expand Up @@ -1110,23 +1110,11 @@ def wait_for
# Prettified string containing failure details
# for this job flow.
Contract String => String
def get_failure_details(jobflow_id)

cluster_step_status = cluster_step_status_for_run(@jobflow)
cluster_status =
begin
retries ||= 0
@jobflow.cluster_status
rescue Elasticity::ThrottlingException, RestClient::RequestTimeout, RestClient::InternalServerError, RestClient::ServiceUnavailable, RestClient::SSLCertificateNotVerified
retries += 1
sleep(2 ** retries + 30)
retry if retries < 3
end

def get_failure_details(jobflow_id, cluster_status, cluster_step_status_for_run)
[
"EMR jobflow #{jobflow_id} failed, check Amazon EMR console and Hadoop logs for details (help: https://github.com/snowplow/snowplow/wiki/Troubleshooting-jobs-on-Elastic-MapReduce). Data files not archived.",
"#{@jobflow.name}: #{cluster_status.state} [#{cluster_status.last_state_change_reason}] ~ #{self.class.get_elapsed_time(cluster_status.ready_at, cluster_status.ended_at)} #{self.class.get_timespan(cluster_status.ready_at, cluster_status.ended_at)}"
].concat(cluster_step_status
].concat(cluster_step_status_for_run
.sort { |a,b|
self.class.nilable_spaceship(a.started_at, b.started_at)
}
Expand Down Expand Up @@ -1225,6 +1213,17 @@ def cluster_step_status_for_run(jobflow)
end
end

def cluster_status(jobflow)
begin
retries ||= 0
jobflow.cluster_status
rescue Elasticity::ThrottlingException, RestClient::RequestTimeout, RestClient::InternalServerError, RestClient::ServiceUnavailable, RestClient::SSLCertificateNotVerified
retries += 1
sleep(2 ** retries + 30)
retry if retries < 3
end
end

# Returns true if the jobflow failed at a rdb loader step
Contract ArrayOf[Elasticity::ClusterStepStatus] => Bool
def self.rdb_loader_failure?(cluster_step_statuses)
Expand Down

0 comments on commit 041b7c9

Please sign in to comment.