From 40348971b642e92a92e89c4846201acfd7cccca3 Mon Sep 17 00:00:00 2001 From: Omar Khan Date: Tue, 10 Mar 2015 11:34:44 +0700 Subject: [PATCH] Add support for multipart uploads --- lib/fakes3/file_store.rb | 84 +++++++++++----- lib/fakes3/server.rb | 175 +++++++++++++++++++++++++++------- lib/fakes3/xml_adapter.rb | 19 ++++ test/aws_sdk_commands_test.rb | 8 ++ 4 files changed, 226 insertions(+), 60 deletions(-) diff --git a/lib/fakes3/file_store.rb b/lib/fakes3/file_store.rb index 94bd2ac4..3f2d4111 100644 --- a/lib/fakes3/file_store.rb +++ b/lib/fakes3/file_store.rb @@ -103,7 +103,7 @@ def get_object(bucket,object_name, request) def object_metadata(bucket,object) end - def copy_object(src_bucket_name,src_name,dst_bucket_name,dst_name,request) + def copy_object(src_bucket_name, src_name, dst_bucket_name, dst_name, request) src_root = File.join(@root,src_bucket_name,src_name,SHUCK_METADATA_DIR) src_metadata_filename = File.join(src_root,"metadata") src_metadata = YAML.load(File.open(src_metadata_filename,'rb').read) @@ -140,8 +140,8 @@ def copy_object(src_bucket_name,src_name,dst_bucket_name,dst_name,request) end end - src_bucket = self.get_bucket(src_bucket_name) - dst_bucket = self.get_bucket(dst_bucket_name) + src_bucket = get_bucket(src_bucket_name) || create_bucket(src_bucket_name) + dst_bucket = get_bucket(dst_bucket_name) || create_bucket(dst_bucket_name) obj = S3Object.new obj.name = dst_name @@ -155,7 +155,31 @@ def copy_object(src_bucket_name,src_name,dst_bucket_name,dst_name,request) return obj end - def store_object(bucket,object_name,request) + def store_object(bucket, object_name, request) + filedata = "" + + # TODO put a tmpfile here first and mv it over at the end + content_type = request.content_type || "" + + match = content_type.match(/^multipart\/form-data; boundary=(.+)/) + boundary = match[1] if match + if boundary + boundary = WEBrick::HTTPUtils::dequote(boundary) + form_data = WEBrick::HTTPUtils::parse_form_data(request.body, boundary) + + if form_data['file'] == nil or form_data['file'] == "" + raise WEBrick::HTTPStatus::BadRequest + end + + filedata = form_data['file'] + else + request.body { |chunk| filedata << chunk } + end + + do_store_object(bucket, object_name, filedata, request) + end + + def do_store_object(bucket, object_name, filedata, request) begin filename = File.join(@root,bucket.name,object_name) FileUtils.mkdir_p(filename) @@ -163,29 +187,12 @@ def store_object(bucket,object_name,request) metadata_dir = File.join(filename,SHUCK_METADATA_DIR) FileUtils.mkdir_p(metadata_dir) - content = File.join(filename,SHUCK_METADATA_DIR,"content") + content = File.join(filename,SHUCK_METADATA_DIR,"content") metadata = File.join(filename,SHUCK_METADATA_DIR,"metadata") - # TODO put a tmpfile here first and mv it over at the end + File.open(content,'wb') { |f| f << filedata } - match=request.content_type.match(/^multipart\/form-data; boundary=(.+)/) - boundary = match[1] if match - if boundary - boundary = WEBrick::HTTPUtils::dequote(boundary) - filedata = WEBrick::HTTPUtils::parse_form_data(request.body, boundary) - raise HTTPStatus::BadRequest if filedata['file'].empty? - File.open(content, 'wb') do |f| - f << filedata['file'] - end - else - File.open(content,'wb') do |f| - request.body do |chunk| - f << chunk - end - end - end metadata_struct = create_metadata(content,request) - File.open(metadata,'w') do |f| f << YAML::dump(metadata_struct) end @@ -206,6 +213,36 @@ def store_object(bucket,object_name,request) end end + def combine_object_parts(bucket, upload_id, object_name, parts, request) + upload_path = File.join(@root, bucket.name) + base_path = File.join(upload_path, "#{upload_id}_#{object_name}") + + complete_file = "" + chunk = "" + part_paths = [] + + parts.sort_by { |part| part[:number] }.each do |part| + part_path = "#{base_path}_part#{part[:number]}" + content_path = File.join(part_path, SHUCK_METADATA_DIR, 'content') + + File.open(content_path, 'rb') { |f| chunk = f.read } + etag = Digest::MD5.hexdigest(chunk) + + raise new Error "invalid file chunk" unless part[:etag] == etag + complete_file << chunk + part_paths << part_path + end + + object = do_store_object(bucket, object_name, complete_file, request) + + # clean up parts + part_paths.each do |path| + FileUtils.remove_dir(path) + end + + object + end + def delete_object(bucket,object_name,request) begin filename = File.join(@root,bucket.name,object_name) @@ -219,6 +256,7 @@ def delete_object(bucket,object_name,request) end end + # TODO: abstract getting meta data from request. def create_metadata(content,request) metadata = {} metadata[:md5] = Digest::MD5.file(content).hexdigest diff --git a/lib/fakes3/server.rb b/lib/fakes3/server.rb index ac4971a1..00affde9 100644 --- a/lib/fakes3/server.rb +++ b/lib/fakes3/server.rb @@ -2,6 +2,8 @@ require 'webrick' require 'webrick/https' require 'openssl' +require 'securerandom' +require 'cgi' require 'fakes3/file_store' require 'fakes3/xml_adapter' require 'fakes3/bucket_query' @@ -109,7 +111,7 @@ def do_GET(request, response) if time >= Time.iso8601(real_obj.modified_date) response.status = 304 return - end + end end response.status = 200 @@ -160,6 +162,9 @@ def do_GET(request, response) def do_PUT(request,response) s_req = normalize_request(request) + query = CGI::parse(request.request_uri.query || "") + + return do_multipartPUT(request, response) if query['uploadId'].first response.status = 200 response.body = "" @@ -184,44 +189,114 @@ def do_PUT(request,response) end end - def do_POST(request,response) - # check that we've received file data - unless request.content_type =~ /^multipart\/form-data; boundary=(.+)/ - raise WEBrick::HTTPStatus::BadRequest + def do_multipartPUT(request, response) + s_req = normalize_request(request) + query = CGI::parse(request.request_uri.query) + + part_number = query['partNumber'].first + upload_id = query['uploadId'].first + part_name = "#{upload_id}_#{s_req.object}_part#{part_number}" + + # store the part + if s_req.type == Request::COPY + real_obj = @store.copy_object( + s_req.src_bucket, s_req.src_object, + s_req.bucket , part_name, + request + ) + + response['Content-Type'] = "text/xml" + response.body = XmlAdapter.copy_object_result real_obj + else + bucket_obj = @store.get_bucket(s_req.bucket) + if !bucket_obj + bucket_obj = @store.create_bucket(s_req.bucket) + end + real_obj = @store.store_object( + bucket_obj, part_name, + request + ) + + response.body = "" + response.header['ETag'] = "\"#{real_obj.md5}\"" end + + response['Access-Control-Allow-Origin'] = '*' + response['Access-Control-Allow-Headers'] = 'Authorization, Content-Length' + response['Access-Control-Expose-Headers'] = 'ETag' + + response.status = 200 + end + + def do_POST(request,response) s_req = normalize_request(request) - key=request.query['key'] - success_action_redirect=request.query['success_action_redirect'] - success_action_status=request.query['success_action_status'] + key = request.query['key'] + query = CGI::parse(request.request_uri.query || "") + + if query.has_key?('uploads') + upload_id = SecureRandom.hex + + response.body = <<-eos.strip + + + #{ s_req.bucket } + #{ key } + #{ upload_id } + + eos + elsif query.has_key?('uploadId') + upload_id = query['uploadId'].first + bucket_obj = @store.get_bucket(s_req.bucket) + real_obj = @store.combine_object_parts( + bucket_obj, + upload_id, + s_req.object, + parse_complete_multipart_upload(request), + request + ) - filename = 'default' - filename = $1 if request.body =~ /filename="(.*)"/ - key=key.gsub('${filename}', filename) + response.body = XmlAdapter.complete_multipart_result real_obj + elsif request.content_type =~ /^multipart\/form-data; boundary=(.+)/ + key=request.query['key'] - bucket_obj = @store.get_bucket(s_req.bucket) || @store.create_bucket(s_req.bucket) - real_obj=@store.store_object(bucket_obj, key, s_req.webrick_request) + success_action_redirect = request.query['success_action_redirect'] + success_action_status = request.query['success_action_status'] - response['Etag'] = "\"#{real_obj.md5}\"" - response.body = "" - if success_action_redirect - response.status = 307 - response['Location']=success_action_redirect - else - response.status = success_action_status || 204 - if response.status=="201" - response.body= <<-eos.strip - - - http://#{s_req.bucket}.localhost:#{@port}/#{key} - #{s_req.bucket} - #{key} - #{response['Etag']} - - eos + filename = 'default' + filename = $1 if request.body =~ /filename="(.*)"/ + key = key.gsub('${filename}', filename) + + bucket_obj = @store.get_bucket(s_req.bucket) || @store.create_bucket(s_req.bucket) + real_obj = @store.store_object(bucket_obj, key, s_req.webrick_request) + + response['Etag'] = "\"#{real_obj.md5}\"" + + if success_action_redirect + response.status = 307 + response.body = "" + response['Location'] = success_action_redirect + else + response.status = success_action_status || 204 + if response.status == "201" + response.body = <<-eos.strip + + + http://#{s_req.bucket}.localhost:#{@port}/#{key} + #{s_req.bucket} + #{key} + #{response['Etag']} + + eos + end end + else + raise WEBrick::HTTPStatus::BadRequest end - response['Content-Type'] = 'text/xml' - response['Access-Control-Allow-Origin'] = '*' + + response['Content-Type'] = 'text/xml' + response['Access-Control-Allow-Origin'] = '*' + response['Access-Control-Allow-Headers'] = 'Authorization, Content-Length' + response['Access-Control-Expose-Headers'] = 'ETag' end def do_DELETE(request,response) @@ -241,10 +316,11 @@ def do_DELETE(request,response) def do_OPTIONS(request, response) super - response["Access-Control-Allow-Origin"] = "*" - response["Access-Control-Allow-Methods"] = "HEAD, GET, PUT, POST" - response["Access-Control-Allow-Headers"] = "accept, content-type" - response["Access-Control-Expose-Headers"] = "ETag, x-amz-meta-custom-header" + + response['Access-Control-Allow-Origin'] = '*' + response['Access-Control-Allow-Methods'] = 'PUT, POST, HEAD, GET, OPTIONS' + response['Access-Control-Allow-Headers'] = 'Accept, Content-Type, Authorization, Content-Length, ETag' + response['Access-Control-Expose-Headers'] = 'ETag' end private @@ -336,9 +412,11 @@ def normalize_put(webrick_req,s_req) end end + # TODO: also parse the x-amz-copy-source-range:bytes=first-last header + # for multipart copy copy_source = webrick_req.header["x-amz-copy-source"] if copy_source and copy_source.size == 1 - src_elems = copy_source.first.split("/") + src_elems = copy_source.first.split("/") root_offset = src_elems[0] == "" ? 1 : 0 s_req.src_bucket = src_elems[root_offset] s_req.src_object = src_elems[1 + root_offset,src_elems.size].join("/") @@ -355,6 +433,14 @@ def normalize_post(webrick_req,s_req) s_req.path = webrick_req.query['key'] s_req.webrick_request = webrick_req + + if s_req.is_path_style + elems = path[1,path_len].split("/") + s_req.bucket = elems[0] + s_req.object = elems[1..-1].join('/') if elems.size >= 2 + else + s_req.object = path[1..-1] + end end # This method takes a webrick request and generates a normalized FakeS3 request @@ -391,6 +477,21 @@ def normalize_request(webrick_req) return s_req end + def parse_complete_multipart_upload request + parts_xml = "" + request.body { |chunk| parts_xml << chunk } + + # TODO: I suck at parsing xml + parts_xml = parts_xml.scan /\.*?<\/Part\>/m + + parts_xml.collect do |xml| + { + number: xml[/\(\d+)\<\/PartNumber\>/, 1].to_i, + etag: xml[/\\"(.+)\"\<\/ETag\>/, 1] + } + end + end + def dump_request(request) puts "----------Dump Request-------------" puts request.request_method diff --git a/lib/fakes3/xml_adapter.rb b/lib/fakes3/xml_adapter.rb index 1b22ce3e..f575df02 100644 --- a/lib/fakes3/xml_adapter.rb +++ b/lib/fakes3/xml_adapter.rb @@ -199,5 +199,24 @@ def self.copy_object_result(object) } output end + + # + # http://Example-Bucket.s3.amazonaws.com/Example-Object + # Example-Bucket + # Example-Object + # "3858f62230ac3c915f300c664312c11f-9" + # + def self.complete_multipart_result(object) + output = "" + xml = Builder::XmlMarkup.new(:target => output) + xml.instruct! :xml, :version=>"1.0", :encoding=>"UTF-8" + xml.CompleteMultipartUploadResult { |result| + result.Location("TODO: implement") + result.Bucket("TODO: implement") + result.Key(object.name) + result.ETag("\"#{object.md5}\"") + } + output + end end end diff --git a/test/aws_sdk_commands_test.rb b/test/aws_sdk_commands_test.rb index c885b2ca..a08a7992 100644 --- a/test/aws_sdk_commands_test.rb +++ b/test/aws_sdk_commands_test.rb @@ -20,4 +20,12 @@ def test_copy_to assert_equal 2, bucket.objects.count end + + def test_multipart_upload + bucket = @s3.buckets["test_multipart_upload"] + object = bucket.objects["key1"] + object.write("thisisaverybigfile", :multipart_threshold => 5) + assert object.exists? + assert_equal "thisisaverybigfile", object.read + end end