diff --git a/.gitignore b/.gitignore index b39c062e..4bce588b 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,5 @@ pkg/* .DS_STORE spec/redis.config.yml +bundle +spec/tmp diff --git a/.rvmrc b/.rvmrc new file mode 100644 index 00000000..27b0a605 --- /dev/null +++ b/.rvmrc @@ -0,0 +1 @@ +rvm use 1.9.3@qless --create diff --git a/Gemfile b/Gemfile index 81da76d6..4cc2c682 100644 --- a/Gemfile +++ b/Gemfile @@ -7,3 +7,5 @@ gemspec # We can't do that until qless-core has been released # to rubygems.org, so for now it's hear. gem 'qless-core', :git => 'git://github.com/seomoz/qless-core.git' + +gem 'debugger', :platform => :mri diff --git a/Rakefile b/Rakefile index 903b5915..c24ae7ab 100644 --- a/Rakefile +++ b/Rakefile @@ -7,3 +7,8 @@ RSpec::Core::RakeTask.new(:spec) do |t| end task :default => :spec + +require 'bundler' +Bundler.setup + +require 'qless/tasks' diff --git a/lib/qless.rb b/lib/qless.rb index 89bf1bba..b493afa7 100644 --- a/lib/qless.rb +++ b/lib/qless.rb @@ -1,6 +1,7 @@ require "socket" require "redis" require "json" +require "securerandom" require "qless/version" require "qless/config" @@ -9,17 +10,32 @@ require "qless/lua" module Qless + extend self + + def generate_jid + SecureRandom.uuid.gsub('-', '') + end + + def stringify_hash_keys(hash) + hash.each_with_object({}) do |(key, value), result| + result[key.to_s] = value + end + end + + # This is a unique identifier for the worker + def worker_name + @worker_name ||= [Socket.gethostname, Process.pid.to_s].join('-') + end + class Client # Lua scripts attr_reader :_cancel, :_complete, :_fail, :_failed, :_get, :_getconfig, :_heartbeat, :_jobs, :_peek, :_pop, :_put, :_queues, :_setconfig, :_stats, :_track, :_workers, :_depends # A real object - attr_reader :config, :redis, :worker + attr_reader :config, :redis def initialize(options = {}) - # This is a unique identifier for the worker - @worker = Socket.gethostname + "-" + Process.pid.to_s # This is the redis instance we're connected to - @redis = Redis.new(options) + @redis = Redis.connect(options) # use connect so REDIS_URL will be honored @config = Config.new(self) ['cancel', 'complete', 'depends', 'fail', 'failed', 'get', 'getconfig', 'heartbeat', 'jobs', 'peek', 'pop', 'put', 'queues', 'setconfig', 'stats', 'track', 'workers'].each do |f| @@ -28,7 +44,7 @@ def initialize(options = {}) end def queue(name) - Queue.new(name, self, @worker) + Queue.new(name, self) end def queues(qname=nil) @@ -79,4 +95,4 @@ def job(id) Job.new(self, JSON.parse(results)) end end -end \ No newline at end of file +end diff --git a/lib/qless/job.rb b/lib/qless/job.rb index 8bebd052..4216d25b 100644 --- a/lib/qless/job.rb +++ b/lib/qless/job.rb @@ -1,28 +1,55 @@ +require "qless" require "qless/lua" require "redis" require "json" module Qless class Job - attr_reader :jid, :expires, :state, :queue, :history, :worker, :retries, :remaining, :failure, :klass, :delay, :tracked + attr_reader :jid, :expires, :state, :queue, :history, :worker_name, :retries, :remaining, :failure, :klass, :delay, :tracked attr_accessor :data, :priority, :tags def perform - klass = @klass.split('::').inject(nil) { |m, el| (m || Kernel).const_get(el) } + klass = @klass.split('::').inject(Kernel) { |context, name| context.const_get(name) } + klass.perform(self) + end + + def self.mock(client, klass, attributes = {}) + defaults = { + "jid" => Qless.generate_jid, + "data" => {}, + "klass" => klass.to_s, + "priority" => 0, + "tags" => [], + "worker" => "mock_worker", + "expires" => Time.now + (60 * 60), # an hour from now + "state" => "running", + "tracked" => false, + "queue" => "mock_queue", + "retries" => 5, + "remaining" => 5, + "failure" => "maybe", + "history" => [], + "dependencies" => [], + "dependents" => [] + } + attributes = defaults.merge(Qless.stringify_hash_keys(attributes)) + new(client, attributes) end def initialize(client, atts) @client = client - %w{jid data klass priority tags worker expires state tracked queue + %w{jid data klass priority tags expires state tracked queue retries remaining failure history dependencies dependents}.each do |att| self.instance_variable_set("@#{att}".to_sym, atts.fetch(att)) end @delay = atts.fetch('delay', 0) + @worker_name = atts.fetch('worker') # This is a silly side-effect of Lua doing JSON parsing - @tags = [] unless @tags != {} - @dependents = [] unless @depenents != {} - @dependencies = [] unless @dependencies != {} + @tags = [] if @tags == {} + @dependents = [] if @dependents == {} + @dependencies = [] if @dependencies == {} + @state_changed = false end def [](key) @@ -36,6 +63,10 @@ def []=(key, val) def to_s inspect end + + def description + "#{@jid} (#{@klass} / #{@queue})" + end def inspect "< Qless::Job #{@jid} >" @@ -47,26 +78,30 @@ def ttl # Move this from it's current queue into another def move(queue) - @client._put.call([queue], [ - @jid, @klass, JSON.generate(@data), Time.now.to_f, 0 - ]) + note_state_change do + @client._put.call([queue], [ + @jid, @klass, JSON.generate(@data), Time.now.to_f, 0 + ]) + end end # Fail a job def fail(group, message) - @client._fail.call([], [ - @jid, - @worker, - group, message, - Time.now.to_f, - JSON.generate(@data)]) || false + note_state_change do + @client._fail.call([], [ + @jid, + @worker_name, + group, message, + Time.now.to_f, + JSON.generate(@data)]) || false + end end # Heartbeat a job def heartbeat() @client._heartbeat.call([], [ @jid, - @worker, + @worker_name, Time.now.to_f, JSON.generate(@data)]) || false end @@ -76,19 +111,27 @@ def heartbeat() # => next (String) the next queue # => delay (int) how long to delay it in the next queue def complete(nxt=nil, options={}) - if nxt.nil? - response = @client._complete.call([], [ - @jid, @worker, @queue, Time.now.to_f, JSON.generate(@data)]) - else - response = @client._complete.call([], [ - @jid, @worker, @queue, Time.now.to_f, JSON.generate(@data), 'next', nxt, 'delay', - options.fetch(:delay, 0), 'depends', JSON.generate(options.fetch(:depends, []))]) + response = note_state_change do + if nxt.nil? + @client._complete.call([], [ + @jid, @worker_name, @queue, Time.now.to_f, JSON.generate(@data)]) + else + @client._complete.call([], [ + @jid, @worker_name, @queue, Time.now.to_f, JSON.generate(@data), 'next', nxt, 'delay', + options.fetch(:delay, 0), 'depends', JSON.generate(options.fetch(:depends, []))]) + end end response.nil? ? false : response end + + def state_changed? + @state_changed + end def cancel - @client._cancel.call([], [@jid]) + note_state_change do + @client._cancel.call([], [@jid]) + end end def track(*tags) @@ -106,5 +149,13 @@ def depend(*jids) def undepend(*jids) !!@client._depends.call([], [@jid, 'off'] + jids) end + + private + + def note_state_change + result = yield + @state_changed = true + result + end end end diff --git a/lib/qless/job_reservers/ordered.rb b/lib/qless/job_reservers/ordered.rb new file mode 100644 index 00000000..1f331b8f --- /dev/null +++ b/lib/qless/job_reservers/ordered.rb @@ -0,0 +1,23 @@ +module Qless + module JobReservers + class Ordered + attr_reader :queues + + def initialize(queues) + @queues = queues + end + + def reserve + @queues.each do |q| + job = q.pop + return job if job + end + nil + end + + def description + @description ||= @queues.map(&:name).join(', ') + " (ordered)" + end + end + end +end diff --git a/lib/qless/job_reservers/round_robin.rb b/lib/qless/job_reservers/round_robin.rb new file mode 100644 index 00000000..51a408b5 --- /dev/null +++ b/lib/qless/job_reservers/round_robin.rb @@ -0,0 +1,34 @@ +module Qless + module JobReservers + class RoundRobin + attr_reader :queues + + def initialize(queues) + @queues = queues + @num_queues = queues.size + @last_popped_queue_index = @num_queues - 1 + end + + def reserve + @num_queues.times do |i| + if job = next_queue.pop + return job + end + end + nil + end + + def description + @description ||= @queues.map(&:name).join(', ') + " (round robin)" + end + + private + + def next_queue + @last_popped_queue_index = (@last_popped_queue_index + 1) % @num_queues + @queues[@last_popped_queue_index] + end + end + end +end + diff --git a/lib/qless/queue.rb b/lib/qless/queue.rb index 4c66f159..826b365a 100644 --- a/lib/qless/queue.rb +++ b/lib/qless/queue.rb @@ -1,4 +1,3 @@ -require "securerandom" require "qless/lua" require "qless/job" require "redis" @@ -8,12 +7,12 @@ module Qless # A configuration class associated with a qless client class Queue attr_reader :name - attr_accessor :worker + attr_accessor :worker_name - def initialize(name, client, worker) + def initialize(name, client) @client = client @name = name - @worker = worker + self.worker_name = Qless.worker_name end # Put the described job in this queue @@ -23,7 +22,7 @@ def initialize(name, client, worker) # => delay (int) def put(klass, data, opts={}) @client._put.call([@name], [ - SecureRandom.uuid.gsub('-', ''), + Qless.generate_jid, klass.to_s, JSON.generate(data), Time.now.to_f, @@ -37,7 +36,7 @@ def put(klass, data, opts={}) # Pop a work item off the queue def pop(count=nil) - results = @client._pop.call([@name], [@worker, (count || 1), Time.now.to_f]).map { |j| Job.new(@client, JSON.parse(j)) } + results = @client._pop.call([@name], [worker_name, (count || 1), Time.now.to_f]).map { |j| Job.new(@client, JSON.parse(j)) } count.nil? ? results[0] : results end diff --git a/lib/qless/server.rb b/lib/qless/server.rb index 6545bfd5..f3ff600f 100755 --- a/lib/qless/server.rb +++ b/lib/qless/server.rb @@ -1,12 +1,8 @@ - - require 'sinatra/base' require 'qless' # Much of this is shamelessly poached from the resque web client -client = Qless::Client.new - module Qless class Server < Sinatra::Base # Path-y-ness @@ -23,6 +19,10 @@ class Server < Sinatra::Base def self.client @client ||= Qless::Client.new end + + def self.client=(client) + @client = client + end helpers do include Rack::Utils @@ -296,4 +296,4 @@ def strftime(t) # start the server if ruby file executed directly run! if app_file == $0 end -end \ No newline at end of file +end diff --git a/lib/qless/server/views/_job.erb b/lib/qless/server/views/_job.erb index aa83772f..8a228f24 100644 --- a/lib/qless/server/views/_job.erb +++ b/lib/qless/server/views/_job.erb @@ -3,7 +3,7 @@

- <%= job.jid %> | <%= job.state %> / <%= job.queue %><%= job.worker.nil? ? "/ #{job.worker}" : "" %> + <%= job.jid %> | <%= job.state %> / <%= job.queue %><%= job.worker_name.nil? ? "/ #{job.worker_name}" : "" %>

diff --git a/lib/qless/tasks.rb b/lib/qless/tasks.rb new file mode 100644 index 00000000..3962dc8d --- /dev/null +++ b/lib/qless/tasks.rb @@ -0,0 +1,10 @@ +namespace :qless do + task :setup # no-op; users should define their own setup + + desc "Start a Qless worker using env vars: QUEUES, JOB_RESERVER, REDIS_URL, INTERVAL, VERBOSE, VVERBOSE" + task :work => :setup do + require 'qless/worker' + Qless::Worker.start + end +end + diff --git a/lib/qless/worker.rb b/lib/qless/worker.rb new file mode 100644 index 00000000..a87c55aa --- /dev/null +++ b/lib/qless/worker.rb @@ -0,0 +1,174 @@ +require 'qless' +require 'time' +require 'qless/job_reservers/ordered' +require 'qless/job_reservers/round_robin' + +module Qless + # This is heavily inspired by Resque's excellent worker: + # https://github.com/defunkt/resque/blob/v1.20.0/lib/resque/worker.rb + class Worker + def initialize(client, job_reserver, options = {}) + @client, @job_reserver = client, job_reserver + @shutdown = @paused = false + self.very_verbose = options[:very_verbose] + self.verbose = options[:verbose] + end + + # Whether the worker should log basic info to STDOUT + attr_accessor :verbose + + # Whether the worker should log lots of info to STDOUT + attr_accessor :very_verbose + + # Starts a worker based on ENV vars. Supported ENV vars: + # - REDIS_URL=redis://host:port/db-num (the redis gem uses this automatically) + # - QUEUES=high,medium,low or QUEUE=blah + # - JOB_RESERVER=Ordered or JOB_RESERVER=RoundRobin + # - INTERVAL=3.2 + # - VERBOSE=true (to enable logging) + # - VVERBOSE=true (to enable very verbose logging) + # This is designed to be called from a rake task + def self.start + client = Qless::Client.new + queues = (ENV['QUEUES'] || ENV['QUEUE']).to_s.split(',').map { |q| client.queue(q.strip) } + if queues.none? + raise "No queues provided. You must pass QUEUE or QUEUES when starting a worker." + end + + reserver = JobReservers.const_get(ENV.fetch('JOB_RESERVER', 'Ordered')).new(queues) + interval = Float(ENV.fetch('INTERVAL', 5.0)) + + options = {} + options[:verbose] = !!ENV['VERBOSE'] + options[:very_verbose] = !!ENV['VVERBOSE'] + + new(client, reserver, options).work(interval) + end + + def work(interval = 5.0) + procline "Starting #{@job_reserver.description}" + register_signal_handlers + + loop do + break if shutdown? + next if paused? + + unless job = @job_reserver.reserve + break if interval.zero? + procline "Waiting for #{@job_reserver.description}" + log! "Sleeping for #{interval} seconds" + sleep interval + next + end + + log "got: #{job.inspect}" + + if @child = fork + # We're in the parent process + procline "Forked #{@child} for #{job.description}" + Process.wait(@child) + else + # We're in the child process + procline "Processing #{job.description}" + perform(job) + exit! + end + end + end + + def perform(job) + job.perform + rescue => error + fail_job(job, error) + else + job.complete unless job.state_changed? + end + + def shutdown + @shutdown = true + end + + def shutdown! + shutdown + kill_child + end + + def shutdown? + @shutdown + end + + def paused? + @paused + end + + def pause_processing + log "USR2 received; pausing job processing" + @paused = true + procline "Paused -- #{@job_reserver.description}" + end + + def unpause_processing + log "CONT received; resuming job processing" + @paused = false + end + + private + + def fail_job(job, error) + group = "#{job.klass}:#{error.class}" + message = "#{error.message}\n\n#{error.backtrace.join("\n")}" + log "Got #{group} failure from #{job.inspect}" + job.fail(group, message) + end + + def procline(value) + $0 = "Qless-#{Qless::VERSION}: #{value} at #{Time.now.iso8601}" + log! $0 + end + + def kill_child + return unless @child + return unless system("ps -o pid,state -p #{@child}") + Process.kill("KILL", @child) rescue nil + end + + # This is stolen directly from resque... (thanks, @defunkt!) + # Registers the various signal handlers a worker responds to. + # + # TERM: Shutdown immediately, stop processing jobs. + # INT: Shutdown immediately, stop processing jobs. + # QUIT: Shutdown after the current job has finished processing. + # USR1: Kill the forked child immediately, continue processing jobs. + # USR2: Don't process any new jobs + # CONT: Start processing jobs again after a USR2 + def register_signal_handlers + trap('TERM') { shutdown! } + trap('INT') { shutdown! } + + begin + trap('QUIT') { shutdown } + trap('USR1') { kill_child } + trap('USR2') { pause_processing } + trap('CONT') { unpause_processing } + rescue ArgumentError + warn "Signals QUIT, USR1, USR2, and/or CONT not supported." + end + end + + # Log a message to STDOUT if we are verbose or very_verbose. + def log(message) + if verbose + puts "*** #{message}" + elsif very_verbose + time = Time.now.strftime('%H:%M:%S %Y-%m-%d') + puts "** [#{time}] #$$: #{message}" + end + end + + # Logs a very verbose message to STDOUT. + def log!(message) + log message if very_verbose + end + end +end + diff --git a/qless.gemspec b/qless.gemspec index 3d3dac59..b43187bf 100644 --- a/qless.gemspec +++ b/qless.gemspec @@ -34,10 +34,11 @@ Gem::Specification.new do |s| s.add_dependency "sinatra", "~> 1.3.2" s.add_dependency "vegas" , "~> 0.1.11" - s.add_dependency "hiredis", "~> 0.4.5" s.add_dependency "redis" , "~> 2.2.2" - s.add_dependency "json" , "~> 1.6.6" s.add_development_dependency "rspec" , "~> 2.6" + s.add_development_dependency "rspec-fire", "~> 0.4" s.add_development_dependency "rake" , "~> 0.9.2.2" + s.add_development_dependency 'capybara', '~> 1.1.2' + s.add_development_dependency 'launchy', '~> 2.1.0' end diff --git a/spec/qless_spec.rb b/spec/integration/qless_spec.rb similarity index 96% rename from spec/qless_spec.rb rename to spec/integration/qless_spec.rb index 1c1e8284..cb72a694 100644 --- a/spec/qless_spec.rb +++ b/spec/integration/qless_spec.rb @@ -9,45 +9,14 @@ class FooJob # An empty class end - describe Qless::Client do - # Our main client - let(:client) { Qless::Client.new(redis_config) } + describe Qless::Client, :integration do # Our main test queue let(:q) { client.queue("testing") } # Point to the main queue, but identify as different workers - let(:a) { client.queue("testing").tap { |o| o.worker = "worker-a" } } - let(:b) { client.queue("testing").tap { |o| o.worker = "worker-b" } } + let(:a) { client.queue("testing").tap { |o| o.worker_name = "worker-a" } } + let(:b) { client.queue("testing").tap { |o| o.worker_name = "worker-b" } } # And a second queue let(:other) { client.queue("other") } - - let(:redis_config) do - if File.exist?('./spec/redis.config.yml') - YAML.load_file('./spec/redis.config.yml') - else - {} - end - end - - def assert_minimum_redis_version(version) - redis_version = Gem::Version.new(@redis.info["redis_version"]) - if redis_version < Gem::Version.new(version) - pending "You are running redis #{redis_version}, but qless requires at least #{version}" - end - end - - before(:each) do - # Sometimes we need raw redis access - @redis = Redis.new(redis_config) - assert_minimum_redis_version("2.6") - if @redis.keys("*").length > 0 - raise "Must start with empty Redis DB" - end - @redis.script(:flush) - end - - after(:each) do - @redis.flushdb - end describe "#config" do it "can set, get and erase configuration" do @@ -71,7 +40,7 @@ def assert_minimum_redis_version(version) job.priority.should eq(0) job.data.should eq({"test" => "put_get"}) job.tags.should eq([]) - job.worker.should eq("") + job.worker_name.should eq("") job.state.should eq("waiting") job.history.length.should eq(1) job.history[0]['q'].should eq("testing") @@ -104,7 +73,7 @@ def assert_minimum_redis_version(version) client.config['heartbeat'] = 60 job = q.pop job.data.should eq({'test' => 'test_put_pop_attributes'}) - job.worker.should eq(client.worker) + job.worker_name.should eq(Qless.worker_name) job.expires.should > (Time.new.to_i - 20) job.state.should eq('running') job.queue.should eq('testing') @@ -127,7 +96,7 @@ def assert_minimum_redis_version(version) jid = q.put(Qless::Job, {'test' => 'test_put_pop_attributes'}) job = q.peek job.data.should eq({'test' => 'test_put_pop_attributes'}) - job.worker.should eq('') + job.worker_name.should eq('') job.state.should eq('waiting') job.queue.should eq('testing') job.remaining.should eq(5) @@ -339,7 +308,7 @@ def assert_minimum_redis_version(version) job.jid.should eq(jid) job.queue.should eq("testing") job.data.should eq({"test" => "fail_failed"}) - job.worker.should eq("") + job.worker_name.should eq("") job.state.should eq("failed") job.queue.should eq("testing") job.remaining.should eq(5) @@ -489,7 +458,7 @@ def assert_minimum_redis_version(version) job = client.job(jid) job.history.length.should eq(1) job.state.should eq("complete") - job.worker.should eq("") + job.worker_name.should eq("") job.queue.should eq("") q.length.should eq(0) end @@ -510,7 +479,7 @@ def assert_minimum_redis_version(version) job = client.job(jid) job.history.length.should eq(2) job.state.should eq("waiting") - job.worker.should eq("") + job.worker_name.should eq("") job.queue.should eq("testing") q.length.should eq(1) end @@ -534,7 +503,7 @@ def assert_minimum_redis_version(version) job = client.job(jid) job.history.length.should eq(1) job.state.should eq("complete") - job.worker.should eq("") + job.worker_name.should eq("") job.queue.should eq("") q.length.should eq(0) end @@ -923,11 +892,11 @@ def assert_minimum_redis_version(version) client.workers.should eq({}) job = q.pop client.workers.should eq([{ - "name" => q.worker, + "name" => q.worker_name, "jobs" => 1, "stalled" => 0 }]) - client.workers(q.worker).should eq({ + client.workers(q.worker_name).should eq({ "jobs" => [jid], "stalled" => {} }) @@ -944,21 +913,21 @@ def assert_minimum_redis_version(version) jid = q.put(Qless::Job, {"test" => "workers_cancel"}) job = q.pop client.workers.should eq([{ - "name" => q.worker, + "name" => q.worker_name, "jobs" => 1, "stalled" => 0 }]) - client.workers(q.worker).should eq({ + client.workers(q.worker_name).should eq({ "jobs" => [jid], "stalled" => {} }) job.cancel client.workers.should eq([{ - "name" => q.worker, + "name" => q.worker_name, "jobs" => 0, "stalled" => 0 }]) - client.workers(q.worker).should eq({ + client.workers(q.worker_name).should eq({ "jobs" => {}, "stalled" => {} }) @@ -979,11 +948,11 @@ def assert_minimum_redis_version(version) client.config["heartbeat"] = -10 job = q.pop client.workers.should eq([{ - "name" => q.worker, + "name" => q.worker_name, "jobs" => 0, "stalled" => 1 }]) - client.workers(q.worker).should eq({ + client.workers(q.worker_name).should eq({ "jobs" => {}, "stalled" => [jid] }) @@ -992,15 +961,15 @@ def assert_minimum_redis_version(version) job = a.pop a.pop client.workers.should eq([{ - "name" => a.worker, + "name" => a.worker_name, "jobs" => 1, "stalled" => 0 }, { - "name" => q.worker, + "name" => q.worker_name, "jobs" => 0, "stalled" => 0 }]) - client.workers(q.worker).should eq({ + client.workers(q.worker_name).should eq({ "jobs" => {}, "stalled" => {} }) @@ -1016,11 +985,11 @@ def assert_minimum_redis_version(version) jid = q.put(Qless::Job, {"test" => "workers_fail"}) job = q.pop client.workers.should eq([{ - "name" => q.worker, + "name" => q.worker_name, "jobs" => 1, "stalled" => 0 }]) - client.workers(q.worker).should eq({ + client.workers(q.worker_name).should eq({ "jobs" => [jid], "stalled" => {} }) @@ -1028,11 +997,11 @@ def assert_minimum_redis_version(version) # Now, let's fail it job.fail("foo", "bar") client.workers.should eq([{ - "name" => q.worker, + "name" => q.worker_name, "jobs" => 0, "stalled" => 0 }]) - client.workers(q.worker).should eq({ + client.workers(q.worker_name).should eq({ "jobs" => {}, "stalled" => {} }) @@ -1047,22 +1016,22 @@ def assert_minimum_redis_version(version) jid = q.put(Qless::Job, {"test" => "workers_complete"}) job = q.pop client.workers.should eq([{ - "name" => q.worker, + "name" => q.worker_name, "jobs" => 1, "stalled" => 0 }]) - client.workers(q.worker).should eq({ + client.workers(q.worker_name).should eq({ "jobs" => [jid], "stalled" => {} }) job.complete client.workers.should eq([{ - "name" => q.worker, + "name" => q.worker_name, "jobs" => 0, "stalled" => 0 }]) - client.workers(q.worker).should eq({ + client.workers(q.worker_name).should eq({ "jobs" => {}, "stalled" => {} }) @@ -1078,22 +1047,22 @@ def assert_minimum_redis_version(version) jid = q.put(Qless::Job, {"test" => "workers_reput"}) job = q.pop client.workers.should eq([{ - "name" => q.worker, + "name" => q.worker_name, "jobs" => 1, "stalled" => 0 }]) - client.workers(q.worker).should eq({ + client.workers(q.worker_name).should eq({ "jobs" => [jid], "stalled" => {} }) job.move("other") client.workers.should eq([{ - "name" => q.worker, + "name" => q.worker_name, "jobs" => 0, "stalled" => 0 }]) - client.workers(q.worker).should eq({ + client.workers(q.worker_name).should eq({ "jobs" => {}, "stalled" => {} }) diff --git a/spec/integration/server_spec.rb b/spec/integration/server_spec.rb new file mode 100644 index 00000000..0644b85c --- /dev/null +++ b/spec/integration/server_spec.rb @@ -0,0 +1,24 @@ +ENV['RACK_ENV'] = 'test' +require 'spec_helper' +require 'yaml' +require 'qless/server' +require 'capybara/rspec' + +module Qless + describe Server, :type => :request do + before(:all) do + Qless::Server.client = Qless::Client.new(redis_config) + Capybara.app = Qless::Server.new + end + + it 'can visit each top-nav tab' do + visit '/' + + links = all('ul.nav a') + links.should have_at_least(6).links + links.each do |link| + click_link link.text + end + end + end +end diff --git a/spec/integration/worker_spec.rb b/spec/integration/worker_spec.rb new file mode 100644 index 00000000..825ac440 --- /dev/null +++ b/spec/integration/worker_spec.rb @@ -0,0 +1,61 @@ +require 'spec_helper' +require 'redis' +require 'yaml' +require 'qless/worker' +require 'qless' + +class WorkerIntegrationJob + def self.perform(job) + File.open(job['file'], "w") { |f| f.write("done") } + end +end + +describe "Worker integration", :integration do + def start_worker + unless @child = fork + with_env_vars 'REDIS_URL' => redis_url, 'QUEUE' => 'main', 'INTERVAL' => '0.0001' do + Qless::Worker.start + exit! # once the work ends, exit hte worker process + end + end + end + + def output_file(i) + File.join(temp_dir, "job.out.#{Time.now.to_i}.#{i}") + end + + def wait_for_job_completion + 20.times do |i| + sleep 0.1 + return if `ps -p #{@child}` =~ /Waiting/i + end + + raise "Didn't complete: #{`ps -p #{@child}`}" + end + + let(:temp_dir) { "./spec/tmp" } + + before do + FileUtils.rm_rf temp_dir + FileUtils.mkdir_p temp_dir + end + + it 'can start a worker and then shut it down' do + files = 3.times.map { |i| output_file(i) } + files.select { |f| File.exist?(f) }.should eq([]) + + start_worker + + queue = client.queue("main") + files.each do |f| + queue.put(WorkerIntegrationJob, "file" => f) + end + + wait_for_job_completion + Process.kill("QUIT", @child) # send shutdown signal + + contents = files.map { |f| File.read(f) } + contents.should eq(['done'] * files.size) + end +end + diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 7efab34d..7a14b018 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -1,6 +1,71 @@ +begin + # use `bundle install --standalone' to get this... + require_relative '../bundle/bundler/setup' +rescue LoadError + # fall back to regular bundler if the developer hasn't bundled standalone + require 'bundler' + Bundler.setup +end + +require 'rspec/fire' + +module QlessSpecHelpers + def with_env_vars(vars) + original = ENV.to_hash + vars.each { |k, v| ENV[k] = v } + + begin + yield + ensure + ENV.replace(original) + end + end + + def redis_config + @redis_config ||= if File.exist?('./spec/redis.config.yml') + YAML.load_file('./spec/redis.config.yml') + else + {} + end + end + + def redis_url + return "redis://localhost:6379" if redis_config.empty? + "redis://#{redis_config[:host]}:#{redis_config[:port]}" + end +end + RSpec.configure do |c| c.treat_symbols_as_metadata_keys_with_true_values = true c.filter_run :f c.run_all_when_everything_filtered = true + c.include RSpec::Fire + c.include QlessSpecHelpers end +shared_context "redis integration", :integration do + let(:client) { Qless::Client.new(redis_config) } + + def assert_minimum_redis_version(version) + redis_version = Gem::Version.new(@redis.info["redis_version"]) + if redis_version < Gem::Version.new(version) + pending "You are running redis #{redis_version}, but qless requires at least #{version}" + end + end + + before(:each) do + # Sometimes we need raw redis access + @redis = Redis.new(redis_config) + assert_minimum_redis_version("2.6") + if @redis.keys("*").length > 0 + raise "Must start with empty Redis DB" + end + @redis.script(:flush) + end + + after(:each) do + @redis.flushdb + end +end + + diff --git a/spec/unit/job_reservers/ordered_spec.rb b/spec/unit/job_reservers/ordered_spec.rb new file mode 100644 index 00000000..bf8f2596 --- /dev/null +++ b/spec/unit/job_reservers/ordered_spec.rb @@ -0,0 +1,57 @@ +require 'spec_helper' +require 'qless/queue' +require 'qless/job_reservers/ordered' + +module Qless + module JobReservers + describe Ordered do + let(:q1) { fire_double("Qless::Queue") } + let(:q2) { fire_double("Qless::Queue") } + let(:q3) { fire_double("Qless::Queue") } + let(:reserver) { Ordered.new([q1, q2, q3]) } + + describe "#reserve" do + it 'always pops jobs from the first queue as long as it has jobs' do + q1.should_receive(:pop).and_return(:j1, :j2, :j3) + q2.should_not_receive(:pop) + q3.should_not_receive(:pop) + + reserver.reserve.should eq(:j1) + reserver.reserve.should eq(:j2) + reserver.reserve.should eq(:j3) + end + + it 'falls back to other queues when earlier queues lack jobs' do + call_count = 1 + q1.should_receive(:pop).exactly(4).times { :q1_job if [2, 4].include?(call_count) } + q2.should_receive(:pop).exactly(2).times { :q2_job if call_count == 1 } + q3.should_receive(:pop).once { :q3_job if call_count == 3 } + + reserver.reserve.should eq(:q2_job) + call_count = 2 + reserver.reserve.should eq(:q1_job) + call_count = 3 + reserver.reserve.should eq(:q3_job) + call_count = 4 + reserver.reserve.should eq(:q1_job) + end + + it 'returns nil if none of the queues have jobs' do + [q1, q2, q3].each { |q| q.stub(:pop) } + reserver.reserve.should be_nil + end + end + + describe "#description" do + it 'returns a useful human readable string' do + q1.stub(:name) { "Queue1" } + q2.stub(:name) { "Queue2" } + q3.stub(:name) { "Queue3" } + + reserver.description.should eq("Queue1, Queue2, Queue3 (ordered)") + end + end + end + end +end + diff --git a/spec/unit/job_reservers/round_robin_spec.rb b/spec/unit/job_reservers/round_robin_spec.rb new file mode 100644 index 00000000..f034f1e0 --- /dev/null +++ b/spec/unit/job_reservers/round_robin_spec.rb @@ -0,0 +1,45 @@ +require 'spec_helper' +require 'qless/queue' +require 'qless/job_reservers/round_robin' + +module Qless + module JobReservers + describe RoundRobin do + let(:q1) { fire_double("Qless::Queue") } + let(:q2) { fire_double("Qless::Queue") } + let(:q3) { fire_double("Qless::Queue") } + let(:reserver) { RoundRobin.new([q1, q2, q3]) } + + describe "#reserve" do + it 'round robins the queues' do + q1.should_receive(:pop).twice { :q1_job } + q2.should_receive(:pop).once { :q2_job } + q3.should_receive(:pop).once { :q3_job } + + reserver.reserve.should eq(:q1_job) + reserver.reserve.should eq(:q2_job) + reserver.reserve.should eq(:q3_job) + reserver.reserve.should eq(:q1_job) + end + + it 'returns nil if none of the queues have jobs' do + q1.should_receive(:pop).once { nil } + q2.should_receive(:pop).once { nil } + q3.should_receive(:pop).once { nil } + reserver.reserve.should be_nil + end + end + + describe "#description" do + it 'returns a useful human readable string' do + q1.stub(:name) { "Queue1" } + q2.stub(:name) { "Queue2" } + q3.stub(:name) { "Queue3" } + + reserver.description.should eq("Queue1, Queue2, Queue3 (round robin)") + end + end + end + end +end + diff --git a/spec/unit/job_spec.rb b/spec/unit/job_spec.rb new file mode 100644 index 00000000..b6286c6d --- /dev/null +++ b/spec/unit/job_spec.rb @@ -0,0 +1,67 @@ +require 'spec_helper' +require 'qless/job' + +module Qless + describe Job do + class JobClass + class Nested + end + end + + let(:client) { stub.as_null_object } + + describe ".mock" do + it 'creates a job instance' do + Job.mock(client, JobClass).should be_a(Job) + end + + it 'honors attributes passed as a symbol' do + job = Job.mock(client, JobClass, data: { "a" => 5 }) + job.data.should eq("a" => 5) + end + end + + describe "#perform" do + it 'calls the #perform method on the job class with the job as an argument' do + job = Job.mock(client, JobClass) + JobClass.should_receive(:perform).with(job).once + job.perform + end + + it 'properly finds nested classes' do + job = Job.mock(client, JobClass::Nested) + JobClass::Nested.should_receive(:perform).with(job).once + job.perform + end + end + + [ + [:fail, 'group', 'message'], + [:complete], + [:cancel], + [:move, 'queue'] + ].each do |meth, *args| + describe "##{meth}" do + let(:job) { Job.mock(client, JobClass) } + + it 'updates #state_changed? from false to true' do + expect { + job.send(meth, *args) + }.to change(job, :state_changed?).from(false).to(true) + end + + class MyCustomError < StandardError; end + it 'does not update #state_changed? if there is a redis connection error' do + client.stub(:"_#{meth}") { raise MyCustomError, "boom" } + client.stub(:"_put") { raise MyCustomError, "boom" } # for #move + + expect { + job.send(meth, *args) + }.to raise_error(MyCustomError) + job.state_changed?.should be_false + end + end + end + end +end + diff --git a/spec/unit/qless_spec.rb b/spec/unit/qless_spec.rb new file mode 100644 index 00000000..5992a214 --- /dev/null +++ b/spec/unit/qless_spec.rb @@ -0,0 +1,21 @@ +require 'spec_helper' +require 'qless' + +describe Qless do + describe ".generate_jid" do + it "generates a UUID suitable for use as a jid" do + Qless.generate_jid.should match(/\A[a-f0-9]{32}\z/) + end + end + + describe ".worker_name" do + it 'includes the hostname in the worker name' do + Qless.worker_name.should include(Socket.gethostname) + end + + it 'includes the pid in the worker name' do + Qless.worker_name.should include(Process.pid.to_s) + end + end +end + diff --git a/spec/unit/worker_spec.rb b/spec/unit/worker_spec.rb new file mode 100644 index 00000000..9eeadd35 --- /dev/null +++ b/spec/unit/worker_spec.rb @@ -0,0 +1,319 @@ +require 'spec_helper' +require 'yaml' +require 'qless/worker' + +module Qless + describe Worker do + let(:reserver) { fire_double("Qless::JobReservers::Ordered", description: "job reserver") } + let(:client) { stub.as_null_object } + let(:worker) { Worker.new(client, reserver) } + + describe "#perform" do + class MyJobClass; end + let(:job) { Job.mock(client, MyJobClass) } + + it 'performs the job' do + MyJobClass.should_receive(:perform) + worker.perform(job) + end + + it 'fails the job if performing it raises an error' do + MyJobClass.stub(:perform) { raise StandardError.new("boom") } + expected_line_number = __LINE__ - 1 + job.should respond_to(:fail).with(2).arguments + + job.should_receive(:fail) do |group, message| + group.should eq("Qless::MyJobClass:StandardError") + message.should include("boom") + message.should include("#{__FILE__}:#{expected_line_number}") + end + + worker.perform(job) + end + + it 'completes the job if it finishes with no errors' do + MyJobClass.stub(:perform) + job.should respond_to(:complete).with(0).arguments + job.should_receive(:complete).with(no_args) + worker.perform(job) + end + + it 'does not complete the job if the job logic itself changes the state of it (e.g. moves it to a new queue)' do + MyJobClass.stub(:perform) { |j| j.move("other") } + job.should_not_receive(:complete) + worker.perform(job) + end + end + + describe "#work" do + around(:each) do |example| + old_procline = procline + example.run + $0 = old_procline + end + + def procline + $0 + end + + class FileWriterJob + def self.perform(job) + sleep(job['sleep']) if job['sleep'] + File.open(job['file'], "w") { |f| f.write("done: #{$0}") } + end + end + + let(:output_file) { File.join(temp_dir, "job.out.#{Time.now.to_i}") } + let(:job) { Job.mock(client, FileWriterJob, data: { 'file' => output_file }) } + + let(:temp_dir) { "./spec/tmp" } + before do + FileUtils.rm_rf temp_dir + FileUtils.mkdir_p temp_dir + reserver.stub(:reserve).and_return(job, nil) + end + + it "performs the job in a child process and waits for it to complete" do + worker.work(0) + File.read(output_file).should include("done") + end + + it 'begins with a "starting" procline' do + starting_procline = nil + reserver.stub(:reserve) do + starting_procline = procline + nil + end + + worker.work(0) + starting_procline.should include("Starting") + end + + it 'sets an appropriate procline for the parent process' do + parent_procline = nil + old_wait = Process.method(:wait) + Process.stub(:wait) do |child| + parent_procline = procline + old_wait.call(child) + end + + worker.work(0) + parent_procline.should match(/Forked .* at/) + end + + it 'sets an appropriate procline in the child process' do + worker.work(0) + output = File.read(output_file) + output.should include("Processing", job.queue, job.klass, job.jid) + end + + it 'stops working when told to shutdown' do + num_jobs_performed = 0 + old_wait = Process.method(:wait) + Process.stub(:wait) do |child| + worker.shutdown if num_jobs_performed == 1 + num_jobs_performed += 1 + old_wait.call(child) + end + + reserver.stub(:reserve).and_return(job, job) + worker.work(0.0001) + num_jobs_performed.should eq(2) + end + + it 'kills the child immediately when told to #shutdown!' do + job['sleep'] = 0.5 # to ensure the parent has a chance to kill the child before it does work + + old_wait = Process.method(:wait) + Process.stub(:wait) do |child| + worker.shutdown! + old_wait.call(child) + end + + File.exist?(output_file).should be_false + worker.work(0) + File.exist?(output_file).should be_false + end + + it 'can be paused' do + old_wait = Process.method(:wait) + Process.stub(:wait) do |child| + worker.pause_processing # pause the worker after starting the first job + old_wait.call(child) + end + + paused_procline = nil + paused_checks = 0 + old_paused = worker.method(:paused?) + worker.stub(:paused?) do + paused_checks += 1 # count the number of loop iterations + worker.shutdown if paused_checks == 20 # so we don't loop forever + old_paused.call.tap do |paused| + paused_procline = procline if paused + end + end + + # a job should only be reserved once because it is paused while processing the first one + reserver.should_receive(:reserve).once { job } + + worker.work(0) + paused_checks.should eq(20) + paused_procline.should include("Paused") + end + + it 'can be unpaused' do + worker.pause_processing + + paused_checks = 0 + old_paused = worker.method(:paused?) + worker.stub(:paused?) do + paused_checks += 1 # count the number of loop iterations + worker.unpause_processing if paused_checks == 20 # so we don't loop forever + old_paused.call + end + + worker.work(0) + paused_checks.should be >= 20 + end + end + + describe ".start" do + def with_env_vars(vars = {}) + defaults = { + 'REDIS_URL' => redis_url, + 'QUEUE' => 'mock_queue' + } + super(defaults.merge(vars)) { yield } + end + + it 'uses REDIS_URL to make a redis connection' do + with_env_vars "REDIS_URL" => (redis_url + '/4') do + Worker.should_receive(:new) do |client, reserver| + client.redis.client.db.should eq(4) + stub.as_null_object + end + + Worker.start + end + end + + it 'starts working with sleep interval INTERVAL' do + with_env_vars "INTERVAL" => "2.3" do + worker = fire_double("Qless::Worker") + Worker.stub(:new).and_return(worker) + worker.should_receive(:work).with(2.3) + + Worker.start + end + end + + it 'defaults the sleep interval to 5.0' do + with_env_vars do + worker = fire_double("Qless::Worker") + Worker.stub(:new).and_return(worker) + worker.should_receive(:work).with(5.0) + + Worker.start + end + end + + it 'uses the named QUEUE' do + with_env_vars 'QUEUE' => 'normal' do + Worker.should_receive(:new) do |client, reserver| + reserver.queues.map(&:name).should eq(["normal"]) + stub.as_null_object + end + + Worker.start + end + end + + it 'uses the named QUEUES (comma delimited)' do + with_env_vars 'QUEUES' => 'high,normal, low' do + Worker.should_receive(:new) do |client, reserver| + reserver.queues.map(&:name).should eq(["high", "normal", "low"]) + stub.as_null_object + end + + Worker.start + end + end + + it 'raises an error if no queues are provided' do + with_env_vars 'QUEUE' => '', 'QUEUES' => '' do + expect { + Worker.start + }.to raise_error(/must pass QUEUE or QUEUES/) + end + end + + it 'uses the Ordered reserver by default' do + with_env_vars do + Worker.should_receive(:new) do |client, reserver| + reserver.should be_a(JobReservers::Ordered) + stub.as_null_object + end + + Worker.start + end + end + + it 'uses the RoundRobin reserver if so configured' do + with_env_vars 'JOB_RESERVER' => 'RoundRobin' do + Worker.should_receive(:new) do |client, reserver| + reserver.should be_a(JobReservers::RoundRobin) + stub.as_null_object + end + + Worker.start + end + end + + it 'sets verbose and very_verbose to false by default' do + with_env_vars do + orig_new = Worker.method(:new) + Worker.should_receive(:new) do |client, reserver, options = {}| + worker = orig_new.call(client, reserver, options) + worker.verbose.should be_false + worker.very_verbose.should be_false + + stub.as_null_object + end + + Worker.start + end + end + + it 'sets verbose=true when passed VERBOSE' do + with_env_vars 'VERBOSE' => '1' do + orig_new = Worker.method(:new) + Worker.should_receive(:new) do |client, reserver, options = {}| + worker = orig_new.call(client, reserver, options) + worker.verbose.should be_true + worker.very_verbose.should be_false + + stub.as_null_object + end + + Worker.start + end + end + + it 'sets very_verbose=true when passed VVERBOSE' do + with_env_vars 'VVERBOSE' => '1' do + orig_new = Worker.method(:new) + Worker.should_receive(:new) do |client, reserver, options = {}| + worker = orig_new.call(client, reserver, options) + worker.verbose.should be_false + worker.very_verbose.should be_true + + stub.as_null_object + end + + Worker.start + end + end + end + end +end +