Permalink
Browse files

add append

  • Loading branch information...
1 parent 351202a commit d421456f576643e5f890672fee1ecc006e17d643 @kzk committed Mar 12, 2012
Showing with 46 additions and 4 deletions.
  1. +42 −2 lib/webhdfs/fileutils.rb
  2. +4 −2 test/webhdfs/fileutils.rb
View
@@ -63,22 +63,55 @@ def copy_from_local(file, path, options={})
#
# path - HDFS file path
# file - local file path
+ # options - :offset, :length, :buffersize, :verbose
#
# Examples
#
# FileUtils.copy_from_local 'remote_file', 'local_file'
#
def copy_to_local(path, file, options={})
- fu_check_options options, OPT_TABLE['copy_from_local']
+ fu_check_options options, OPT_TABLE['copy_to_local']
fu_log "copy_to_local hdfs=#{path} local=#{file}" if options[:verbose]
File.open(file, "wb") do |f|
ret = fu_get(path, 'OPEN', options)
f.write ret
end
end
- OPT_TABLE['copy_to_local'] = [:offset, :length, :buffersize]
+ OPT_TABLE['copy_to_local'] = [:offset, :length, :buffersize, :verbose]
module_function :copy_to_local
+ # Public: Append to HDFS file
+ #
+ # path - HDFS file path
+ # body - contents
+ # options - :buffersize, :verbose
+ #
+ # Examples
+ #
+ # FileUtils.copy_from_local 'local_file', 'remote_file'
+ #
+ def append(path, body, options={})
+ fu_check_options options, OPT_TABLE['append']
+ fu_log "append #{body.bytesize} bytes to #{path}" if options[:verbose]
+ begin
+ fu_post(path, 'APPEND', options)
+ rescue RestClient::TemporaryRedirect => e
+ # must be redirected
+ raise e unless [301, 302, 307].include? e.response.code
+ # must have location
+ location = e.response.headers[:location]
+ raise e if location.nil? or location.empty?
+ # put contents
+ begin
+ RestClient.post location, body
+ rescue => e
+ p e.response
+ end
+ end
+ end
+ OPT_TABLE['append'] = [:buffersize, :verbose]
+ module_function :append
+
# Public: Create one or more directories.
#
# list - directory name, or list of them
@@ -318,6 +351,13 @@ def fu_put(path, op, params={}, payload='')
end
private_module_function :fu_put
+ # Internal: HTTP POST
+ def fu_post(path, op, params={}, payload='')
+ url = "http://#{@fu_host}:#{@fu_port}/webhdfs/v1/#{path}"
+ RestClient.post url, payload, :params => params.merge({:op => op})
+ end
+ private_module_function :fu_post
+
# Internal: HTTP DELETE
def fu_delete(path, op, params={})
url = "http://#{@fu_host}:#{@fu_port}/webhdfs/v1/#{path}"
@@ -6,8 +6,10 @@ def setup
end
def test_copy_from_local
- WebHDFS::FileUtils.copy_from_local('VERSION', 'VERSION')
- WebHDFS::FileUtils.copy_to_local('VERSION', 'VERSION2')
+ WebHDFS::FileUtils.copy_from_local('VERSION', 'VERSION', :verbose => true)
+ WebHDFS::FileUtils.copy_to_local('VERSION', 'VERSION2', :verbose => true)
+ WebHDFS::FileUtils.append('VERSION', 'foo-bar-buzz', :verbose => true)
+ WebHDFS::FileUtils.rm('VERSION', :verbose => true)
end
def test_rm

0 comments on commit d421456

Please sign in to comment.