New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Max Concurrent Jobs #2402

Merged
merged 17 commits into from Dec 6, 2017

Conversation

Projects
None yet
2 participants
@mpchadwick
Contributor

mpchadwick commented Dec 2, 2017

An implementation of the feature described in #2393

/cc @zendesk/samson

Tasks

  • 馃憤 from team

References

  • Jira link:

Risks

  • Level: Low/Med/High
@@ -23,11 +24,24 @@ def executing?(id)
end

def queued?(id)
@queue.values.detect { |jes| jes.detect { |je| return je if je.id == id } }
@queue.each do |i|

This comment has been minimized.

@grosser

grosser Dec 2, 2017

Member

can do .detect to avoid the nil below

This comment has been minimized.

@grosser

grosser Dec 2, 2017

Member

how about putting queue, job_execution in the queue so these each/detect get simpler ?

This comment has been minimized.

@grosser

grosser Dec 2, 2017

Member

something like @queue.each do |queue, job_execution|

This comment has been minimized.

@mpchadwick

mpchadwick Dec 3, 2017

Contributor

Not really understanding these comments. This change would make @queue an array of hashes. I'm interpreting this comment as a suggestion to make it a hash with pairs of queue => job_execution which wouldn't work because there could be multiple entries for each queue. I'm probably misunderstanding something.

This comment has been minimized.

@mpchadwick

mpchadwick Dec 3, 2017

Contributor

It seems like the easiest way to make this more succinct is to do...

@queue.detect  { |i| i[:job_execution].id == id}

Then the places that consume queued? can be updated to expect the hash with queue: 'foo', 'job_execution': bar and extract job_execution as needed.

This comment has been minimized.

@mpchadwick

mpchadwick Dec 3, 2017

Contributor

Actually it seems like it can just be this...

@queue.detect  { |i| return i[:job_execution] if i[:job_execution].id == id}
if @executing[queue]
@queue[queue] << job_execution
if should_queue_job?(queue)
@queue.push('queue': queue, 'job_execution': job_execution)

This comment has been minimized.

@grosser

grosser Dec 2, 2017

Member

use queue: and not 'queue':

@@ -99,19 +123,32 @@ def perform_job(job_execution, queue)
end
end

def should_queue_job?(queue)

This comment has been minimized.

@grosser

grosser Dec 2, 2017

Member

should_ is kinda redundant since the method is already ? -> def queue_job?

@lock = Mutex.new
@executing = {}
@threads = {}
JobQueue.concurrency = ENV['MAX_CONCURRENT_JOBS'].to_i

This comment has been minimized.

@grosser

grosser Dec 2, 2017

Member

this looks wrong ... add a delegate and then self.concurrency = ?

This comment has been minimized.

@mpchadwick

mpchadwick Dec 3, 2017

Contributor

Sorry, but I'm confused by this comment. I did some quick research and it looks like delegate in Ruby is a way to have a method in one class reference a method from another class. In this case I don't see understand what what what would be delegated to who?

This comment has been minimized.

@grosser

grosser Dec 5, 2017

Member

what is strange here is that initializing a class sets global state

I think we can simplify this to:

def concurrency
  ENV['MAX_CONCURRENT_JOBS'].to_i
end

and then in the test you can either do job_queue.expects(:concurrency).returns(1) or

with_env MAX_CONCURRENT_JOBS: '1' do
 ... tests ...
end
if JobQueue.enabled && (next_execution = @queue[queue].shift)
@executing[queue] = next_execution
perform_job(next_execution, queue)
if JobQueue.enabled && !@queue.empty?

This comment has been minimized.

@grosser

grosser Dec 2, 2017

Member

!empty? -> any?

@@ -12,6 +12,6 @@
<%= render 'job_executions', job_executions: active %>

<h2>Queued</h2>
<%= render 'job_executions', job_executions: queued %>
<%= render 'job_executions', job_executions: JobQueue.grouped_queue %>

This comment has been minimized.

@grosser

grosser Dec 2, 2017

Member

this looks suspicious ... should this assign queue = JobQueue.grouped_queue somewhere ?

This comment has been minimized.

@mpchadwick

mpchadwick Dec 3, 2017

Contributor

What do you think about debug accepting a grouped argument (default to false) and handle the grouping in debug. Then we can call it here by passing true?

This comment has been minimized.

@grosser

grosser Dec 5, 2017

Member

how about debug keeps returning what it returned before: a grouped queue ... then less tests have to change ?

it 'queues a job if max concurrent jobs is hit' do
with_job_execution do
locked do
JobQueue.concurrency = 1

This comment has been minimized.

@grosser

grosser Dec 2, 2017

Member

need to unset that in the ensure

This comment has been minimized.

@grosser

grosser Dec 5, 2017

Member

something like:

begin
  test
ensure
  ... unset ...
