Skip to content

Commit

Permalink
Update Contracts for get_failure_details (closes snowplow/snowplow#4088)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben authored and peel committed May 25, 2020
1 parent 6b0f91e commit 3527213
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 54 deletions.
56 changes: 2 additions & 54 deletions lib/snowplow-emr-etl-runner/emr_job.rb
Expand Up @@ -1107,34 +1107,6 @@ def wait_for
JobResult.new(success, bootstrap_failure, rdb_loader_failure, rdb_loader_cancellation)
end

# Prettified string containing failure details
# for this job flow.
Contract String => String
def get_failure_details(jobflow_id, cluster_status, cluster_step_status_for_run)
[
"EMR jobflow #{jobflow_id} failed, check Amazon EMR console and Hadoop logs for details (help: https://github.com/snowplow/snowplow/wiki/Troubleshooting-jobs-on-Elastic-MapReduce). Data files not archived.",
"#{@jobflow.name}: #{cluster_status.state} [#{cluster_status.last_state_change_reason}] ~ #{self.class.get_elapsed_time(cluster_status.ready_at, cluster_status.ended_at)} #{self.class.get_timespan(cluster_status.ready_at, cluster_status.ended_at)}"
].concat(cluster_step_status_for_run
.sort { |a,b|
self.class.nilable_spaceship(a.started_at, b.started_at)
}
.each_with_index
.map { |s,i|
" - #{i + 1}. #{s.name}: #{s.state} ~ #{self.class.get_elapsed_time(s.started_at, s.ended_at)} #{self.class.get_timespan(s.started_at, s.ended_at)}"
})
.join("\n")
end

# Gets the time span.
#
# Parameters:
# +start+:: start time
# +_end+:: end time
Contract Maybe[Time], Maybe[Time] => String
def self.get_timespan(start, _end)
"[#{start} - #{_end}]"
end

# Spaceship operator supporting nils
#
# Parameters:
Expand All @@ -1154,32 +1126,6 @@ def self.nilable_spaceship(a, b)
end
end

# Gets the elapsed time in a
# human-readable format.
#
# Parameters:
# +start+:: start time
# +_end+:: end time
Contract Maybe[Time], Maybe[Time] => String
def self.get_elapsed_time(start, _end)
if start.nil? or _end.nil?
"elapsed time n/a"
else
# Adapted from http://stackoverflow.com/a/19596579/255627
seconds_diff = (start - _end).to_i.abs

hours = seconds_diff / 3600
seconds_diff -= hours * 3600

minutes = seconds_diff / 60
seconds_diff -= minutes * 60

seconds = seconds_diff

"#{hours.to_s.rjust(2, '0')}:#{minutes.to_s.rjust(2, '0')}:#{seconds.to_s.rjust(2, '0')}"
end
end

# Recursively change the keys of a YAML from symbols to strings
def recursive_stringify_keys(h)
if h.class == [].class
Expand All @@ -1200,6 +1146,7 @@ def deep_copy(o)
#
# Parameters:
# +jobflow+:: The jobflow to extract steps from
Contract Elasticity::JobFlow => ArrayOf[Elasticity::ClusterStepStatus]
def cluster_step_status_for_run(jobflow)
begin
retries ||= 0
Expand All @@ -1213,6 +1160,7 @@ def cluster_step_status_for_run(jobflow)
end
end

Contract Elasticity::JobFlow => Elasticity::ClusterStatus
def cluster_status(jobflow)
begin
retries ||= 0
Expand Down
54 changes: 54 additions & 0 deletions lib/snowplow-emr-etl-runner/utils.rb
Expand Up @@ -14,6 +14,7 @@
# License:: Apache License Version 2.0

require 'contracts'
require 'elasticity'

# Module with diverse utilities dealing with a few quirks in EmrEtlRunner
module Snowplow
Expand Down Expand Up @@ -224,6 +225,59 @@ def parse_duration(input)
time
end

# Prettified string containing failure details
# for this job flow.
Contract String, Elasticity::ClusterStatus, ArrayOf[Elasticity::ClusterStepStatus] => String
def get_failure_details(jobflow_id, cluster_status, cluster_step_status_for_run)
[
"EMR jobflow #{jobflow_id} failed, check Amazon EMR console and Hadoop logs for details (help: https://github.com/snowplow/snowplow/wiki/Troubleshooting-jobs-on-Elastic-MapReduce). Data files not archived.",
"#{jobflow_id}: #{cluster_status.state} [#{cluster_status.last_state_change_reason}] ~ #{get_elapsed_time(cluster_status.ready_at, cluster_status.ended_at)} #{get_timespan(cluster_status.ready_at, cluster_status.ended_at)}"
].concat(cluster_step_status_for_run
.sort { |a,b|
self.class.nilable_spaceship(a.started_at, b.started_at)
}
.each_with_index
.map { |s,i|
" - #{i + 1}. #{s.name}: #{s.state} ~ #{get_elapsed_time(s.started_at, s.ended_at)} #{get_timespan(s.started_at, s.ended_at)}"
})
.join("\n")
end

# Gets the elapsed time in a
# human-readable format.
#
# Parameters:
# +start+:: start time
# +_end+:: end time
Contract Maybe[Time], Maybe[Time] => String
def get_elapsed_time(start, _end)
if start.nil? or _end.nil?
"elapsed time n/a"
else
# Adapted from http://stackoverflow.com/a/19596579/255627
seconds_diff = (start - _end).to_i.abs

hours = seconds_diff / 3600
seconds_diff -= hours * 3600

minutes = seconds_diff / 60
seconds_diff -= minutes * 60

seconds = seconds_diff

"#{hours.to_s.rjust(2, '0')}:#{minutes.to_s.rjust(2, '0')}:#{seconds.to_s.rjust(2, '0')}"
end
end

# Gets the time span.
#
# Parameters:
# +start+:: start time
# +_end+:: end time
Contract Maybe[Time], Maybe[Time] => String
def get_timespan(start, _end)
"[#{start} - #{_end}]"
end
end
end
end
112 changes: 112 additions & 0 deletions spec/snowplow-emr-etl-runner/utils_spec.rb
Expand Up @@ -191,4 +191,116 @@
expect(subject.parse_duration("1w 5d 3h 13m")).to eq(1048380)
end
end

describe '#get_failure_details' do
let(:jobflow_id) { "JOB ID" }
let(:cluster_state) { 'TERMINATED' }
let(:timeline) do
<<-JSON
{
"CreationDateTime": 1436788464.415,
"EndDateTime": 1436791032.097,
"ReadyDateTime": 1436788842.195
}
JSON
end
let(:aws_cluster_status) do
<<-JSON
{
"Cluster": {
"Applications": [
{
"Name": "hadoop",
"Version": "1.0.3"
}
],
"AutoTerminate": true,
"Configurations": [
],
"Ec2InstanceAttributes": {
"Ec2AvailabilityZone": "us-east-1a",
"EmrManagedMasterSecurityGroup": "sg-b7de0adf",
"EmrManagedSlaveSecurityGroup": "sg-89de0ae1"
},
"Id": "j-3T0PHNUXCY7SX",
"MasterPublicDnsName": "ec2-54-81-173-103.compute-1.amazonaws.com",
"Name": "Elasticity Job Flow",
"NormalizedInstanceHours": 2,
"RequestedAmiVersion": "latest",
"RunningAmiVersion": "2.4.2",
"Status": {
"State": "#{cluster_state}",
"StateChangeReason": {
"Code": "ALL_STEPS_COMPLETED",
"Message": "Steps completed"
},
"Timeline": #{timeline}
},
"Tags": [
{
"Key": "key",
"Value": "value"
}
],
"TerminationProtected": false,
"VisibleToAllUsers": false
}
}
JSON
end

let (:cluster_status) { Elasticity::ClusterStatus.from_aws_data(JSON.parse(aws_cluster_status)) }

let(:aws_cluster_steps) do
<<-JSON
{
"Steps": [
{
"ActionOnFailure": "TERMINATE_CLUSTER",
"Config": {
"Args": [
"36",
"3",
"0"
],
"Jar": "s3n://elasticmapreduce/samples/cloudburst/cloudburst.jar",
"MainClass" : "MAIN_CLASS",
"Properties": {
"Key1" : "Value1",
"Key2" : "Value2"
}
},
"Id": "s-OYPPAC4XPPUC",
"Name": "Elasticity Custom Jar Step",
"Status": {
"State": "COMPLETED",
"StateChangeReason": {
"Code": "ALL_STEPS_COMPLETED",
"Message": "Steps completed"
},
"Timeline": #{timeline}
}
}
]
}
JSON
end

let(:cluster_step_statuses) { Elasticity::ClusterStepStatus.from_aws_list_data(JSON.parse(aws_cluster_steps)) }

let(:expected_output) {
[
"EMR jobflow JOB ID failed, check Amazon EMR console and Hadoop logs for details (help: https://github.com/snowplow/snowplow/wiki/Troubleshooting-jobs-on-Elastic-MapReduce). Data files not archived.",
"JOB ID: TERMINATED [ALL_STEPS_COMPLETED] ~ 00:36:29 #{subject.get_timespan(cluster_status.ready_at, cluster_status.ended_at)}",
" - 1. Elasticity Custom Jar Step: COMPLETED ~ elapsed time n/a [ - #{cluster_status.ended_at}]"
]
.join("\n")
}

it { should respond_to(:get_failure_details).with(3).argument }

it 'should create a string containing a summary of the failure' do
expect(subject.get_failure_details(jobflow_id, cluster_status, cluster_step_statuses)).to eq(expected_output)
end
end
end

0 comments on commit 3527213

Please sign in to comment.