Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

integrated signed url or query string authentication integration for …

…s3 storage
  • Loading branch information...
commit a44617803604e2509d4250f1e64cbe3397f7e275 1 parent 4763a3f
@rdulepet authored
Showing with 96 additions and 12 deletions.
  1. +11 −11 job.rb
  2. +1 −1  processing_node.rb
  3. +84 −0 s3_upload.rb
View
22 job.rb
@@ -12,7 +12,7 @@
require 'global'
require 'instrument_developer_script'
-
+require 's3_upload'
class Job
JOB_FIELDS = %w[name value kind data_set_s3_file]
@@ -28,14 +28,17 @@ class Job
attr_reader :r_script_filename, :job_id, :curr_uuid, :r_call_interface
attr_reader :job_status, :r_script_inc_filename, :doc
+ attr_reader :server_node
- def initialize(xml_response)
+ def initialize(xml_response, server_node)
#@r_call_interface = RSRuby.instance
@r_script_filename = nil
@r_script_inc_filename = nil
@job_id = 0
@curr_uuid = Global.rand_uuid
@job_status = Global::FAILED_JOB
+ @server_node = server_node
+
# log request
Global.logger.info(xml_response)
@@ -184,10 +187,7 @@ def store_results_and_logs
# first store log
begin
puts "LOG_FILE = logs/job_#{normalized_get_id}/job.log"
- Global.s3if.put(Global::MAIN_BUCKET,
- "logs/job_#{normalized_get_id}/job.log",
- File.open("#{Global.results_dir}/#{@curr_uuid}/job.log"),
- Global::S3_OPTIONS)
+ upload_results_to_s3(@server_node, @job_id, "logs", "job.log", "#{Global.results_dir}/#{@curr_uuid}/job.log")
rescue => err_store_log
Global.logger.info('probably no error log generated, happens for successful jobs that have no output or error')
end
@@ -207,11 +207,11 @@ def store_results_and_logs
File.extname(file) == '.pdf')}.each{|name|
name = name.split("/").last
puts "RESULTS_FILE = #{Global.results_dir}/#{@curr_uuid}/#{name}"
- Global.s3if.put(Global::MAIN_BUCKET,
- "results/job_#{normalized_get_id}/#{name}",
- File.open("#{Global.results_dir}/#{@curr_uuid}/#{name}"),
- Global::S3_OPTIONS)
+ upload_results_to_s3(@server_node,
+ @job_id,
+ "results",
+ name,
+ "#{Global.results_dir}/#{@curr_uuid}/#{name}")
}
-
end
end
View
2  processing_node.rb
@@ -86,7 +86,7 @@ def fetch_next_job
# issue command to fetch next job
begin
xml_response = @site['jobs_queues/run_next_job'].put '', {:content_length => '0', :content_type => 'text/xml'}
- job = Job.new(xml_response)
+ job = Job.new(xml_response, @server_node)
return job
rescue Exception => exception_not_found
return_status = exception_not_found.to_s
View
84 s3_upload.rb
@@ -0,0 +1,84 @@
+require 'rubygems'
+require 'xmlsimple'
+require 'cgi'
+require 'net/https'
+
+def upload_results_to_s3 (server_name, job_id, type, fname, fpath_name)
+ s3_data = ''
+ response = Net::HTTP.get(URI.parse("http://#{server_name}/jobs/#{job_id}/uploadurls.xml?upload_type=#{type}&files=#{fname}"))
+
+ s3_data = XmlSimple.xml_in(response)
+ # puts s3_data.inspect
+
+ #this is just so we can parse it fast
+ host = s3_data["files"].first[fname].first['host'].first
+ port = s3_data["files"].first[fname].first['port'].first
+ header = s3_data["files"].first[fname].first['header'].first
+ ssl = s3_data["files"].first[fname].first['ssl'].first['content']
+ path = s3_data["files"].first[fname].first['path'].first
+
+ File.open(fpath_name, 'rb') do |up_file|
+ send_request('Put', host, port, path, header, up_file, ssl)
+ end
+
+end
+
+# Helper to generate REST requests, handle authentication and errors
+def send_request(req_type, host, port, url, fields_hash, up_file, ssl)
+ res = false
+ conn_start(host, port, ssl) do |http|
+ request = Module.module_eval("Net::HTTP::#{req_type}").new(url)
+ fields_hash.each_pair{|n,v| request[n] = v} # Add user defined header fields
+ request.content_length = up_file.lstat.size
+ request.body_stream = up_file # For uploads using streaming
+ begin
+ response = http.request(request)
+
+ rescue Exception => ex
+ response = ex.to_s
+ end
+
+ # Handle auth errors and other errors!
+ if !response.is_a?(Net::HTTPOK)
+ puts 'Failed to upload!'
+ log_request(request, response)
+ else
+ res = true
+ end
+ end
+end
+
+
+# Create a connection with all the needed setting
+# And yeald to the block
+def conn_start host, port, ssl
+ con = Net::HTTP.new(host, port)
+ if ssl
+ con.use_ssl = true
+ con.verify_mode = OpenSSL::SSL::VERIFY_NONE
+ end
+ con.start
+ if block_given?
+ begin
+ return yield(con)
+ ensure
+ con.finish
+ end
+ end
+ con
+end
+
+# Display the request and response...
+def log_request(request, response)
+ puts "\nRequest Headers"
+ request.each {|head,val| puts "#{head}: #{val}"}
+ puts "\nResponse Headers"
+ response.each {|head,val| puts "#{head}: #{val}"}
+ puts "\nResponse Body"
+ puts response.body
+ puts "\nResponse Type"
+ puts "#{response.class} (#{response.code})"
+end
+
+#upload_results_to_s3("crdata.org", 184, "logs", "job.log", "job.log")
+#upload_results_to_s3("crdata.org", 184, "results", "index.html", "index.html")
Please sign in to comment.
Something went wrong with that request. Please try again.