Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
Add support for multipart uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
omarkhan authored and jubos committed Mar 15, 2015
1 parent 5229cc0 commit 4034897
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 60 deletions.
84 changes: 61 additions & 23 deletions lib/fakes3/file_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def get_object(bucket,object_name, request)
def object_metadata(bucket,object)
end

def copy_object(src_bucket_name,src_name,dst_bucket_name,dst_name,request)
def copy_object(src_bucket_name, src_name, dst_bucket_name, dst_name, request)
src_root = File.join(@root,src_bucket_name,src_name,SHUCK_METADATA_DIR)
src_metadata_filename = File.join(src_root,"metadata")
src_metadata = YAML.load(File.open(src_metadata_filename,'rb').read)
Expand Down Expand Up @@ -140,8 +140,8 @@ def copy_object(src_bucket_name,src_name,dst_bucket_name,dst_name,request)
end
end

src_bucket = self.get_bucket(src_bucket_name)
dst_bucket = self.get_bucket(dst_bucket_name)
src_bucket = get_bucket(src_bucket_name) || create_bucket(src_bucket_name)
dst_bucket = get_bucket(dst_bucket_name) || create_bucket(dst_bucket_name)

obj = S3Object.new
obj.name = dst_name
Expand All @@ -155,37 +155,44 @@ def copy_object(src_bucket_name,src_name,dst_bucket_name,dst_name,request)
return obj
end

def store_object(bucket,object_name,request)
def store_object(bucket, object_name, request)
filedata = ""

# TODO put a tmpfile here first and mv it over at the end
content_type = request.content_type || ""

match = content_type.match(/^multipart\/form-data; boundary=(.+)/)
boundary = match[1] if match
if boundary
boundary = WEBrick::HTTPUtils::dequote(boundary)
form_data = WEBrick::HTTPUtils::parse_form_data(request.body, boundary)

if form_data['file'] == nil or form_data['file'] == ""
raise WEBrick::HTTPStatus::BadRequest
end

filedata = form_data['file']
else
request.body { |chunk| filedata << chunk }
end

do_store_object(bucket, object_name, filedata, request)
end

def do_store_object(bucket, object_name, filedata, request)
begin
filename = File.join(@root,bucket.name,object_name)
FileUtils.mkdir_p(filename)

metadata_dir = File.join(filename,SHUCK_METADATA_DIR)
FileUtils.mkdir_p(metadata_dir)

content = File.join(filename,SHUCK_METADATA_DIR,"content")
content = File.join(filename,SHUCK_METADATA_DIR,"content")
metadata = File.join(filename,SHUCK_METADATA_DIR,"metadata")

# TODO put a tmpfile here first and mv it over at the end
File.open(content,'wb') { |f| f << filedata }

match=request.content_type.match(/^multipart\/form-data; boundary=(.+)/)
boundary = match[1] if match
if boundary
boundary = WEBrick::HTTPUtils::dequote(boundary)
filedata = WEBrick::HTTPUtils::parse_form_data(request.body, boundary)
raise HTTPStatus::BadRequest if filedata['file'].empty?
File.open(content, 'wb') do |f|
f << filedata['file']
end
else
File.open(content,'wb') do |f|
request.body do |chunk|
f << chunk
end
end
end
metadata_struct = create_metadata(content,request)

File.open(metadata,'w') do |f|
f << YAML::dump(metadata_struct)
end
Expand All @@ -206,6 +213,36 @@ def store_object(bucket,object_name,request)
end
end

def combine_object_parts(bucket, upload_id, object_name, parts, request)
upload_path = File.join(@root, bucket.name)
base_path = File.join(upload_path, "#{upload_id}_#{object_name}")

complete_file = ""
chunk = ""
part_paths = []

parts.sort_by { |part| part[:number] }.each do |part|
part_path = "#{base_path}_part#{part[:number]}"
content_path = File.join(part_path, SHUCK_METADATA_DIR, 'content')

File.open(content_path, 'rb') { |f| chunk = f.read }
etag = Digest::MD5.hexdigest(chunk)

raise new Error "invalid file chunk" unless part[:etag] == etag

This comment has been minimized.

Copy link
@antono

antono Dec 24, 2015

new Error? Do you think its ruby?

complete_file << chunk
part_paths << part_path
end

object = do_store_object(bucket, object_name, complete_file, request)

# clean up parts
part_paths.each do |path|
FileUtils.remove_dir(path)
end

object
end

def delete_object(bucket,object_name,request)
begin
filename = File.join(@root,bucket.name,object_name)
Expand All @@ -219,6 +256,7 @@ def delete_object(bucket,object_name,request)
end
end

