Permalink
Browse files

Merge pull request #233 from iron-io/feature-experimental-pseudosync-…

…support

Experimental support for synchronous tasks
  • Loading branch information...
2 parents 01a0661 + 8c30b2f commit 812c886ad9d33610fcbac0710cc30dd38c20fb32 @rkononov rkononov committed Dec 21, 2017
Showing with 72 additions and 7 deletions.
  1. +11 −0 lib/iron_worker_ng/api_client.rb
  2. +61 −7 lib/iron_worker_ng/client.rb
@@ -116,6 +116,17 @@ def tasks_log(id)
end
end
+ def tasks_stdout(id)
+ check_id(id)
+ if block_given?
+ stream_get("projects/#{@project_id}/tasks/#{id}/outlog") do |chunk|
+ yield chunk
+ end
+ else
+ parse_response(get("projects/#{@project_id}/tasks/#{id}/outlog"), false)
+ end
+ end
+
def tasks_set_progress(id, options = {})
check_id(id)
parse_response(post("projects/#{@project_id}/tasks/#{id}/progress", options))
@@ -2,6 +2,7 @@
require 'base64'
require 'tmpdir'
require 'fileutils'
+require 'timeout'
require 'iron_worker_ng/api_client'
@@ -327,6 +328,20 @@ def tasks_create_legacy(code_name, params = {}, options = {})
OpenStruct.new(t)
end
+ def tasks_run(code_name, params = {}, options = {})
+ options['sync'] = true
+ IronCore::Logger.debug 'IronWorkerNG', "Calling tasks.run with code_name='#{code_name}', params='#{params.to_s}' and options='#{options.to_s}'"
+
+ res = @api.tasks_create(code_name, params.is_a?(String) ? params : params.to_json, options)
+
+ t = res['tasks'][0]
+ task_id = t['id']
+
+ tasks_wait_for(task_id)
+
+ tasks_wait_for_stdout(task_id)
+ end
+
def tasks_cancel(task_id)
IronCore::Logger.debug 'IronWorkerNG', "Calling tasks.cancel with task_id='#{task_id}'"
@@ -353,6 +368,16 @@ def tasks_log(task_id)
end
end
+ def tasks_stdout(task_id)
+ IronCore::Logger.debug 'IronWorkerNG', "Calling tasks.stdout with task_id='#{task_id}'"
+
+ if block_given?
+ @api.tasks_stdout(task_id) {|chunk| yield(chunk)}
+ else
+ @api.tasks_stdout(task_id)
+ end
+ end
+
def tasks_set_progress(task_id, options = {})
IronCore::Logger.debug 'IronWorkerNG', "Calling tasks.set_progress with task_id='#{task_id}' and options='#{options.to_s}'"
@@ -364,19 +389,40 @@ def tasks_set_progress(task_id, options = {})
def tasks_wait_for(task_id, options = {}, &block)
IronCore::Logger.debug 'IronWorkerNG', "Calling tasks.wait_for with task_id='#{task_id}' and options='#{options.to_s}'"
- options[:sleep] ||= options['sleep'] || 5
+ options[:sleep] ||= options['sleep'] || 0.5
task = tasks_get(task_id)
-
- while task.status == 'queued' || task.status == 'running'
- block.call(task) unless block.nil?
- sleep options[:sleep]
- task = tasks_get(task_id)
+ Timeout::timeout(task.timeout + 15) do
+ while task.status == 'queued' || task.status == 'preparing' || task.status == 'running'
+ block.call(task) unless block.nil?
+ sleep options[:sleep]
+ options[:sleep] = sleep_between_retries options[:sleep]
+ task = tasks_get(task_id)
+ end
end
-
task
end
+ def tasks_wait_for_stdout(task_id, options = {}, &block)
+ IronCore::Logger.debug 'IronWorkerNG', "Calling tasks.wait_for_stdout with task_id='#{task_id}' and options='#{options.to_s}'"
+
+ options[:sleep] ||= options['sleep'] || 0.5
+
+ Timeout::timeout(60) do
+ while true
+ begin
+ stdout = tasks_stdout(task_id)
+ block.call(stdout) unless block.nil?
+ return stdout
+ rescue Rest::HttpError => e
+ raise e if e.code != 404
+ end
+ sleep options[:sleep]
+ options[:sleep] = sleep_between_retries options[:sleep]
+ end
+ end
+ end
+
def tasks_retry(task_id, options = {})
IronCore::Logger.debug 'IronWorkerNG', "Calling tasks.retry with task_id='#{task_id}' and options='#{options.to_s}'"
@@ -525,5 +571,13 @@ def clusters_list_base(options = {})
res = @api.clusters_list(options)
(res['clusters'] || []).map { |s| OpenStruct.new(s.merge('_id' => s['id'])) }
end
+
+ # sleep between retries
+ def sleep_between_retries(previous_duration)
+ if previous_duration >= 60
+ return previous_duration
+ end
+ return previous_duration * 2
+ end
end
end

0 comments on commit 812c886

Please sign in to comment.