Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of ActiveJob AsyncAdapter. #21257

Merged
merged 1 commit into from Aug 25, 2015
Merged

Conversation

@jdantonio
Copy link
Contributor

@jdantonio jdantonio commented Aug 16, 2015

Now that activesupport has a runtime dependency on concurrent-ruby, we can begin taking advantage of those tools in more ways. This PR creates a simple asynchronous ActiveJob adapter that posts jobs to a concurrent-ruby thread pool. Within the context of ActiveJob it provides functionality comparable to sucker_punch. Rails 5 users will now be able to create simple asynchronous jobs without installing additional gems simply by setting the new adapter:

# config/application.rb
module YourApp
  class Application < Rails::Application
    config.active_job.queue_adapter = :async
  end
end

A simple benchmark script which compares enqueue performance vs. sucker_punch can be found here. Performance is comparable on both Ruby 2.2.2 and JRuby 9000.

If this PR is accepted I can add more features such as job prioritization and per-queue thread pools.

@jdantonio
Copy link
Contributor Author

@jdantonio jdantonio commented Aug 16, 2015

NOTE: I think my benchmarks are uninformative. Since I used the test helpers to setup ActiveJob, it looks like both adapters are performing synchronously, not asynchronously. I will gather better benchmarks as soon as I figure out how to setup the test properly.

@jdantonio
Copy link
Contributor Author

@jdantonio jdantonio commented Aug 16, 2015

The benchmark script and associated data have been updated. They now represent asynchronous behavior. Performance is roughly the same for AsyncAdapter and SuckerPunchAdapter.

@rafaelfranca
Copy link
Member

@rafaelfranca rafaelfranca commented Aug 16, 2015

Thank you for the pull request but I don't think we should maintain a queue
system inside Rails. There are a lot of good implementations out there that
I think we just end up with a poor implementation inside the framework, to
not say the maintenance overhead that it will give us.

On Sun, Aug 16, 2015, 15:46 jdantonio notifications@github.com wrote:

The benchmark script and associated data have been updated. They now
represent asynchronous behavior. Performance is roughly the same for
AsyncAdapter and SuckerPunchAdapter.


Reply to this email directly or view it on GitHub
#21257 (comment).

@rafaelfranca
Copy link
Member

@rafaelfranca rafaelfranca commented Aug 16, 2015

@dhh @tenderlove @matthewd @senny @chancancode what are your thoughts about a default async adapter in Rails itself?

@dhh
Copy link
Member

@dhh dhh commented Aug 16, 2015

I actually would like to see a default async queue for ActiveJob, mostly for testing. No, you shouldn't use that in production unless you don't care about losing jobs if a process crashes, but it would be nice for testing, so you wouldn't have to install Sucker Punch.

@rafaelfranca
Copy link
Member

@rafaelfranca rafaelfranca commented Aug 16, 2015

If that is the goal 👍 too. It should be explicit that it should not be used on production.

@dhh
Copy link
Member

@dhh dhh commented Aug 16, 2015

Agree. Or only used in production for things with no criticality.

On Sun, Aug 16, 2015 at 5:21 PM, Rafael Mendonça França <
notifications@github.com> wrote:

If that is the goal [image: 👍] too. It should be explicit that it
should not be used on production.


Reply to this email directly or view it on GitHub
#21257 (comment).

@jdantonio
Copy link
Contributor Author

@jdantonio jdantonio commented Aug 17, 2015

I'll begin working on the full feature set tomorrow (Monday). Scheduled tasks (#set(wait: 1.week)) won't survive a restart, but that should be fine for testing. I'll update the guide to be abundantly clear that the :async adapter is for testing only. I'll update this PR when it is feature-complete.

@dhh
Copy link
Member

@dhh dhh commented Aug 17, 2015

Thanks! Yeah, would be really great to have scheduled tasks work for
testing. Sucker Punch doesn't support those. I had that issue with some
tasks that had a 15s delay that I couldn't test.

On Sun, Aug 16, 2015 at 11:23 PM, jdantonio notifications@github.com
wrote:

