Permalink
Browse files

- Rewriting message_queue_adapter interface (and superclass). Makes m…

…uch more sense now.

- Broke Skynet::Message into 3 classes, TaskMessage, ResultMessage and ErrorMessage (which inherits from ResultMessage)

- MessageQueue adapters now assume if you pass :any as the task type you want, they should randomly pick from the master or task queues.

- Added new SingleProcess message queue adapter to be the reference implementation and mock what a real message queue would work like.

- Add new QuiQ queue, though I'm not using it yet.

- Rafactor TupleSpace message queue adapter to use new superclass

- Begin trying to move what was in Skynet::Task into Skynet::TaskMessage
- Skynet::TaskMessage now runs the task.

- Create a universal adapter tester. Just plug in your adapter and run the tests.
  • Loading branch information...
1 parent aee882d commit 5ad4baa7a0660c58c763d3b2e001e8cc08deac8d phlapjack committed May 28, 2008
View
4 Manifest.txt
@@ -3,6 +3,7 @@ License.txt
Manifest.txt
README.txt
Rakefile
+TODO.txt
app_generators/skynet_install/USAGE
app_generators/skynet_install/skynet_install_generator.rb
app_generators/skynet_install/templates/migration.rb
@@ -94,7 +95,7 @@ lib/skynet/mapreduce_helper.rb
lib/skynet/mapreduce_test.rb
lib/skynet/message_queue_adapters/message_queue_adapter.rb
lib/skynet/message_queue_adapters/mysql.rb
-lib/skynet/message_queue_adapters/qui_q.rb
+lib/skynet/message_queue_adapters/single_process.rb
lib/skynet/message_queue_adapters/tuple_space.rb
lib/skynet/skynet_active_record_extensions.rb
lib/skynet/skynet_config.rb
@@ -124,6 +125,7 @@ tasks/website.rake
test/test_active_record_extensions.rb
test/test_generator_helper.rb
test/test_helper.rb
+test/test_message_queue_adapter.rb
test/test_mysql_message_queue_adapter.rb
test/test_skynet.rb
test/test_skynet_install_generator.rb
View
97 TODO.txt
@@ -0,0 +1,97 @@
+Allow map_reduce class to include something or inherit from something and just say
+
+ map do |map_data|
+ end
+
+ job_error do ||
+ end
+
+ task_error do ||
+ end
+
+Let jobs have an on_error method (on_map_error, on_reduce_error), which gets called and passed a message on error
+
+App throws Skynet::Task:Error.
+Maybe map and reduce takes an OPTIONAL second argument which is the message or some response object. So they can return
+the data OR use the response object. Or at least modify the response object or something. Putting response codes etc...
+
+Instead of a skynet runner:
+ Install a conf/skynet.rb
+ If they run skynet where there's a conf/skynet.rb then it just uses requires that. No need for a runner
+ Even if they run it locally, there's a conf skynet
+ Allow a ~/.skynet/config.rb for global skynet config!
+ This file can contain the SKYNET_WORKER_VERSION
+ Workers can always RE check this file to see if there's a new version
+
+Error Logging adapter classes
+
+MessageQueueAdapter::SingleProcess
+ Deal with error messages (figure out when to write retry task)
+ Timeouts
+ Handle new Message object
+ timeout vs start_after
+ don't use start after for failover tasks
+
+Skynet::Worker
+ Deal with Task errors and Timeouts better vs other exceptions
+ Handle new Message object
+
+Skynet::Message
+ Deal with errors
+
+ renamed payload to task
+ removed payload_type, it's now using tasktype
+ added start_after
+ renamed expiry to timeout
+ really, expiry should be timeout
+
+ Timeouts are still funny in their naming and meaning between jobs, tasks and queues
+
+ Start AFter
+ Don't start until this TIME
+ Task Timeout
+ Time to complete the task (expiry)
+ Expire Time (we probably shouldn't write expire time to the Q even if it's in the object)
+ The time this task should expire (time task started + timeout)
+
+## These may only make sense for results, and errors, not tasks, or masters.
+ Result Timeout
+ How long after this message is put on the queue should it be removed?
+ Result Expire Time (I guess we have to write this to the Q)
+ What time should this message just disappear from the queue (time message put on the Q + result_timeout)
+
+ change result_timeout to timeout in Tasks
+
+ renamed task.result_timeout to task.task_timeout
+ reused task.result_timeout as the actual result_timeout
+
+ 3 Message Subclasses
+ Task
+ Runs task, returns error or result message within a transaction. Always returns a Skynet::Message subclass
+ Error
+ Result
+
+
+Skynet::Job
+ run() needs begin rescue blocks to handle errors with local master and such Master errors should flow back to job
+
+ Make sure we know the difference between result_timeout and timeout
+ timeout and result_timeout are being used for both sides
+ A task is made in job, then passed through the q in a message.
+ A worker tasks a task from the task queue out of a message
+ Why can't the message be passed all the way through!
+
+ Be careful of persistent messages. If the expire_time is started on initialization. If the object gets passed around
+ by reference, it will never be reinitialized, even when we want the expire_time recalculated
+ PERHAPS we need start, stop methods. OR, if wrap the RUN in a message block... which handles the timeout!
+ The timeout could still be caught by Task
+ message.start do / end
+
+Errors
+ Maybe there should be a skynet error logger that watches the error Q
+ If there is no error q, we don't write to it
+ Or at least an error logger adapter class that can be subclassed
+ The error logger should have access to the message itself
+ There should be callbacks to distinguish task vs. master errors
+ You might only want to deal with master errors or visa versa
+ With access to the message you could only report on non retryable errors if you want. Maybe there's a convenience callback for that.
View
2 lib/skynet.rb
@@ -19,7 +19,7 @@
require 'message_queue_adapters/message_queue_adapter'
require 'message_queue_adapters/tuple_space'
require 'qui_q'
-require 'message_queue_adapters/qui_q'
+require 'message_queue_adapters/single_process'
require "skynet_message_queue"
require 'skynet_partitioners'
require 'skynet_job'
View
100 lib/skynet/message_queue_adapters/message_queue_adapter.rb
@@ -14,41 +14,97 @@ class AbstractClassError < Skynet::Error
class MessageQueueAdapter
- def list_results(data,timeout=nil)
- raise AbstractClassError.new("You must implement list_results in a subclass.")
- end
- def list_tasks(template,timeout=nil)
- raise AbstractClassError.new("You must implement method in a subclass.")
+ VALID_PAYLOAD_TYPES = ["master_or_task","master","task","error"]
+
+ #superclass
+ def check_task_version(task)
+ ## IF this task isn't the right version and we find there's a new version, raise
+ ## Make sure NOT to dequeue task if we raised above
+ curver = get_worker_version
+ raise Skynet::NewWorkerVersion.new if curver != task.version and curver != get_worker_version
end
- def take_next_task(template,timeout=nil)
- raise AbstractClassError.new("You must implement method in a subclass.")
+ #superclass
+ def take_next_task(*args)
+ message = dequeue_task(*args)
+ raise InvalidMessage.new("Got a #{message.class}, expecting a Skynet::TaskMessage") unless message.is_a?(Skynet::TaskMessage)
+ check_task_version(message)
+ accept_unacknowledged_message(message)
+ message
end
- def write_message(template,timeout=nil)
- raise AbstractClassError.new("You must implement method in a subclass.")
+ def take_result(*args)
+ dequeue_result(*args)
+ end
+
+ #superclass
+ def write_failover(message)
+ failover_message = message.fallback_task_message
+ write_task(failover_message) if failover_message
end
- def write_result(template,timeout=nil)
- raise AbstractClassError.new("You must implement method in a subclass.")
+ #superclass
+ def write_task(message,timeout=nil)
+
+ raise InvalidMessage.new("Got a #{message.class}, expecting a Skynet::TaskMessage") unless message.is_a?(Skynet::TaskMessage)
+
+ # There are only two payload types, :task and :master
+ tasktype = message.tasktype.to_s
+ raise Error.new("#{tasktype} is not a valid payload type. Only allowed types are #{VALID_PAYLOAD_TYPES.join(',')}") unless VALID_PAYLOAD_TYPES.include?(tasktype.to_s)
+ enqueue_task(message)
+
+ return true # wrote successfully
+ end
+
+ #superclass
+ def write_message(message,timeout=nil)
+ write_task(message,timeout)
end
- def take_result(template,timeout=nil)
- raise AbstractClassError.new("You must implement method in a subclass.")
+ #superclass
+ def write_result_for_task(task_message,result,timeout=nil)
+ raise InvalidMessage.new("Got a #{message.class}, expecting a Skynet::TaskMessage") unless task_message.is_a?(Skynet::TaskMessage) or task_message.is_a?(Skynet::ErrorMessage)
+ acknowledge(task_message)
+ if task_message.is_a?(Skynet::ErrorMessage)
+ message = task_message.error_message(error)
+
+ # FIXME Do we always write the failover when there's an error?
+ write_failover(task_message)
+ write_error(message,timeout)
+ else
+ message = task_message.result_message(result)
+ write_result(message,timeout)
+ end
end
+
+ #superclass
+ def write_result(message,timeout=nil)
+
+ raise InvalidMessage.new("Got a #{message.class}, expecting a Skynet::ResultMessage") unless message.is_a?(Skynet::ResultMessage)
- def write_error(template,timeout=nil)
- raise AbstractClassError.new("You must implement method in a subclass.")
- end
+ ## It is assumed a queue was created for the original publisher of the task.
+ ## Here we'll add this task to that publisher's queue
- def set_worker_version(template,timeout=nil)
- raise AbstractClassError.new("You must implement method in a subclass.")
+ # First we'll make sure that task is acknowledged so it is taken off the Q permanently
+
+ enqueue_result(message)
+ end
+
+ #superclass
+ def write_error_for_task(task_message,error,timeout=nil)
+ raise InvalidMessage.new("Got a #{message.class}, expecting a Skynet::TaskMessage") unless task_message.is_a?(Skynet::TaskMessage)
+ message = task_message.error_message(error)
+ write_error(message,timeout)
end
-
- def clear_outstanding_tasks
- raise AbstractClassError.new("You must implement clear_outstanding_tasks in a subclass.")
- end
+ #superclass
+ def write_error(message,error,timeout=nil)
+
+ raise InvalidMessage.new("Got a #{message.class}, expecting a Skynet::ErrorMessage") unless message.is_a?(Skynet::ErrorMessage)
+
+ ## Errors are forms of results and go back on the results queue
+ enqueue_result(message)
+ end
end
end
View
4 lib/skynet/message_queue_adapters/mysql.rb
@@ -104,7 +104,7 @@ def message_to_hash(message,timeout=nil,fields=Skynet::Message.fields)
if message.send(field).is_a?(Symbol)
hash[field] = message.send(field).to_s
elsif field == :payload
- hash[:raw_payload] = message.raw_payload
+ hash[:raw_task] = message.raw_task
else
hash[field] = message.send(field)
end
@@ -207,7 +207,7 @@ def update_message_with_result(message,timeout=nil)
timeout_sql = (timeout ? ", timeout = #{timeout}, expire_time = #{Time.now.to_f + timeout}" : '')
rows = 0
raw_payload_sql = " raw_payload = "
- raw_payload_sql << (message.raw_payload ? "'#{::Mysql.escape_string(message.raw_payload)}'" : 'NULL')
+ raw_payload_sql << (message.raw_task ? "'#{::Mysql.escape_string(message.raw_task)}'" : 'NULL')
update_sql = %{
update #{message_queue_table}
set tasktype = "#{message.tasktype}",
View
127 lib/skynet/message_queue_adapters/qui_q.rb
@@ -1,127 +0,0 @@
-class Skynet
-
- class MessageQueueAdapter::QuiQ < Skynet::MessageQueueAdapter
- # def list_results(data,timeout=nil)
- # raise AbstractClassError.new("You must implement list_results in a subclass.")
- # end
-
- @@tasks = Hash.new(Hash.new([]))
- @@results = Hash.new(Hash.new([]))
- @@queues_inited = false
- @@worker_version = 0
-
- # @@task_queue = SingleQueues.new(:tasks)
- # @@results_queue = SingleQueues.new(:results)
-
- def initialize(options={})
- # init_queues_if_needed
- end
-
- # def init_queues_if_needed
- # if @@queues_inited
- # [:task,:master] do |payload_type|
- # Thread.new do
- # loop do
- # task = @@task_queue.get(payload_type)
- # @@tasks[task.queue_id][task.payload_type] << task
- # end
- # end
- # end
- #
- # Thread.new do
- # loop do
- # task = @@result_queue.get(payload_type)
- # @@tasks[task.queue_id][task.payload_type] << task
- # end
- # end
- # end
- # end
-
- ## Is this necessary? How can a queue do this!!!
- def list_tasks(template,timeout=nil)
- @@tasks.values.values.join if @@tasks.values.any? and tasks.values.values.any?
- end
-
- def take_next_task(curver,timeout=1,payload_type=nil,queue_id=0)
- start = Time.now
- loop do
- @@tasks[queue_id][payload_type].each_with_index do |task,ii|
-
- ## only take tasks that are ready to be worked on
- next unless task.expire_time.is_a?(Range) and task.expire_time.last < Time.now.to_i
-
- ## IF this task isn't the right version and we find there's a new version, raise
- raise Skynet::NewWorkerVersion.new if curver != task.version and curver != get_worker_version
- ## Make sure NOT to dequeue task if we raised above
-
- ## dequeue
- return @@tasks[queue_id][payload_type].slice!(ii)
- end
- raise Skynet::RequestExpiredError.new if timeout and Time.now > start + timeout
- end
- end
-
- def write_task(task,timeout=nil)
- # There are only two payload types, :task and :master
- @@tasks[task.queue_id][task.payload_type] << task
-
- # @@task_queue.put(task,task.payload_type,"#{task.queue_id}")
- end
-
- def write_message(task,timeout=nil)
- write_task(task,timeout)
- end
-
- def write_result(task,timeout=nil)
- ## It is assumed a queue was created for the original publisher of the task.
- ## Here we'll add this task to that publisher's queue
- @@results[task.queue_id][task.task_id]
- end
-
- def take_result(task,timeout=1)
- ## Before a worker adds tasks to the task_queue, it creates a results queue for itself
- ## It subscribes to that queue to watch for results
- start = Time.now
- loop do
- if @@results[queue_id][task.task_id]
- @@results[queue_id].delete(task.task_id)
- end
- raise Skynet::RequestExpiredError.new if timeout and Time.now > start + timeout
- end
- end
-
- def write_error(message,timeout=nil)
- ## Errors are forms of results and go back on the results queue
- task = message.error_message(message)
- @@results[task.queue_id][task.task_id]
- end
-
- ## There should be central code for determining the current worker version.
- ## Does the new version matter if we know we are of the wrong version?
- def set_worker_version(version)
- @@worker_version = version
- end
-
- ## Supposed to determine the global current worker version
- ## Does the new version matter if we know we are of the wrong version?
- def get_worker_version
- @@worker_version
- end
-
- ## Is this version still active in the queue?
- def version_active?(curver=nil, queue_id = 0)
- true
- end
-
- ## flush task queue
- def flush_tasks
- @@tasks = Hash.new(Hash.new([]))
- end
-
- ## flush task queue
- ## Is this possible?
- def flush_results
- @@results = Hash.new(Hash.new([]))
- end
- end
-end
View
207 lib/skynet/message_queue_adapters/single_process.rb
@@ -0,0 +1,207 @@
+class Skynet
+
+ class MessageQueueAdapter::SingleProcess < Skynet::MessageQueueAdapter
+ ## THIS SHOULD NOT BE HERE
+ # def list_results(data,timeout=nil)
+ # raise AbstractClassError.new("You must implement list_results in a subclass.")
+ # end
+
+ @@tasks = Hash.new(Hash.new([]))
+ @@results = {}
+ @@queues_inited = false
+ @@worker_version = 0
+
+
+
+ def initialize(options={})
+ @failover = {}
+ # @thread = Thread.new do
+ # loop do
+ # begin
+ # @failover.each do |task_id,task|
+ # # Grab expired tasks and put them back on the Q
+ # next if task.expire_time.to_i < Time.now.to_i
+ # ## FIXME Expire time should never be a range! We have to fix this in Skynet::Message
+ # next if task.expire_time.is_a?(Range) and task.expire_time.last < Time.now.to_i
+ # unacknowledge(task)
+ # write_failover(task)
+ # end
+ # rescue Exception => e
+ # pp "ERROR", e, e.backtrace.join("\n")
+ # end
+ # sleep 0.1
+ # end
+ # end
+ # init_queues_if_needed
+ end
+
+ # This is only for this single process testing
+ def reqeue_expired_unacknowlged_tasks
+ @failover.each do |task_id,task|
+ now = Time.now.to_f
+ # Grab expired tasks and put them back on the Q
+ next if task.expire_time.to_f > now
+ ## FIXME Expire time should never be a range! We have to fix this in Skynet::Message
+ next if task.expire_time.is_a?(Range) and task.expire_time.last > now
+ unacknowledge(task)
+ write_failover(task)
+ end
+ end
+
+ def flush
+ @failover = {}
+ @@tasks = Hash.new(Hash.new([]))
+ @@results = {}
+ end
+
+ # def init_queues_if_needed
+ # if @@queues_inited
+ # [:task,:master] do |tasktype|
+ # Thread.new do
+ # loop do
+ # task = @@task_queue.get(tasktype)
+ # @@tasks[task.queue_id][task.tasktype] << task
+ # end
+ # end
+ # end
+ #
+ # Thread.new do
+ # loop do
+ # task = @@result_queue.get(tasktype)
+ # @@tasks[task.queue_id][task.tasktype] << task
+ # end
+ # end
+ # end
+ # end
+
+ ## Is this necessary? How can a queue do this!!!
+ # def list_tasks(template,timeout=nil)
+ # @@tasks.values.values.join if @@tasks.values.any? and tasks.values.values.any?
+ # end
+
+ #subclass
+ def dequeue_task(version,tasktype,timeout=1,queue_id=0)
+ reqeue_expired_unacknowlged_tasks
+ tasktype = tasktype.to_s
+ raise Error.new("#{tasktype} is not a valid payload type. Only allowed types are #{VALID_PAYLOAD_TYPES.join(',')}") unless VALID_PAYLOAD_TYPES.include?(tasktype.to_s)
+ start = Time.now
+ loop do
+ # Handle workers that want to grab masters, tasks, or either
+ real_tasktype = tasktype
+ if tasktype == "master_or_task"
+ real_tasktype = ["master","task"].rand
+ end
+
+ # pp "LOOKING IN @@tasks[#{queue_id}][#{tasktype}]", @@tasks[queue_id][tasktype]
+ @@tasks[queue_id][tasktype].each_with_index do |task,ii|
+
+ ## only take tasks that are ready to be worked on
+ next if task.expire_time.to_i > Time.now.to_i
+ ## FIXME Expire time should never be a range! We have to fix this in Skynet::Message
+ next if task.expire_time.is_a?(Range) and task.expire_time.last > Time.now.to_i
+
+ check_task_version(task)
+
+ ## dequeue
+ message = @@tasks[queue_id][tasktype].slice!(ii)
+
+ ## We're simulating how a worker might fail catostrophically without acknowledging or not
+ # First we see if this message is eligible for retry
+ # Maybe we can abstract this as accept_unacknowledged_method or something
+
+ return message
+ end
+ raise Skynet::RequestExpiredError.new if timeout and Time.now > start + timeout
+ end
+ end
+
+ def accept_unacknowledged_message(message)
+ if message.retry?
+ fallbackmessage = message.clone
+ ## The problem is, we have to start counting from now to the timeout, maybe Message should do this for us somehow?
+ # FIXME Should this be here? When does the clock start?
+ fallbackmessage.expire_time = Time.now.to_i + fallbackmessage.timeout
+ ## This particular adapter uses an internal queue of tasks to make sure the workers respond in time. If not, they reput
+ ## them on the queue anyway. Since this is single process, it's not likely there would be more than one task
+ ## on here at a time
+ @failover[message.task_id] = fallbackmessage
+ end
+ end
+
+ #subclass
+ def acknowledge(message)
+ @failover.delete(message.task_id)
+ end
+
+ #subclass
+ def unacknowledge(message)
+ ## How do we unacknowledge
+ @failover.delete(message.task_id)
+ end
+
+ #subclass
+ def enqueue_task(message)
+ @@tasks[message.queue_id][message.tasktype].push(message)
+ end
+
+ # subclass this
+ # do we need timeout?
+ def enqueue_result(message)
+ # First we'll make sure that task is acknowledged so it is taken off the Q permanently
+ acknowledge(message)
+ @@results[message.job_id] ||= []
+ @@results[message.job_id] << message
+ end
+
+ #subclass
+ def dequeue_result(job_id,timeout=1)
+ ## Before a worker adds tasks to the task_queue, it creates a results queue for itself
+ ## It subscribes to that queue to watch for results
+ start = Time.now
+ loop do
+ if @@results[job_id]
+ if @@results[job_id].empty?
+ @@results.delete(job_id)
+ next
+ else
+ return @@results[job_id].pop
+ end
+ end
+ raise Skynet::RequestExpiredError.new if timeout and Time.now > start + timeout
+ end
+ end
+
+ #subclass
+ def connected?
+ true
+ end
+
+ ## There should be central code for determining the current worker version.
+ ## Does the new version matter if we know we are of the wrong version?
+ def set_worker_version(version)
+ @@worker_version = version
+ end
+
+ ## Supposed to determine the global current worker version
+ ## Does the new version matter if we know we are of the wrong version?
+ def get_worker_version
+ @@worker_version
+ end
+
+ ## Is this version still active in the queue?
+ def version_active?(curver=nil, queue_id = 0)
+ true
+ end
+
+ ## flush task queue
+ def flush_tasks
+ @@tasks = Hash.new(Hash.new([]))
+ end
+
+ ## flush task queue
+ ## Is this possible?
+ def flush_results
+ @@results = Hash.new([])
+ end
+ end
+end
View
170 lib/skynet/message_queue_adapters/tuple_space.rb
@@ -23,7 +23,7 @@ class TupleSpace < Skynet::MessageQueueAdapter
include SkynetDebugger
- USE_FALLBACK_TASKS = true
+ USE_FALLBACK_TASKS = false
@@ts = nil
@@curhostidx = 0
@@ -55,35 +55,94 @@ def initialize(options={})
@ts = self.class.get_tuple_space(options)
end
- def take_next_task(curver,timeout=nil,payload_type=nil,queue_id=0)
- message = Skynet::Message.new(take(Skynet::Message.next_task_template(curver,payload_type, queue_id),timeout))
- write_fallback_task(message)
+ def dequeue_task(version,tasktype,timeout=0.00001,queue_id=0)
+ real_tasktype = tasktype.to_sym
+ if tasktype.to_s == :master_or_task
+ real_tasktype = [:master,:task].rand
+ end
+ template = {
+ ### FIXME expire_time should just be a number
+ :expire_time => (0 .. Time.now.to_f),
+ :tasktype => real_tasktype,
+ :queue_id => queue_id,
+ :version => version,
+ :iteration => (0..Skynet::CONFIG[:MAX_RETRIES]),
+ }
+ ts_template = Skynet::TaskMessage.fields.collect do |field|
+ if template[field]
+ template[field]
+ else
+ nil
+ end
+ end
+ ts_template.unshift(:task)
+
+ tuple = take(ts_template,timeout)
+ tuple.shift
+ message = Skynet::TaskMessage.new(tuple)
message
end
-
- def write_message(message,timeout=nil)
- timeout ||= message.expiry
- write(message,timeout)
+
+ def enqueue_task(message)
+ timeout = message.result_timeout if message.result_timeout and message.result_timeout > 0
+ timeout ||= 10
+ message.tasktype = message.tasktype.to_s
+ message_array = message.to_a
+ message_array[Skynet::TaskMessage.fields.index(:expire_time)] ||= 0
+ message_array.unshift(:task)
+ write(message_array,timeout)
end
- def write_result(message,result=[],timeout=nil)
+ def enqueue_result(message)
+ # First we'll make sure that task is acknowledged so it is taken off the Q permanently
+ acknowledge(message)
result_message = message.result_message(result).to_a
- timeout ||= result_message.expiry
+ result_message.unshift(:result)
+ timeout = result_message.timeout
write(result_message,timeout)
- take_fallback_message(message)
- result_message
end
- def take_result(job_id,timeout=nil)
- Skynet::Message.new(take(Skynet::Message.result_template(job_id),timeout))
+ # def write_result(message,result=[],timeout=nil)
+ # result_message = message.result_message(result).to_a
+ # timeout ||= result_message.timeout
+ # write(result_message,timeout)
+ # take_fallback_message(message)
+ # result_message
+ # end
+
+ def dequeue_result(job_id,timeout=1)
+ template_message = Skynet::ResultMessage.new(:job_id => job_id)
+ template_message.tasktype = nil
+ result_template = template_message.to_a
+ result_template.unshift(:result)
+ tuple = Skynet::Message.new(take(result_template,timeout))
+ tuple.shift
+ tuple
+ end
+
+ def accept_unacknowledged_message(message)
+ write_failover(message)
+ end
+
+ def acknowledge(message)
+ take_fallback_message(message,timeout=0.01)
end
- def write_error(message,error='',timeout=nil)
- timeout ||= message.expiry
- write(message.error_message(error),timeout)
- take_fallback_message(message)
+ #subclass
+ def unacknowledge(message)
+ ## How do we unacknowledge
+ take_fallback_message(message,timeout=0.01)
end
+
+
+
+
+
+
+
+
+
def list_tasks(iteration=nil,queue_id=0)
read_all(Skynet::Message.outstanding_tasks_template(iteration,queue_id))
end
@@ -104,10 +163,11 @@ def version_active?(curver=nil, queue_id= 0)
end
def get_worker_version
+ 1
begin
- message = Skynet::WorkerVersionMessage.new(read(Skynet::WorkerVersionMessage.template,0.00001))
+ message = read([:workerversion,nil],0.00001)
if message
- curver = message.version
+ curver = message[1]
else
curver=0
end
@@ -118,15 +178,16 @@ def get_worker_version
end
def set_worker_version(ver=nil)
+ return true
begin
- messages = read_all(Skynet::WorkerVersionMessage.template).collect {|ret| Skynet::WorkerVersionMessage.new(ret)}
+ tuples = read_all([:workerversion,nil])
curver = 0
- messages.each do |message|
- curver = message.version
+ tuples.each do |tuple|
+ curver = tuple[1]
debug "CURRENT WORKER VERSION #{curver}"
- curvmessage = Skynet::WorkerVersionMessage.new(take(message.template,0.00001))
+ curvmessage = take([:workerversion,nil],0.00001)
if curvmessage
- curver = curvmessage.version
+ curver = curvmessage[1]
else
curver=0
end
@@ -137,29 +198,39 @@ def set_worker_version(ver=nil)
newver = ver ? ver : curver + 1
debug "WRITING CURRENT WORKER REV #{newver}"
- write(Skynet::WorkerVersionMessage.new(:version=>newver))
+ write([:workerversion,never])
newver
end
+ def flush
+ clear_outstanding_tasks
+ end
+
def clear_outstanding_tasks
+ task_template = []
+ Skynet::ResultMessage.fields.each_with_index do
+ task_template << nil
+ end
+ task_template.unshift(:task)
begin
- tasks = read_all(Skynet::Message.outstanding_tasks_template)
+ tasks = read_all(task_template)
rescue DRb::DRbConnError, Errno::ECONNREFUSED => e
error "ERROR #{e.inspect}", caller
end
tasks.size.times do |ii|
- take(Skynet::Message.outstanding_tasks_template,0.00001)
+ take(task_template,0.00001)
end
-
- results = read_all(Skynet::Message.outstanding_results_template)
+
+ result_template = []
+ Skynet::ResultMessage.fields.each_with_index do
+ result_template << nil
+ end
+ result_template.unshift(:result)
+ results = read_all(result_template)
results.size.times do |ii|
- take(Skynet::Message.outstanding_results_template,0.00001)
+ take(result_template,0.00001)
end
-
- task_tuples = read_all(Skynet::Message.outstanding_tasks_template)
- result_tuples = read_all(Skynet::Message.outstanding_results_template)
- return task_tuples + result_tuples
end
def stats
@@ -192,25 +263,28 @@ def read_all(template)
end
###### FALLBACK METHODS
- def write_fallback_task(message)
- return unless USE_FALLBACK_TASKS
- debug "4 WRITING BACKUP TASK #{message.task_id}", message.to_h
- ftm = message.fallback_task_message
- debug "WRITE FALLBACK TASK", ftm.to_h
- timeout = message.expiry * 8
- write(ftm,timeout) unless ftm.iteration == -1
- ftm
- end
+ # def write_fallback_task(message)
+ # return unless USE_FALLBACK_TASKS
+ # debug "4 WRITING BACKUP TASK #{message.task_id}", message.to_h
+ # ftm = message.fallback_task_message
+ # debug "WRITE FALLBACK TASK", ftm.to_h
+ # timeout = message.timeout * 8
+ # write(ftm,timeout) unless ftm.iteration == -1
+ # ftm
+ # end
def take_fallback_message(message,timeout=0.01)
return unless USE_FALLBACK_TASKS
return if message.retry <= message.iteration
begin
- # debug "LOOKING FOR FALLBACK TEMPLATE", message.fallback_template
- fb_message = Skynet::Message.new(take(message.fallback_template,timeout))
- # debug "TOOK FALLBACK MESSAGE for TASKID: #{fb_message.task_id}"
+ fb_message = Skynet::TaskMessage.new(:task_id => message.task_id)
+ fb_template = fb_message.to_a
+ fb_template.unshift(:task)
+ debug "LOOKING FOR FALLBACK TEMPLATE", fb_template
+ take(fb_template,timeout)
+ debug "TOOK FALLBACK MESSAGE for TASKID: #{fb_message.task_id}"
rescue Skynet::RequestExpiredError => e
- error "Couldn't find expected FALLBACK MESSAGE", Skynet::Message.new(message.fallback_template).to_h
+ error "Couldn't find expected FALLBACK MESSAGE", fb_template
end
end
## END FALLBACK METHODS
View
59 lib/skynet/skynet_job.rb
@@ -83,7 +83,7 @@ class Error < Skynet::Error; end
:map, :map_partitioner, :reduce, :reduce_partition, :map_reduce_class,
:master_retry, :map_retry, :reduce_retry,
:keep_map_tasks, :keep_reduce_tasks,
- :local_master, :async
+ :local_master, :async, :data_debug
]
FIELDS.each do |method|
@@ -96,7 +96,7 @@ class Error < Skynet::Error; end
end
end
- attr_accessor :use_local_queue
+ attr_accessor :use_local_queue, :data_debug
Skynet::CONFIG[:JOB_DEFAULTS] = {
:queue_id => 0,
@@ -283,6 +283,8 @@ def run(options = {})
info "RUN 1 BEGIN #{name}, job_id:#{job_id} vers: #{version} async:#{async}, local_master: #{local_master}, master?: #{master?}"
# run the master task if we're running async or local_master
+
+ ## FIXME we need beging rescue blocks here
if master?
master_enqueue
# ====================================================================================
@@ -327,7 +329,10 @@ def map_enqueue
self.use_local_queue = map_local?
if map_tasks
number_of_tasks = 0
- map_tasks.each do |task|
+ size = map_tasks.size - 1
+ printlog "MESSAGES TO MAP ENQUEUE #{size}" if data_debug?
+ map_tasks.each_with_index do |task,ii|
+ printlog "#{size - ii} MAP TASKS LEFT TO ENQUEUE" if data_debug?
number_of_tasks += 1
enqueue_messages(tasks_to_messages(task))
end
@@ -347,6 +352,7 @@ def map_results(number_of_tasks)
def partition_data(post_map_data)
debug "RUN REDUCE 3.1 BEFORE PARTITION #{display_info} reducers: #{reducers}"
debug "RUN REDUCE 3.1 : #{reducers} #{name}, job_id:#{job_id}", post_map_data
+ printlog "RUN REDUCE 3.1 : #{reducers} #{name}, job_id:#{job_id}", post_map_data if data_debug?
return unless post_map_data
partitioned_data = nil
if not @reduce_partition
@@ -368,17 +374,21 @@ def partition_data(post_map_data)
partitioned_data.compact! if partitioned_data
debug "RUN REDUCE 3.2 AFTER PARTITION #{display_info} reducers: #{reducers}"
debug "RUN REDUCE 3.2 AFTER PARTITION #{display_info} data:", partitioned_data if partitioned_data
+ printlog "RUN REDUCE 3.2 AFTER PARTITION #{display_info} data:", partitioned_data if data_debug?
partitioned_data
end
def reduce_enqueue(partitioned_data)
return partitioned_data unless @reduce and reducers and reducers > 0
debug "RUN REDUCE 3.3 CREATED REDUCE TASKS #{display_info}", partitioned_data
+ size = partitioned_data.size
+ printlog "REDUCE MESSAGES TO ENQUEUE #{size}" if data_debug?
reduce_tasks = self.reduce_tasks(partitioned_data)
self.use_local_queue = reduce_local?(reduce_tasks)
number_of_tasks = 0
- reduce_tasks.each do |task|
+ reduce_tasks.each_with_index do |task,ii|
+ printlog "#{size - ii} REDUCE TASKS LEFT TO ENQUEUE" if data_debug?
number_of_tasks += 1
enqueue_messages(tasks_to_messages(task))
end
@@ -387,6 +397,7 @@ def reduce_enqueue(partitioned_data)
def reduce_results(number_of_tasks)
results = gather_results(number_of_tasks, reduce_timeout, reduce_name)
+ printlog "REDUCE RESULTS", results if data_debug?
if results.is_a?(Array) and results.first.is_a?(Hash)
hash_results = Hash.new
results.each {|h| hash_results.merge!(h) if h.class == Hash}
@@ -401,8 +412,8 @@ def reduce_results(number_of_tasks)
def enqueue_messages(messages)
messages.each do |message|
- timeout = message.expiry || 5
- debug "RUN TASKS SUBMITTING #{message.name} job_id: #{job_id} #{message.payload.is_a?(Skynet::Task) ? 'task' + message.payload.task_id.to_s : ''}"
+ timeout = message.timeout || 5
+ debug "RUN TASKS SUBMITTING #{message.name} job_id: #{job_id} #{message.task.is_a?(Skynet::Task) ? 'task' + message.task.task_id.to_s : ''}"
debug "RUN TASKS WORKER MESSAGE #{message.name} job_id: #{job_id}", message.to_a
mq.write_message(message,timeout * 5)
end
@@ -411,8 +422,9 @@ def enqueue_messages(messages)
# Given a job_id, returns the results from the message queue. Used to retrieve results of asyncronous jobs.
def self.results_by_job_id(job_id,timeout=2)
result_message = mq.take_result(job_id,timeout)
- result = result_message.payload
+ result = result_message.result
return nil unless result
+ printlog "POST REDUCE RESULTS", results if data_debug?
return result
end
@@ -426,16 +438,17 @@ def gather_results(number_of_tasks, timeout=nil, description=nil)
loop do
# debug "LOOKING FOR RESULT MESSAGE TEMPLATE"
result_message = self.mq.take_result(job_id,timeout * 2)
- ret_result = result_message.payload
+ ret_result = result_message.result
- if result_message.payload_type == :error
+ if result_message.tasktype == :error
errors[result_message.task_id] = ret_result
error "ERROR RESULT TASK #{result_message.task_id} returned #{errors[result_message.task_id].inspect}"
else
results[result_message.task_id] = ret_result
debug "RESULT returned TASKID: #{result_message.task_id} #{results[result_message.task_id].inspect}"
end
debug "RESULT collected: #{(results.keys + errors.keys).size}, remaining: #{(number_of_tasks - (results.keys + errors.keys).uniq.size)}"
+ printlog "RESULT collected: #{(results.keys + errors.keys).size}, remaining: #{(number_of_tasks - (results.keys + errors.keys).uniq.size)}" if data_debug?
break if (number_of_tasks - (results.keys + errors.keys).uniq.size) <= 0
end
rescue Skynet::RequestExpiredError => e
@@ -489,11 +502,11 @@ def map_tasks
debug "RUN MAP 2.1 #{display_info} data before partition:", @map_data
task_options = {
- :process => @map,
- :name => map_name,
- :map_or_reduce => :map,
- :result_timeout => map_timeout,
- :retry => map_retry || Skynet::CONFIG[:DEFAULT_MAP_RETRY]
+ :process => @map,
+ :name => map_name,
+ :map_or_reduce => :map,
+ :timeout => map_timeout,
+ :retry => map_retry || Skynet::CONFIG[:DEFAULT_MAP_RETRY]
}
if @map_data.is_a?(Array)
@@ -522,11 +535,11 @@ def map_tasks
def reduce_tasks(partitioned_data)
@reduce_tasks ||= begin
task_options = {
- :name => reduce_name,
- :process => @reduce,
- :map_or_reduce => :reduce,
- :result_timeout => reduce_timeout,
- :retry => reduce_retry || Skynet::CONFIG[:DEFAULT_REDUCE_RETRY]
+ :name => reduce_name,
+ :process => @reduce,
+ :map_or_reduce => :reduce,
+ :timeout => reduce_timeout,
+ :retry => reduce_retry || Skynet::CONFIG[:DEFAULT_REDUCE_RETRY]
}
Skynet::TaskIterator.new(task_options, partitioned_data)
end
@@ -540,7 +553,7 @@ def tasks_to_messages(tasks)
end
tasks.collect do |task|
- Skynet::Message.new_task_message(task,self)
+ Skynet::TaskMessage.new_task_message(task,self)
end
end
@@ -605,6 +618,10 @@ def single?
@single
end
+ def data_debug?
+ @data_debug || Skynet::CONFIG[:SKYNET_JOB_DEBUG_DATA_LEVEL]
+ end
+
def reset!
@map_tasks = nil
@reduce_tasks = nil
@@ -781,7 +798,7 @@ def reset!
def run_message(message)
result = nil
(message.retry + 1).times do
- task = message.payload
+ task = message.task
debug "RUN TASKS LOCALLY SUBMITTING #{message.name} task #{task.task_id}", task
begin
result = task.run
View
18 lib/skynet/skynet_manager.rb
@@ -26,7 +26,7 @@ def initialize(options)
@required_libs = options[:required_libs] || []
@queue_id = options[:queue_id] || 0
@number_of_workers = 0
- @workers_by_type = {:master => [], :task => [], :any => []}
+ @workers_by_type = {:master => [], :task => [], :master_or_task => []}
@signaled_workers = []
@worker_queue = {}
@workers_restarting = 0
@@ -218,17 +218,17 @@ def add_worker(workers=1)
warn "Adding #{workers} WORKERS. Task Workers: #{num_task_only_workers}, Master Workers: #{num_master_only_workers} Master & Task Workers: #{workers - num_task_only_workers - num_master_only_workers}"
@all_workers_started = false
- worker_types = {:task => 0, :master => 0, :any => 0}
+ worker_types = {:task => 0, :master => 0, :master_or_task => 0}
(1..workers).collect do |ii|
- worker_type = :any
+ worker_type = :master_or_task
if (ii <= num_master_only_workers)
worker_type = :master
worker_types[:master] += 1
elsif (ii > num_master_only_workers and ii <= num_master_only_workers + num_task_only_workers)
worker_type = :task
worker_types[:task] += 1
else
- worker_types[:any] += 1
+ worker_types[:master_or_task] += 1
end
cmd = "#{@script_path} --worker_type=#{worker_type}"
cmd << " --queue_id=#{queue_id}"
@@ -302,7 +302,7 @@ def hard_restart_workers
signal_workers("TERM")
@restart = true
signal_workers("INT",:master)
- signal_workers("INT",:any)
+ signal_workers("INT",:master_or_task)
sleep @number_of_workers
check_started_workers
end
@@ -352,7 +352,7 @@ def setup_signals
def shutdown
info(:shutdown)
@shutdown = true
- signal_workers("TERM",[:idle,:master,:any])
+ signal_workers("TERM",[:idle,:master,:master_or_task])
end
def terminate
@@ -445,13 +445,13 @@ def stats
stats[:idle_workers] = idle_workers.size
stats[:shutdown_workers] = inactive_workers.size
stats[:masters] = active_workers.select{|worker|worker.tasktype.to_s == "master"}.size
- stats[:master_or_task_workers] = active_workers.select{|worker|worker.tasktype.to_s == "any"}.size
+ stats[:master_or_task_workers] = active_workers.select{|worker|worker.tasktype.to_s == "master_or_task"}.size
stats[:taskworkers] = active_workers.select{|worker|worker.tasktype.to_s == "task"}.size
stats[:active_masters] = currently_active_workers.select{|worker|worker.tasktype.to_s == "master"}.size
- stats[:active_master_or_task_workers] = currently_active_workers.select{|worker|worker.tasktype.to_s == "any"}.size
+ stats[:active_master_or_task_workers] = currently_active_workers.select{|worker|worker.tasktype.to_s == "master_or_task"}.size
stats[:active_taskworkers] = currently_active_workers.select{|worker|worker.tasktype.to_s == "task"}.size
stats[:idle_masters] = idle_workers.select{|worker|worker.tasktype.to_s == "master"}.size
- stats[:idle_master_or_task_workers] = idle_workers.select{|worker|worker.tasktype.to_s == "any"}.size
+ stats[:idle_master_or_task_workers] = idle_workers.select{|worker|worker.tasktype.to_s == "master_or_task"}.size
stats[:idle_taskworkers] = idle_workers.select{|worker|worker.tasktype.to_s == "task"}.size
stats
end
View
473 lib/skynet/skynet_message.rb
@@ -1,4 +1,5 @@
class Skynet
+
class Message
include SkynetDebugger
@@ -8,47 +9,18 @@ class BadMessage < Skynet::Error; end
class << self
attr_accessor :fields
end
-
- self.fields = [
- :tasktype,
- :drburi,
- :task_id,
- :job_id,
- :payload,
- :payload_type,
- :name,
- :expiry,
- :expire_time,
- :iteration,
- :version,
- :retry,
- :queue_id
- ]
-
- self.fields.each do |method|
- next if [:payload, :tasktype, :payload_type].include?(method)
- attr_accessor method
- end
-
- attr_reader :payload_type, :tasktype
- def self.new_task_message(task,job)
- self.new(
- :job_id => job.job_id,
- :expire_time => job.start_after,
- :version => job.version,
- :queue_id => job.queue_id || 0,
- :iteration => 0,
- :tasktype => :task,
- :task_id => task.task_id,
- :payload => task,
- :payload_type => task.task_or_master,
- :expiry => task.result_timeout,
- :name => task.name,
- :retry => task.retry
- )
+ attr_reader :tasktype
+
+
+ def self.set_fields(fields)
+ self.fields = fields
+ self.fields.each do |method|
+ next if [:task, :tasktype].include?(method)
+ attr_accessor method
+ end
end
-
+
def initialize(opts)
if opts.is_a?(Array)
self.class.fields.each_with_index do |field, ii|
@@ -59,55 +31,44 @@ def initialize(opts)
value = opts[field] || opts[field.to_s] || nil
self.send("#{field}=",value) if value
end
- opts_raw_payload = opts[:raw_payload] || opts["raw_payload"]
- if opts_raw_payload
- self.raw_payload = opts_raw_payload
- end
- self.retry ||= 0
end
- self.payload
+ self.task
end
def fields
self.class.fields
end
def tasktype=(ttype)
- @tasktype = ttype.to_sym
- end
-
- def payload_type=(ptype)
- @payload_type = ptype.to_sym if ptype
- end
-
- # alias for payload
- def task
- payload
+ if ttype.nil?
+ @tasktype=nil
+ else
+ @tasktype = ttype.to_sym
+ end
end
-
- def payload=(data)
- @payload = data
- self.raw_payload = data.to_yaml if data.respond_to?(:to_yaml) and not payload.kind_of?(Proc)
+
+ def task=(data)
+ @task = data
+ self.raw_task = data.to_yaml if data.respond_to?(:to_yaml) and not task.kind_of?(Proc)
end
- def payload
- @payload ||= begin
- YAML::load(self.raw_payload) if self.raw_payload
+ def task
+ @task ||= begin
+ YAML::load(self.raw_task) if self.raw_task
rescue Exception => e
- raise BadMessage.new("Couldnt marshal payload #{e.inspect} #{e.backtrace.join("\n")}")
+ raise BadMessage.new("Couldnt marshal task #{e.inspect} #{e.backtrace.join("\n")}")
end
end
- def raw_payload=(data)
- @raw_payload = data
- @payload=nil
+ def raw_task=(data)
+ @raw_task = data
+ @task=nil
end
- def raw_payload
- @raw_payload
+ def raw_task
+ @raw_task
end
-
- def [](ii)
+ def [](ii)
send(self.class.fields[ii])
end
@@ -132,54 +93,9 @@ def to_h
def to_s
to_a
end
-
- def timeout
- expire_time * 2
- end
-
- def self.next_task_template(version=nil, payload_type=nil, queue_id=0)
- template = {
- ### FIXME expire_time should just be a number
- :expire_time => (0 .. Time.now.to_i),
- :tasktype => :task,
- :queue_id => queue_id,
- :version => version,
- :payload_type => payload_type,
- :iteration => (0..Skynet::CONFIG[:MAX_RETRIES]),
- }
-
- fields.collect do |field|
- template[field]
- end
- end
-
- def self.result_template(job_id,tasktype=:result)
- template = {
- :tasktype => tasktype,
- :job_id => job_id
- }
- fields.collect do |field|
- template[field]
- end
- end
-
- def self.result_message(message,result,tasktype=:result, resulttype=:result)
- template = {
- :tasktype => tasktype,
- :payload => result,
- :payload_type => resulttype
- }
-
- fields.each do |field|
- template[field] = message.send(field) unless template.has_key?(field)
- end
- new(template)
- end
- def result_message(result,tasktype=:result, resulttype=:result)
- self.class.result_message(self,result,tasktype,resulttype)
- end
+ ## FIXME REMOVE
def self.outstanding_tasks_template(iteration=nil,queue_id=0)
template = {
:tasktype => :task,
@@ -191,6 +107,7 @@ def self.outstanding_tasks_template(iteration=nil,queue_id=0)
end
end
+ ## FIXME REMOVE
def self.outstanding_results_template(queue_id=0)
template = {
:tasktype => :result,
@@ -200,115 +117,288 @@ def self.outstanding_results_template(queue_id=0)
template[field]
end
end
+
+ end ## END class Message
+
+
+ class ResultMessage < Message
+
+ self.set_fields([
+ :task_id,
+ :job_id,
+ :task, ### FIXME Should results have the orig task?
+ :result,
+ :name,
+ :result_timeout,
+ :expire_time,
+ :iteration,
+ :version,
+ :retry,
+ :queue_id,
+ :result_code,
+ :sn_result_code,
+ ])
- def self.error_message(message,error)
- result_message(message,error,:result,:error)
+ def tasktype
+ :result
end
+
+ def timeout
+ result_timeout
+ end
+
+ def timeout=(timeout)
+ result_timeout
+ end
+
+ def payload
+ result
+ end
+
+ end
+
+ class ErrorMessage < ResultMessage
+
+ self.set_fields([
+ :task_id,
+ :job_id,
+ :task, ### FIXME Should results have the orig task?
+ :error,
+ :name,
+ :result_timeout,
+ :expire_time,
+ :iteration,
+ :version,
+ :retry,
+ :queue_id,
+ :result_code,
+ :sn_result_code,
+ ])
+
- def error_message(error)
- self.class.error_message(self,error)
+ def tasktype
+ :error
+ end
+
+ def payload
+ error
+ end
+
+ def error=(error)
+ self.result=error
end
+
+ def error
+ result
+ end
+
+ end
+
- def self.error_template(message)
- template = {
- :tasktype => message.tasktype,
- :drburi => message.drburi,
- :version => message.version,
- :task_id => message.task_id,
- :queue_id => message.queue_id
- }
- fields.collect do |field|
- template[field]
+ class TaskMessage < Message
+
+ self.set_fields([
+ :tasktype, ## replaces payload_type. :master, :task, :result, :error
+ :task_id,
+ :job_id,
+ :task,
+ :error,
+ :name,
+ :start_after,
+ :timeout,
+ :result_timeout,
+ :expire_time,
+ :iteration,
+ :version,
+ :retry,
+ :queue_id
+ ])
+
+
+ def initialize(opts)
+ if opts.is_a?(Array)
+ self.class.fields.each_with_index do |field, ii|
+ self.send("#{field}=",opts[ii] || nil)
+ end
+ elsif opts
+ self.class.fields.each do |field|
+ value = opts[field] || opts[field.to_s] || nil
+ self.send("#{field}=",value) if value
+ end
+ opts_raw_task = opts[:raw_task] || opts["raw_task"]
+ if opts_raw_task
+ self.raw_task = opts_raw_task
+ end
+ self.retry ||= 0
end
+ self.task
+ end
+
+ def payload
+ task
+ end
+
+ def retry?
+ return false unless self.retry
+
+ if self.retry
+ if (self.retry and self.iteration >= self.retry)
+ false
+ else
+ true
+ end
+ # FIXME The default retries should be SET in the message object by Skynet::job!!!
+ # Originally I was gonna do this for map and reduce, but we don't know that here, just whether its a master.
+ elsif self.tasktype.to_sym == :master and Skynet::CONFIG[:DEFAULT_MASTER_RETRY] and self.iteration >= Skynet::CONFIG[:DEFAULT_MASTER_RETRY]
+ false
+ elsif Skynet::CONFIG[:MAX_RETRIES] and self.iteration >= Skynet::CONFIG[:MAX_RETRIES]
+ false
+ else
+ false
+ end
+ end
+
+ def run_task
+ raise BadMessage.new("run_task expecting type Skynet::Task got #{task.class}") unless task.is_a?(Skynet::Task)
+ raise Error.new("This task has reached max retries. Current itteration: #{iteration}, Retries: #{self.retry}")
+ info "running task #{name} TIMEOUT: #{timeout} task_id:#{task_id} MorR:#{task.map_or_reduce} PROCESS CLASS: #{task.process.class}"
+ begin
+ return result_message(task.run(iteration))
+
+ ## If we have an exception, how do we know whether we should acknowledge the first message or not
+ rescue Skynet::Task::Error => e
+ error "TASK ERROR: #{e.orig_exception.message} #{e.orig_exception.backtrace.join("\n")}"
+ ### Need to handle these errors better
+ return error_message(task, e)
+ rescue Exception => e
+ return error_message(task, e)
+ end
+ end
+
+ def self.new_task_message(task,job)
+ self.new(
+ :tasktype => task.task_or_master,
+ :task_id => task.task_id,
+ :job_id => job.job_id,
+ :task => task,
+ :name => task.name,
+ :start_after => job.start_after,
+ :timeout => task.timeout,
+ ## FIXME Where would we get this from the task? Should we be making these from tasks?
+ :result_timeout => task.result_timeout,
+ # :expire_time => Time.now.to_i + task.result_timeout ### expire time doesn't have to be stored
+ :version => job.version,
+ :queue_id => job.queue_id || 0,
+ :iteration => 0,
+ :retry => task.retry
+ )
end
+
+ ## FIXME Do we need a next task template?
+ # def self.next_task_template(version=nil, tasktype=nil, queue_id=0)
+ # template = {
+ # ### FIXME expire_time should just be a number
+ # :expire_time => (0 .. Time.now.to_i),
+ # :tasktype => :task,
+ # :queue_id => queue_id,
+ # :version => version,
+ # :iteration => (0..Skynet::CONFIG[:MAX_RETRIES]),
+ # }
+ #
+ # fields.collect do |field|
+ # template[field]
+ # end
+ # end
- def error_template
- self.class.error_template(self)
- end
+ def self.result_message(message,result)
+ template = {
+ :result => result,
+ :result_timeout => message.result_timeout,
+ }
+
+ Skynet::ResultMessage.new(message.to_h.merge(template))
+ end
def self.fallback_task_message(message)
template = {}
- if message.retry
- if (message.retry and message.iteration >= message.retry)
- template[:iteration] = -1
- else
- template[:iteration] = message.iteration + 1
- end
- # Originally I was gonna do this for map and reduce, but we don't know that here, just whether its a master.
- elsif message.payload_type.to_sym == :master and Skynet::CONFIG[:DEFAULT_MASTER_RETRY] and message.iteration >= Skynet::CONFIG[:DEFAULT_MASTER_RETRY]
- template[:iteration] = -1
- elsif Skynet::CONFIG[:MAX_RETRIES] and message.iteration >= Skynet::CONFIG[:MAX_RETRIES]
- template[:iteration] = -1
- else
+ if message.retry?
template[:iteration] = message.iteration + 1
+ template[:expire_time] = Time.now.to_i + message.timeout
+ else
+ return nil
end
- template[:expire_time] = Time.now.to_i + message.expiry
-
fields.each do |field|
template[field] = message.send(field) unless template.has_key?(field)
end
# debug "BUILDING NEXT FALLBACK TASK MESSAGE OFF"#, template
- Skynet::Message.new(template)
+ Skynet::TaskMessage.new(template)
end
def fallback_task_message
self.class.fallback_task_message(self)
end
- def self.fallback_template(message)
+ def result_message(result)
+ self.class.result_message(self,result)
+ end
+
+ def self.error_message(message,error)
template = {
- :tasktype => message.tasktype,
- :drburi => message.drburi,
- :version => message.version,
- :task_id => message.task_id,
- :queue_id => message.queue_id,
- :iteration => (1..Skynet::CONFIG[:MAX_RETRIES]),
+ :error => error,
+ :result_timeout => message.result_timeout,
}
- fields.collect do |field|
- template[field]
+
+ fields.each do |field|
+ template[field] = message.send(field) unless template.has_key?(field)
end
+ Skynet::ErrorMessage.new(message.to_h.merge(template))
end
- def fallback_template
- self.class.fallback_template(self)
+ def error_message(error)
+ self.class.error_message(self,error)
end
- end ## END class Message
- class WorkerVersionMessage < Skynet::Message
-
- self.fields = self.superclass.fields
+ end
+
- def initialize(opts)
- super
- self.expire_time ||= Time.now.to_i
- self.tasktype = :current_worker_rev
- end
-
- def version
- @version.to_i
- end
+
+ # class WorkerVersionMessage < Skynet::Message
+ #
+ # self.fields = [:version,:expire_time]
+ #
+ # def initialize(opts)
+ # super
+ # self.expire_time ||= Time.now.to_i
+ # self.tasktype = :current_worker_rev
+ # end
+ #
+ # def version
+ # @version.to_i
+ # end
+ #
+ # def self.template
+ # template = {
+ # :tasktype => :current_worker_rev
+ # }
+ # fields.collect do |field|
+ # template[field]
+ # end
+ # end
+ #
+ # def template
+ # template = {
+ # :tasktype => :current_worker_rev,
+ # :expire_time => nil
+ # }
+ # fields.collect do |field|
+ # template[field] || self.send(field)
+ # end
+ # end
+ #
+ # end
- def self.template
- template = {
- :tasktype => :current_worker_rev
- }
- fields.collect do |field|
- template[field]
- end
- end
-
- def template
- template = {
- :tasktype => :current_worker_rev,
- :expire_time => nil
- }
- fields.collect do |field|
- template[field] || self.send(field)
- end
- end
- end
class WorkerStatusMessage < Skynet::Message
# I'd love to rename tasktype and tasksubtype to something more reasonable
@@ -335,6 +425,7 @@ def initialize(opts)
self.tasksubtype = :worker
end
+ ### REMOVE TEMPLATES
def self.worker_status_template(opts)
template = {
:tasksubtype => :worker,
View
5 lib/skynet/skynet_message_queue.rb
@@ -84,8 +84,9 @@ def mq
)
end
- def_delegators :mq, :take_next_task, :write_message, :take_result, :write_error, :write_result,
- :list_tasks, :list_results, :clear_outstanding_tasks, :clear_outstanding_results, :stats
+ def_delegators :mq, :take_next_task, :write_task, :write_message, :take_result, :write_error, :write_result,
+ :list_tasks, :list_results, :clear_outstanding_tasks, :clear_outstanding_results, :stats,
+ :write_error_for_task, :write_result_for_task
View
48 lib/skynet/skynet_task.rb
@@ -4,7 +4,30 @@ class Task
class ConstructorError < StandardError; end
class TimeoutError < StandardError; end
-
+
+ ## Skynet::Task::Error can be thrown by your map or reduce to pass valuable logging information
+ ## Call initialize with an options hash
+ ## You can pass
+ # :error_code => YOURCODE,
+ # :error_type => YOURERRORTYPE,
+ # :exception => ORIGEXCEPTION_OBJECT,
+ # :message => YOUR_MESSAGE
+ class Error < StandardError
+
+ attr_accessor :error_code, :task, :orig_exception, :skynet_error_type, :error_type
+
+ def initialize(options={})
+ self.error_code = options[:error_code]
+ self.error_type = options[:error_type]
+ self.skynet_error_type = options[:skynet_error_type] || :task
+ self.task = options[:task]
+ self.orig_exception = options[:exception]
+ error_message = options[:message]
+ error_message ||= self.orig_exception if self.orig_exception
+ super(error_message)
+ end
+ end
+
attr_reader :data, :process, :result, :map_or_reduce, :marshalable
attr_accessor :name, :result_timeout, :retry
@@ -93,24 +116,19 @@ def run(iteration=nil)
end
end
rescue Timeout::Error => e
- # ==========
- # = XXX NEWSFEED HACK
- # = I'm printing the data hash, but that hash has all this shit added to it after runing through newsfeed.
- # = It's actually nice to be able to see what was added, but sometimes its too much data.
- # = Though the handy part will be adding instrumentation to the event_hash and seeing it onyl during a timeout.
- # ==========
-
- if @data.is_a?(Array) and @data.first.is_a?(Hash)
- @data.each {|h|h.delete(:event_object)}
- end
- raise TimeoutError.new("TASK TIMED OUT! #{name} IT:[#{iteration}] timeout:#{@result_timeout} #{e.inspect} DATA: #{@data.inspect} #{e.backtrace.join("\n")}")
+
+ raise Skynet::Task::Error.new(:skynet_error_type => :timeout, :task => self, :exception => e, :message => "TASK TIMED OUT! #{name} IT:[#{iteration}] timeout:#{@result_timeout} #{e.inspect} DATA: #{@data.inspect} #{e.backtrace.join("\n")}")
+ # raise TimeoutError.new("TASK TIMED OUT! #{name} IT:[#{iteration}] timeout:#{@result_timeout} #{e.inspect} DATA: #{@data.inspect} #{e.backtrace.join("\n")}")
+
+ rescue Skynet::Task::Error => e
+ raise e
# ==========
- # = XXX This rescue block is probably not necessary. Just for debugging for now. =
+ # = XXX This rescue block is probably not necessary. Just for debugging for now. Should be in worker right?
# ==========
rescue Exception => e
- error "Error running task #{e.inspect} TASK:", self, e.backtrace.join("\n")
- raise e
+ # error "Error running task #{e.inspect} TASK:", self, e.backtrace.join("\n")
+ raise Skynet::Task::Error.new(:skynet_error_type => :task, :task => self, :exception => e)
end
end
View
46 lib/skynet/skynet_worker.rb
@@ -35,10 +35,9 @@ def initialize(worker_type, options = {})
debug "THIS WORKER TAKES #{worker_type}"
@worker_info = {
- :tasktype => worker_type,
:hostname => hostname,
:process_id => process_id,
- :worker_type => payload_type,
+ :worker_type => worker_type,
:worker_id => worker_id,
:version => mq.get_worker_version,
}
@@ -155,8 +154,8 @@ def manager
Skynet::Manager.get
end
- def payload_type
- return nil if worker_type == :any
+ def worker_type
+ # return nil if worker_type == :master_or_task
return worker_type
end
@@ -195,6 +194,7 @@ def start
task = nil
loop do
+ @in_process = false
message = nil
begin
if Skynet::CONFIG[:WORKER_MAX_PROCESSED] and Skynet::CONFIG[:WORKER_MAX_PROCESSED] > 0 and @processed >= Skynet::CONFIG[:WORKER_MAX_PROCESSED]
@@ -220,11 +220,15 @@ def start
#
# debug "LOOK FOR WORK USING TEMPLATE", Skynet::Message.task_template(@curver)
# message = Skynet::Message.new(mq.take(Skynet::Message.task_template(@curver),0.00001))
+
+ ### FIXME These should be real messages not tasks!!!!!!!!!!
+ ## That way we'd have enough information to pass back to the queue that we wouldn't have to reuse fields in task
+ ## We'd be able to rename task.timeout back to task.timeout, or may not need it at all. We've got it in the message.
message = mq.take_next_task(@curver, 0.00001, payload_type, queue_id)
- next unless message.respond_to?(:payload)
+ next unless message.respond_to?(:task)
- task = message.payload
+ task = message.task
error "BAD MESSAGE", task unless task.respond_to?(:map_or_reduce)
info "STEP 2 GOT MESSAGE #{message.name} type:#{task.map_or_reduce}, jobid: #{message.job_id}, taskid:#{message.task_id} it: #{message.iteration}"
@@ -244,21 +248,16 @@ def start
:name => message.name,
:map_or_reduce => task.map_or_reduce
})
- result = task.run(message.iteration)
+ result = message.run(message.iteration)
info "STEP 5 GOT RESULT FROM RUN TASK #{message.name} jobid: #{message.job_id} taskid: #{task.task_id}"
debug "STEP 5.1 RESULT DATA:", result
- result_message = mq.write_result(message,result,task.result_timeout)
+ result_message = mq.write_result_for_task(message,result,task.result_timeout)
info "STEP 6 WROTE RESULT MESSAGE #{message.name} jobid: #{message.job_id} taskid: #{task.task_id}"
# debug "STEP 6.1 RESULT_MESSAGE:", result_message
notify_task_complete
- rescue Skynet::Task::TimeoutError => e
- error "Task timed out while executing #{e.inspect} #{e.backtrace.join("\n")}"
- @in_process = false
- next
-
rescue Skynet::Worker::RespawnWorker => e
info "Respawning and taking worker status #{e.message}"
notify_worker_stop
@@ -288,24 +287,33 @@ def start
rescue NoManagerError => e
fatal e.message
break
+
rescue Interrupt, SystemExit => e
info "Exiting..."
notify_worker_stop
break
+
+ rescue Skynet::Task::TimeoutError => e
+ error "Task timed out while executing #{e.inspect} #{e.backtrace.join("\n")}"
+ @in_process = false
+ ### FIXME We should be sending an error back to the queue?
+ ### Do we retry if there's a timeout?
+ ### We should raise a task error here with a special code
+ next
+
rescue Exception => e
error "skynet_worker.rb:#{__LINE__} #{e.inspect} #{e.backtrace.join("\n")}"
exceptions += 1
break if exceptions > 1000
#mq.take(@next_worker_message.task_template,0.0005) if message
+ ## FIXME Do we need this now that we're handling errors in message.run
if message
- mq.write_error(message,"#{e.inspect} #{e.backtrace.join("\n")}",(task.respond_to?(:result_timeout) ? task.result_timeout : 200))
+ mq.write_error_for_message(message,"#{e.inspect} #{e.backtrace.join("\n")}",(task.respond_to?(:result_timeout) ? task.result_timeout : 200))
else
# what do we do here
# mq.write_error(message,"ERROR in WORKER [#{$$}] #{e.inspect} #{e.backtrace.join("\n")}")
end
# mq.write_error("ERROR in WORKER [#{$$}] #{e.inspect} #{e.backtrace.join("\n")}")
- @in_process = false
- next
end
end
end
@@ -360,16 +368,16 @@ def ok_to_mem_check?
end
def self.start(options={})
- options[:worker_type] ||= :any
+ options[:worker_type] ||= :master_or_task
options[:required_libs] ||= []
OptionParser.new do |opt|
opt.banner = "Usage: worker [options]"
opt.on('-r', '--required LIBRARY', 'Include the specified libraries') do |v|
options[:required_libs] << v
end
- opt.on('-ot', '--worker_type WORKERTYPE', "master, task or any") do |v|
- if ["any","master","task"].include?(v)
+ opt.on('-ot', '--worker_type WORKERTYPE', "master, task or master_or_task") do |v|
+ if ["master_or_task","master","task"].include?(v)
options[:worker_type] = v
else
raise Skynet::Error.new("#{v} is not a valid worker_type")
View
180 test/test_message_queue_adapter.rb
@@ -0,0 +1,180 @@
+require File.dirname(__FILE__) + '/test_helper.rb'
+
+class MessageQueueAdapterTest < Test::Unit::TestCase
+
+ attr_reader :mq
+
+ Skynet.configure(
+ :ENABLE => false,
+ :SKYNET_LOG_FILE => STDOUT,
+ :SKYNET_LOG_LEVEL => Logger::ERROR,
+ :TS_DRBURI => "druby://localhost:40088",
+ :SKYNET_LOCAL_MANAGER_URL => "druby://localhost:40000",
+ :MESSAGE_QUEUE_ADAPTER => "Skynet::MessageQueueAdapter::TupleSpace"
+ # :MESSAGE_QUEUE_ADAPTER => "Skynet::MessageQueueAdapter::SingleProcess"
+ )
+
+ def setup
+
+ @ts ||= Rinda::TupleSpace.new
+ @@tss ||= DRb.start_service(Skynet::CONFIG[:TS_DRBURI], @ts)
+
+ @mq = Skynet::MessageQueue.new
+
+ @mq.mq.flush
+
+ @worker_message = Skynet::TaskMessage.new(
+ :tasktype => :task,
+ :drburi => "localhost",
+ :job_id => 1,
+ :task_id => 2,
+ :task => "task",
+ :timeout => 20,
+ :result => nil,
+ :expire_time => 0,
+ :iteration => 0,
+ :name => "test",
+ :version => 1,
+ :retry => 3,
+ :queue_id => 0
+ )
+ end
+
+ def test_write_and_take_next_task
+ assert mq.write_task(@worker_message)
+ assert_equal @worker_message.to_h, mq.take_next_task(@worker_message.version,:task).to_h
+ end
+
+ def test_task_failover
+ message = @worker_message.clone
+ message.timeout=0.4
+ assert mq.write_message(message)
+ taken = mq.take_next_task(message.version,message.tasktype)
+ assert_equal message.to_h, taken.to_h
+ sleep 1
+ assert_equal 2, mq.take_next_task(message.version,message.tasktype).task_id
+ end
+
+ def test_task_failover_no_retry
+ message = @worker_message.clone
+ message.retry = false
+ message.timeout=0.4
+ message.name = "hi"
+ assert mq.write_task(message)
+ assert_equal message.to_h, mq.take_next_task(message.version,message.tasktype).to_h
+ sleep 1.5
+ notask = false
+ begin
+ task = mq.take_next_task(message.version,message.tasktype).task_id
+ rescue Skynet::RequestExpiredError
+ notask = true
+ end
+ assert notask
+ end
+
+ def test_task_retries
+ message = @worker_message.clone
+ message.timeout=0.4
+ message.retry = 2
+ assert mq.write_message(message)
+ assert_equal message.to_h, mq.take_next_task(message.version,message.tasktype).to_h
+ sleep 1
+ assert_equal 2, mq.take_next_task(message.version,message.tasktype).task_id
+ sleep 1
+ assert_equal 2, mq.take_next_task(message.version,message.tasktype).task_id
+ notask = false
+ begin
+ mq.take_next_task(message.version,message.tasktype).task_id
+ rescue Skynet::RequestExpiredError
+ notask = true
+ end
+ assert notask
+ end
+
+ def test_write_and_take_result
+ assert mq.write_task(@worker_message)
+ message = mq.take_next_task(@worker_message.version,:task)
+ assert_equal @worker_message, message
+
+ set_result = {:blah => ['hi']}
+
+ mq.write_result_for_task(message,set_result,20)
+ result = mq.take_result(message.job_id)
+ assert_equal set_result, result.result
+
+ notask = false
+ begin
+ mq.take_next_task(message.version,message.tasktype).task_id
+ rescue Skynet::RequestExpiredError
+ notask = true
+ end
+ assert notask
+ end
+
+
+ ## do we want to retry if there's a basic error? What are the circumstances of a retry?
+ ## There are exceptions thrown IN execution of the task
+ ## such as timeout errors executing the task
+ ## Then there are errors in skynet during execution
+ ## I thought about having 2 error codes, a skynet_error_code and a task_error_code which
+ ## could be specified by the developer
+ def test_write_and_take_error
+ assert mq.write_task(@worker_message)
+ message = mq.take_next_task(@worker_message.version,:task)
+ assert_equal @worker_message, message
+
+
+ mq.write_error_for_task(message,"ERR",20)
+ result = mq.take_result(message.job_id)
+ assert_equal "ERR", result.error
+
+ notask = false
+ begin
+ mq.take_next_task(message.version,message.tasktype).task_id
+ rescue Skynet::RequestExpiredError
+ notask = true
+ end
+ end
+
+ def test_write_complex_error
+ assert mq.write_task(@worker_message)
+ message = mq.take_next_task(@worker_message.version,:task)
+ assert_equal @worker_message, message
+
+ e = {:error => :now}
+ mq.write_error_for_task(message,e,20)
+ result = mq.take_result(message.job_id)
+ assert_equal e, result.error
+ end
+
+ ## TODO
+ # def test_unmarshable_object_error
+ # end
+
+ # def test_raw_vs_real_task_object
+ # end
+
+ # def test_task_error_messages
+ # end
+
+ # def expired_tasks_vs_start_after
+ # end
+
+ # def test_start_after
+ # end
+
+ # def test_multiple_jobs_and_tasks
+ # end
+
+ # def test_iterations
+ # know when to write failover
+ # end
+
+ def test_worker_version
+ mq.set_worker_version(2)
+ assert_equal 2, mq.get_worker_version
+ mq.set_worker_version(10)
+ assert_equal 10, mq.get_worker_version
+ end
+
+end

0 comments on commit 5ad4baa

Please sign in to comment.