A simple non-durable in memory queue for running background tasks using threads.
Projects like Resque, delayed job, queue classic, and sidekiq are great. They use data stores like postgres and redis to store information to be processed later. If you're prototyping a system or don't have access to a data store, you might still want to push off some work to a background process. If that's the case an in-memory threaded queue might be a good fit.
$ bundle install
Define your task to be processed:
class Archive def self.call(repo_id, branch = 'master') repo = Repository.find(repo_id) repo.create_archive(branch) end end
It can be any object that responds to
call but we recommend a class or module which makes switching to a durable queue later easier.
Then to enqueue a task to be run in the background use
repo = Repo.last Threaded.enqueue(Archive, repo.id, 'staging')
The first argument is a class that defines the task to be processed and the rest of the arguments are passed to the task when it is run.
The default number of worker threads is 16, you can configure that when you start your queue:
Threaded.config do |config| config.size = 5 end
By default jobs have a timeout value of 60 seconds. Since this is an in-memory queue (goes away when your process terminates) it is in your best interests to keep jobs small and quick, and not overload the queue. You can configure a different timeout on start:
Threaded.config do |config| config.timeout = 90 # timeout is in seconds end
Want a different logger? Specify a different Logger:
Threaded.config do |config| config.logger = Logger.new(STDOUT) end
As soon as you call
enqueue a new thread will be started, if you wish to explicitly start all threads you can call
Threaded.start. You can also inline your config if you want when you start the queue:
Threaded.start(size: 5, timeout: 90, logger: Logger.new(STDOUT))
For testing or guaranteed code execution use the
Threaded.inline = true
This option bypasses the queue and executes code as it comes.
This worker operates in the same process as your app, that means if your app is CPU bound, it will not be very useful. This worker uses threads which means that to be useful your app needs to either use IO (database calls, file writes/reads, shelling out, etc.) or run on JRuby or Rubinius.
To make sure all items in your queue are processed you can add a condition
at_exit to your program:
at_exit do Threaded.stop end
This call takes an optional timeout value (in seconds).