I'll begin working on the full feature set tomorrow (Monday). Scheduled
tasks (#set(wait: 1.week)) won't survive a restart, but that should be
fine for testing. I'll update the guide to be abundantly clear that the
:async adapter is for testing only. I'll update this PR when it is
feature-complete.


Reply to this email directly or view it on GitHub
#21257 (comment).

@jdantonio
Copy link
Contributor Author

@jdantonio jdantonio commented Aug 20, 2015

@dhh @rafaelfranca I'm sorry it took me a few days to get back to this, but it's ready now. I've added job scheduling and support for custom queues. I've also updated the documentation to briefly explain why this adapter is for dev/test and not prod.

@cristianbica
Copy link
Member

@cristianbica cristianbica commented Aug 20, 2015

Really nice. I do have 2 suggestions:

  • Follow the model of all the adapters: have a different class (ex: ActiveJob::AsyncJob) that handles the queueing, execution, queue creating and the configurations (ex: executor) and the adapter should just call enqueue / enqueue_at on the AsyncJob class.
  • it seems a bit complicated to have to create the queues upfront. I would rather see the async class create the queue if not exists
@jdantonio
Copy link
Contributor Author

@jdantonio jdantonio commented Aug 20, 2015

@carllerche I'll start work on both of those changes this evening.

@jdantonio
Copy link
Contributor Author

@jdantonio jdantonio commented Aug 20, 2015

@cristianbica This update implements both of your suggestions. Now that ActiveJob::AsyncJob is a separate class I would like to add a set of tests that test it directly, but I'd like to get your feedback on this update before I write those. (The adapter tests still pass.)

@kaspth
Copy link
Member

@kaspth kaspth commented Aug 20, 2015

executor might be close to the concurrent-ruby terminology, but it feels very removed from Active Job. It would be great to find a term closer to home.

In fact we might not need to expose that executor information at all. As the adapter seems to have a lot of configuration for something that's mostly for testing. What need is there to change the pool size in testing as long as we're using a minimum of 2?

@kaspth
kaspth reviewed Aug 20, 2015
View changes
activejob/lib/active_job/async_job.rb Outdated
}.freeze

QUEUES = ThreadSafe::Cache.new do |hash, queue_name|
hash[queue_name] = case queue_name

This comment has been minimized.

@kaspth

kaspth Aug 20, 2015
Member

Lets do:

queue_name = queue_name.to_sym

hash[queue_name] = \
  if queue_name == :default
    ActiveJob::AsyncJob.default_executor
  else
    ActiveJob::AsyncJob.create_executor
  end
@kaspth
kaspth reviewed Aug 20, 2015
View changes
activejob/lib/active_job/async_job.rb Outdated
end

class << self

This comment has been minimized.

@kaspth

kaspth Aug 20, 2015
Member

✂️ this line

@kaspth
kaspth reviewed Aug 20, 2015
View changes
activejob/lib/active_job/async_job.rb Outdated

class << self

def create_executor(opts = {})

This comment has been minimized.

@kaspth

kaspth Aug 20, 2015
Member

Keyword argument could work great here:

def create_executor(workers: DEFAULT_MIN_THREADS)
  options = DEFAULT_EXECUTOR_OPTIONS.merge(min_threads: workers, max_threads: workers)
  Concurrent::ThreadPoolExecutor.new(options)
end
@kaspth
kaspth reviewed Aug 20, 2015
View changes
activejob/lib/active_job/async_job.rb Outdated
Concurrent::ThreadPoolExecutor.new(opts)
end

def create_queue(name, opts = {})

This comment has been minimized.

@kaspth

kaspth Aug 20, 2015
Member

Another place for a keyword argument:

def create_queue(name, executor: nil, **options)
  QUEUES[name] = executor || create_executor(options)
end
@jdantonio
Copy link
Contributor Author

@jdantonio jdantonio commented Aug 21, 2015

@kaspth I was definitely overthinking things. Running the tests requires that the job runner perform synchronously. The original version was a proof-of-concept that didn't support queues or scheduled tasks, so I simply injected an ImmediateExecutor when running the tests. When I started adding new features I lost sight of the original intent and made things more complicated than necessary.

This update has the same features yet is simpler:

  • ActiveJob::AsyncJob has a set_test_mode! class method that the test helper calls. It is undocumented and irreversible. I expect that it will only be called from the test helper in this repo.
  • All queues, including the default queue, are created with the same configuration.
  • Queues are automatically created the first time a job is post to the queue.
  • Although it's a narrow use case, queues can still be manually created and configured. The only option in this case is to pass a manually created thread pool as an argument. I think we can assume that anyone choosing to manually configure a queue is also capable of manually configuring the thread pool.

This update also includes a set of unit tests for the AsyncJob class.

class QueueCreationError < ArgumentError; end

class << self
# Force all jobs to run synchronously when testing the activejob gem.

This comment has been minimized.

@kaspth

kaspth Aug 23, 2015
Member

It's Active Job 😁

This comment has been minimized.

@kaspth

kaspth Aug 23, 2015
Member

