Skip to content

Commit

Permalink
Completed adding support for handling the boundary issue (COPY the da…
Browse files Browse the repository at this point in the history
…y-after's files to Processing, then DELETE them after a successful run)
  • Loading branch information
alexanderdean committed Oct 18, 2012
1 parent efb00f0 commit 72ab49c
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 52 deletions.
4 changes: 3 additions & 1 deletion 3-etl/emr-etl-runner/lib/emr_jobs.rb
Expand Up @@ -121,8 +121,10 @@ def run_etl()
status = wait_for(jobflow_id)

if !status
raise ExecutionError, "Hive jobflow #{jobflow_id} failed, check Amazon logs for details. Data files not archived."
raise ExecutionError, "EMR jobflow #{jobflow_id} failed, check Amazon logs for details. Data files not archived."
end

puts "EMR jobflow #{jobflow_id} completed successfully."
end

end
178 changes: 127 additions & 51 deletions 3-etl/emr-etl-runner/lib/s3_tasks.rb
Expand Up @@ -22,6 +22,8 @@ module S3Tasks

class DirectoryNotEmptyError < StandardError; end

class UnsupportedFileOperationError < StandardError; end

# Class to describe an S3 location
class S3Location
attr_reader :bucket, :dir, :s3location
Expand Down Expand Up @@ -71,11 +73,15 @@ def stage_logs_for_emr(config)

# Move the files we need to move (within the date span)
files_to_move = files_between(config[:start], config[:end])
move_files(s3, in_location, processing_location, target_files)
move_files(s3, in_location, processing_location, files_to_move)

# To deal with boundary issues, we also copy over the available files for the day after our date range
files_to_copy = files_for_day_after(config[:end])
puts ">>>>>>>>> TODO: need to copy files that match %s" % files_to_copy
copy_files(s3, in_location, processing_location, files_to_copy)

# Wait for s3 to eventually become consistant
puts "Waiting a minute to allow S3 to settle (eventual consistency)"
sleep(60)

end
module_function :stage_logs_for_emr
Expand All @@ -96,21 +102,21 @@ def archive_logs(config)

add_date_path = lambda { |filepath|
if m = filepath.match('[^/]+\.(\d\d\d\d-\d\d-\d\d)-\d\d\.[^/]+\.gz$')
filename = m[0]
date = m[1]
return date + '/' + filename
filename = m[0]
date = m[1]
return date + '/' + filename
else
return filename
end
}

# Move the files we need to move (within the date span)
files_to_move = files_between(config[:start], config[:end])
move_files(s3, processing_location, archive_location, files_to_move, add_date_path);
move_files(s3, processing_location, archive_location, files_to_move, add_date_path)

# Delete the copies of the files from the day after our date range
files_to_copy = files_for_day_after(config[:end])
puts ">>>>>>>>> TODO: need to copy files that match %s" % files_to_copy
files_to_delete = files_for_day_after(config[:end])
delete_files(s3, processing_location, files_to_delete)

end
module_function :archive_logs
Expand All @@ -122,9 +128,9 @@ def archive_logs(config)
def files_for_day_after(end_date)

day_after = Date.parse(end_date) + 1
'(' + day_after + ')[^/]+\.gz$'
'(' + day_after.strftime('%Y-%m-%d') + ')[^/]+\.gz$'
end
module_function :files_between
module_function :files_for_day_after

# Find files within the given date range
# (inclusive).
Expand Down Expand Up @@ -158,19 +164,81 @@ def new_s3_from(config)
end
module_function :new_s3_from

# Moves files between s3 locations concurrently
# Delete files from S3 locations concurrently
#
# Parameters:
# +s3+:: A Fog::Storage s3 connection
# +from+:: S3Location to delete files from
# +match_regex+:: a regex string to match the files to delete
def delete_files(s3, from_location, match_regex='.+')

puts " deleting files from #{from_location}"
process_files(:delete, s3, from_location, match_regex)
end
module_function :delete_files

# Copies files between S3 locations concurrently
#
# Parameters:
# +s3+:: A Fog::Storage s3 connection
# +from+:: S3Location to copy files from
# +to+:: S3Location to copy files to
# +match_regex+:: a regex string to match the files to copy
# +alter_filename_lambda+:: lambda to alter the written filename
def copy_files(s3, from_location, to_location, match_regex='.+', alter_filename_lambda=false)

puts " copying files from #{from_location} to #{to_location}"
process_files(:copy, s3, from_location, match_regex, to_location, alter_filename_lambda)
end
module_function :copy_files

# Moves files between S3 locations concurrently
#
# Parameters:
# +s3+:: A Fog::Storage s3 connection
# +from+:: S3Location to move files from
# +to+:: S3Location to move files to
# +match_regex+:: a regex string to match the files to copy
# +match_regex+:: a regex string to match the files to move
# +alter_filename_lambda+:: lambda to alter the written filename
def move_files(s3, from_location, to_location, match_regex='.+', alter_filename_lambda=false)

puts " moving files from #{from_location} to #{to_location}"
process_files(:move, s3, from_location, match_regex, to_location, alter_filename_lambda)
end
module_function :move_files

# Concurrent file operations between S3 locations. Supports:
# - Copy
# - Delete
# - Move (= Copy + Delete)
#
# Parameters:
# +operation+:: Operation to perform. :copy, :delete, :move supported
# +s3+:: A Fog::Storage s3 connection
# +from+:: S3Location to process files from
# +match_regex+:: a regex string to match the files to process
# +to+:: S3Location to process files to
# +alter_filename_lambda+:: lambda to alter the written filename
def process_files(operation, s3, from_location, match_regex='.+', to_location=nil, alter_filename_lambda=false)

# Validate that the file operation makes sense
case operation
when :copy, :move
if to_location.nil?
raise UnsupportedFileOperationError "File operation %s requires a to_location to be set" % operation
end
when :delete
unless to_location.nil?
raise UnsupportedFileOperationError "File operation %s does not support the to_location argument" % operation
end
if alter_filename_lambda.class == Proc
raise UnsupportedFileOperationError "File operation %s does not support the alter_filename_lambda argument" % operation
end
else
raise UnsupportedFileOperationError "File operation %s is unsupported. Try :copy, :delete or :move" % operation
end

files_to_move = []
files_to_process = []
threads = []
mutex = Mutex.new
complete = false
Expand All @@ -183,7 +251,7 @@ def move_files(s3, from_location, to_location, match_regex='.+', alter_filename_
# create ruby threads to concurrently execute s3 operations
for i in (0...10)

# each thread pops a file off the files_to_move array, and moves it.
# each thread pops a file off the files_to_process array, and moves it.
# We loop until there are no more files
threads << Thread.new do
loop do
Expand All @@ -195,26 +263,26 @@ def move_files(s3, from_location, to_location, match_regex='.+', alter_filename_
mutex.synchronize do

while !complete && !match do
if files_to_move.size == 0
if files_to_process.size == 0
# s3 batches 1000 files per request
# we load up our array with the files to move
#puts "-- loading more results"
files_to_move = s3.directories.get(from_location.bucket, :prefix => from_location.dir).files.all(marker_opts)
#puts "-- got #{files_to_move.size} results"
files_to_process = s3.directories.get(from_location.bucket, :prefix => from_location.dir).files.all(marker_opts)
#puts "-- got #{files_to_process.size} results"
# if we don't have any files after the s3 request, we're complete
if files_to_move.size == 0
if files_to_process.size == 0
complete = true
next
else
marker_opts['marker'] = files_to_move.last.key
marker_opts['marker'] = files_to_process.last.key

# By reversing the array we can use pop and get FIFO behaviour
# instead of the performance penalty incurred by unshift
files_to_move = files_to_move.reverse
files_to_process = files_to_process.reverse
end
end

file = files_to_move.pop
file = files_to_process.pop
match = file.key.match(match_regex)
end
end
Expand All @@ -231,32 +299,44 @@ def move_files(s3, from_location, to_location, match_regex='.+', alter_filename_
filename = file_match[1]
end

puts " #{from_location.bucket}/#{file.key} -> #{to_location.bucket}/#{to_location.dir_as_path}#{filename}"

# copy file
i = 0
begin
file.copy(to_location.bucket, to_location.dir_as_path + filename)
puts " +-> #{to_location.bucket}/#{to_location.dir_as_path}#{filename}"
rescue
raise unless i < s3_retries
puts "Problem copying #{file.key}. Retrying.", $!, $@
sleep(10) # give us a bit of time before retrying
i += 1
retry
# What are we doing?
case operation
when :move
puts " MOVE #{from_location.bucket}/#{file.key} -> #{to_location.bucket}/#{to_location.dir_as_path}#{filename}"
when :copy
puts " COPY #{from_location.bucket}/#{file.key} +-> #{to_location.bucket}/#{to_location.dir_as_path}#{filename}"
when :delete
puts " DELETE x #{from_location.bucket}/#{file.key}"
end

# A move or copy starts with a copy file
if [:move, :copy].include? operation
i = 0
begin
file.copy(to_location.bucket, to_location.dir_as_path + filename)
puts " +-> #{to_location.bucket}/#{to_location.dir_as_path}#{filename}"
rescue
raise unless i < s3_retries
puts "Problem copying #{file.key}. Retrying.", $!, $@
sleep(10) # give us a bit of time before retrying
i += 1
retry
end
end

# delete file
i = 0
begin
file.destroy()
puts " x #{from_location.bucket}/#{file.key}"
rescue
raise unless i < s3_retries
puts "Problem destroying #{file.key}. Retrying.", $!, $@
sleep(10) # give us a bit of time before retrying
i += 1
retry
# A move or delete ends with a delete
if [:move, :delete].include? operation
i = 0
begin
file.destroy()
puts " x #{from_location.bucket}/#{file.key}"
rescue
raise unless i < s3_retries
puts "Problem destroying #{file.key}. Retrying.", $!, $@
sleep(10) # give us a bit of time before retrying
i += 1
retry
end
end
end
end
Expand All @@ -265,11 +345,7 @@ def move_files(s3, from_location, to_location, match_regex='.+', alter_filename_
# wait for threads to finish
threads.each { |aThread| aThread.join }

# wait for s3 to eventually become consistant
puts "Waiting a minute to allow S3 to settle (eventual consistency)"
sleep(60)

end
module_function :move_files
module_function :process_files

end
end

10 comments on commit 72ab49c

@mtibben
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @alexanderdean ... I was initially confused by this, but I see what you're doing here, I see now it's necessary if you're trying to capture an exact, specific date range.

My use-case is that I'm using it as a rolling ETL. The scripts are run daily, and so I have just been assuming that the overlap will be captured on the next day's run. I modified the etl.q script to load the existing partitions, and for the INSERT, I removed the date range conditions, so that records from a previous day could be inserted.

So I'm trying to decide whether I should add an option to allow the rolling approach, or use your approach here, to process exactly one day.

So a some queries:

  • Should we be able to limit the overlap files to the first hour only, instead of copying across all the files? i.e. just copy the *-yyyy-mm-dd-00-* files ?
  • Will you now also need to capture files from the previous day - could there be an overlap on the other side also?

@alexanderdean
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @mtibben - yep exactly, I added that boundary code in after @yalisassoon's points (#46 (comment)) in the other thread. I've tried to explain the thinking in the new wiki page for the EmrEtlRunner here:

https://github.com/snowplow/snowplow/wiki/Deploying-the-EMR-ETL-Runner#wiki-usage-warnings

Your rolling approach makes sense as well - I guess the difference is that you only capture the last few events from Monday evening on the Wednesday morning run, not the Tuesday morning run. (But you do less file copy as a result.)

I don't know enough about Hive partioning to quite understand the implications of the HiveQL changes you made for the rolling approach - could you share your current HiveQL script in a gist so that @yalisassoon could take a look?

On your two questions:

  1. Should we be able to limit the overlap files to the first hour only, instead of copying across all the files? i.e. just copy the -yyyy-mm-> dd-00- files ?

Yes - I think we could limit it to say the first 2 hours (2 rather than 1 to be safe as I can't find anything official from Amazon about the upper limit on how long a CloudFront log takes to arrive). Even if this only saves 2 hours of files to copy (if people run it at 4am), then I think it's a worthwhile addition - great idea.

  1. Will you now also need to capture files from the previous day - could there be an overlap on the other side also?

No, I can't think of a situation where a file with a Wednesday timestamp has a Thursday event in it. So I think we're safe.

Anyway, let me know your thoughts Mike, and would be great to have a look at your updated HiveQL script. Either way, we're getting close to having all the code needed to support either the exact-one-day or a rolling approach - I think the rolling approach becomes quite interesting once you start to reduce batch size from 24 hours to maybe every 6 or 1 hours (which I know some people e.g. @ramn will want to do eventually)...

@yalisassoon
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mtibben,

I'm intrigued by your rolling approach. The reason we didn't implement a rolling approach was that we populate each Hive partition using an INSERT OVERWRITE statement. When using this statement, any data already in that Hive partition is blown away, so we need to be sure when we execute the statement that all the raw data we want to transfer into that table for that time period is available to the query when it is executed - then the partition is populated - then we don't load any more data into it.

In your rolling approach: if you've run the job on a Monday you'll primarily process data for Sunday. On Tuesday when you rerun the job, you'll process Monday's data, but also potentially some additional rows for Sunday. How do you make sure that that handful of rows does not overwrite all the rows you wrote when you ran the job on Monday? If you wouldn't mind sharing your HiveQL, I'd love to see how you worked around this issue, as a rolling approach is nicer than ours...

@mtibben
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @yalisassoon ,

Hive 0.8+ supports INSERT INTO which appends instead of overwrites. My hql liooks like this

SET hive.exec.dynamic.partition=true ;
SET hive.exec.dynamic.partition.mode=nonstrict ;

ADD JAR ${SERDE_FILE} ;

CREATE EXTERNAL TABLE `extracted_logs`
ROW FORMAT SERDE 'com.snowplowanalytics.snowplow.hadoop.hive.SnowPlowEventDeserializer'
LOCATION '${CLOUDFRONT_LOGS}' ;

CREATE EXTERNAL TABLE IF NOT EXISTS `events` (
tm string,
txn_id string,
user_id string,
...
)
PARTITIONED BY (dt STRING)
LOCATION '${EVENTS_TABLE}' ;

ALTER TABLE events RECOVER PARTITIONS;

INSERT INTO TABLE `events`
PARTITION (dt)
SELECT
tm,
txn_id,
user_id,
...
dt
FROM `extracted_logs` ;

@yalisassoon
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mtibben! The fact that Hive now supports INSERT INTO TABLE had completely passed me by :-) I'll have a think now about whether we should update our approach to utilise INSERT INTO TABLE command (i.e. adopt your rolling approach), or stick to adding in exactly one day's worth of data at a time, now I understand your approach...

@alexanderdean
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @mtibben - just to let you know that we decided to go with your rolling approach in the end. Many thanks for suggesting it and bearing with us as we tried to understand it :-)

In the end your rolling approach seemed to be more flexible, but also more error-proof than our approach. So we've updated the codebase to support rolling mode, including:

  1. Tidying up the jobflow configuration
  2. Changing the command-line --start and --end parameters so that they work properly (only processing files within those dates)
  3. Adding in the new rolling HiveQL file
  4. Updating the documentation on the wiki: https://github.com/snowplow/snowplow/wiki/Deploying-EmrEtlRunner

Anyway, thanks again - we're doing our final tests and then will cut a release...

@mtibben
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, no problems at all, I'm glad I can contribute back in some way :)

Something else I'm working on - I've been playing with using a column table format (RCFILE) in hive, partitioning by date and bucketing by user. I'm hoping that will improve performance of the table. I'll let you know how I go

@alexanderdean
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again - that sounds promising about the RCFILE format! CCing @yalisassoon as I know he'll be interested too...

@yalisassoon
Copy link
Member

@yalisassoon yalisassoon commented on 72ab49c Oct 25, 2012 via email

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mtibben
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome. Qubole sounds very interesting. To be honest I've found using Hive on EMR a pretty frustrating experience at times, so I would be interested in what they have to offer.

Please sign in to comment.