Skip to content

Commit

Permalink
the wu-sync s3 command now works with multiple --input directories
Browse files Browse the repository at this point in the history
  • Loading branch information
Dhruv Bansal committed Sep 23, 2013
1 parent 03a8ee0 commit a43ecdc
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 23 deletions.
9 changes: 9 additions & 0 deletions README.md
Expand Up @@ -757,6 +757,15 @@ file:
```
$ wu-sync s3 --input=/data/clean --bucket=s3://example.com/archive --s3cmd_config=config/s3cfg
```

The `wu-sync s3` command also works with multiple `--input`
directories. This is to work nicely with the `wu-sync prepare`
command which has multiple `--output` directories:

```
$ wu-sync s3 --input=/data/clean_1,/data/clean_2 --bucket=s3://example.com/archive --s3cmd_config=config/s3cfg
```

<a name="wu-sync-all">
### Working with multiple data sources

Expand Down
4 changes: 2 additions & 2 deletions lib/wukong-load/syncers/prepare_syncer.rb
Expand Up @@ -82,7 +82,7 @@ def self.from_source settings, source, name
# @param [Configliere::Param] settings
def self.configure settings
settings.define :input, description: "Input directory of (possibly growing) files"
settings.define :output, description: "Output directory of processed, complete files", default: [], type: Array
settings.define :output, description: "Comma-separated list of output directories", default: [], type: Array

settings.define :ordered, description: "Create a total ordering within the output directory", default: false, type: :boolean
settings.define :metadata, description: "Create a metadata file for each file in the output directory", default: false, type: :boolean
Expand All @@ -93,7 +93,7 @@ def self.configure settings
settings.define :split_program, description: "Path to the `split` program", default: 'split'

settings.description = <<-EOF
Syncs an --input directory, with possibly growing flies, to an
Syncs an --input directory, with possibly growing files, to an
--output directory or directories.
Files will only appear in the --output directory when they stop
Expand Down
61 changes: 40 additions & 21 deletions lib/wukong-load/syncers/s3_syncer.rb
@@ -1,7 +1,8 @@
module Wukong
module Load

# Syncs a local directory to an AWS S3 bucket and path.
# Syncs a local directory or directories to an AWS S3 bucket and
# path.
#
# Uses the [`s3cmd`](http://s3tools.org/s3cmd) tool behind the
# scenes to do the heavy-lifting.
Expand All @@ -20,10 +21,10 @@ class S3Syncer < Syncer
#
# @return [S3Syncer]
def self.from_source settings, source, name
raise Error.new("An --input directory is required") unless settings[:input]
raise Error.new("At least one --input directory is required") if (settings[:input].nil? || settings[:input].empty?)
new(settings.dup.merge({
name: name.to_s,
input: File.join(settings[:input], name.to_s),
input: settings[:input].map { |dir| File.join(dir, name.to_s) },
}.tap { |s|
s[:bucket] = File.join(settings[:bucket], name.to_s) if settings[:bucket]
}).merge(source[:s3] || {}))
Expand All @@ -33,15 +34,16 @@ def self.from_source settings, source, name
#
# @param [Configliere::Param] settings
def self.configure settings
settings.define :input, description: "Local directory to archive to S3"
settings.define :input, description: "Comma-separated list of input diretories", default: [], type: Array
settings.define :bucket, description: "S3 bucket and path to in which to archive data"

settings.define :s3cmd_program, description: "Path to the `s3cmd` executable", default: 's3cmd'
settings.define :s3cmd_config, description: "Path to the `s3cmd` config file"
settings.define :region, description: "AWS region to create bucket in, one of: #{REGIONS.join(',')}", default: "US"

