Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'databus23-remove_worker_on_shutdown'

  • Loading branch information...
commit 7a9fb564dd8a2ee6894d0ede9d2e1e40409325ec 2 parents c720cd2 + 1408ded
Myron Marston myronmarston authored
10 lib/qless.rb
View
@@ -150,7 +150,7 @@ class Client
# Lua scripts
attr_reader :_cancel, :_config, :_complete, :_fail, :_failed, :_get, :_heartbeat, :_jobs, :_peek, :_pop
attr_reader :_priority, :_put, :_queues, :_recur, :_retry, :_stats, :_tag, :_track, :_workers, :_depends
- attr_reader :_pause, :_unpause
+ attr_reader :_pause, :_unpause, :_deregister_workers
# A real object
attr_reader :config, :redis, :jobs, :queues, :workers
@@ -161,7 +161,8 @@ def initialize(options = {})
assert_minimum_redis_version("2.5.5")
@config = Config.new(self)
['cancel', 'config', 'complete', 'depends', 'fail', 'failed', 'get', 'heartbeat', 'jobs', 'peek', 'pop',
- 'priority', 'put', 'queues', 'recur', 'retry', 'stats', 'tag', 'track', 'workers', 'pause', 'unpause'].each do |f|
+ 'priority', 'put', 'queues', 'recur', 'retry', 'stats', 'tag', 'track', 'workers', 'pause', 'unpause',
+ 'deregister_workers'].each do |f|
self.instance_variable_set("@_#{f}", Qless.lua_script_cache.script_for(f, @redis))
end
@@ -192,6 +193,11 @@ def untrack(jid)
def tags(offset=0, count=100)
JSON.parse(@_tag.call([], ['top', offset, count]))
end
+
+ def deregister_workers(*worker_names)
+ _deregister_workers.call([], worker_names)
+ end
+
private
def assert_minimum_redis_version(version)
2  lib/qless/qless-core
@@ -1 +1 @@
-Subproject commit 819647eb0b99d4eaae38726b111c300dc8cbc4bd
+Subproject commit b593d849591a677df610c112451040dcb7dd6b81
13 lib/qless/worker.rb
View
@@ -103,6 +103,9 @@ def work(interval = 5.0)
exit!
end
end
+ ensure
+ # make sure the worker deregisters on shutdown
+ deregister
end
def perform(job)
@@ -145,6 +148,16 @@ def unpause_processing
private
+ def deregister
+ uniq_clients.each do |client|
+ client.deregister_workers(Qless.worker_name)
+ end
+ end
+
+ def uniq_clients
+ @uniq_clients ||= @job_reserver.queues.map(&:client).uniq
+ end
+
def retryable_exception_classes(job)
return [] unless job.klass.respond_to?(:retryable_exception_classes)
job.klass.retryable_exception_classes
14 spec/integration/qless_spec.rb
View
@@ -1694,6 +1694,20 @@ def should_only_have_tracked_jid_for(type)
"stalled" => {}
})
end
+
+ def registered_worker_names
+ client.workers.counts.map { |w| w['name'] }
+ end
+
+ it 'removes deregistered workers' do
+ q.put(Qless::Job, {"test" => "workers_reput"})
+ q.pop
+
+ expect {
+ client.deregister_workers(q.worker_name)
+ }.to change { registered_worker_names }.from([q.worker_name]).to([])
+ end
+
end
describe "#jobs" do
2  spec/unit/worker_spec.rb
View
@@ -4,7 +4,7 @@
module Qless
describe Worker do
- let(:reserver) { fire_double("Qless::JobReservers::Ordered", description: "job reserver") }
+ let(:reserver) { fire_double("Qless::JobReservers::Ordered", description: "job reserver", queues: []) }
let(:client ) { stub.as_null_object }
def procline
Please sign in to comment.
Something went wrong with that request. Please try again.