Permalink
Browse files

Implemented ability to have multiple --output directories for `wu-syn…

…c prepare`.
  • Loading branch information...
1 parent cb62c80 commit 03a8ee0d7201965b14034d669a263e6432e762e6 Dhruv Bansal committed Sep 23, 2013
View
@@ -498,7 +498,7 @@ processed.
**Note:** The files created in the `--output` directory are
[hardlinks](http://en.wikipedia.org/wiki/Hard_link) pointing at the
original files in the `--input` directory.
-
+
#### Splitting input files
The `--split` option will make `wu-sync prepare` split large files in
@@ -723,6 +723,19 @@ Both `path` and `meta_path` are relative to the `--output` directory.
The `--metadata` option can also be combined with the `--split`
option.
+#### Multiple Output Directories
+
+Using multiple `--output` directories mounted on different devices can
+greatly speed up operation as large files are read & written. When
+using multiple `--output` directories, each consecutively processed
+file in the `--input` directory will be assigned one of the output
+directories in a round-robin fashion:
+
+```
+$ wu-sync prepare --input=/data/ftp --output=/data/clean_1,/data/clean_2
+$ wu-sync prepare --input=/data/ftp --output=/data/clean_1,/data/clean_2
+```
+
### To S3
The `s3` sync will sync data from a local `--input` directory to an
@@ -2,32 +2,40 @@ module Wukong
module Load
# Syncs a "live" input directory, with possibly growing files,
- # with an output directory.
+ # with an output directory or directories. In the case of
+ # multiple output directories, useful to get higher throughput,
+ # consecutive files will be placed round-robin at the same
+ # relative paths into each output directory: *merging* the output
+ # directories downstream should yield the same result as having
+ # just used a single output directory.
#
- # Files in the output directory will only appear when files in the
- # input directory stop growing.
+ # Files in the output directories will only appear when files in
+ # the input directory stop growing.
#
- # By default, files in the output directory will be hardlinks at
+ # By default, files in the output directories will be hardlinks at
# the same relative paths as their corresponding (complete) files
# in the input directory.
#
# A second mode of operation (activated using the `split` option
# in #initialize) will split all (complete) files in the input
# directory into equal-sized, smaller files in the output
- # directory with the same relative path as the original file in
+ # directories with the same relative path as the original file in
# the input directory but with numerically increasaing suffixes.
# Size and splitting behavior are both configurable.
#
# In both of these modes of operation, two additional options are
# available:
#
# 1. Instead of placing files (or splits of a file) at the same
- # relative path in the output directory as in the input directory,
- # a complete ordering on the files, based on the time of syncing
- # and the file's original relative path, can be used instead.
- # This is a good choice when downstream consumers (e.g. - Storm)
- # need ordered input. This option is activated using the
- # `ordered` option in #initialize.
+ # relative path in the output directories as in the input
+ # directory, a complete ordering on the files, based on the time
+ # of syncing and the file's original relative path, can be used
+ # instead. This is a good choice when downstream consumers
+ # (e.g. - Storm) need ordered input. This option is activated
+ # using the `ordered` option in #initialize. When using multiple
+ # output directories, ordering is maintained **across**
+ # directories so that the downstream merge of multiple, ordered
+ # output directories is itself orderered.
#
# 2. In addition to creating the output file (or splits) a JSON
# metadata file can also be created for each input file. This
@@ -43,7 +51,10 @@ module Load
# in a partial state. The combination of these factors means that
# a downstream consumer can use the presence or absence of a
# metadata file as an indicator of whether the **actual** output
- # file (or splits) has finished arriving.
+ # file (or splits) has finished arriving. When creating multiple
+ # output directories, metadata files are created within each
+ # output directory separately (but still in a way consistent with
+ # them being merged downstream).
class PrepareSyncer < Syncer
include Logging
@@ -58,11 +69,11 @@ class PrepareSyncer < Syncer
# @return [PrepareSyncer]
def self.from_source settings, source, name
raise Error.new("An --input directory is required") unless settings[:input]
- raise Error.new("An --output directory is required") unless settings[:output]
+ raise Error.new("At least one --output directory is required") if (settings[:output].nil? || settings[:output].empty?)
new(settings.dup.merge({
name: name.to_s,
input: File.join(settings[:input], name.to_s),
- output: File.join(settings[:output], name.to_s),
+ output: settings[:output].map { |dir| File.join(dir, name.to_s) },
}).merge(source[:prepare] || {}))
end
@@ -71,7 +82,7 @@ def self.from_source settings, source, name
# @param [Configliere::Param] settings
def self.configure settings
settings.define :input, description: "Input directory of (possibly growing) files"
- settings.define :output, description: "Output directory of processed, complete files"
+ settings.define :output, description: "Output directory of processed, complete files", default: [], type: Array
settings.define :ordered, description: "Create a total ordering within the output directory", default: false, type: :boolean
settings.define :metadata, description: "Create a metadata file for each file in the output directory", default: false, type: :boolean
@@ -83,7 +94,7 @@ def self.configure settings
settings.description = <<-EOF
Syncs an --input directory, with possibly growing flies, to an
---output directory.
+--output directory or directories.
Files will only appear in the --output directory when they stop
growing in the --input directory. At least two invocations of wu-sync
@@ -109,16 +120,21 @@ def self.configure settings
$ wu-sync prepare --input=/var/ftp/inbound --output=/data/ftp/outbound --split --bytes=1_048_576
The --ordered option can be used to create a complete ordering of
-files in the --output directory, useful for when downstream consumers
-(e.g. - Storm) require ordered input.
+files in the --output directories, useful for when downstream
+consumers (e.g. - Storm) require ordered input.
The --metadata option will create a JSON metadata file in the output
-directory in addition to each output file.
+directories in addition to each output file.
In situations where the input file and output file are in a one-to-one
correspondence (without the --split option), files in the --output
-directory will be hardlinks pointing at their equivalent files in the
---input directory.
+directories will be hardlinks pointing at their equivalent files in
+the --input directory.
+
+Multiple --output directories (usually on different hard drives) can
+dramatically speed up operations:
+
+ $ wu-sync prepare --input=/var/ftp/inbound --output=/data/ftp/outbound_1,/data/ftp/outbound_2,/data/ftp/outbound_3 --split --lines=100_000
EOF
end
@@ -132,9 +148,10 @@ def validate
raise Error.new("Input directory <#{settings[:input]}> does not exist") unless File.exist?(settings[:input])
raise Error.new("Input directory <#{settings[:input]}> is not a directory") unless File.directory?(settings[:input])
- raise Error.new("A local --output directory is required") if settings[:output].nil? || settings[:output].empty?
- raise Error.new("Output directory <#{settings[:output]}> exists but is not a directory") if File.exist?(settings[:output]) && !File.directory?(settings[:output])
-
+ raise Error.new("At least one --output directory is required") if (settings[:output].nil? || settings[:output].empty?)
+ settings[:output].each do |dir|
+ raise Error.new("Output directory <#{dir}> exists but is not a directory") if File.exist?(dir) && !File.directory?(dir)
+ end
true
end
@@ -166,11 +183,13 @@ def absolute_input_directory
Pathname.new(File.absolute_path(settings[:input]))
end
- # The absolute path to the output directory.
+ # The absolute path to the output directories.
#
- # @return [Pathname]
- def absolute_output_directory
- Pathname.new(File.absolute_path(settings[:output]))
+ # @return [Array<Pathname>]
+ def absolute_output_directories
+ settings[:output].map do |dir|
+ Pathname.new(File.absolute_path(dir))
+ end
end
# Setup this PrepareSyncer by loading any file size state that's
@@ -265,7 +284,7 @@ def remember_size!(path)
# @return [Handler]
def create_handler
settings[:ordered] = true if settings[:metadata]
- handler_settings = settings.dup.merge(input: absolute_input_directory, output: absolute_output_directory)
+ handler_settings = settings.dup.merge(input: absolute_input_directory, output: absolute_output_directories)
self.handler = (settings[:split] ? SplittingHandler : Handler).new(self, handler_settings)
end
@@ -15,6 +15,9 @@ class Handler
# from the PrepareSyncer that created it.
attr_accessor :settings
+ # A counter that increments for each input file processed.
+ attr_accessor :counter
+
include Logging
include MetadataHandler
@@ -23,13 +26,14 @@ class Handler
# @param [PrepareSyncer] syncer the syncer this handler is for
# @param [Configliere::Param] settings
# @option settings [Pathname] :input the input directory
- # @option settings [Pathname] :output the output directory
+ # @option settings [Array<Pathname>] :output the output directories
# @option settings [true, false] :dry_run log what would be done instead of doing it
# @option settings [true, false] :ordered create totally ordered output
# @option settings [true, false] :metadata create metadata files for each output file
def initialize syncer, settings
self.syncer = syncer
self.settings = settings
+ self.counter = 0
extend (settings[:dry_run] ? FileUtils::NoWrite : FileUtils)
extend OrderedHandler if settings[:ordered]
end
@@ -64,8 +68,11 @@ def before_process original
# Run after successfully processing each file.
#
+ # By default it increments the #counter.
+ #
# @param [Pathname] original the original file in the input directoryw
def after_process original
+ self.counter += 1
end
# Run upon an error during processing.
@@ -89,37 +96,47 @@ def create_hardlink original, copy
process_metadata_for(copy) if settings[:metadata]
end
- # Return a path in the `output` directory that has the same
+ # Return the current output directory, chosen by cycling
+ # through the given output directories based on the value of
+ # the current #counter.
+ #
+ # @return [Pathname]
+ def current_output_directory
+ settings[:output][self.counter % settings[:output].size]
+ end
+
+ # Return a path in an `output` directory that has the same
# relative path as `original` does in the input directory.
#
+ # The `output` directory chosen will cycle through the given
+ # output directories as the #counter increments.
+ #
# @param [Pathname] original
# @return [Pathname]
def path_for original
- relative_path = original.relative_path_from(settings[:input])
- settings[:output].join(relative_path)
+ current_output_directory.join(relative_path_of(original, settings[:input]))
end
- # Return the path of `original` relative to the containing
- # `dir`.
+ # Return the path of `file` relative to the containing `dir`.
#
- # @param [Pathname] original
+ # @param [Pathname] file
# @param [Pathname] dir
# @return [Pathname]
- def relative_path_of original, dir
- original.relative_path_from(dir)
+ def relative_path_of file, dir
+ file.relative_path_from(dir)
end
- # Returns the top-level directory of the `original` path,
- # relative to `dir`.
+ # Returns the top-level directory of the `file`, relative to
+ # `dir`.
#
- # If the `original` is in `dir` itself, and not a
- # subdirectory, returns the string "root".
+ # If the `file` is in `dir` itself, and not a subdirectory,
+ # returns the string "root".
#
- # @param [Pathname] original
+ # @param [Pathname] file
# @param [Pathname] dir
# @return [String, "root"]
- def top_level_of(original, dir)
- top_level, rest = relative_path_of(original, dir).to_s.split('/', 2)
+ def top_level_of(file, dir)
+ top_level, rest = relative_path_of(file, dir).to_s.split('/', 2)
rest ? top_level : 'root'
end
@@ -26,8 +26,8 @@ def process_metadata_for(copy)
# @param [Pathname] copy
# @return [Pathname]
def metadata_path_for(copy)
- top_level, rest = relative_path_of(copy, settings[:output]).to_s.split('/', 2)
- settings[:output].join(top_level + '_meta', rest + ".meta")
+ top_level, rest = relative_path_of(copy, current_output_directory).to_s.split('/', 2)
+ current_output_directory.join(top_level + '_meta', rest + ".meta")
end
# Returns the metadata for the ``copy` file.
@@ -36,8 +36,8 @@ def metadata_path_for(copy)
# @param [Hash]
def metadata_for(copy)
{
- path: relative_path_of(copy, settings[:output]),
- meta_path: relative_path_of(metadata_path_for(copy), settings[:output]),
+ path: relative_path_of(copy, current_output_directory),
+ meta_path: relative_path_of(metadata_path_for(copy), current_output_directory),
size: File.size(copy),
md5: Digest::MD5.file(copy).hexdigest, # streaming
}
@@ -7,32 +7,14 @@ class PrepareSyncer
# directory.
module OrderedHandler
- # Counter for keeping track of the number of files processed.
- #
- # Defaults to 0.
- #
- # @return [Integer]
- def counter
- @counter ||= 0
- end
- attr_writer :counter
-
- # Increments the #counter.
- #
- # @param [Pathname] original
- def after_process original
- super(original)
- self.counter += 1
- end
-
# Return the output path for the given `original` file.
#
# @param [Pathname] original
# @param [Time] time use this specific time instead of the current UTC time
# @return [Pathname]
def path_for original, time=nil
time ||= Time.now.utc
- settings[:output].join(daily_directory_for(time, original)).join(slug_for(time, original))
+ current_output_directory.join(daily_directory_for(time, original)).join(slug_for(time, original))
end
# Return the daily directory for the given `time`.
@@ -105,7 +105,7 @@ def suffix_stem_for(copy)
#
# @return [Integer]
def suffix_length
- 4
+ 6
end
# Suffix of the first split.

0 comments on commit 03a8ee0

Please sign in to comment.