Also why do we need to run async adapter jobs synchronously in our own tests? Won't that hurt ourselves down the line?

This comment has been minimized.

@jdantonio

jdantonio Aug 23, 2015
Author Contributor

My apologies regarding the gem name. I've fixed that.

I must have misunderstood what was happening in the test helpers. Some of the test helpers set a "test" backend or mode. When I run the tests without a test mode one of the serialization tests fails. I'll try to figure out what's happening there.

This comment has been minimized.

@jdantonio

jdantonio Aug 23, 2015
Author Contributor

Correction--all the adapter tests fail when there is no test mode.

test/adapters/delayed_job.rb:

Delayed::Worker.delay_jobs = false
Delayed::Worker.backend    = :test

test/adapters/qu.rb:

require 'qu-immediate'

test/adapters/resque.rb:

Resque.inline = true

This comment has been minimized.

@kaspth

kaspth Aug 23, 2015
Member

No worries about the name 😁

I'm not sure I follow you. Perhaps we're talking about different things?

My confusion is how the async adapter implements a test mode by not doing what it says on the tin. I can understand if we need the adapter to run synchronously for its own tests, but shouldn't we also have integration tests that mimic how an actual user tests with the adapter?

This comment has been minimized.

@jdantonio

jdantonio Aug 23, 2015
Author Contributor

Oh. I think I see what you are saying. If I follow, you are suggesting another set of tests that actually post background jobs then verify that the jobs actually run in the background. Is that correct? That's not a problem. I'll add a set of tests for that.

The latest update is a rebase against the latest Rails master and also incorporates the other suggestions.

This comment has been minimized.

@kaspth

kaspth Aug 23, 2015
Member

Exactly that 👍

This comment has been minimized.

@jdantonio

jdantonio Aug 23, 2015
Author Contributor

@kaspth I apologize if I'm missing something, but it appears that the file test/integration/queuing_test.rb currently tests the asynchronous behavior of all adapters. It posts jobs with perform_later then uses timers (such as wait_for_jobs_to_finish_for(2.seconds)) to verify that the jobs post. It looks like the adapter tests put all the adapters in synchronous mode, the existing integration tests put the adapters in asynchronous mode, and the Rakefile controls the environment appropriately for each set of tests. However, I didn't notice this earlier so I have not setup the necessary test helper for making the integration tests work with the Async Job. The integration tests (which do not run with rake test) are failing. I'll fix that.

@test_mode = true
end

def test_mode? #:nodoc:

This comment has been minimized.

@kaspth

kaspth Aug 23, 2015
Member

Would we ever need to query this somewhere outside the async adapter? Probably doesn't even need to be a method at all.

class AsyncJobTest < ActiveSupport::TestCase

def job_queue_spy
ActiveJob::AsyncJob.const_get(:QUEUES)

This comment has been minimized.

@kaspth

kaspth Aug 23, 2015
Member

