Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
...
Checking mergeability… Don't worry, you can still create the pull request.
  • 2 commits
  • 7 files changed
  • 0 commit comments
  • 1 contributor
View
24 README.md
@@ -498,7 +498,7 @@ processed.
**Note:** The files created in the `--output` directory are
[hardlinks](http://en.wikipedia.org/wiki/Hard_link) pointing at the
original files in the `--input` directory.
-
+
#### Splitting input files
The `--split` option will make `wu-sync prepare` split large files in
@@ -723,6 +723,19 @@ Both `path` and `meta_path` are relative to the `--output` directory.
The `--metadata` option can also be combined with the `--split`
option.
+#### Multiple Output Directories
+
+Using multiple `--output` directories mounted on different devices can
+greatly speed up operation as large files are read & written. When
+using multiple `--output` directories, each consecutively processed
+file in the `--input` directory will be assigned one of the output
+directories in a round-robin fashion:
+
+```
+$ wu-sync prepare --input=/data/ftp --output=/data/clean_1,/data/clean_2
+$ wu-sync prepare --input=/data/ftp --output=/data/clean_1,/data/clean_2
+```
+
### To S3
The `s3` sync will sync data from a local `--input` directory to an
@@ -744,6 +757,15 @@ file:
```
$ wu-sync s3 --input=/data/clean --bucket=s3://example.com/archive --s3cmd_config=config/s3cfg
```
+
+The `wu-sync s3` command also works with multiple `--input`
+directories. This is to work nicely with the `wu-sync prepare`
+command which has multiple `--output` directories:
+
+```
+$ wu-sync s3 --input=/data/clean_1,/data/clean_2 --bucket=s3://example.com/archive --s3cmd_config=config/s3cfg
+```
+
<a name="wu-sync-all">
### Working with multiple data sources
View
79 lib/wukong-load/syncers/prepare_syncer.rb
@@ -2,19 +2,24 @@ module Wukong
module Load
# Syncs a "live" input directory, with possibly growing files,
- # with an output directory.
+ # with an output directory or directories. In the case of
+ # multiple output directories, useful to get higher throughput,
+ # consecutive files will be placed round-robin at the same
+ # relative paths into each output directory: *merging* the output
+ # directories downstream should yield the same result as having
+ # just used a single output directory.
#
- # Files in the output directory will only appear when files in the
- # input directory stop growing.
+ # Files in the output directories will only appear when files in
+ # the input directory stop growing.
#
- # By default, files in the output directory will be hardlinks at
+ # By default, files in the output directories will be hardlinks at
# the same relative paths as their corresponding (complete) files
# in the input directory.
#
# A second mode of operation (activated using the `split` option
# in #initialize) will split all (complete) files in the input
# directory into equal-sized, smaller files in the output
- # directory with the same relative path as the original file in
+ # directories with the same relative path as the original file in
# the input directory but with numerically increasaing suffixes.
# Size and splitting behavior are both configurable.
#
@@ -22,12 +27,15 @@ module Load
# available:
#
# 1. Instead of placing files (or splits of a file) at the same
- # relative path in the output directory as in the input directory,
- # a complete ordering on the files, based on the time of syncing
- # and the file's original relative path, can be used instead.
- # This is a good choice when downstream consumers (e.g. - Storm)
- # need ordered input. This option is activated using the
- # `ordered` option in #initialize.
+ # relative path in the output directories as in the input
+ # directory, a complete ordering on the files, based on the time
+ # of syncing and the file's original relative path, can be used
+ # instead. This is a good choice when downstream consumers
+ # (e.g. - Storm) need ordered input. This option is activated
+ # using the `ordered` option in #initialize. When using multiple
+ # output directories, ordering is maintained **across**
+ # directories so that the downstream merge of multiple, ordered
+ # output directories is itself orderered.
#
# 2. In addition to creating the output file (or splits) a JSON
# metadata file can also be created for each input file. This
@@ -43,7 +51,10 @@ module Load
# in a partial state. The combination of these factors means that
# a downstream consumer can use the presence or absence of a
# metadata file as an indicator of whether the **actual** output
- # file (or splits) has finished arriving.
+ # file (or splits) has finished arriving. When creating multiple
+ # output directories, metadata files are created within each
+ # output directory separately (but still in a way consistent with
+ # them being merged downstream).
class PrepareSyncer < Syncer
include Logging
@@ -58,11 +69,11 @@ class PrepareSyncer < Syncer
# @return [PrepareSyncer]
def self.from_source settings, source, name
raise Error.new("An --input directory is required") unless settings[:input]
- raise Error.new("An --output directory is required") unless settings[:output]
+ raise Error.new("At least one --output directory is required") if (settings[:output].nil? || settings[:output].empty?)
new(settings.dup.merge({
name: name.to_s,
input: File.join(settings[:input], name.to_s),
- output: File.join(settings[:output], name.to_s),
+ output: settings[:output].map { |dir| File.join(dir, name.to_s) },
}).merge(source[:prepare] || {}))
end
@@ -71,7 +82,7 @@ def self.from_source settings, source, name
# @param [Configliere::Param] settings
def self.configure settings
settings.define :input, description: "Input directory of (possibly growing) files"
- settings.define :output, description: "Output directory of processed, complete files"
+ settings.define :output, description: "Comma-separated list of output directories", default: [], type: Array
settings.define :ordered, description: "Create a total ordering within the output directory", default: false, type: :boolean
settings.define :metadata, description: "Create a metadata file for each file in the output directory", default: false, type: :boolean
@@ -82,8 +93,8 @@ def self.configure settings
settings.define :split_program, description: "Path to the `split` program", default: 'split'
settings.description = <<-EOF
-Syncs an --input directory, with possibly growing flies, to an
---output directory.
+Syncs an --input directory, with possibly growing files, to an
+--output directory or directories.
Files will only appear in the --output directory when they stop
growing in the --input directory. At least two invocations of wu-sync
@@ -109,16 +120,21 @@ def self.configure settings
$ wu-sync prepare --input=/var/ftp/inbound --output=/data/ftp/outbound --split --bytes=1_048_576
The --ordered option can be used to create a complete ordering of
-files in the --output directory, useful for when downstream consumers
-(e.g. - Storm) require ordered input.
+files in the --output directories, useful for when downstream
+consumers (e.g. - Storm) require ordered input.
The --metadata option will create a JSON metadata file in the output
-directory in addition to each output file.
+directories in addition to each output file.
In situations where the input file and output file are in a one-to-one
correspondence (without the --split option), files in the --output
-directory will be hardlinks pointing at their equivalent files in the
---input directory.
+directories will be hardlinks pointing at their equivalent files in
+the --input directory.
+
+Multiple --output directories (usually on different hard drives) can
+dramatically speed up operations:
+
+ $ wu-sync prepare --input=/var/ftp/inbound --output=/data/ftp/outbound_1,/data/ftp/outbound_2,/data/ftp/outbound_3 --split --lines=100_000
EOF
end
@@ -132,9 +148,10 @@ def validate
raise Error.new("Input directory <#{settings[:input]}> does not exist") unless File.exist?(settings[:input])
raise Error.new("Input directory <#{settings[:input]}> is not a directory") unless File.directory?(settings[:input])
- raise Error.new("A local --output directory is required") if settings[:output].nil? || settings[:output].empty?
- raise Error.new("Output directory <#{settings[:output]}> exists but is not a directory") if File.exist?(settings[:output]) && !File.directory?(settings[:output])
-
+ raise Error.new("At least one --output directory is required") if (settings[:output].nil? || settings[:output].empty?)
+ settings[:output].each do |dir|
+ raise Error.new("Output directory <#{dir}> exists but is not a directory") if File.exist?(dir) && !File.directory?(dir)
+ end
true
end
@@ -166,11 +183,13 @@ def absolute_input_directory
Pathname.new(File.absolute_path(settings[:input]))
end
- # The absolute path to the output directory.
+ # The absolute path to the output directories.
#
- # @return [Pathname]
- def absolute_output_directory
- Pathname.new(File.absolute_path(settings[:output]))
+ # @return [Array<Pathname>]
+ def absolute_output_directories
+ settings[:output].map do |dir|
+ Pathname.new(File.absolute_path(dir))
+ end
end
# Setup this PrepareSyncer by loading any file size state that's
@@ -265,7 +284,7 @@ def remember_size!(path)
# @return [Handler]
def create_handler
settings[:ordered] = true if settings[:metadata]
- handler_settings = settings.dup.merge(input: absolute_input_directory, output: absolute_output_directory)
+ handler_settings = settings.dup.merge(input: absolute_input_directory, output: absolute_output_directories)
self.handler = (settings[:split] ? SplittingHandler : Handler).new(self, handler_settings)
end
View
49 lib/wukong-load/syncers/prepare_syncer/handler.rb
@@ -15,6 +15,9 @@ class Handler
# from the PrepareSyncer that created it.
attr_accessor :settings
+ # A counter that increments for each input file processed.
+ attr_accessor :counter
+
include Logging
include MetadataHandler
@@ -23,13 +26,14 @@ class Handler
# @param [PrepareSyncer] syncer the syncer this handler is for
# @param [Configliere::Param] settings
# @option settings [Pathname] :input the input directory
- # @option settings [Pathname] :output the output directory
+ # @option settings [Array<Pathname>] :output the output directories
# @option settings [true, false] :dry_run log what would be done instead of doing it
# @option settings [true, false] :ordered create totally ordered output
# @option settings [true, false] :metadata create metadata files for each output file
def initialize syncer, settings
self.syncer = syncer
self.settings = settings
+ self.counter = 0
extend (settings[:dry_run] ? FileUtils::NoWrite : FileUtils)
extend OrderedHandler if settings[:ordered]
end
@@ -64,8 +68,11 @@ def before_process original
# Run after successfully processing each file.
#
+ # By default it increments the #counter.
+ #
# @param [Pathname] original the original file in the input directoryw
def after_process original
+ self.counter += 1
end
# Run upon an error during processing.
@@ -89,37 +96,47 @@ def create_hardlink original, copy
process_metadata_for(copy) if settings[:metadata]
end
- # Return a path in the `output` directory that has the same
+ # Return the current output directory, chosen by cycling
+ # through the given output directories based on the value of
+ # the current #counter.
+ #
+ # @return [Pathname]
+ def current_output_directory
+ settings[:output][self.counter % settings[:output].size]
+ end
+
+ # Return a path in an `output` directory that has the same
# relative path as `original` does in the input directory.
#
+ # The `output` directory chosen will cycle through the given
+ # output directories as the #counter increments.
+ #
# @param [Pathname] original
# @return [Pathname]
def path_for original
- relative_path = original.relative_path_from(settings[:input])
- settings[:output].join(relative_path)
+ current_output_directory.join(relative_path_of(original, settings[:input]))
end
- # Return the path of `original` relative to the containing
- # `dir`.
+ # Return the path of `file` relative to the containing `dir`.
#
- # @param [Pathname] original
+ # @param [Pathname] file
# @param [Pathname] dir
# @return [Pathname]
- def relative_path_of original, dir
- original.relative_path_from(dir)
+ def relative_path_of file, dir
+ file.relative_path_from(dir)
end
- # Returns the top-level directory of the `original` path,
- # relative to `dir`.
+ # Returns the top-level directory of the `file`, relative to
+ # `dir`.
#
- # If the `original` is in `dir` itself, and not a
- # subdirectory, returns the string "root".
+ # If the `file` is in `dir` itself, and not a subdirectory,
+ # returns the string "root".
#
- # @param [Pathname] original
+ # @param [Pathname] file
# @param [Pathname] dir
# @return [String, "root"]
- def top_level_of(original, dir)
- top_level, rest = relative_path_of(original, dir).to_s.split('/', 2)
+ def top_level_of(file, dir)
+ top_level, rest = relative_path_of(file, dir).to_s.split('/', 2)
rest ? top_level : 'root'
end
View
8 lib/wukong-load/syncers/prepare_syncer/metadata_handler.rb
@@ -26,8 +26,8 @@ def process_metadata_for(copy)
# @param [Pathname] copy
# @return [Pathname]
def metadata_path_for(copy)
- top_level, rest = relative_path_of(copy, settings[:output]).to_s.split('/', 2)
- settings[:output].join(top_level + '_meta', rest + ".meta")
+ top_level, rest = relative_path_of(copy, current_output_directory).to_s.split('/', 2)
+ current_output_directory.join(top_level + '_meta', rest + ".meta")
end
# Returns the metadata for the ``copy` file.
@@ -36,8 +36,8 @@ def metadata_path_for(copy)
# @param [Hash]
def metadata_for(copy)
{
- path: relative_path_of(copy, settings[:output]),
- meta_path: relative_path_of(metadata_path_for(copy), settings[:output]),
+ path: relative_path_of(copy, current_output_directory),
+ meta_path: relative_path_of(metadata_path_for(copy), current_output_directory),
size: File.size(copy),
md5: Digest::MD5.file(copy).hexdigest, # streaming
}
View
20 lib/wukong-load/syncers/prepare_syncer/ordered_handler.rb
@@ -7,24 +7,6 @@ class PrepareSyncer
# directory.
module OrderedHandler
- # Counter for keeping track of the number of files processed.
- #
- # Defaults to 0.
- #
- # @return [Integer]
- def counter
- @counter ||= 0
- end
- attr_writer :counter
-
- # Increments the #counter.
- #
- # @param [Pathname] original
- def after_process original
- super(original)
- self.counter += 1
- end
-
# Return the output path for the given `original` file.
#
# @param [Pathname] original
@@ -32,7 +14,7 @@ def after_process original
# @return [Pathname]
def path_for original, time=nil
time ||= Time.now.utc
- settings[:output].join(daily_directory_for(time, original)).join(slug_for(time, original))
+ current_output_directory.join(daily_directory_for(time, original)).join(slug_for(time, original))
end
# Return the daily directory for the given `time`.
View
2  lib/wukong-load/syncers/prepare_syncer/splitting_handler.rb
@@ -105,7 +105,7 @@ def suffix_stem_for(copy)
#
# @return [Integer]
def suffix_length
- 4
+ 6
end
# Suffix of the first split.
View
61 lib/wukong-load/syncers/s3_syncer.rb
@@ -1,7 +1,8 @@
module Wukong
module Load
- # Syncs a local directory to an AWS S3 bucket and path.
+ # Syncs a local directory or directories to an AWS S3 bucket and
+ # path.
#
# Uses the [`s3cmd`](http://s3tools.org/s3cmd) tool behind the
# scenes to do the heavy-lifting.
@@ -20,10 +21,10 @@ class S3Syncer < Syncer
#
# @return [S3Syncer]
def self.from_source settings, source, name
- raise Error.new("An --input directory is required") unless settings[:input]
+ raise Error.new("At least one --input directory is required") if (settings[:input].nil? || settings[:input].empty?)
new(settings.dup.merge({
name: name.to_s,
- input: File.join(settings[:input], name.to_s),
+ input: settings[:input].map { |dir| File.join(dir, name.to_s) },
}.tap { |s|
s[:bucket] = File.join(settings[:bucket], name.to_s) if settings[:bucket]
}).merge(source[:s3] || {}))
@@ -33,7 +34,7 @@ def self.from_source settings, source, name
#
# @param [Configliere::Param] settings
def self.configure settings
- settings.define :input, description: "Local directory to archive to S3"
+ settings.define :input, description: "Comma-separated list of input diretories", default: [], type: Array
settings.define :bucket, description: "S3 bucket and path to in which to archive data"
settings.define :s3cmd_program, description: "Path to the `s3cmd` executable", default: 's3cmd'
@@ -41,7 +42,8 @@ def self.configure settings
settings.define :region, description: "AWS region to create bucket in, one of: #{REGIONS.join(',')}", default: "US"
settings.description = <<-EOF
-Sync data from a local directory to an S3 bucket and path:
+Sync data from a local directory or directories to an S3 bucket and
+path:
$ wu-sync s3 --input=/data --bucket=s3://example.com/data
@@ -55,6 +57,11 @@ def self.configure settings
specify a path to the right configuration file for s3cmd:
$ wu-sync s3 --input=/data --bucket=s3://example.com/data --s3cmd_config=my_aws_account.s3cfg
+
+You can sync multiple --input directories to the same --bucket (using
+parallel invocations of s3cmd), useful for higher throughput:
+
+ $ wu-sync s3 --input=/data_1,/data_2,/data_3 --bucket=s3://example.com/data --s3cmd_config=my_aws_account.s3cfg
EOF
end
@@ -66,9 +73,11 @@ def self.configure settings
# @raise [Wukong::Error] if the AWS `region` is invalid
# @return [true]
def validate
- raise Error.new("A local --input directory is required") if settings[:input].nil? || settings[:input].empty?
- raise Error.new("Input directory <#{settings[:input]}> does not exist") unless File.exist?(settings[:input])
- raise Error.new("Input directory <#{settings[:input]}> is not a directory") unless File.directory?(settings[:input])
+ raise Error.new("At least one --input directory is required") if (settings[:input].nil? || settings[:input].empty?)
+ settings[:input].each do |dir|
+ raise Error.new("Input directory <#{dir}> does not exist") unless File.exist?(dir)
+ raise Error.new("Input directory <#{dir}> is not a directory") unless File.directory?(dir)
+ end
raise Error.new("An S3 --bucket is required") if settings[:bucket].nil? || settings[:bucket].empty?
@@ -87,7 +96,7 @@ def setup
# Log a message.
def before_sync
- log.info("Syncing from #{local_directory} to #{s3_uri}")
+ log.info("Syncing from #{local_directories.inspect} to #{s3_uri}")
end
# Perform the sync.
@@ -97,19 +106,23 @@ def before_sync
# execute the command-line in a subprocess and log its output at
# the `DEBUG` level.
def sync
- if settings[:dry_run]
- log.info(sync_command)
- else
- IO.popen(sync_command).each { |line| handle_output(line) }
+ local_directories.each do |local_directory|
+ if settings[:dry_run]
+ log.info(sync_command(local_directory))
+ else
+ IO.popen(sync_command(local_directory)).each { |line| handle_output(line) }
+ end
end
end
- # The local filesystem directory to use as the source for the
+ # The local filesystem directories to use as the sources for the
# sync.
#
- # @return [String]
- def local_directory
- File.join(File.expand_path(settings[:input].to_s), "") # always use a trailing '/' with s3cmd
+ # @return [Array<String>]
+ def local_directories
+ settings[:input].map do |dir|
+ File.join(File.expand_path(dir.to_s), "") # always use a trailing '/' with s3cmd
+ end
end
# The S3 bucket and path to use as the destination for the sync.
@@ -121,8 +134,10 @@ def s3_uri
File.join(settings[:bucket] =~ %r{^s3://}i ? settings[:bucket].to_s : "s3://#{settings[:bucket]}", "") # always use a traling '/' with s3cmd
end
- # The command that will be run.
- def sync_command
+ # The command that will be run for a given local directory.
+ #
+ # @param [String] local_directory
+ def sync_command local_directory
config_file = settings[:s3cmd_config] ? "--config=#{Shellwords.escape(settings[:s3cmd_config])}" : ""
"#{s3cmd_program} sync #{Shellwords.escape(local_directory)} #{Shellwords.escape(s3_uri)} --no-delete-removed --bucket-location=#{settings[:region]} #{config_file} 2>&1"
end
@@ -143,8 +158,12 @@ def s3cmd_program
def handle_output line
case
when line =~ /^Done. Uploaded (\d+) bytes in ([\d\.]+) seconds/
- self.bytes = $1.to_i
- self.duration = $2.to_f
+ self.bytes ||= 0
+ self.bytes += $1.to_i
+
+ self.duration ||= 0.0
+ self.duration += $2.to_f
+
log.debug(line.chomp)
when line =~ /^ERROR:\s+(.*)$/
raise Error.new($1.chomp)

No commit comments for this range

Something went wrong with that request. Please try again.