Skip to content

Commit

Permalink
Fix srcPattern for copying stream enriched data to HDFS (close snowpl…
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy authored and Piotr Limanowski committed May 25, 2020
1 parent ad94317 commit 600d72a
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions lib/snowplow-emr-etl-runner/emr_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class EmrJob
# Constants
JAVA_PACKAGE = "com.snowplowanalytics.snowplow"
PARTFILE_REGEXP = ".*part-.*"
STREAM_ENRICH_REGEXP = ".*\.gz"
SUCCESS_REGEXP = ".*_SUCCESS"
STANDARD_HOSTED_ASSETS = "s3://snowplow-hosted-assets"
ENRICH_STEP_INPUT = 'hdfs:///local/snowplow/raw-events/'
Expand Down Expand Up @@ -284,10 +285,15 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive
@jobflow.set_task_instance_group(instance_group)
end

stream_enrich_mode = !csbe[:stream].nil?

# Get full path when we need to move data to enrich_final_output
# otherwise (when enriched/good is non-empty already)
# we can list files withing folders using '*.'-regexps
enrich_final_output = if enrich || staging_stream_enrich
partition_by_run(csbe[:good], run_id)
else
csbe[:good] # Doesn't make sense to partition if enrich has already been done
csbe[:good]
end

if enrich
Expand Down Expand Up @@ -393,16 +399,14 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive

# Staging data produced by Stream Enrich
if staging_stream_enrich
src_pattern = '.+'

csbe_good_loc = Sluice::Storage::S3::Location.new(csbe[:good])
unless Sluice::Storage::S3::is_empty?(s3, csbe_good_loc)
raise DirectoryNotEmptyError, "Cannot safely add stream staging step to jobflow, #{csbe_good_loc} is not empty"
end

stream_enrich_loc = Sluice::Storage::S3::Location.new(csbe[:stream])

src_pattern_regex = Regexp.new src_pattern
src_pattern_regex = Regexp.new STREAM_ENRICH_REGEXP
files = Sluice::Storage::S3::list_files(s3, stream_enrich_loc).select { |f| !(f.key =~ src_pattern_regex).nil? }
if files.empty?
raise NoDataToProcessError, "No Snowplow enriched stream logs to process since last run"
Expand All @@ -413,7 +417,7 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive
"--src" , csbe[:stream],
"--dest" , enrich_final_output,
"--s3Endpoint" , s3_endpoint,
"--srcPattern" , src_pattern,
"--srcPattern" , STREAM_ENRICH_REGEXP,
"--deleteOnSuccess"
]
staging_step.name << ": Stream Enriched #{csbe[:stream]} -> Enriched Staging S3"
Expand Down Expand Up @@ -443,7 +447,7 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive
if enrich
@jobflow.add_step(get_rmr_step(ENRICH_STEP_INPUT, standard_assets_bucket))
else
src_pattern = unless staging_stream_enrich then PARTFILE_REGEXP else '.+' end
src_pattern = if stream_enrich_mode then STREAM_ENRICH_REGEXP else PARTFILE_REGEXP end

copy_to_hdfs_step = Elasticity::S3DistCpStep.new(legacy = @legacy)
copy_to_hdfs_step.arguments = [
Expand Down Expand Up @@ -539,7 +543,7 @@ def initialize(debug, staging, enrich, staging_stream_enrich, shred, es, archive

if rdb_load
rdb_loader_version = Gem::Version.new(config[:storage][:versions][:rdb_loader])
skip_manifest = staging_stream_enrich && rdb_loader_version > RDB_LOADER_WITH_PROCESSING_MANIFEST
skip_manifest = stream_enrich_mode && rdb_loader_version > RDB_LOADER_WITH_PROCESSING_MANIFEST
get_rdb_loader_steps(config, targets[:ENRICHED_EVENTS], resolver, assets[:loader], rdbloader_steps, skip_manifest).each do |step|
@jobflow.add_step(step)
end
Expand Down

0 comments on commit 600d72a

Please sign in to comment.