# TODO: abstract getting meta data from request.
def create_metadata(content,request)
metadata = {}
metadata[:md5] = Digest::MD5.file(content).hexdigest
Expand Down
175 changes: 138 additions & 37 deletions lib/fakes3/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
require 'webrick'
require 'webrick/https'
require 'openssl'
require 'securerandom'
require 'cgi'
require 'fakes3/file_store'
require 'fakes3/xml_adapter'
require 'fakes3/bucket_query'
Expand Down Expand Up @@ -109,7 +111,7 @@ def do_GET(request, response)
if time >= Time.iso8601(real_obj.modified_date)
response.status = 304
return
end
end
end

response.status = 200
Expand Down Expand Up @@ -160,6 +162,9 @@ def do_GET(request, response)

def do_PUT(request,response)
s_req = normalize_request(request)
query = CGI::parse(request.request_uri.query || "")

return do_multipartPUT(request, response) if query['uploadId'].first

response.status = 200
response.body = ""
Expand All @@ -184,44 +189,114 @@ def do_PUT(request,response)
end
end

def do_POST(request,response)
# check that we've received file data
unless request.content_type =~ /^multipart\/form-data; boundary=(.+)/
raise WEBrick::HTTPStatus::BadRequest
def do_multipartPUT(request, response)
s_req = normalize_request(request)
query = CGI::parse(request.request_uri.query)

part_number = query['partNumber'].first
upload_id = query['uploadId'].first
part_name = "#{upload_id}_#{s_req.object}_part#{part_number}"

# store the part
if s_req.type == Request::COPY
real_obj = @store.copy_object(
s_req.src_bucket, s_req.src_object,
s_req.bucket , part_name,
request
)

response['Content-Type'] = "text/xml"
response.body = XmlAdapter.copy_object_result real_obj
else
bucket_obj = @store.get_bucket(s_req.bucket)
if !bucket_obj
bucket_obj = @store.create_bucket(s_req.bucket)
end
real_obj = @store.store_object(
bucket_obj, part_name,
request
)

response.body = ""
response.header['ETag'] = "\"#{real_obj.md5}\""
end

response['Access-Control-Allow-Origin'] = '*'
response['Access-Control-Allow-Headers'] = 'Authorization, Content-Length'
response['Access-Control-Expose-Headers'] = 'ETag'

response.status = 200
end

def do_POST(request,response)
s_req = normalize_request(request)
key=request.query['key']
success_action_redirect=request.query['success_action_redirect']
success_action_status=request.query['success_action_status']
key = request.query['key']
query = CGI::parse(request.request_uri.query || "")

if query.has_key?('uploads')
upload_id = SecureRandom.hex

response.body = <<-eos.strip
<?xml version="1.0" encoding="UTF-8"?>
<InitiateMultipartUploadResult>
<Bucket>#{ s_req.bucket }</Bucket>
<Key>#{ key }</Key>
<UploadId>#{ upload_id }</UploadId>
</InitiateMultipartUploadResult>
eos
elsif query.has_key?('uploadId')
upload_id = query['uploadId'].first
bucket_obj = @store.get_bucket(s_req.bucket)
real_obj = @store.combine_object_parts(
bucket_obj,
upload_id,
s_req.object,
parse_complete_multipart_upload(request),
request
)

filename = 'default'
filename = $1 if request.body =~ /filename="(.*)"/
key=key.gsub('${filename}', filename)
response.body = XmlAdapter.complete_multipart_result real_obj
elsif request.content_type =~ /^multipart\/form-data; boundary=(.+)/
key=request.query['key']

bucket_obj = @store.get_bucket(s_req.bucket) || @store.create_bucket(s_req.bucket)
real_obj=@store.store_object(bucket_obj, key, s_req.webrick_request)
success_action_redirect = request.query['success_action_redirect']
success_action_status = request.query['success_action_status']

response['Etag'] = "\"#{real_obj.md5}\""
response.body = ""
if success_action_redirect
response.status = 307
response['Location']=success_action_redirect
else
response.status = success_action_status || 204
if response.status=="201"
response.body= <<-eos.strip
<?xml version="1.0" encoding="UTF-8"?>
<PostResponse>
<Location>http://#{s_req.bucket}.localhost:#{@port}/#{key}</Location>
<Bucket>#{s_req.bucket}</Bucket>
<Key>#{key}</Key>
<ETag>#{response['Etag']}</ETag>
</PostResponse>
eos
filename = 'default'
filename = $1 if request.body =~ /filename="(.*)"/
key = key.gsub('${filename}', filename)

bucket_obj = @store.get_bucket(s_req.bucket) || @store.create_bucket(s_req.bucket)
real_obj = @store.store_object(bucket_obj, key, s_req.webrick_request)