end

but as discussed above if you can do this via an instance method then we don't need to change global state at all

end

def dequeue(id)
!!@queue.values.detect { |jes| jes.reject! { |je| je.id == id } }
@queue.each do |i|

This comment has been minimized.

@grosser

grosser Dec 2, 2017

Member

could be a !!@queue.reject! { |i| } so on delete it would return true

@@ -41,8 +55,8 @@ def perform_later(job_execution, queue: nil)

if JobQueue.enabled
@lock.synchronize do
if @executing[queue]
@queue[queue] << job_execution
if should_queue_job?(queue)

This comment has been minimized.

@grosser

grosser Dec 2, 2017

Member

could read nicer with if full?(queue)

@grosser

This comment has been minimized.

Member

grosser commented Dec 2, 2017

logic looks about right ... just a bit rough around the edges :)

I think @queue should get a rename ... it's getting confusing :D

@mpchadwick

This comment has been minimized.

Contributor

mpchadwick commented Dec 3, 2017

Still working through these. @queue seems like a logical name for me. What would you suggest as a replacement?

mpchadwick added some commits Dec 3, 2017

@mpchadwick mpchadwick changed the title from Max Concurrecnt Jobs to Max Concurrent Jobs Dec 5, 2017

if @executing[queue]
@queue[queue] << job_execution
if full?(queue)
@queue.push(queue: queue, 'job_execution': job_execution)

This comment has been minimized.

@grosser

grosser Dec 5, 2017

Member

'job_execution': is the same as job_execution:

if JobQueue.enabled && (next_execution = @queue[queue].shift)
@executing[queue] = next_execution
perform_job(next_execution, queue)
if JobQueue.enabled && @queue.any?

This comment has been minimized.

@grosser

grosser Dec 5, 2017

Member

don't need the @queue.any since the .each will be a noop if it is empty

@@ -7,6 +7,7 @@ def with_job_execution
JobQueue.enabled = true
yield
ensure
JobQueue.concurrency = 0

This comment has been minimized.

@grosser

grosser Dec 5, 2017

Member

don't need a global reset when it is only used in one place

@mpchadwick

This comment has been minimized.

Contributor

mpchadwick commented Dec 6, 2017

@grosser I think all the comments have been addressed. Now I just have one test that's failing

I've been pulling my hair over this for the past 30 minutes, but am really struggling understand what is going on. If I'm interpreting the backtrace correctly the test is hanging on the code that executes starting from with_a_queued_job on line 180. What doesn't make sense to me is that with_a_queued_job is used without any issue in all the other tests starting in the it 'has a queued job' block.

Do you have any thoughts about what is happening?

------ >>> test/models/job_queue_test.rb 
Run options: --seed 11288
# Running:
...............E
Error:
JobQueue::#perform_later::with queued job#test_0003_does not perform the next job when job execution is disabled:
Maxitest::Timeout::TestCaseTimeout: Test took too long to finish, aborting. To use a debugger, set Maxitest.timeout = false at the top of the test file.
    test/models/job_queue_test.rb:24:in `sleep'
    test/models/job_queue_test.rb:24:in `wait_for_jobs_to_finish'
    test/models/job_queue_test.rb:60:in `locked'
    test/models/job_queue_test.rb:44:in `block in with_a_queued_job'
    test/support/job_queue_support.rb:8:in `with_job_execution'
    test/models/job_queue_test.rb:43:in `with_a_queued_job'
    test/models/job_queue_test.rb:181:in `block (4 levels) in <top (required)>'
bin/rails test test/models/job_queue_test.rb:180
@mpchadwick

This comment has been minimized.

Contributor

mpchadwick commented Dec 6, 2017

Actually I figured out the problem with the test. The clear wasn't doing what it needed to do because debug no longer directly exposed the queue. Pushing up a fix. The stack trace was a bit misleading...

mpchadwick added some commits Dec 6, 2017

@grosser

grosser approved these changes Dec 6, 2017

@grosser grosser merged commit 957f5fa into zendesk:master Dec 6, 2017

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
@grosser

This comment has been minimized.

Member

grosser commented Dec 6, 2017

good job figuring all this stuff out ... the job-queue is a fun place to loose countless hours of productivity :D

@mpchadwick

This comment has been minimized.

Contributor

mpchadwick commented Dec 6, 2017

@grosser thanks for you're review. My company (Something Digital) is a happy user of Samson for our deployments, but it sometimes gets killed by oom-killer with too many jobs running at once (we've found Gulp to particularly be a memory hog). Hope this feature comes in useful for others as well.

@grosser

This comment has been minimized.

Member

grosser commented Dec 6, 2017

we nice all our jobs, but we still run into issues from time to time, but mostly from a single super-slow job ... that's why we added gcb-builds so all that can happen on gcloud :D

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment