Skip to content

Commit

Permalink
Merge branch 'master' into kill_exhausted_retries_error
Browse files Browse the repository at this point in the history
Conflicts:
	lib/qless/qless-core
  • Loading branch information
myronmarston committed Jun 12, 2013
2 parents 1008632 + d00817d commit 1cd5dad
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 27 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -20,7 +20,7 @@ put in. So if a worker is working on a job, and you move it, the worker's reques
complete the job will be ignored. complete the job will be ignored.


A job can be `canceled`, which means it disappears into the ether, and we'll never A job can be `canceled`, which means it disappears into the ether, and we'll never
pay it any mind every again. A job can be `dropped`, which is when a worker fails pay it any mind ever again. A job can be `dropped`, which is when a worker fails
to heartbeat or complete the job in a timely fashion, or a job can be `failed`, to heartbeat or complete the job in a timely fashion, or a job can be `failed`,
which is when a host recognizes some systematically problematic state about the which is when a host recognizes some systematically problematic state about the
job. A worker should only fail a job if the error is likely not a transient one; job. A worker should only fail a job if the error is likely not a transient one;
Expand Down
3 changes: 2 additions & 1 deletion lib/qless/queue.rb
Expand Up @@ -57,7 +57,8 @@ def heartbeat=(value)
end end


def max_concurrency def max_concurrency
Integer(get_config :"max-concurrency") value = get_config(:"max-concurrency")
value && Integer(value)
end end


def max_concurrency=(value) def max_concurrency=(value)
Expand Down
12 changes: 9 additions & 3 deletions lib/qless/subscriber.rb
Expand Up @@ -4,7 +4,7 @@
module Qless module Qless
class Subscriber class Subscriber
def self.start(*args, &block) def self.start(*args, &block)
new(*args, &block).start_pub_sub_listener new(*args, &block).tap(&:start_pub_sub_listener)
end end


attr_reader :client, :channel attr_reader :client, :channel
Expand All @@ -26,7 +26,7 @@ def start_pub_sub_listener
@listener_redis.subscribe(channel, @my_channel) do |on| @listener_redis.subscribe(channel, @my_channel) do |on|
on.message do |_channel, message| on.message do |_channel, message|
if _channel == @my_channel if _channel == @my_channel
@listener_redis.unsubscribe(@my_channel) @listener_redis.unsubscribe(channel, @my_channel) if message == 'disconnect'
else else
@message_received_callback.call(self, JSON.parse(message)) @message_received_callback.call(self, JSON.parse(message))
end end
Expand All @@ -37,9 +37,15 @@ def start_pub_sub_listener
wait_until_thread_listening wait_until_thread_listening
end end


def stop
@client_redis.publish(@my_channel, 'disconnect')
end

private

def wait_until_thread_listening def wait_until_thread_listening
Qless::WaitUntil.wait_until(10) do Qless::WaitUntil.wait_until(10) do
@client_redis.publish(@my_channel, 'disconnect') == 1 @client_redis.publish(@my_channel, 'listening?') == 1
end end
end end
end end
Expand Down
43 changes: 27 additions & 16 deletions lib/qless/worker.rb
Expand Up @@ -78,24 +78,25 @@ def self.start
def work(interval = 5.0) def work(interval = 5.0)
procline "Starting #{@job_reserver.description}" procline "Starting #{@job_reserver.description}"
register_parent_signal_handlers register_parent_signal_handlers
uniq_clients.each { |client| start_parent_pub_sub_listener_for(client) }


loop do with_pub_sub_listener_for_each_client do
break if shutdown? loop do
if paused? break if shutdown?
sleep interval if paused?
next sleep interval
next
end

unless job = reserve_job
break if interval.zero?
procline "Waiting for #{@job_reserver.description}"
log! "Sleeping for #{interval} seconds"
sleep interval
next
end

perform_job_in_child_process(job)
end end

unless job = reserve_job
break if interval.zero?
procline "Waiting for #{@job_reserver.description}"
log! "Sleeping for #{interval} seconds"
sleep interval
next
end

perform_job_in_child_process(job)
end end
ensure ensure
# make sure the worker deregisters on shutdown # make sure the worker deregisters on shutdown
Expand Down Expand Up @@ -334,6 +335,16 @@ def log!(message)
log message if very_verbose log message if very_verbose
end end


def with_pub_sub_listener_for_each_client
subscribers = uniq_clients.map do |client|
start_parent_pub_sub_listener_for(client)
end

yield
ensure
subscribers.each(&:stop)
end

def start_parent_pub_sub_listener_for(client) def start_parent_pub_sub_listener_for(client)
Subscriber.start(client, "ql:w:#{Qless.worker_name}") do |subscriber, message| Subscriber.start(client, "ql:w:#{Qless.worker_name}") do |subscriber, message|
if message["event"] == "lock_lost" && message["jid"] == current_job_jid if message["event"] == "lock_lost" && message["jid"] == current_job_jid
Expand Down
13 changes: 7 additions & 6 deletions spec/integration/qless_spec.rb
Expand Up @@ -535,16 +535,15 @@ def should_only_have_tracked_jid_for(type)
end end


context "when there is a max concurrency set on the queue" do context "when there is a max concurrency set on the queue" do
before do
q.max_concurrency = 2
q.heartbeat = 60
end

it 'exposes a reader method for the config value' do it 'exposes a reader method for the config value' do
expect(q.max_concurrency).to eq(2) expect {
q.max_concurrency = 2
}.to change { q.max_concurrency }.from(nil).to(2)
end end


it 'limits the number of jobs that can be worked on concurrently from that queue' do it 'limits the number of jobs that can be worked on concurrently from that queue' do
q.max_concurrency = 2

3.times { q.put(Qless::Job, {"test" => "put_get"}) } 3.times { q.put(Qless::Job, {"test" => "put_get"}) }


j1, j2 = 2.times.map { q.pop } j1, j2 = 2.times.map { q.pop }
Expand All @@ -559,6 +558,8 @@ def should_only_have_tracked_jid_for(type)
end end


it 'can still timeout the jobs' do it 'can still timeout the jobs' do
q.max_concurrency = 2
q.heartbeat = 60
Time.freeze Time.freeze


4.times { q.put(Qless::Job, {"test" => "put_get"}) } 4.times { q.put(Qless::Job, {"test" => "put_get"}) }
Expand Down
9 changes: 9 additions & 0 deletions spec/integration/worker_spec.rb
Expand Up @@ -99,6 +99,15 @@ def start_worker(run_as_single_process)
client.redis.get('retry_integration_job_count').should eq('11') client.redis.get('retry_integration_job_count').should eq('11')
end end


it 'does not leak threads' do
queue = client.queues["main"]
queue.put(WorkerIntegrationJob, "word" => "foo", "redis_url" => client.redis.client.id)

expect {
Qless::Worker.new(Qless::JobReservers::RoundRobin.new([queue])).work(0)
}.not_to change { Thread.list }
end

context 'when a job times out' do context 'when a job times out' do
include_context "stops all non-main threads" include_context "stops all non-main threads"
let(:queue) { client.queues["main"] } let(:queue) { client.queues["main"] }
Expand Down

0 comments on commit 1cd5dad

Please sign in to comment.