response['Etag'] = "\"#{real_obj.md5}\""

if success_action_redirect
response.status = 307
response.body = ""
response['Location'] = success_action_redirect
else
response.status = success_action_status || 204
if response.status == "201"
response.body = <<-eos.strip
<?xml version="1.0" encoding="UTF-8"?>
<PostResponse>
<Location>http://#{s_req.bucket}.localhost:#{@port}/#{key}</Location>
<Bucket>#{s_req.bucket}</Bucket>
<Key>#{key}</Key>
<ETag>#{response['Etag']}</ETag>
</PostResponse>
eos
end
end
else
raise WEBrick::HTTPStatus::BadRequest
end
response['Content-Type'] = 'text/xml'
response['Access-Control-Allow-Origin'] = '*'

response['Content-Type'] = 'text/xml'
response['Access-Control-Allow-Origin'] = '*'
response['Access-Control-Allow-Headers'] = 'Authorization, Content-Length'
response['Access-Control-Expose-Headers'] = 'ETag'
end

def do_DELETE(request,response)
Expand All @@ -241,10 +316,11 @@ def do_DELETE(request,response)

def do_OPTIONS(request, response)
super
response["Access-Control-Allow-Origin"] = "*"
response["Access-Control-Allow-Methods"] = "HEAD, GET, PUT, POST"
response["Access-Control-Allow-Headers"] = "accept, content-type"
response["Access-Control-Expose-Headers"] = "ETag, x-amz-meta-custom-header"

response['Access-Control-Allow-Origin'] = '*'
response['Access-Control-Allow-Methods'] = 'PUT, POST, HEAD, GET, OPTIONS'
response['Access-Control-Allow-Headers'] = 'Accept, Content-Type, Authorization, Content-Length, ETag'
response['Access-Control-Expose-Headers'] = 'ETag'
end

private
Expand Down Expand Up @@ -336,9 +412,11 @@ def normalize_put(webrick_req,s_req)
end
end

# TODO: also parse the x-amz-copy-source-range:bytes=first-last header
# for multipart copy
copy_source = webrick_req.header["x-amz-copy-source"]
if copy_source and copy_source.size == 1
src_elems = copy_source.first.split("/")
src_elems = copy_source.first.split("/")
root_offset = src_elems[0] == "" ? 1 : 0
s_req.src_bucket = src_elems[root_offset]
s_req.src_object = src_elems[1 + root_offset,src_elems.size].join("/")
Expand All @@ -355,6 +433,14 @@ def normalize_post(webrick_req,s_req)
s_req.path = webrick_req.query['key']

s_req.webrick_request = webrick_req

if s_req.is_path_style
elems = path[1,path_len].split("/")
s_req.bucket = elems[0]
s_req.object = elems[1..-1].join('/') if elems.size >= 2
else
s_req.object = path[1..-1]
end
end

# This method takes a webrick request and generates a normalized FakeS3 request
Expand Down Expand Up @@ -391,6 +477,21 @@ def normalize_request(webrick_req)
return s_req
end

def parse_complete_multipart_upload request
parts_xml = ""
request.body { |chunk| parts_xml << chunk }

# TODO: I suck at parsing xml
parts_xml = parts_xml.scan /\<Part\>.*?<\/Part\>/m

parts_xml.collect do |xml|
{
number: xml[/\<PartNumber\>(\d+)\<\/PartNumber\>/, 1].to_i,
etag: xml[/\<ETag\>\"(.+)\"\<\/ETag\>/, 1]
}
end
end

def dump_request(request)
puts "----------Dump Request-------------"
puts request.request_method
Expand Down
19 changes: 19 additions & 0 deletions lib/fakes3/xml_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -199,5 +199,24 @@ def self.copy_object_result(object)
}
output
end

# <CompleteMultipartUploadResult>
# <Location>http://Example-Bucket.s3.amazonaws.com/Example-Object</Location>
# <Bucket>Example-Bucket</Bucket>
# <Key>Example-Object</Key>
# <ETag>"3858f62230ac3c915f300c664312c11f-9"</ETag>
# </CompleteMultipartUploadResult>
def self.complete_multipart_result(object)
output = ""
xml = Builder::XmlMarkup.new(:target => output)
xml.instruct! :xml, :version=>"1.0", :encoding=>"UTF-8"
xml.CompleteMultipartUploadResult { |result|
result.Location("TODO: implement")
result.Bucket("TODO: implement")
result.Key(object.name)
result.ETag("\"#{object.md5}\"")
}
output
end
end
end
Loading

0 comments on commit 4034897

Please sign in to comment.