Skip to content

Commit

Permalink
basefilesystem should have the correct interface, going to use it now…
Browse files Browse the repository at this point in the history
… to define hadoopfilesystem
  • Loading branch information
Jacob Perkins committed Feb 11, 2011
1 parent 3abd539 commit 90c90ae
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 130 deletions.
1 change: 0 additions & 1 deletion lib/swineherd.rb
Expand Up @@ -4,7 +4,6 @@

module Swineherd
autoload :Template, 'swineherd/template'
autoload :HDFS, 'swineherd/hdfs'
autoload :Script, 'swineherd/script'
autoload :Workflow, 'swineherd/workflow'
end
14 changes: 7 additions & 7 deletions lib/swineherd/filesystem.rb
@@ -1,16 +1,16 @@
module Swineherd
autoload :BaseFileSystem, 'filesystem/basefilesystem'
autoload :LocalFileSystem, 'filesystem/localfilesystem'
autoload :HadoopFileSystem, 'filesystem/hadoopfilesystem'
autoload :BaseFileSystem, 'swineherd/filesystem/basefilesystem'
autoload :LocalFileSystem, 'swineherd/filesystem/localfilesystem'
autoload :HadoopFileSystem, 'swineherd/filesystem/hadoopfilesystem'

class FileSystem
# A factory function that returns an instance of the requested class
def self.get(scheme, *args)
case sheme
def self.get scheme
case scheme
when :file then
LocalFileSystem.new
Swineherd::LocalFileSystem.new
when :hdfs then
HadoopFileSystem.new
Swineherd::HadoopFileSystem.new
else
raise "Filesystem with scheme #{scheme} does not exist."
end
Expand Down
114 changes: 114 additions & 0 deletions lib/swineherd/filesystem/basefilesystem.rb
@@ -0,0 +1,114 @@
module Swineherd

#
# All methods a filesystem should have
#
module BaseFileSystem

#
# Return a new instance of 'this' filesystem. Classes that include this
# module are expected to know how to pull their particular set of arguments
# from *args and initialize themselves by opening any required connections, &c.
#
def initialize *args
end

#
# Open a file in this filesystem. Should return a usable file handle for in
# the mode (read 'r' or 'w') given. File classes should, at minimum, have
# the methods defined in BaseFile
#
def open path, mode="r", &blk
end

#
# Recursively delete the path and all paths below it.
#
def rm path
end

#
# Returns true if the file or path exists and false otherwise.
#
def exists? path
end

#
# Moves the source path to the destination path
#
def mv srcpath, dstpath
end

#
# Recursively copies all files and directories under srcpath to dstpath
#
def cp srcpath, dstpath
end

#
# Make directory path if it does not (partly) exist
#
def mkpath path
end

#
# Return file type ("directory" or "file" or "symlink")
#
def type path
end

#
# Give contained files/dirs
#
def entries dirpath
end

#
# Needs to close the filesystem by cleaning up any open connections, &c.
#
def close *args
end

class BaseFile
attr_accessor :path, :scheme, :mode


def initialize *args, &blk
end

#
# A new file in the filesystem needs to be instantiated with a
# path, a mode (read 'r' or write 'w').
#
def open path, mode="r", &blk
end

#
# Return whole file and as a string
#
def read
end

#
# Return a line from stream
#
def readline
end

#
# Writes a string to the file
#
def write string
end

#
# Close the file
#
def close *args
end

end

end

end
126 changes: 126 additions & 0 deletions lib/swineherd/filesystem/hadoopfilesystem.rb
@@ -0,0 +1,126 @@
module Swineherd

#
# Methods for dealing with hadoop distributed file system (hdfs)
#
class HadoopFileSystem

include Swineherd::BaseFileSystem
# include Swineherd::BaseFileSystem::ClassMethods

# #
# # Test if this file exists on the hdfs
# #
# def self.exist? target
# system %Q{hadoop fs -test -e #{target}}
# end
#
# #
# # Make a new hdfs dir, returns non-zero if already
# # exists
# #
# def self.mkdir target
# system %Q{hadoop fs -mkdir #{target}}
# end
#
# #
# # Make a new hdfs dir if and only if it does not already exist
# #
# def self.mkdir_p target
# mkdir target unless exist? target
# end
#
# #
# # Removes hdfs file
# #
# def self.rmr target
# system %Q{hadoop fs -rmr #{target}}
# end
#
# #
# # Get an array of paths in the targeted hdfs path
# #
# def self.dir_entries target
# stuff = `hadoop fs -ls #{target}`
# stuff = stuff.split(/\n/).map{|l| l.split(/\s+/).last}
# stuff[1..-1] rescue []
# end
#
# #
# # Removes hdfs file
# #
# def self.rm target
# system %Q{hadoop fs -rm #{target}}
# end
#
# #
# # Moves hdfs file from source to dest
# #
# def self.mv source, dest
# system %Q{hadoop fs -mv #{source} #{dest}}
# end
#
# #
# # Distributed streaming from input to output
# #
# def self.stream input, output
# system("${HADOOP_HOME}/bin/hadoop \\
# jar ${HADOOP_HOME}/contrib/streaming/hadoop-*streaming*.jar \\
# -D mapred.job.name=\"Swineherd Stream (#{File.basename(input)} -> #{output})\" \\
# -D mapred.min.split.size=1000000000 \\
# -D mapred.reduce.tasks=0 \\
# -mapper \"/bin/cat\" \\
# -input \"#{input}\" \\
# -output \"#{output}\"")
# end
#
# #
# # Given an array of input dirs, stream all into output dir and remove duplicate records.
# # Reasonable default hadoop streaming options are chosen.
# #
# def self.merge inputs, output, options = {}
# options[:reduce_tasks] ||= 25
# options[:partition_fields] ||= 2
# options[:sort_fields] ||= 2
# options[:field_separator] ||= '/t'
# names = inputs.map{|inp| File.basename(inp)}.join(',')
# cmd = "${HADOOP_HOME}/bin/hadoop \\
# jar ${HADOOP_HOME}/contrib/streaming/hadoop-*streaming*.jar \\
# -D mapred.job.name=\"Swineherd Merge (#{names} -> #{output})\" \\
# -D num.key.fields.for.partition=\"#{options[:partition_fields]}\" \\
# -D stream.num.map.output.key.fields=\"#{options[:sort_fields]}\" \\
# -D mapred.text.key.partitioner.options=\"-k1,#{options[:partition_fields]}\" \\
# -D stream.map.output.field.separator=\"'#{options[:field_separator]}'\" \\
# -D mapred.min.split.size=1000000000 \\
# -D mapred.reduce.tasks=#{options[:reduce_tasks]} \\
# -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \\
# -mapper \"/bin/cat\" \\
# -reducer \"/usr/bin/uniq\" \\
# -input \"#{inputs.join(',')}\" \\
# -output \"#{output}\""
# puts cmd
# system cmd
# end
#
# #
# # Concatenates a hadoop dir or file into a local file
# #
# def self.cat_to_local src, dest
# system %Q{hadoop fs -cat #{src}/[^_]* > #{dest}} unless File.exist?(dest)
# end
#
# #
# # Needs to return true if no outputs exist, false otherwise,
# # raise error if some do and some dont
# #
# def self.check_paths paths
# exist_count = 0 # no outputs exist
# paths.each{|hdfs_path| exist_count += 1 if exist?(hdfs_path) }
# raise "Indeterminate output state" if (exist_count > 0) && (exist_count < paths.size)
# return true if exist_count == 0
# false
# end

end

end
122 changes: 0 additions & 122 deletions lib/swineherd/filesystem/hdfs.rb

This file was deleted.

0 comments on commit 90c90ae

Please sign in to comment.