Skip to content

Commit

Permalink
Merge branch 'release/r104-stoplesteinan'
Browse files Browse the repository at this point in the history
  • Loading branch information
BenFradet committed Apr 30, 2018
2 parents 877c7a2 + 26b9c19 commit d0f7117
Show file tree
Hide file tree
Showing 11 changed files with 226 additions and 73 deletions.
2 changes: 1 addition & 1 deletion 3-enrich/emr-etl-runner/lib/snowplow-emr-etl-runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@
module Snowplow
module EmrEtlRunner
NAME = "snowplow-emr-etl-runner"
VERSION = "0.31.0"
VERSION = "0.32.0"
end
end
13 changes: 12 additions & 1 deletion 3-enrich/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,20 @@ def self.validate_and_coalesce(args, config)
raise ConfigError, 'resume-from and skip are mutually exclusive'
end

if args[:resume_from] == "staging_stream_enrich" && config[:aws][:s3][:buckets][:enriched][:stream].nil?
if args[:resume_from] == "staging_stream_enrich" && config.dig(:aws, :s3, :buckets, :enriched, :stream).nil?
raise ConfigError, 'staging_stream_enrich is invalid step to resume from without aws.s3.buckets.enriched.stream settings'
end
unless config.dig(:aws, :s3, :buckets, :enriched, :stream).nil?
if args[:resume_from] == "enrich"
raise ConfigError, 'cannot resume from enrich in stream enrich mode'
end
if args[:skip].include?('staging') || args[:skip].include?('enrich')
raise ConfigError, 'cannot skip staging nor enrich in stream enrich mode. Either skip staging_stream_enrich or resume from shred'
end
if args[:skip].include?('archive_raw') || args[:resume_from] == "archive_raw"
raise ConfigError, 'cannot skip nor resume from archive_raw in stream enrich mode'
end
end

args[:include].each { |opt|
unless INCLUDES.include?(opt)
Expand Down
15 changes: 15 additions & 0 deletions 3-enrich/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb
Original file line number Diff line number Diff line change
Expand Up @@ -172,5 +172,20 @@ module EmrEtlRunner
:include => ArrayOf[String]
})

# Record of all possible steps that can be launched during EMR job
EmrSteps = ({
:staging => Bool,
:enrich => Bool,
:staging_stream_enrich => Bool,
:shred => Bool,
:es => Bool,
:archive_raw => Bool,
:rdb_load => Bool,
:consistency_check => Bool,
:load_manifest_check => Bool,
:analyze => Bool,
:archive_enriched => Bool,
:archive_shredded => Bool
})
end
end
29 changes: 17 additions & 12 deletions 3-enrich/emr-etl-runner/lib/snowplow-emr-etl-runner/emr_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class EmrJob
# Constants
JAVA_PACKAGE = "com.snowplowanalytics.snowplow"
PARTFILE_REGEXP = ".*part-.*"
STREAM_ENRICH_REGEXP = ".*\.gz"
SUCCESS_REGEXP = ".*_SUCCESS"
STANDARD_HOSTED_ASSETS = "s3://snowplow-hosted-assets"
ENRICH_STEP_INPUT = 'hdfs:///local/snowplow/raw-events/'
Expand Down Expand Up @@ -284,10 +285,15 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive
@jobflow.set_task_instance_group(instance_group)
end

stream_enrich_mode = !csbe[:stream].nil?

# Get full path when we need to move data to enrich_final_output
# otherwise (when enriched/good is non-empty already)
# we can list files withing folders using '*.'-regexps
enrich_final_output = if enrich || staging_stream_enrich
partition_by_run(csbe[:good], run_id)
else
csbe[:good] # Doesn't make sense to partition if enrich has already been done
csbe[:good]
end

if enrich
Expand Down Expand Up @@ -393,17 +399,14 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive

# Staging data produced by Stream Enrich
if staging_stream_enrich
enrich_final_output_loc = Sluice::Storage::S3::Location.new(enrich_final_output)

src_pattern = '.+'

unless Sluice::Storage::S3::is_empty?(s3, enrich_final_output_loc)
raise DirectoryNotEmptyError, "Cannot safely add stream staging step to jobflow, #{enrich_final_output_loc} is not empty"
csbe_good_loc = Sluice::Storage::S3::Location.new(csbe[:good])
unless Sluice::Storage::S3::is_empty?(s3, csbe_good_loc)
raise DirectoryNotEmptyError, "Cannot safely add stream staging step to jobflow, #{csbe_good_loc} is not empty"
end

stream_enrich_loc = Sluice::Storage::S3::Location.new(csbe[:stream])