Making the constant public (with a # :nodoc:) would feel more apt than disregarding what you've marked it as 😄


# Create a new job queue with the given +name+. Jobs will run on the
# given +thread pool+. The thread pool must be a concurrent-ruby
# executor. Raises +QueueCreationError+ when the queue already exists.

This comment has been minimized.

@kaspth

kaspth Aug 23, 2015
Member

Can we add a link to some documentation for the executors?

@kaspth
Copy link
Member

@kaspth kaspth commented Aug 23, 2015

This just keeps getting better 👏

@jdantonio
Copy link
Contributor Author

@jdantonio jdantonio commented Aug 24, 2015

@kaspth The test cases in test/integration/queuing_test.rb are precisely the type of test you suggested. When I created an appropriate test helper and ran those tests I discovered a localization bug. The latest version has the following changes:

  • Fix the aforementioned locale (serialization) bug
  • Follows the pattern of the other adapters and serializes the job within the adapter
  • Follows the pattern of the other adapters and explicitly passes the queue name from the adapter
  • Adds a test job and AsyncJob unit test case for queue_as
  • Adds a set_normal_mode! method for consistent unit test setup/teardown.

All of the tests unit tests (rake test:async) and integration tests (rake test:integration:async) pass.

require 'jobs/queue_as_job'

class AsyncJobTest < ActiveSupport::TestCase

This comment has been minimized.

@kaspth

kaspth Aug 24, 2015
Member

✂️ this line

#
# Rails.application.config.active_job.queue_adapter = :async
class AsyncAdapter

This comment has been minimized.

@kaspth

kaspth Aug 24, 2015
Member

✂️ this line

end

def enqueue(job_data, queue: 'default') #:nodoc:
QUEUES[queue].post(job_data) {|job| ActiveJob::Base.execute(job) }

This comment has been minimized.

@kaspth

kaspth Aug 24, 2015
Member

Needs a space after the open brace.

This comment has been minimized.

@ianks

ianks Aug 25, 2015
Contributor

What happens if there is no queue defined here?

This comment has been minimized.

@jdantonio

jdantonio Aug 25, 2015
Author Contributor

@ianks The block passed to the constructor of ThreadSafe::Cache on line 35 specifies what to do when an attempt is made to assign a value to a key that does not exist. In this case, we automatically create the queue. It's a synchronized operation.

This comment has been minimized.

@thedarkone

thedarkone Aug 25, 2015
Contributor

@jdantonio we need to use compute_if_absent up above:

QUEUES = ThreadSafe::Cache.new do |hash, queue_name| #:nodoc:
  hash.compute_if_absent(queue_name) { ActiveJob::AsyncJob.create_thread_pool }
end

It is my fault there are no docs for ThreadSafe::Cache - there are no exclusivity/synchronization guarantees about the "default block" of ThreadSafe::Cache.new. The cache will happily let multiple threads inside.

delay = timestamp - Time.current.to_f
if delay > 0
args = [job_data]
executor = QUEUES[queue]

This comment has been minimized.

@kaspth

kaspth Aug 24, 2015
Member

We're already using keyword arguments so the args and executor variables just create noise. Lets inline them - 80 chars per line be damned 😁

# Raises +QueueCreationError+ when the queue already exists.
def create_queue(name, thread_pool)
raise QueueCreationError.new('queue already exists') if QUEUES.key? name
# possible race condition here but the use case is very narrow

This comment has been minimized.

@kaspth

kaspth Aug 24, 2015
Member

If the use case is so very narrow should we really add this method? Generally we'd like to keep as closed off a public interface as possible.

# This should only be called from within unit tests.
def set_normal_mode! #:nodoc:
@test_mode = false
end

This comment has been minimized.

@kaspth

kaspth Aug 24, 2015
Member

I'm not feeling it on these mode names - especially because they're both used in tests. Why not rename to perform_immediately! and perform_asynchronously!?

# Async Job supports job queues specified with +queue_as+. Queues
# are implemented as individual thread pools. Queues will be created
# automatically as needed using a default configuration. Should there be a
# need to customize the thread pool behind one or more queues, any

This comment has been minimized.

@kaspth

kaspth Aug 24, 2015
Member

Should there be a need

Muddy use case counts further against making create_queue public.

This comment has been minimized.

@jdantonio

jdantonio Aug 25, 2015
Author Contributor

@kaspth You're right. I've spent so many hours building/refining/debugging those thread pools that I want users to have access to their full potential, but in this case it isn't really necessary. It's a feature that can easily be added in the future should the need arise.

This comment has been minimized.

@kaspth

kaspth Aug 25, 2015
Member

It's not easy to let go when you've spent a lot of hours on something.

This has definitely made me want to look more into concurrent-ruby ❤️❤️❤️

# asynchronously on a +concurrent-ruby+ thread pool. All job data
# is retained in memory. Because job data is not saved to a persistent
# datastore there is no additional infrastructure needed and most jobs
# will process very quickly. The lack of persistence, however, means

This comment has been minimized.

@kaspth

kaspth Aug 24, 2015
Member

rm will and very

@kaspth
Copy link
Member

@kaspth kaspth commented Aug 24, 2015

@robin850 can you check the documentation?

@cristianbica what do you think about this? I'm not too familiar with Active Job's tests, so how are these looking?

# pool directly then call the +create_queue+ method passing the thread
# pool as the second parameter:
#
# thread_pool = Concurrent::FixedThreadPool.new(10, max_queue: 100, fallback_polcy: :caller_runs)

This comment has been minimized.

@robin850

robin850 Aug 24, 2015
Member

There's a missing "i" in fallback_policy.

# ActiveJob::AsyncJob.create_queue(:my_queue, thread_pool)
#
# The +create_queue+ method must be called during application initialization,
# before any jobs are post to the queue. Attempting to create a queue that

This comment has been minimized.

@robin850

robin850 Aug 24, 2015
Member

I guess it should be "are posted" rather than "are post".

class QueueCreationError < ArgumentError; end

class << self
# Force all jobs to run synchronously when testing the Active Job gem.

This comment has been minimized.

@robin850

robin850 Aug 24, 2015
Member

s/Force/Forces

def set_test_mode! #:nodoc:
@test_mode = true
end
# Allow jobs to run asynchronously when testing the Active Job gem.

This comment has been minimized.

@robin850

robin850 Aug 24, 2015
Member

s/Allow/Allows

@test_mode = false
end

# Create a new job queue with the given +name+. Jobs will run on the

This comment has been minimized.

@robin850

robin850 Aug 24, 2015
Member

s/Create/Creates

@@ -12,6 +12,8 @@ module ActiveJob
# * {Sidekiq}[http://sidekiq.org]
# * {Sneakers}[https://github.com/jondot/sneakers]
# * {Sucker Punch}[https://github.com/brandonhilkert/sucker_punch]
# * {Active Job Async Job}[http://api.rubyonrails.org/classes/ActiveJob/AsyncJob.html]

This comment has been minimized.

@robin850

robin850 Aug 24, 2015
Member

You should rather link to the AsyncAdapter adapter rather than the AsyncJob here, no ?

# When enqueueing jobs with the Inline adapter the job will be executed
# immediately on the calling thread. Job scheduling is not supported. This
# adapter is intended for use in tests. For more information see the
# {api documentation}[http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters/AsyncAdapter.html]

This comment has been minimized.

@robin850

robin850 Aug 24, 2015
Member

This doesn't seem to be the right link and "for more information" is redundant here.

Actually, I guess you can simply remove this part and the previous one as well (i.e. "Async Job Queue Adapter"). They don't bring anything and may get out of sync when these adapters get updated. What do you think ?

#
# Concurrent Ruby: Modern concurrency tools including agents, futures,
# promises, thread pools, supervisors, and more. Inspired by Erlang,
# Clojure, Scala, Go, Java, JavaScript, and classic concurrency patterns.

This comment has been minimized.

@robin850

robin850 Aug 24, 2015
Member

I guess you can also remove comments starting from the top to here. The first part is duplicated from AsyncJob documentation and people will have to checkout concurrent-ruby anyway so there's no need to describe it. What do you think ?

This comment has been minimized.

@robin850

robin850 Aug 24, 2015
Member

You can certainly simply specify that this adapter creates AsyncJob objects under the hood.

end

test "enqueuing without specifying a queue uses the default queue" do
break unless using_async_adapter?

This comment has been minimized.

@robin850

robin850 Aug 24, 2015
Member

This is nit-picky but what do you think about using skip instead of break ?

This comment has been minimized.

@jdantonio

jdantonio Aug 25, 2015
Author Contributor

@robin850 You're right--skip is better. I'm an RSpec fan so my knowledge of minitest is limited. I didn't realize there was a skip function.

@robin850
Copy link
Member

@robin850 robin850 commented Aug 24, 2015

Great job @jdantonio! This is looking good! 👏

def create_queue(name, thread_pool)
raise QueueCreationError.new('queue already exists') if QUEUES.key? name
# possible race condition here but the use case is very narrow
QUEUES[name] = thread_pool

This comment has been minimized.

@ianks

ianks Aug 25, 2015
Contributor

Why not synchronize this method?

# Allow jobs to run asynchronously when testing the Active Job gem.
# This should only be called from within unit tests.
def set_normal_mode! #:nodoc:
@test_mode = false

This comment has been minimized.

@ianks

ianks Aug 25, 2015
Contributor

Might need to ensure_ivar_visibility here to be safe

This comment has been minimized.

@jdantonio

jdantonio Aug 25, 2015
Author Contributor

@ianks Strictly speaking, the methods for setting test mode (now perform_immediately) are not thread safe. But that should be OK since they are special-purpose, private methods. The unit tests need the adapters to run immediately. Every one of the supported adapters, therefore, has such a mode. But that should never be used anywhere except in this test suite, where we can ensure that it's called safely.

@jdantonio
Copy link
Contributor Author

@jdantonio jdantonio commented Aug 25, 2015

@kaspth @robin850 Updated based on latest feedback and rebased against latest master.

kaspth added a commit that referenced this pull request Aug 25, 2015
Initial implementation of ActiveJob AsyncAdapter.
@kaspth kaspth merged commit c5a88e5 into rails:master Aug 25, 2015
@kaspth
Copy link
Member

@kaspth kaspth commented Aug 25, 2015

Thanks @jdantonio, this is great 👍

@jdantonio
Copy link
Contributor Author

@jdantonio jdantonio commented Aug 25, 2015

@kaspth Thank you very much for your feedback! This PR is much better because of your help.

@kaspth
Copy link
Member

@kaspth kaspth commented Aug 25, 2015

❤️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked issues

Successfully merging this pull request may close these issues.

None yet

8 participants
You can’t perform that action at this time.