Skip to content
Browse files

Broken state - about to completely refactor

  • Loading branch information...
1 parent 9051e4c commit 5dc84370ef72b617b5b08f655a79e0b4ad66c623 @mloughran committed
Showing with 115 additions and 26 deletions.
  1. +4 −0 README.md
  2. +3 −1 examples/test_consumer.rb
  3. +20 −7 examples/test_producer.rb
  4. +1 −2 lib/juggler.rb
  5. +87 −16 lib/juggler/runner.rb
View
4 README.md
@@ -23,3 +23,7 @@ For example
end
http
end
+
+Important points to note:
+
+* If your deferrable code raises errors, this will not be handled by juggler.
View
4 examples/test_consumer.rb
@@ -2,6 +2,8 @@
$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
require 'juggler'
+Juggler.logger.level = Logger::DEBUG
+
EM.run {
Juggler.juggle(:http, 3) do |path|
http = EM::Protocols::HttpClient.request({
@@ -19,7 +21,7 @@
Juggler.juggle(:timer, 5) do |params|
defer = EM::DefaultDeferrable.new
- EM::Timer.new(1) do
+ EM::Timer.new(10) do
defer.set_deferred_status :succeeded, nil
# defer.set_deferred_status :failed, nil
end
View
27 examples/test_producer.rb
@@ -4,10 +4,23 @@
# Throw some jobs
-EM.run {
- 10.times do |i|
- path = ['/fast', '/slow'][i % 2]
- Juggler.throw(:http, path, :ttr => 40)
- end
- 10.times { Juggler.throw(:timer, {:foo => 'bar'}, :ttr => 20) }
-}
+# EM.run {
+# # 10.times do |i|
+# # path = ['/fast', '/slow'][i % 2]
+# # Juggler.throw(:http, path, :ttr => 40)
+# # end
+# 1.times { Juggler.throw(:timer, {:foo => 'bar'}, :ttr => 5) }
+# }
+
+Thread.new do
+ EM.run
+end
+
+loop do
+ puts "Choose your ttr!"
+ time = gets.to_i
+ puts "Creating job with ttr #{time}"
+ EM.next_tick {
+ Juggler.throw(:timer, {:foo => 'bar'}, :ttr => time)
+ }
+end
View
3 lib/juggler.rb
@@ -1,14 +1,13 @@
require 'em-jack'
require 'eventmachine'
-autoload :Logger, 'logger'
-
class Juggler
class << self
attr_writer :logger
def logger
@logger ||= begin
+ require 'logger'
logger = Logger.new(STDOUT)
logger.level = Logger::WARN
logger.debug("Created logger")
View
103 lib/juggler/runner.rb
@@ -74,8 +74,11 @@ def reserve
reserve_call.callback do |job|
@reserved = false
+
+
EM.next_tick {
- # Reserve in next tick so that on error deletes get scheduled first
+ # Reserve in next tick so that any errors during this reserve can be
+ # excecuted before the next blocking reserve
reserve_if_necessary
}
@@ -91,39 +94,52 @@ def reserve
connection.delete(job)
next
end
+
+ Juggler.logger.debug {
+ "Job #{job.jobid} body: #{params}"
+ }
begin
job_deferrable = @strategy.call(params)
rescue => e
handle_exception(e, "Exception calling #{@queue} strategy")
-
- # TODO: exponential backoff, error catching
- connection.release(job, :delay => 1)
-
+ release_for_retry(job)
next
end
- @running << job
+ # job.stats do |stats|
+ # Juggler.logger.debug { "Job #{job.jobid} stats: #{stats.inspect}" }
+ #
+ # EM::Timer.new(stats["ttr"] - 2) {
+ # Juggler.logger.debug {
+ # "Job timeout exceeded - failing"
+ # }
+ # job_deferrable.fail "Timeout"
+ # }
+ # end
+
+ @running << job_deferrable
Juggler.logger.debug {
"Queue #{@queue}: Excecuting #{@running.size} jobs"
}
+ Juggler.logger.debug {
+ @running.map { |e| e.inspect }.join("\n")
+ }
job_deferrable.callback do
- @running.delete(job)
+ @running.delete(job_deferrable)
- # TODO: error catching
- connection.delete(job)
-
- reserve_if_necessary
+ delete_job(job).callback {
+ reserve_if_necessary
+ }
end
job_deferrable.errback do |e|
- @running.delete(job)
-
- # TODO: exponential backoff, error catching
- connection.release(job, :delay => 1)
+ @running.delete(job_deferrable)
- reserve_if_necessary
+ release_for_retry(job).callback {
+ reserve_if_necessary
+ }
end
end
@@ -132,6 +148,8 @@ def reserve
Juggler.logger.warn "Reserve call failed: #{error}"
+ check_all_reserved_jobs
+
# Wait 1s before reserving or we'll just get DEALINE_SOON again
# "If the client issues a reserve command during the safety margin,
# <snip>, the server will respond with: DEADLINE_SOON"
@@ -173,5 +191,58 @@ def connection
:tube => @queue
})
end
+
+ # TODO: exponential backoff
+ def release_for_retry(job)
+ dd = EM::DefaultDeferrable.new
+ Juggler.logger.debug { "Job #{job.jobid} releasing" }
+ stats_def = job.stats
+ stats_def.callback do |stats|
+ Juggler.logger.debug { "Job #{job.jobid} stats: #{stats.inspect}"}
+
+ release_def = connection.release(job, :delay => 1)
+
+ release_def.callback {
+ Juggler.logger.info { "Job #{job.jobid} released for retry" }
+ dd.succeed
+ }
+ release_def.errback {
+ Juggler.logger.error do
+ "Job #{job.jobid } release failed (could not release)"
+ end
+ dd.succeed
+ }
+ end
+ stats_def.errback {
+ Juggler.logger.error do
+ "Job #{job.jobid } release failed (could not retrieve stats)"
+ end
+ dd.succeed
+ }
+ dd
+ end
+
+ def delete_job(job)
+ delete_def = connection.delete(job)
+ delete_def.callback do
+ Juggler.logger.debug "Job #{job.jobid} deleted"
+ end
+ delete_def.errback do
+ Juggler.logger.debug "Job #{job.jobid} delete operation failed"
+ end
+ delete_def
+ end
+
+ def check_all_reserved_jobs
+ @running.each do |job|
+ puts "Checking job #{job.jobid}"
+ job.stats { |stats|
+ puts "Job #{job.jobid} has #{stats["time-left"]}s left"
+ if stats["time-left"] < 1
+ job.fail "Timed out"
+ end
+ }
+ end
+ end
end
end

0 comments on commit 5dc8437

Please sign in to comment.
Something went wrong with that request. Please try again.