settings.description = <<-EOF
Sync data from a local directory to an S3 bucket and path:
Sync data from a local directory or directories to an S3 bucket and
path:
$ wu-sync s3 --input=/data --bucket=s3://example.com/data
Expand All @@ -55,6 +57,11 @@ def self.configure settings
specify a path to the right configuration file for s3cmd:
$ wu-sync s3 --input=/data --bucket=s3://example.com/data --s3cmd_config=my_aws_account.s3cfg
You can sync multiple --input directories to the same --bucket (using
parallel invocations of s3cmd), useful for higher throughput:
$ wu-sync s3 --input=/data_1,/data_2,/data_3 --bucket=s3://example.com/data --s3cmd_config=my_aws_account.s3cfg
EOF
end

Expand All @@ -66,9 +73,11 @@ def self.configure settings
# @raise [Wukong::Error] if the AWS `region` is invalid
# @return [true]
def validate
raise Error.new("A local --input directory is required") if settings[:input].nil? || settings[:input].empty?
raise Error.new("Input directory <#{settings[:input]}> does not exist") unless File.exist?(settings[:input])
raise Error.new("Input directory <#{settings[:input]}> is not a directory") unless File.directory?(settings[:input])
raise Error.new("At least one --input directory is required") if (settings[:input].nil? || settings[:input].empty?)
settings[:input].each do |dir|
raise Error.new("Input directory <#{dir}> does not exist") unless File.exist?(dir)
raise Error.new("Input directory <#{dir}> is not a directory") unless File.directory?(dir)
end

raise Error.new("An S3 --bucket is required") if settings[:bucket].nil? || settings[:bucket].empty?

Expand All @@ -87,7 +96,7 @@ def setup

# Log a message.
def before_sync
log.info("Syncing from #{local_directory} to #{s3_uri}")
log.info("Syncing from #{local_directories.inspect} to #{s3_uri}")
end

# Perform the sync.
Expand All @@ -97,19 +106,23 @@ def before_sync
# execute the command-line in a subprocess and log its output at
# the `DEBUG` level.
def sync
if settings[:dry_run]
log.info(sync_command)
else
IO.popen(sync_command).each { |line| handle_output(line) }
local_directories.each do |local_directory|
if settings[:dry_run]
log.info(sync_command(local_directory))
else
IO.popen(sync_command(local_directory)).each { |line| handle_output(line) }
end
end
end

# The local filesystem directory to use as the source for the
# The local filesystem directories to use as the sources for the
# sync.
#
# @return [String]
def local_directory
File.join(File.expand_path(settings[:input].to_s), "") # always use a trailing '/' with s3cmd
# @return [Array<String>]
def local_directories
settings[:input].map do |dir|
File.join(File.expand_path(dir.to_s), "") # always use a trailing '/' with s3cmd
end
end

# The S3 bucket and path to use as the destination for the sync.
Expand All @@ -121,8 +134,10 @@ def s3_uri
File.join(settings[:bucket] =~ %r{^s3://}i ? settings[:bucket].to_s : "s3://#{settings[:bucket]}", "") # always use a traling '/' with s3cmd
end

# The command that will be run.
def sync_command
# The command that will be run for a given local directory.
#
# @param [String] local_directory
def sync_command local_directory
config_file = settings[:s3cmd_config] ? "--config=#{Shellwords.escape(settings[:s3cmd_config])}" : ""
"#{s3cmd_program} sync #{Shellwords.escape(local_directory)} #{Shellwords.escape(s3_uri)} --no-delete-removed --bucket-location=#{settings[:region]} #{config_file} 2>&1"
end
Expand All @@ -143,8 +158,12 @@ def s3cmd_program
def handle_output line
case
when line =~ /^Done. Uploaded (\d+) bytes in ([\d\.]+) seconds/
self.bytes = $1.to_i
self.duration = $2.to_f
self.bytes ||= 0
self.bytes += $1.to_i

self.duration ||= 0.0
self.duration += $2.to_f

log.debug(line.chomp)
when line =~ /^ERROR:\s+(.*)$/
raise Error.new($1.chomp)
Expand Down

0 comments on commit a43ecdc

Please sign in to comment.