Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'master' of github.com:infochimps/swineherd

  • Loading branch information...
commit d4ebcd3d2299628bd71dee3e8cd2513f6d9e227b 2 parents d7d9d90 + 3194958
@kornypoet kornypoet authored
View
39 lib/swineherd/filesystem/hadoopfilesystem.rb
@@ -116,19 +116,46 @@ def stream input, output
end
#
- # BZIP
+ # BZIP
#
def bzip input, output
system("#{@hadoop_home}/bin/hadoop \\
- jar #{@hadoop_home}/contrib/streaming/hadoop-*streaming*.jar \\
+ jar #{@hadoop_home}/contrib/streaming/hadoop-*streaming*.jar \\
-D mapred.output.compress=true \\
-D mapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec \\
-D mapred.reduce.tasks=1 \\
-mapper \"/bin/cat\" \\
- -reducer \"/bin/cat\" \\
+ -reducer \"/bin/cat\" \\
-input \"#{input}\" \\
-output \"#{output}\"")
- end
+ end
+
+ #
+ # Merges many input files into :reduce_tasks amount of output files
+ #
+ def dist_merge inputs, output, options = {}
+ options[:reduce_tasks] ||= 25
+ options[:partition_fields] ||= 2
+ options[:sort_fields] ||= 2
+ options[:field_separator] ||= '/t'
+ names = inputs.map{|inp| File.basename(inp)}.join(',')
+ cmd = "#{@hadoop_home}/bin/hadoop \\
+ jar #{@hadoop_home}/contrib/streaming/hadoop-*streaming*.jar \\
+ -D mapred.job.name=\"Swineherd Merge (#{names} -> #{output})\" \\
+ -D num.key.fields.for.partition=\"#{options[:partition_fields]}\" \\
+ -D stream.num.map.output.key.fields=\"#{options[:sort_fields]}\" \\
+ -D mapred.text.key.partitioner.options=\"-k1,#{options[:partition_fields]}\" \\
+ -D stream.map.output.field.separator=\"'#{options[:field_separator]}'\" \\
+ -D mapred.min.split.size=1000000000 \\
+ -D mapred.reduce.tasks=#{options[:reduce_tasks]} \\
+ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \\
+ -mapper \"/bin/cat\" \\
+ -reducer \"/usr/bin/uniq\" \\
+ -input \"#{inputs.join(',')}\" \\
+ -output \"#{output}\""
+ puts cmd
+ system cmd
+ end
#
# Copy hdfs file to local filesystem
@@ -138,7 +165,7 @@ def copy_to_local srcfile, dstfile
end
#
- # Copyy local file to hdfs filesystem
+ # Copy local file to hdfs filesystem
#
def copy_from_local srcfile, dstfile
@hdfs.copy_from_local_file(Path.new(srcfile), Path.new(dstfile))
@@ -259,7 +286,7 @@ def set_env
require 'java'
@hadoop_conf = (ENV['HADOOP_CONF_DIR'] || File.join(@hadoop_home, 'conf'))
@hadoop_conf += "/" unless @hadoop_conf.end_with? "/"
- $CLASSPATH << @hadoop_conf
+ $CLASSPATH << @hadoop_conf
Dir["#{@hadoop_home}/hadoop*.jar", "#{@hadoop_home}/lib/*.jar"].each{|jar| require jar}
java_import 'org.apache.hadoop.conf.Configuration'
View
12 lib/swineherd/filesystem/s3filesystem.rb
@@ -209,6 +209,16 @@ def put srcpath, destpath
def close *args
end
+ def put srcpath, destpath
+ dest_bucket = bucket(destpath)
+ if File.directory? srcpath
+
+ else
+ key = srcpath
+ end
+ @s3.interface.put(dest_path, key, File.open(srcpath))
+ end
+
class S3File
attr_accessor :path, :handle, :fs
@@ -244,7 +254,7 @@ def read
# downloading...
#
def readline
- @handle ||= fs.s3.interface.get_object(fs.bucket(path), fs.key_path(path)).each
+ @handle ||= fs.s3.interface.get_object(fs.bucket(path), fs.key_path(path)).each
begin
@handle.next
rescue StopIteration, NoMethodError
View
1  lib/swineherd/foo
@@ -1 +0,0 @@
-@('_')@
Please sign in to comment.
Something went wrong with that request. Please try again.