src_pattern_regex = Regexp.new src_pattern
src_pattern_regex = Regexp.new STREAM_ENRICH_REGEXP
files = Sluice::Storage::S3::list_files(s3, stream_enrich_loc).select { |f| !(f.key =~ src_pattern_regex).nil? }
if files.empty?
raise NoDataToProcessError, "No Snowplow enriched stream logs to process since last run"
Expand All @@ -414,7 +417,7 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive
"--src" , csbe[:stream],
"--dest" , enrich_final_output,
"--s3Endpoint" , s3_endpoint,
"--srcPattern" , src_pattern,
"--srcPattern" , STREAM_ENRICH_REGEXP,
"--deleteOnSuccess"
]
staging_step.name << ": Stream Enriched #{csbe[:stream]} -> Enriched Staging S3"
Expand Down Expand Up @@ -444,15 +447,17 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive
if enrich
@jobflow.add_step(get_rmr_step(ENRICH_STEP_INPUT, standard_assets_bucket))
else
src_pattern = unless staging_stream_enrich then PARTFILE_REGEXP else '.+' end
src_pattern = if stream_enrich_mode then STREAM_ENRICH_REGEXP else PARTFILE_REGEXP end

copy_to_hdfs_step = Elasticity::S3DistCpStep.new(legacy = @legacy)
copy_to_hdfs_step.arguments = [
"--src" , enrich_final_output, # Opposite way round to normal
"--dest" , ENRICH_STEP_OUTPUT,
"--srcPattern" , src_pattern,
"--outputCodec", "none",
"--s3Endpoint" , s3_endpoint
] # Either user doesn't want compression, or files are already compressed
]

copy_to_hdfs_step.name << ": Enriched S3 -> HDFS"
@jobflow.add_step(copy_to_hdfs_step)
end
Expand Down Expand Up @@ -540,7 +545,7 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive

if rdb_load
rdb_loader_version = Gem::Version.new(config[:storage][:versions][:rdb_loader])
skip_manifest = staging_stream_enrich && rdb_loader_version > RDB_LOADER_WITH_PROCESSING_MANIFEST
skip_manifest = stream_enrich_mode && rdb_loader_version > RDB_LOADER_WITH_PROCESSING_MANIFEST
get_rdb_loader_steps(config, targets[:ENRICHED_EVENTS], resolver, assets[:loader], rdbloader_steps, skip_manifest).each do |step|
@jobflow.add_step(step)
end
Expand Down
107 changes: 55 additions & 52 deletions 3-enrich/emr-etl-runner/lib/snowplow-emr-etl-runner/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,10 @@ class Runner

include Monitoring::Logging

# Initialize the class.
Contract ArgsHash, ConfigHash, ArrayOf[String], String, ArrayOf[JsonFileHash] => Runner
def initialize(args, config, enrichments_array, resolver, targets_array)

# Let's set our logging level immediately
Monitoring::Logging::set_level config[:monitoring][:logging][:level]

@args = args
@config = config
@enrichments_array = enrichments_array
@resolver_config = resolver
resolver_config_json = JSON.parse(@resolver_config, {:symbolize_names => true})
@resolver = Iglu::Resolver.parse(resolver_config_json)
@targets = group_targets(validate_targets(targets_array))
self
end

# Our core flow
Contract None => nil
def run

resume = @args[:resume_from]
skips = @args[:skip]
enriched_stream = @config[:aws][:s3][:buckets][:enriched][:stream]
steps = {
# Decide what steps should be submitted to EMR job
Contract ArrayOf[String], Maybe[String], Maybe[String] => EmrSteps
def self.get_steps(skips, resume, enriched_stream)
{
:staging => (enriched_stream.nil? and resume.nil? and not skips.include?('staging')),
:enrich => (enriched_stream.nil? and (resume.nil? or resume == 'enrich') and not skips.include?('enrich')),
:staging_stream_enrich => ((not enriched_stream.nil? and resume.nil?) and not skips.include?('staging_stream_enrich')),
Expand All @@ -75,6 +54,56 @@ def run
not skips.include?('archive_enriched')),
:archive_shredded => (not skips.include?('archive_shredded'))
}
end


Contract HashOf[Symbol, Bool], ArrayOf[String] => RdbLoaderSteps
def self.get_rdbloader_steps(steps, inclusions)
s = {
:skip => [],
:include => []
}

if not steps[:analyze]
s[:skip] << "analyze"
end

if not steps[:consistency_check]
s[:skip] << "consistency_check"
end

if not steps[:load_manifest_check]
s[:skip] << "load_manifest_check"
end

if inclusions.include?("vacuum")
s[:include] << "vacuum"
end

