Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

fixed SEGMENTATION issue by discarding rsruby and using command line …

…execution, works very robust so far. also added more error handling coditions when bad parameters etc.
  • Loading branch information...
commit 38c31ad9ee595b37e863b7a3423179e474894d28 1 parent e7e163f
@rdulepet authored
Showing with 89 additions and 39 deletions.
  1. +2 −2 instrument_developer_script.rb
  2. +79 −34 job.rb
  3. +8 −3 processing_node.rb
View
4 instrument_developer_script.rb
@@ -55,7 +55,7 @@ def self.instrument_code orig_r_script
if amatch
arr_instrumented[arr_instrumented.length] = "library(\"R2HTML\")\ncrdata_job_log <- file(\"job.log\", open=\"wt\")\nsink(crdata_job_log)\ncrdata_target <- HTMLInitFile(getwd(), filename=\"index\")\n"
elsif amatch = /#{CRDATA_FOOTER}/.match(line)
- arr_instrumented[arr_instrumented.length] = "HTMLEndFile()\nsink()\n"
+ arr_instrumented[arr_instrumented.length] = "HTMLEndFile()\nsink()\nclose(crdata_job_log)\n"
elsif
=end
if amatch = /#{CRDATA_SECTION}/.match(line)
@@ -106,7 +106,7 @@ def self.instrument_code orig_r_script
end
# automatically take care of FOOTER mandatory tag
- arr_instrumented[arr_instrumented.length] = "\n}, interrupt = function(ex) {\nprint (\"got exception: Failed Job\");\n returnstatus=\"FAILED JOB, PLEASE CHECK LOG\"; \nHTML(returnstatus, file=crdata_target);\nprint(ex);\n}, error = function(ex) {\nprint (\"got error: Failed Job\");\n returnstatus=\"FAILED JOB, PLEASE CHECK LOG\"; \nHTML(returnstatus, file=crdata_target);\nprint(ex);\n}, finally = {\nHTMLEndFile()\nsink()\n\n})\n"
+ arr_instrumented[arr_instrumented.length] = "\n}, interrupt = function(ex) {\nprint (\"got exception: Failed Job\");\n returnstatus=\"FAILED JOB, PLEASE CHECK LOG\"; \nHTML(returnstatus, file=crdata_target);\nprint(ex);\n}, error = function(ex) {\nprint (\"got error: Failed Job\");\n returnstatus=\"FAILED JOB, PLEASE CHECK LOG\"; \nHTML(returnstatus, file=crdata_target);\nprint(ex);\n}, finally = {\nprint(\"JOB ENDED\");\nHTMLEndFile()\nsink()\nclose(crdata_job_log)\n\n})\n"
# now write instrumented array into the original R script
r_script_file_handle = File.open(orig_r_script, aModeString="w")
View
113 job.rb
@@ -7,7 +7,7 @@
require 'right_aws'
require 'fileutils'
require 'rake'
-require 'rsruby'
+#require 'rsruby'
require 'cgi'
require 'global'
@@ -26,14 +26,16 @@ class Job
VALUE_ENUMERATION = "Enumeration"
VALUE_STRING = "String"
- attr_reader :r_script_filename, :job_id, :curr_uuid, :r_call_interface, :job_status
+ attr_reader :r_script_filename, :job_id, :curr_uuid, :r_call_interface
+ attr_reader :job_status, :r_script_inc_filename, :doc
def initialize(xml_response)
- @r_call_interface = RSRuby.instance
+ #@r_call_interface = RSRuby.instance
@r_script_filename = nil
+ @r_script_inc_filename = nil
@job_id = 0
@curr_uuid = Global.rand_uuid
- @job_status = Global::SUCCESSFUL_JOB
+ @job_status = Global::FAILED_JOB
# log request
Global.logger.info(xml_response)
@@ -42,61 +44,91 @@ def initialize(xml_response)
Global.create_if_missing_directory Global.results_dir + "/" + @curr_uuid
@r_script_filename = "#{Global.results_dir}/#{@curr_uuid}/#{@curr_uuid}.r"
- doc = Hpricot::XML(xml_response)
+ # this include file is used to pass variables to R
+ @r_script_inc_filename = "#{Global.results_dir}/#{@curr_uuid}/inc_#{@curr_uuid}.r"
+ @doc = Hpricot::XML(xml_response)
+ end
+ def fetch_source_code
# at the moment we extract only JOB ID and script content
# rest such as data we will look at it in later phases.
- @job_id = (doc/'job'/'id').inner_text
+ @job_id = (@doc/'job'/'id').inner_text
Global.logger.info("JOB_ID = #{@job_id}, LOCAL_DIR = #{Global.results_dir}/#{@curr_uuid}, SCRIPT_NAME = #{@r_script_filename}")
- r_script = (doc/'source-code').inner_text
+ r_script = (@doc/'source-code').inner_text
r_script_file_handle = File.open(@r_script_filename, aModeString="w")
+ # this is done to pass variables
+ r_script_file_handle.puts "source(\"#{@r_script_inc_filename}\")\n"
+ # ok write the actual script now
r_script_file_handle.puts r_script
r_script_file_handle.close
+ end
+
+ def fetch_params
# just some temporary logic/hack for data if script uses some .dat,.csv file
# this will be removed when we have data support in CRdata
#`cp /tmp/*.dat /tmp/*.csv #{Dir::pwd}/#{@curr_uuid}`
- (doc/:param).each do |param|
- job_params = {}
+ # write variables inside the include file as we are having memory issues
+ # with rsruby
+ begin
+ r_script_inc_file_handle = File.open(@r_script_inc_filename, aModeString="w")
- JOB_FIELDS.each do |el|
- job_params[el] = CGI::unescapeHTML(param.at(el).innerHTML)
- end
+ (@doc/:param).each do |param|
+ job_params = {}
- if job_params[PARAM_KIND] == VALUE_DATA_SET
- just_name = job_params[PARAM_DATA_SET].to_s.last_part
- @r_call_interface.assign(job_params[PARAM_NAME], just_name)
- Global.logger.info("R_PARAMETER::#{job_params[PARAM_NAME]} = #{just_name}")
+ JOB_FIELDS.each do |el|
+ job_params[el] = CGI::unescapeHTML(param.at(el).innerHTML)
+ end
- data_file_handle = File.new("#{Global.results_dir}/#{@curr_uuid}/#{just_name}", 'wb')
- # stream file in chunks especially makes more sense for larger files
- rhdr = Global.s3if.get(Global::MAIN_BUCKET, job_params[PARAM_DATA_SET].to_s.clean_s3_url) do |chunk|
- data_file_handle.write chunk
+ if job_params[PARAM_KIND] == VALUE_DATA_SET
+ just_name = job_params[PARAM_DATA_SET].to_s.last_part
+ #@r_call_interface.assign(job_params[PARAM_NAME], just_name)
+ r_script_inc_file_handle.puts "#{job_params[PARAM_NAME]} = \"#{just_name}\""
+ Global.logger.info("R_PARAMETER::#{job_params[PARAM_NAME]} = #{just_name}")
+
+ data_file_handle = File.new("#{Global.results_dir}/#{@curr_uuid}/#{just_name}", 'wb')
+ # stream file in chunks especially makes more sense for larger files
+ rhdr = Global.s3if.get(Global::MAIN_BUCKET, job_params[PARAM_DATA_SET].to_s.clean_s3_url) do |chunk|
+ data_file_handle.write chunk
+ end
+ data_file_handle.close
+ elsif job_params[PARAM_KIND] == VALUE_STRING
+ #@r_call_interface.assign(job_params[PARAM_NAME], job_params[PARAM_VALUE].to_s)
+ r_script_inc_file_handle.puts "#{job_params[PARAM_NAME]} = \"#{job_params[PARAM_VALUE].to_s}\""
+ Global.logger.info("R_PARAMETER::#{job_params[PARAM_NAME]} = #{job_params[PARAM_VALUE]}")
+ else
+ #@r_call_interface.assign(job_params[PARAM_NAME], job_params[PARAM_VALUE].to_f)
+ r_script_inc_file_handle.puts "#{job_params[PARAM_NAME]} = #{job_params[PARAM_VALUE].to_f}"
+ Global.logger.info("R_PARAMETER::#{job_params[PARAM_NAME]} = #{job_params[PARAM_VALUE]}")
end
- data_file_handle.close
- elsif job_params[PARAM_KIND] == VALUE_STRING
- @r_call_interface.assign(job_params[PARAM_NAME], job_params[PARAM_VALUE].to_s)
- Global.logger.info("R_PARAMETER::#{job_params[PARAM_NAME]} = #{job_params[PARAM_VALUE]}")
- else
- @r_call_interface.assign(job_params[PARAM_NAME], job_params[PARAM_VALUE].to_f)
- Global.logger.info("R_PARAMETER::#{job_params[PARAM_NAME]} = #{job_params[PARAM_VALUE]}")
end
+
+ r_script_inc_file_handle.close
+ rescue => err
+ # something wrong with params, log it and make it visible to user
+ log_file_handle = File.open("#{Global.results_dir}/#{@curr_uuid}/job.log", aModeString="w")
+ # this is done to pass variables
+ log_file_handle.puts "FAILED JOB, BAD PARAMETERS. PLEASE CHECK AGAIN."
+ log_file_handle.close
+
+ # raise again so outer loop catches the error
+ raise
end
end
def run
Global.logger.info('successfully created job and saved R file')
# this will run the R program that generates log file and results
- #system "cd #{Dir::pwd}/#{@curr_uuid}; r CMD BATCH #{@curr_uuid}.r; mv #{@curr_uuid}.r.Rout job.log; "
- @r_call_interface.setwd("#{Global.results_dir}/#{@curr_uuid}")
+ #system "cd #{Global.results_dir}/#{@curr_uuid}; r --no-save #{@curr_uuid}.r; mv #{@curr_uuid}.r.Rout job.log; "
+ #@r_call_interface.setwd("#{Global.results_dir}/#{@curr_uuid}")
# check if the R code was already instrumented by the developer
# if so then skip instrumentation and just trust it
# otherwise instrument it
- if !InstrumentDeveloperScript::checkif_already_instrumented_code "#{@curr_uuid}.r"
+ if !InstrumentDeveloperScript::checkif_already_instrumented_code "#{Global.results_dir}/#{@curr_uuid}/#{@curr_uuid}.r"
# instrument the R code before running the job to capture output
# to capture HTML output as well as log stuff
- InstrumentDeveloperScript::instrument_code "#{@curr_uuid}.r"
+ InstrumentDeveloperScript::instrument_code "#{Global.results_dir}/#{@curr_uuid}/#{@curr_uuid}.r"
end
# assume that job will be successful by default
@@ -104,7 +136,14 @@ def run
@job_status = Global::SUCCESSFUL_JOB
# run the instrumented script
- @r_call_interface.eval_R("source('#{@curr_uuid}.r')")
+ #@r_call_interface.eval_R("source('#{@curr_uuid}.r')")
+
+ # go back to root dir before starting
+ FileUtils.cd "#{Global.results_dir}/#{@curr_uuid}"
+ system "r --no-save < #{@curr_uuid}.r;"
+
+ # mark default as successful job
+ @job_status = Global::SUCCESSFUL_JOB
# fetch the r program execution status
File.open( Global::JOB_LOG ) {|io| io.grep(/#{Global::FAILED_JOB}/) { |s| @job_status = Global::FAILED_JOB }}
@@ -132,7 +171,10 @@ def store_results_and_logs
# first store log
begin
puts "LOG_FILE = logs/job_#{normalized_get_id}/job.log"
- Global.s3if.put(Global::MAIN_BUCKET, "logs/job_#{normalized_get_id}/job.log", File.open("#{Global.results_dir}/#{@curr_uuid}/job.log"), Global::S3_OPTIONS)
+ Global.s3if.put(Global::MAIN_BUCKET,
+ "logs/job_#{normalized_get_id}/job.log",
+ File.open("#{Global.results_dir}/#{@curr_uuid}/job.log"),
+ Global::S3_OPTIONS)
rescue => err_store_log
Global.logger.info('probably no error log generated, happens for successful jobs that have no output or error')
end
@@ -152,7 +194,10 @@ def store_results_and_logs
File.extname(file) == '.pdf')}.each{|name|
name = name.split("/").last
puts "RESULTS_FILE = #{Global.results_dir}/#{@curr_uuid}/#{name}"
- Global.s3if.put(Global::MAIN_BUCKET, "results/job_#{normalized_get_id}/#{name}", File.open("#{Global.results_dir}/#{@curr_uuid}/#{name}"), Global::S3_OPTIONS)
+ Global.s3if.put(Global::MAIN_BUCKET,
+ "results/job_#{normalized_get_id}/#{name}",
+ File.open("#{Global.results_dir}/#{@curr_uuid}/#{name}"),
+ Global::S3_OPTIONS)
}
end
View
11 processing_node.rb
@@ -51,7 +51,11 @@ def run
# STEP 1
job = fetch_next_job()
- # STEP 3-6
+ # STEP 3-5
+ job.fetch_source_code if !job.nil?
+ job.fetch_params if !job.nil?
+
+ # STEP 6
job.run if !job.nil?
# STEP 7
@@ -62,11 +66,12 @@ def run
rescue => err
Global.logger.fatal(err)
+ # STEP 8
+ job_completed(job) if !job.nil?
+
# STEP 7
job.store_results_and_logs if !job.nil?
- # STEP 8
- job_completed(job) if job.nil?
job = nil
end
rescue => err2
Please sign in to comment.
Something went wrong with that request. Please try again.