s
end

# Initialize the class.
Contract ArgsHash, ConfigHash, ArrayOf[String], String, ArrayOf[JsonFileHash] => Runner
def initialize(args, config, enrichments_array, resolver, targets_array)

# Let's set our logging level immediately
Monitoring::Logging::set_level config[:monitoring][:logging][:level]

@args = args
@config = config
@enrichments_array = enrichments_array
@resolver_config = resolver
resolver_config_json = JSON.parse(@resolver_config, {:symbolize_names => true})
@resolver = Iglu::Resolver.parse(resolver_config_json)
@targets = group_targets(validate_targets(targets_array))
self
end

# Our core flow
Contract None => nil
def run
steps = Runner.get_steps(@args[:skip], @args[:resume_from], @config[:aws][:s3][:buckets][:enriched][:stream])

archive_enriched = if not steps[:archive_enriched]
'skip'
Expand All @@ -99,7 +128,7 @@ def run

# Keep relaunching the job until it succeeds or fails for a reason other than a bootstrap failure
tries_left = @config[:aws][:emr][:bootstrap_failure_tries]
rdbloader_steps = get_rdbloader_steps(steps, @args[:include])
rdbloader_steps = Runner.get_rdbloader_steps(steps, @args[:include])
while true
begin
tries_left -= 1
Expand Down Expand Up @@ -158,32 +187,6 @@ def self.add_trailing_slashes(bucketData)
end
end

Contract HashOf[Symbol, Bool], ArrayOf[String] => RdbLoaderSteps
def get_rdbloader_steps(steps, inclusions)
s = {
:skip => [],
:include => []
}

if not steps[:analyze]
s[:skip] << "analyze"
end

if not steps[:consistency_check]
s[:skip] << "consistency_check"
end

if not steps[:load_manifest_check]
s[:skip] << "load_manifest_check"
end

if inclusions.include?("vacuum")
s[:include] << "vacuum"
end

s
end

# Validate array of self-describing JSONs
Contract ArrayOf[JsonFileHash] => ArrayOf[Iglu::SelfDescribingJson]
def validate_targets(targets)
Expand Down
28 changes: 24 additions & 4 deletions 3-enrich/emr-etl-runner/spec/snowplow-emr-etl-runner/cli_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ def resource(name)
it 'raises a ConfigError if two storage targets with identical ids are passed' do
expect {
Cli.load_targets(resource('invalid-targets'))
}.to raise_exception( ConfigError, "Duplicate storage target ids: [\"id1\"]" )
}.to raise_exception(ConfigError, "Duplicate storage target ids: [\"id1\"]" )
end
end

describe '#load_config' do
it 'raises a ConfigError if the config file argument was nil' do
expect {
Cli.load_config(nil, "<<usage message>>")
}.to raise_exception( ConfigError, "Missing option: config\n<<usage message>>" )
}.to raise_exception(ConfigError, "Missing option: config\n<<usage message>>" )
end

it 'raises a ConfigError if the config file argument could not be found' do
Expand Down Expand Up @@ -239,9 +239,29 @@ def resource(name)
Cli.validate_and_coalesce(a, c)
end

it 'should accept stream-mode config' do
it 'should accept stream enrich mode config' do
Cli.validate_and_coalesce(a, s)
end
end

it 'should reject --skip staging in stream enrich mode' do
a[:skip] = [ 'staging' ]
expect {
Cli.validate_and_coalesce(a, s)
}.to raise_exception(ConfigError, "cannot skip staging nor enrich in stream enrich mode. Either skip staging_stream_enrich or resume from shred")
end

it 'should reject --resume-from enrich in stream enrich mode' do
a[:resume_from] = 'enrich'
expect {
Cli.validate_and_coalesce(a, s)
}.to raise_exception(ConfigError, "cannot resume from enrich in stream enrich mode")
end

it 'should reject --skip archive_raw in stream enrich mode' do
a[:skip] = [ 'archive_raw' ]
expect {
Cli.validate_and_coalesce(a, s)
}.to raise_exception(ConfigError, "cannot skip nor resume from archive_raw in stream enrich mode")
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ aws:
enriched:
good: eg # e.g. s3://my-out-bucket/enriched/good
archive: ea # Where to archive enriched events to, e.g. s3://my-out-bucket/enriched/archive
stream: es # Where to archive enriched events to, e.g. s3://my-out-bucket/enriched/archive
shredded:
good: sg # e.g. s3://my-out-bucket/shredded/good
bad: sb # e.g. s3://my-out-bucket/shredded/bad
Expand Down
Loading

0 comments on commit d0f7117

Please sign in to comment.