From f14493b98a80a65a2a697b47c92a0b95dbb3a887 Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Tue, 17 Apr 2012 11:29:59 -0700 Subject: [PATCH 01/30] Remove unused client. --- lib/qless/server.rb | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lib/qless/server.rb b/lib/qless/server.rb index cf28cfae..63c432be 100755 --- a/lib/qless/server.rb +++ b/lib/qless/server.rb @@ -1,12 +1,8 @@ - - require 'sinatra/base' require 'qless' # Much of this is shamelessly poached from the resque web client -client = Qless::Client.new - module Qless class Server < Sinatra::Base # Path-y-ness From 81332df04fdfe3ec25d24e36cb59c31b1279779b Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Tue, 17 Apr 2012 11:32:11 -0700 Subject: [PATCH 02/30] Allow the server client to be set. This is important when you need to have it hit a different redis. --- lib/qless/server.rb | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/qless/server.rb b/lib/qless/server.rb index 63c432be..216501b5 100755 --- a/lib/qless/server.rb +++ b/lib/qless/server.rb @@ -19,6 +19,10 @@ class Server < Sinatra::Base def self.client @client ||= Qless::Client.new end + + def self.client=(client) + @client = client + end helpers do include Rack::Utils @@ -286,4 +290,4 @@ def strftime(t) # start the server if ruby file executed directly run! if app_file == $0 end -end \ No newline at end of file +end From adefd1f7023b7feeb108eba37473c9f5010da0df Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Tue, 17 Apr 2012 12:02:40 -0700 Subject: [PATCH 03/30] Extract common spec helper stuff into spec_helper. --- spec/qless_spec.rb | 33 +-------------------------------- spec/spec_helper.rb | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 32 deletions(-) diff --git a/spec/qless_spec.rb b/spec/qless_spec.rb index 1c1e8284..3987d84b 100644 --- a/spec/qless_spec.rb +++ b/spec/qless_spec.rb @@ -9,9 +9,7 @@ class FooJob # An empty class end - describe Qless::Client do - # Our main client - let(:client) { Qless::Client.new(redis_config) } + describe Qless::Client, :integration do # Our main test queue let(:q) { client.queue("testing") } # Point to the main queue, but identify as different workers @@ -19,35 +17,6 @@ class FooJob let(:b) { client.queue("testing").tap { |o| o.worker = "worker-b" } } # And a second queue let(:other) { client.queue("other") } - - let(:redis_config) do - if File.exist?('./spec/redis.config.yml') - YAML.load_file('./spec/redis.config.yml') - else - {} - end - end - - def assert_minimum_redis_version(version) - redis_version = Gem::Version.new(@redis.info["redis_version"]) - if redis_version < Gem::Version.new(version) - pending "You are running redis #{redis_version}, but qless requires at least #{version}" - end - end - - before(:each) do - # Sometimes we need raw redis access - @redis = Redis.new(redis_config) - assert_minimum_redis_version("2.6") - if @redis.keys("*").length > 0 - raise "Must start with empty Redis DB" - end - @redis.script(:flush) - end - - after(:each) do - @redis.flushdb - end describe "#config" do it "can set, get and erase configuration" do diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 7efab34d..d3ca05cd 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -4,3 +4,36 @@ c.run_all_when_everything_filtered = true end +shared_context "redis integration", :integration do + let(:client) { Qless::Client.new(redis_config) } + let(:redis_config) do + if File.exist?('./spec/redis.config.yml') + YAML.load_file('./spec/redis.config.yml') + else + {} + end + end + + def assert_minimum_redis_version(version) + redis_version = Gem::Version.new(@redis.info["redis_version"]) + if redis_version < Gem::Version.new(version) + pending "You are running redis #{redis_version}, but qless requires at least #{version}" + end + end + + before(:each) do + # Sometimes we need raw redis access + @redis = Redis.new(redis_config) + assert_minimum_redis_version("2.6") + if @redis.keys("*").length > 0 + raise "Must start with empty Redis DB" + end + @redis.script(:flush) + end + + after(:each) do + @redis.flushdb + end +end + + From 1ca2559ca54fee8477aefd4d5db2de26d0cc634e Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Tue, 17 Apr 2012 12:51:28 -0700 Subject: [PATCH 04/30] Move existing specs into integration folder. The specs all integrate with redis and the qless-core lua scripts. --- spec/{ => integration}/qless_spec.rb | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename spec/{ => integration}/qless_spec.rb (100%) diff --git a/spec/qless_spec.rb b/spec/integration/qless_spec.rb similarity index 100% rename from spec/qless_spec.rb rename to spec/integration/qless_spec.rb From 64dbba20568c3aa9d80723e068341069347fc5d2 Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Tue, 17 Apr 2012 12:58:04 -0700 Subject: [PATCH 05/30] Add rvmrc file. --- .rvmrc | 1 + 1 file changed, 1 insertion(+) create mode 100644 .rvmrc diff --git a/.rvmrc b/.rvmrc new file mode 100644 index 00000000..27b0a605 --- /dev/null +++ b/.rvmrc @@ -0,0 +1 @@ +rvm use 1.9.3@qless --create From 724de2fbf3d5f8d900f8328112014eccd2a37977 Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Tue, 17 Apr 2012 13:01:01 -0700 Subject: [PATCH 06/30] Set things up to support bundle --standalone. See http://myronmars.to/n/dev-blog/2012/03/faster-test-boot-times-with-bundler-standalone for details on this approach. --- .gitignore | 1 + spec/spec_helper.rb | 9 +++++++++ 2 files changed, 10 insertions(+) diff --git a/.gitignore b/.gitignore index b39c062e..1a6994b6 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ pkg/* .DS_STORE spec/redis.config.yml +bundle diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index d3ca05cd..572262af 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -1,3 +1,12 @@ +begin + # use `bundle install --standalone' to get this... + require_relative '../bundle/bundler/setup' +rescue LoadError + # fall back to regular bundler if the developer hasn't bundled standalone + require 'bundler' + Bundler.setup +end + RSpec.configure do |c| c.treat_symbols_as_metadata_keys_with_true_values = true c.filter_run :f From 58f154d761b0e9310f276d0fc2338127900f1179 Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Tue, 17 Apr 2012 12:52:06 -0700 Subject: [PATCH 07/30] Extract JID generation into a helper method. --- lib/qless.rb | 9 ++++++++- lib/qless/queue.rb | 3 +-- spec/unit/qless_spec.rb | 11 +++++++++++ 3 files changed, 20 insertions(+), 3 deletions(-) create mode 100644 spec/unit/qless_spec.rb diff --git a/lib/qless.rb b/lib/qless.rb index 89bf1bba..2787c91b 100644 --- a/lib/qless.rb +++ b/lib/qless.rb @@ -1,6 +1,7 @@ require "socket" require "redis" require "json" +require "securerandom" require "qless/version" require "qless/config" @@ -9,6 +10,12 @@ require "qless/lua" module Qless + extend self + + def generate_jid + SecureRandom.uuid.gsub('-', '') + end + class Client # Lua scripts attr_reader :_cancel, :_complete, :_fail, :_failed, :_get, :_getconfig, :_heartbeat, :_jobs, :_peek, :_pop, :_put, :_queues, :_setconfig, :_stats, :_track, :_workers, :_depends @@ -79,4 +86,4 @@ def job(id) Job.new(self, JSON.parse(results)) end end -end \ No newline at end of file +end diff --git a/lib/qless/queue.rb b/lib/qless/queue.rb index 4c66f159..49aaae53 100644 --- a/lib/qless/queue.rb +++ b/lib/qless/queue.rb @@ -1,4 +1,3 @@ -require "securerandom" require "qless/lua" require "qless/job" require "redis" @@ -23,7 +22,7 @@ def initialize(name, client, worker) # => delay (int) def put(klass, data, opts={}) @client._put.call([@name], [ - SecureRandom.uuid.gsub('-', ''), + Qless.generate_jid, klass.to_s, JSON.generate(data), Time.now.to_f, diff --git a/spec/unit/qless_spec.rb b/spec/unit/qless_spec.rb new file mode 100644 index 00000000..6bea31d0 --- /dev/null +++ b/spec/unit/qless_spec.rb @@ -0,0 +1,11 @@ +require 'spec_helper' +require 'qless' + +describe Qless do + describe ".generate_jid" do + it "generates a UUID suitable for use as a jid" do + Qless.generate_jid.should match(/\A[a-f0-9]{32}\z/) + end + end +end + From 4a31db96c17952b3b97ae7fa833ab1c167b92a16 Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Tue, 17 Apr 2012 13:18:12 -0700 Subject: [PATCH 08/30] Fix Job#initialize. - @dependents, not @depenents - `unless ... !=` is a double negative and is confusing. `if ... ==` is more clear. --- lib/qless/job.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/qless/job.rb b/lib/qless/job.rb index 8bebd052..d8588988 100644 --- a/lib/qless/job.rb +++ b/lib/qless/job.rb @@ -20,9 +20,9 @@ def initialize(client, atts) @delay = atts.fetch('delay', 0) # This is a silly side-effect of Lua doing JSON parsing - @tags = [] unless @tags != {} - @dependents = [] unless @depenents != {} - @dependencies = [] unless @dependencies != {} + @tags = [] if @tags == {} + @dependents = [] if @dependents == {} + @dependencies = [] if @dependencies == {} end def [](key) From a152c2059c016b0dfddbfd6710c00f19f4c4cdbe Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Tue, 17 Apr 2012 13:38:37 -0700 Subject: [PATCH 09/30] Add Job.mock. --- lib/qless.rb | 6 ++++++ lib/qless/job.rb | 24 ++++++++++++++++++++++++ spec/unit/job_spec.rb | 22 ++++++++++++++++++++++ 3 files changed, 52 insertions(+) create mode 100644 spec/unit/job_spec.rb diff --git a/lib/qless.rb b/lib/qless.rb index 2787c91b..080dfdcc 100644 --- a/lib/qless.rb +++ b/lib/qless.rb @@ -16,6 +16,12 @@ def generate_jid SecureRandom.uuid.gsub('-', '') end + def stringify_hash_keys(hash) + hash.each_with_object({}) do |(key, value), result| + result[key.to_s] = value + end + end + class Client # Lua scripts attr_reader :_cancel, :_complete, :_fail, :_failed, :_get, :_getconfig, :_heartbeat, :_jobs, :_peek, :_pop, :_put, :_queues, :_setconfig, :_stats, :_track, :_workers, :_depends diff --git a/lib/qless/job.rb b/lib/qless/job.rb index d8588988..7536abb5 100644 --- a/lib/qless/job.rb +++ b/lib/qless/job.rb @@ -1,3 +1,4 @@ +require "qless" require "qless/lua" require "redis" require "json" @@ -10,6 +11,29 @@ class Job def perform klass = @klass.split('::').inject(nil) { |m, el| (m || Kernel).const_get(el) } end + + def self.mock(client, klass, attributes = {}) + defaults = { + "jid" => Qless.generate_jid, + "data" => {}, + "klass" => klass.to_s, + "priority" => 0, + "tags" => [], + "worker" => "mock_worker", + "expires" => Time.now + (60 * 60), # an hour from now + "state" => "running", + "tracked" => false, + "queue" => "mock_queue", + "retries" => 5, + "remaining" => 5, + "failure" => "maybe", + "history" => [], + "dependencies" => [], + "dependents" => [] + } + attributes = defaults.merge(Qless.stringify_hash_keys(attributes)) + new(client, attributes) + end def initialize(client, atts) @client = client diff --git a/spec/unit/job_spec.rb b/spec/unit/job_spec.rb new file mode 100644 index 00000000..90166dc0 --- /dev/null +++ b/spec/unit/job_spec.rb @@ -0,0 +1,22 @@ +require 'spec_helper' +require 'qless/job' + +module Qless + describe Job do + class JobClass; end + + describe ".mock" do + let(:client) { stub.as_null_object } + + it 'creates a job instance' do + Job.mock(client, JobClass).should be_a(Job) + end + + it 'honors attributes passed as a symbol' do + job = Job.mock(client, JobClass, data: { "a" => 5 }) + job.data.should eq("a" => 5) + end + end + end +end + From 49dc45a7e4a2fd17fb70efbef1fe4894d6fc5c2c Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Tue, 17 Apr 2012 13:43:49 -0700 Subject: [PATCH 10/30] Fix Job#perform so it actually performs the job. --- lib/qless/job.rb | 3 ++- spec/unit/job_spec.rb | 23 ++++++++++++++++++++--- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/lib/qless/job.rb b/lib/qless/job.rb index 7536abb5..26717a3e 100644 --- a/lib/qless/job.rb +++ b/lib/qless/job.rb @@ -9,7 +9,8 @@ class Job attr_accessor :data, :priority, :tags def perform - klass = @klass.split('::').inject(nil) { |m, el| (m || Kernel).const_get(el) } + klass = @klass.split('::').inject(Kernel) { |context, name| context.const_get(name) } + klass.perform(self) end def self.mock(client, klass, attributes = {}) diff --git a/spec/unit/job_spec.rb b/spec/unit/job_spec.rb index 90166dc0..45de9f90 100644 --- a/spec/unit/job_spec.rb +++ b/spec/unit/job_spec.rb @@ -3,11 +3,14 @@ module Qless describe Job do - class JobClass; end + class JobClass + class Nested + end + end - describe ".mock" do - let(:client) { stub.as_null_object } + let(:client) { stub.as_null_object } + describe ".mock" do it 'creates a job instance' do Job.mock(client, JobClass).should be_a(Job) end @@ -17,6 +20,20 @@ class JobClass; end job.data.should eq("a" => 5) end end + + describe "#perform" do + it 'calls the #perform method on the job class with the job as an argument' do + job = Job.mock(client, JobClass) + JobClass.should_receive(:perform).with(job).once + job.perform + end + + it 'properly finds nested classes' do + job = Job.mock(client, JobClass::Nested) + JobClass::Nested.should_receive(:perform).with(job).once + job.perform + end + end end end From daa71089db35a4cc737406b44ba9c859c0b81e5f Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Tue, 17 Apr 2012 13:38:44 -0700 Subject: [PATCH 11/30] Rename `worker` to `worker_name`. I'm going to build a Worker class next. The worker attributes of these different objects are really just the name of the worker, not the worker object itself, so it makes sense to name the attribute worker_name to make that more clear. --- lib/qless.rb | 11 +++--- lib/qless/job.rb | 13 +++---- lib/qless/queue.rb | 8 ++--- spec/integration/qless_spec.rb | 64 +++++++++++++++++----------------- spec/unit/qless_spec.rb | 10 ++++++ 5 files changed, 60 insertions(+), 46 deletions(-) diff --git a/lib/qless.rb b/lib/qless.rb index 080dfdcc..9019c71b 100644 --- a/lib/qless.rb +++ b/lib/qless.rb @@ -22,15 +22,18 @@ def stringify_hash_keys(hash) end end + # This is a unique identifier for the worker + def worker_name + @worker_name ||= [Socket.gethostname, Process.pid.to_s].join('-') + end + class Client # Lua scripts attr_reader :_cancel, :_complete, :_fail, :_failed, :_get, :_getconfig, :_heartbeat, :_jobs, :_peek, :_pop, :_put, :_queues, :_setconfig, :_stats, :_track, :_workers, :_depends # A real object - attr_reader :config, :redis, :worker + attr_reader :config, :redis def initialize(options = {}) - # This is a unique identifier for the worker - @worker = Socket.gethostname + "-" + Process.pid.to_s # This is the redis instance we're connected to @redis = Redis.new(options) @config = Config.new(self) @@ -41,7 +44,7 @@ def initialize(options = {}) end def queue(name) - Queue.new(name, self, @worker) + Queue.new(name, self) end def queues(qname=nil) diff --git a/lib/qless/job.rb b/lib/qless/job.rb index 26717a3e..5ac5f3d4 100644 --- a/lib/qless/job.rb +++ b/lib/qless/job.rb @@ -5,7 +5,7 @@ module Qless class Job - attr_reader :jid, :expires, :state, :queue, :history, :worker, :retries, :remaining, :failure, :klass, :delay, :tracked + attr_reader :jid, :expires, :state, :queue, :history, :worker_name, :retries, :remaining, :failure, :klass, :delay, :tracked attr_accessor :data, :priority, :tags def perform @@ -38,11 +38,12 @@ def self.mock(client, klass, attributes = {}) def initialize(client, atts) @client = client - %w{jid data klass priority tags worker expires state tracked queue + %w{jid data klass priority tags expires state tracked queue retries remaining failure history dependencies dependents}.each do |att| self.instance_variable_set("@#{att}".to_sym, atts.fetch(att)) end @delay = atts.fetch('delay', 0) + @worker_name = atts.fetch('worker') # This is a silly side-effect of Lua doing JSON parsing @tags = [] if @tags == {} @@ -81,7 +82,7 @@ def move(queue) def fail(group, message) @client._fail.call([], [ @jid, - @worker, + @worker_name, group, message, Time.now.to_f, JSON.generate(@data)]) || false @@ -91,7 +92,7 @@ def fail(group, message) def heartbeat() @client._heartbeat.call([], [ @jid, - @worker, + @worker_name, Time.now.to_f, JSON.generate(@data)]) || false end @@ -103,10 +104,10 @@ def heartbeat() def complete(nxt=nil, options={}) if nxt.nil? response = @client._complete.call([], [ - @jid, @worker, @queue, Time.now.to_f, JSON.generate(@data)]) + @jid, @worker_name, @queue, Time.now.to_f, JSON.generate(@data)]) else response = @client._complete.call([], [ - @jid, @worker, @queue, Time.now.to_f, JSON.generate(@data), 'next', nxt, 'delay', + @jid, @worker_name, @queue, Time.now.to_f, JSON.generate(@data), 'next', nxt, 'delay', options.fetch(:delay, 0), 'depends', JSON.generate(options.fetch(:depends, []))]) end response.nil? ? false : response diff --git a/lib/qless/queue.rb b/lib/qless/queue.rb index 49aaae53..826b365a 100644 --- a/lib/qless/queue.rb +++ b/lib/qless/queue.rb @@ -7,12 +7,12 @@ module Qless # A configuration class associated with a qless client class Queue attr_reader :name - attr_accessor :worker + attr_accessor :worker_name - def initialize(name, client, worker) + def initialize(name, client) @client = client @name = name - @worker = worker + self.worker_name = Qless.worker_name end # Put the described job in this queue @@ -36,7 +36,7 @@ def put(klass, data, opts={}) # Pop a work item off the queue def pop(count=nil) - results = @client._pop.call([@name], [@worker, (count || 1), Time.now.to_f]).map { |j| Job.new(@client, JSON.parse(j)) } + results = @client._pop.call([@name], [worker_name, (count || 1), Time.now.to_f]).map { |j| Job.new(@client, JSON.parse(j)) } count.nil? ? results[0] : results end diff --git a/spec/integration/qless_spec.rb b/spec/integration/qless_spec.rb index 3987d84b..cb72a694 100644 --- a/spec/integration/qless_spec.rb +++ b/spec/integration/qless_spec.rb @@ -13,8 +13,8 @@ class FooJob # Our main test queue let(:q) { client.queue("testing") } # Point to the main queue, but identify as different workers - let(:a) { client.queue("testing").tap { |o| o.worker = "worker-a" } } - let(:b) { client.queue("testing").tap { |o| o.worker = "worker-b" } } + let(:a) { client.queue("testing").tap { |o| o.worker_name = "worker-a" } } + let(:b) { client.queue("testing").tap { |o| o.worker_name = "worker-b" } } # And a second queue let(:other) { client.queue("other") } @@ -40,7 +40,7 @@ class FooJob job.priority.should eq(0) job.data.should eq({"test" => "put_get"}) job.tags.should eq([]) - job.worker.should eq("") + job.worker_name.should eq("") job.state.should eq("waiting") job.history.length.should eq(1) job.history[0]['q'].should eq("testing") @@ -73,7 +73,7 @@ class FooJob client.config['heartbeat'] = 60 job = q.pop job.data.should eq({'test' => 'test_put_pop_attributes'}) - job.worker.should eq(client.worker) + job.worker_name.should eq(Qless.worker_name) job.expires.should > (Time.new.to_i - 20) job.state.should eq('running') job.queue.should eq('testing') @@ -96,7 +96,7 @@ class FooJob jid = q.put(Qless::Job, {'test' => 'test_put_pop_attributes'}) job = q.peek job.data.should eq({'test' => 'test_put_pop_attributes'}) - job.worker.should eq('') + job.worker_name.should eq('') job.state.should eq('waiting') job.queue.should eq('testing') job.remaining.should eq(5) @@ -308,7 +308,7 @@ class FooJob job.jid.should eq(jid) job.queue.should eq("testing") job.data.should eq({"test" => "fail_failed"}) - job.worker.should eq("") + job.worker_name.should eq("") job.state.should eq("failed") job.queue.should eq("testing") job.remaining.should eq(5) @@ -458,7 +458,7 @@ class FooJob job = client.job(jid) job.history.length.should eq(1) job.state.should eq("complete") - job.worker.should eq("") + job.worker_name.should eq("") job.queue.should eq("") q.length.should eq(0) end @@ -479,7 +479,7 @@ class FooJob job = client.job(jid) job.history.length.should eq(2) job.state.should eq("waiting") - job.worker.should eq("") + job.worker_name.should eq("") job.queue.should eq("testing") q.length.should eq(1) end @@ -503,7 +503,7 @@ class FooJob job = client.job(jid) job.history.length.should eq(1) job.state.should eq("complete") - job.worker.should eq("") + job.worker_name.should eq("") job.queue.should eq("") q.length.should eq(0) end @@ -892,11 +892,11 @@ class FooJob client.workers.should eq({}) job = q.pop client.workers.should eq([{ - "name" => q.worker, + "name" => q.worker_name, "jobs" => 1, "stalled" => 0 }]) - client.workers(q.worker).should eq({ + client.workers(q.worker_name).should eq({ "jobs" => [jid], "stalled" => {} }) @@ -913,21 +913,21 @@ class FooJob jid = q.put(Qless::Job, {"test" => "workers_cancel"}) job = q.pop client.workers.should eq([{ - "name" => q.worker, + "name" => q.worker_name, "jobs" => 1, "stalled" => 0 }]) - client.workers(q.worker).should eq({ + client.workers(q.worker_name).should eq({ "jobs" => [jid], "stalled" => {} }) job.cancel client.workers.should eq([{ - "name" => q.worker, + "name" => q.worker_name, "jobs" => 0, "stalled" => 0 }]) - client.workers(q.worker).should eq({ + client.workers(q.worker_name).should eq({ "jobs" => {}, "stalled" => {} }) @@ -948,11 +948,11 @@ class FooJob client.config["heartbeat"] = -10 job = q.pop client.workers.should eq([{ - "name" => q.worker, + "name" => q.worker_name, "jobs" => 0, "stalled" => 1 }]) - client.workers(q.worker).should eq({ + client.workers(q.worker_name).should eq({ "jobs" => {}, "stalled" => [jid] }) @@ -961,15 +961,15 @@ class FooJob job = a.pop a.pop client.workers.should eq([{ - "name" => a.worker, + "name" => a.worker_name, "jobs" => 1, "stalled" => 0 }, { - "name" => q.worker, + "name" => q.worker_name, "jobs" => 0, "stalled" => 0 }]) - client.workers(q.worker).should eq({ + client.workers(q.worker_name).should eq({ "jobs" => {}, "stalled" => {} }) @@ -985,11 +985,11 @@ class FooJob jid = q.put(Qless::Job, {"test" => "workers_fail"}) job = q.pop client.workers.should eq([{ - "name" => q.worker, + "name" => q.worker_name, "jobs" => 1, "stalled" => 0 }]) - client.workers(q.worker).should eq({ + client.workers(q.worker_name).should eq({ "jobs" => [jid], "stalled" => {} }) @@ -997,11 +997,11 @@ class FooJob # Now, let's fail it job.fail("foo", "bar") client.workers.should eq([{ - "name" => q.worker, + "name" => q.worker_name, "jobs" => 0, "stalled" => 0 }]) - client.workers(q.worker).should eq({ + client.workers(q.worker_name).should eq({ "jobs" => {}, "stalled" => {} }) @@ -1016,22 +1016,22 @@ class FooJob jid = q.put(Qless::Job, {"test" => "workers_complete"}) job = q.pop client.workers.should eq([{ - "name" => q.worker, + "name" => q.worker_name, "jobs" => 1, "stalled" => 0 }]) - client.workers(q.worker).should eq({ + client.workers(q.worker_name).should eq({ "jobs" => [jid], "stalled" => {} }) job.complete client.workers.should eq([{ - "name" => q.worker, + "name" => q.worker_name, "jobs" => 0, "stalled" => 0 }]) - client.workers(q.worker).should eq({ + client.workers(q.worker_name).should eq({ "jobs" => {}, "stalled" => {} }) @@ -1047,22 +1047,22 @@ class FooJob jid = q.put(Qless::Job, {"test" => "workers_reput"}) job = q.pop client.workers.should eq([{ - "name" => q.worker, + "name" => q.worker_name, "jobs" => 1, "stalled" => 0 }]) - client.workers(q.worker).should eq({ + client.workers(q.worker_name).should eq({ "jobs" => [jid], "stalled" => {} }) job.move("other") client.workers.should eq([{ - "name" => q.worker, + "name" => q.worker_name, "jobs" => 0, "stalled" => 0 }]) - client.workers(q.worker).should eq({ + client.workers(q.worker_name).should eq({ "jobs" => {}, "stalled" => {} }) diff --git a/spec/unit/qless_spec.rb b/spec/unit/qless_spec.rb index 6bea31d0..5992a214 100644 --- a/spec/unit/qless_spec.rb +++ b/spec/unit/qless_spec.rb @@ -7,5 +7,15 @@ Qless.generate_jid.should match(/\A[a-f0-9]{32}\z/) end end + + describe ".worker_name" do + it 'includes the hostname in the worker name' do + Qless.worker_name.should include(Socket.gethostname) + end + + it 'includes the pid in the worker name' do + Qless.worker_name.should include(Process.pid.to_s) + end + end end From a045c90a54830cfdd4ab5b1956aaf6fb9eb8a78d Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Wed, 18 Apr 2012 09:52:02 -0700 Subject: [PATCH 12/30] Install rspec-fire. --- qless.gemspec | 1 + spec/spec_helper.rb | 3 +++ 2 files changed, 4 insertions(+) diff --git a/qless.gemspec b/qless.gemspec index 3d3dac59..886286ed 100644 --- a/qless.gemspec +++ b/qless.gemspec @@ -39,5 +39,6 @@ Gem::Specification.new do |s| s.add_dependency "json" , "~> 1.6.6" s.add_development_dependency "rspec" , "~> 2.6" + s.add_development_dependency "rspec-fire", "~> 0.4" s.add_development_dependency "rake" , "~> 0.9.2.2" end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 572262af..13709f45 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -7,10 +7,13 @@ Bundler.setup end +require 'rspec/fire' + RSpec.configure do |c| c.treat_symbols_as_metadata_keys_with_true_values = true c.filter_run :f c.run_all_when_everything_filtered = true + c.include RSpec::Fire end shared_context "redis integration", :integration do From 0acb86212588a1c1fcbe99ccdc6bbfeb3f89259e Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Wed, 18 Apr 2012 10:30:50 -0700 Subject: [PATCH 13/30] Add job reservers. These are classes that implement a specific job reservation strategy. --- lib/qless/job_reservers/ordered.rb | 17 ++++++++ lib/qless/job_reservers/round_robin.rb | 26 ++++++++++++ spec/unit/job_reservers/ordered_spec.rb | 47 +++++++++++++++++++++ spec/unit/job_reservers/round_robin_spec.rb | 35 +++++++++++++++ 4 files changed, 125 insertions(+) create mode 100644 lib/qless/job_reservers/ordered.rb create mode 100644 lib/qless/job_reservers/round_robin.rb create mode 100644 spec/unit/job_reservers/ordered_spec.rb create mode 100644 spec/unit/job_reservers/round_robin_spec.rb diff --git a/lib/qless/job_reservers/ordered.rb b/lib/qless/job_reservers/ordered.rb new file mode 100644 index 00000000..38fdf396 --- /dev/null +++ b/lib/qless/job_reservers/ordered.rb @@ -0,0 +1,17 @@ +module Qless + module JobReservers + class Ordered + def initialize(queues) + @queues = queues + end + + def reserve + @queues.each do |q| + job = q.pop + return job if job + end + nil + end + end + end +end diff --git a/lib/qless/job_reservers/round_robin.rb b/lib/qless/job_reservers/round_robin.rb new file mode 100644 index 00000000..c699284f --- /dev/null +++ b/lib/qless/job_reservers/round_robin.rb @@ -0,0 +1,26 @@ +module Qless + module JobReservers + class RoundRobin + def initialize(queues) + @queues = queues + @num_queues = queues.size + @last_popped_queue_index = @num_queues - 1 + end + + def reserve + @num_queues.times do |i| + if job = next_queue.pop + return job + end + end + nil + end + + def next_queue + @last_popped_queue_index = (@last_popped_queue_index + 1) % @num_queues + @queues[@last_popped_queue_index] + end + end + end +end + diff --git a/spec/unit/job_reservers/ordered_spec.rb b/spec/unit/job_reservers/ordered_spec.rb new file mode 100644 index 00000000..d075f1e7 --- /dev/null +++ b/spec/unit/job_reservers/ordered_spec.rb @@ -0,0 +1,47 @@ +require 'spec_helper' +require 'qless/queue' +require 'qless/job_reservers/ordered' + +module Qless + module JobReservers + describe Ordered do + describe "#reserve" do + let(:q1) { fire_double("Qless::Queue") } + let(:q2) { fire_double("Qless::Queue") } + let(:q3) { fire_double("Qless::Queue") } + let(:reserver) { Ordered.new([q1, q2, q3]) } + + it 'always pops jobs from the first queue as long as it has jobs' do + q1.should_receive(:pop).and_return(:j1, :j2, :j3) + q2.should_not_receive(:pop) + q3.should_not_receive(:pop) + + reserver.reserve.should eq(:j1) + reserver.reserve.should eq(:j2) + reserver.reserve.should eq(:j3) + end + + it 'falls back to other queues when earlier queues lack jobs' do + call_count = 1 + q1.should_receive(:pop).exactly(4).times { :q1_job if [2, 4].include?(call_count) } + q2.should_receive(:pop).exactly(2).times { :q2_job if call_count == 1 } + q3.should_receive(:pop).once { :q3_job if call_count == 3 } + + reserver.reserve.should eq(:q2_job) + call_count = 2 + reserver.reserve.should eq(:q1_job) + call_count = 3 + reserver.reserve.should eq(:q3_job) + call_count = 4 + reserver.reserve.should eq(:q1_job) + end + + it 'returns nil if none of the queues have jobs' do + [q1, q2, q3].each { |q| q.stub(:pop) } + reserver.reserve.should be_nil + end + end + end + end +end + diff --git a/spec/unit/job_reservers/round_robin_spec.rb b/spec/unit/job_reservers/round_robin_spec.rb new file mode 100644 index 00000000..64eb7719 --- /dev/null +++ b/spec/unit/job_reservers/round_robin_spec.rb @@ -0,0 +1,35 @@ +require 'spec_helper' +require 'qless/queue' +require 'qless/job_reservers/round_robin' + +module Qless + module JobReservers + describe RoundRobin do + describe "#reserve" do + let(:q1) { fire_double("Qless::Queue") } + let(:q2) { fire_double("Qless::Queue") } + let(:q3) { fire_double("Qless::Queue") } + let(:reserver) { RoundRobin.new([q1, q2, q3]) } + + it 'round robins the queues' do + q1.should_receive(:pop).twice { :q1_job } + q2.should_receive(:pop).once { :q2_job } + q3.should_receive(:pop).once { :q3_job } + + reserver.reserve.should eq(:q1_job) + reserver.reserve.should eq(:q2_job) + reserver.reserve.should eq(:q3_job) + reserver.reserve.should eq(:q1_job) + end + + it 'returns nil if none of the queues have jobs' do + q1.should_receive(:pop).once { nil } + q2.should_receive(:pop).once { nil } + q3.should_receive(:pop).once { nil } + reserver.reserve.should be_nil + end + end + end + end +end + From 5ed275979e4df69a89fbd4ea37fe82b7bcd644ca Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Wed, 18 Apr 2012 11:36:09 -0700 Subject: [PATCH 14/30] Got a working worker. --- .gitignore | 1 + lib/qless/worker.rb | 49 ++++++++++++++++++++++++++++++ spec/unit/worker_spec.rb | 65 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 115 insertions(+) create mode 100644 lib/qless/worker.rb create mode 100644 spec/unit/worker_spec.rb diff --git a/.gitignore b/.gitignore index 1a6994b6..4bce588b 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ pkg/* spec/redis.config.yml bundle +spec/tmp diff --git a/lib/qless/worker.rb b/lib/qless/worker.rb new file mode 100644 index 00000000..d937c0be --- /dev/null +++ b/lib/qless/worker.rb @@ -0,0 +1,49 @@ +require 'qless' + +module Qless + class Worker + def initialize(client, job_reserver) + @client, @job_reserver = client, job_reserver + end + + def work(interval = 5.0) + loop do + unless job = @job_reserver.reserve + break if interval.zero? + sleep interval + next + end + + if child = fork + # We're in the parent process + Process.wait(child) + else + # We're in the child process + perform(job) + exit! + end + end + end + + def perform(job) + job.perform + rescue => error + fail_job(job, error) + else + job.complete + end + + private + + def fail_job(job, error) + group = "#{job.klass}:#{error.class}" + message = "#{error.message}\n\n#{error.backtrace.join("\n")}" + job.fail(group, message) + end + + def handle_child_from_parent(child) + Process.wait(child) + end + end +end + diff --git a/spec/unit/worker_spec.rb b/spec/unit/worker_spec.rb new file mode 100644 index 00000000..426ad8f3 --- /dev/null +++ b/spec/unit/worker_spec.rb @@ -0,0 +1,65 @@ +require 'spec_helper' +require 'qless/worker' +require 'qless/job_reservers/ordered' + +module Qless + describe Worker do + let(:reserver) { fire_double("Qless::JobReservers::Ordered") } + let(:client) { stub.as_null_object } + let(:worker) { Worker.new(client, reserver) } + + describe "#perform" do + class MyJobClass; end + let(:job) { Job.mock(client, MyJobClass) } + + it 'performs the job' do + MyJobClass.should_receive(:perform) + worker.perform(job) + end + + it 'fails the job if performing it raises an error' do + MyJobClass.stub(:perform) { raise StandardError.new("boom") } + expected_line_number = __LINE__ - 1 + job.should respond_to(:fail).with(2).arguments + + job.should_receive(:fail) do |group, message| + group.should eq("Qless::MyJobClass:StandardError") + message.should include("boom") + message.should include("#{__FILE__}:#{expected_line_number}") + end + + worker.perform(job) + end + + it 'completes the job if it finishes with no errors' do + MyJobClass.stub(:perform) + job.should respond_to(:complete).with(0).arguments + job.should_receive(:complete).with(no_args) + worker.perform(job) + end + end + + describe "#work" do + class FileWriterJob + def self.perform(job) + File.open(job['file'], "w") { |f| f.write("done") } + end + end + let(:output_file) { File.join(temp_dir, "job.out") } + let(:job) { Job.mock(client, FileWriterJob, data: { 'file' => output_file }) } + + let(:temp_dir) { "./spec/tmp" } + before do + FileUtils.mkdir_p temp_dir + FileUtils.rm_r Dir.glob("#{temp_dir}/*") + reserver.stub(:reserve).and_return(job, nil) + end + + it "performs the job in a child process and waits for it to complete" do + worker.work(0) + File.read(output_file).should eq("done") + end + end + end +end + From 903df1550af38cd58d9170c907f220e805fcf9cb Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Wed, 18 Apr 2012 12:59:58 -0700 Subject: [PATCH 15/30] Add useful/readable proclines to worker. --- lib/qless/job.rb | 4 ++ lib/qless/job_reservers/ordered.rb | 4 ++ lib/qless/job_reservers/round_robin.rb | 6 +++ lib/qless/worker.rb | 10 +++++ spec/unit/job_reservers/ordered_spec.rb | 20 +++++++--- spec/unit/job_reservers/round_robin_spec.rb | 20 +++++++--- spec/unit/worker_spec.rb | 43 ++++++++++++++++++++- 7 files changed, 95 insertions(+), 12 deletions(-) diff --git a/lib/qless/job.rb b/lib/qless/job.rb index 5ac5f3d4..3cd32e07 100644 --- a/lib/qless/job.rb +++ b/lib/qless/job.rb @@ -62,6 +62,10 @@ def []=(key, val) def to_s inspect end + + def description + "#{@jid} (#{@klass} / #{@queue})" + end def inspect "< Qless::Job #{@jid} >" diff --git a/lib/qless/job_reservers/ordered.rb b/lib/qless/job_reservers/ordered.rb index 38fdf396..c5f9bfd5 100644 --- a/lib/qless/job_reservers/ordered.rb +++ b/lib/qless/job_reservers/ordered.rb @@ -12,6 +12,10 @@ def reserve end nil end + + def description + @description ||= @queues.map(&:name).join(', ') + " (ordered)" + end end end end diff --git a/lib/qless/job_reservers/round_robin.rb b/lib/qless/job_reservers/round_robin.rb index c699284f..02d1e1ca 100644 --- a/lib/qless/job_reservers/round_robin.rb +++ b/lib/qless/job_reservers/round_robin.rb @@ -16,6 +16,12 @@ def reserve nil end + def description + @description ||= @queues.map(&:name).join(', ') + " (round robin)" + end + + private + def next_queue @last_popped_queue_index = (@last_popped_queue_index + 1) % @num_queues @queues[@last_popped_queue_index] diff --git a/lib/qless/worker.rb b/lib/qless/worker.rb index d937c0be..e5bae6c8 100644 --- a/lib/qless/worker.rb +++ b/lib/qless/worker.rb @@ -1,4 +1,5 @@ require 'qless' +require 'time' module Qless class Worker @@ -7,18 +8,23 @@ def initialize(client, job_reserver) end def work(interval = 5.0) + procline "Starting #{@job_reserver.description}" + loop do unless job = @job_reserver.reserve break if interval.zero? + procline "Waiting for #{@job_reserver.description}" sleep interval next end if child = fork # We're in the parent process + procline "Forked #{child} for #{job.description}" Process.wait(child) else # We're in the child process + procline "Processing #{job.description}" perform(job) exit! end @@ -44,6 +50,10 @@ def fail_job(job, error) def handle_child_from_parent(child) Process.wait(child) end + + def procline(value) + $0 = "Qless: #{value} at #{Time.now.iso8601}" + end end end diff --git a/spec/unit/job_reservers/ordered_spec.rb b/spec/unit/job_reservers/ordered_spec.rb index d075f1e7..bf8f2596 100644 --- a/spec/unit/job_reservers/ordered_spec.rb +++ b/spec/unit/job_reservers/ordered_spec.rb @@ -5,12 +5,12 @@ module Qless module JobReservers describe Ordered do - describe "#reserve" do - let(:q1) { fire_double("Qless::Queue") } - let(:q2) { fire_double("Qless::Queue") } - let(:q3) { fire_double("Qless::Queue") } - let(:reserver) { Ordered.new([q1, q2, q3]) } + let(:q1) { fire_double("Qless::Queue") } + let(:q2) { fire_double("Qless::Queue") } + let(:q3) { fire_double("Qless::Queue") } + let(:reserver) { Ordered.new([q1, q2, q3]) } + describe "#reserve" do it 'always pops jobs from the first queue as long as it has jobs' do q1.should_receive(:pop).and_return(:j1, :j2, :j3) q2.should_not_receive(:pop) @@ -41,6 +41,16 @@ module JobReservers reserver.reserve.should be_nil end end + + describe "#description" do + it 'returns a useful human readable string' do + q1.stub(:name) { "Queue1" } + q2.stub(:name) { "Queue2" } + q3.stub(:name) { "Queue3" } + + reserver.description.should eq("Queue1, Queue2, Queue3 (ordered)") + end + end end end end diff --git a/spec/unit/job_reservers/round_robin_spec.rb b/spec/unit/job_reservers/round_robin_spec.rb index 64eb7719..f034f1e0 100644 --- a/spec/unit/job_reservers/round_robin_spec.rb +++ b/spec/unit/job_reservers/round_robin_spec.rb @@ -5,12 +5,12 @@ module Qless module JobReservers describe RoundRobin do - describe "#reserve" do - let(:q1) { fire_double("Qless::Queue") } - let(:q2) { fire_double("Qless::Queue") } - let(:q3) { fire_double("Qless::Queue") } - let(:reserver) { RoundRobin.new([q1, q2, q3]) } + let(:q1) { fire_double("Qless::Queue") } + let(:q2) { fire_double("Qless::Queue") } + let(:q3) { fire_double("Qless::Queue") } + let(:reserver) { RoundRobin.new([q1, q2, q3]) } + describe "#reserve" do it 'round robins the queues' do q1.should_receive(:pop).twice { :q1_job } q2.should_receive(:pop).once { :q2_job } @@ -29,6 +29,16 @@ module JobReservers reserver.reserve.should be_nil end end + + describe "#description" do + it 'returns a useful human readable string' do + q1.stub(:name) { "Queue1" } + q2.stub(:name) { "Queue2" } + q3.stub(:name) { "Queue3" } + + reserver.description.should eq("Queue1, Queue2, Queue3 (round robin)") + end + end end end end diff --git a/spec/unit/worker_spec.rb b/spec/unit/worker_spec.rb index 426ad8f3..39f1f8c5 100644 --- a/spec/unit/worker_spec.rb +++ b/spec/unit/worker_spec.rb @@ -40,9 +40,19 @@ class MyJobClass; end end describe "#work" do + around(:each) do |example| + old_procline = procline + example.run + $0 = old_procline + end + + def procline + $0 + end + class FileWriterJob def self.perform(job) - File.open(job['file'], "w") { |f| f.write("done") } + File.open(job['file'], "w") { |f| f.write("done: #{$0}") } end end let(:output_file) { File.join(temp_dir, "job.out") } @@ -57,7 +67,36 @@ def self.perform(job) it "performs the job in a child process and waits for it to complete" do worker.work(0) - File.read(output_file).should eq("done") + File.read(output_file).should include("done") + end + + it 'begins with a "starting" procline' do + starting_procline = nil + reserver.stub(:reserve) do + starting_procline = procline + nil + end + + worker.work(0) + starting_procline.should include("Qless: Starting") + end + + it 'sets an appropriate procline for the parent process' do + parent_procline = nil + old_wait = Process.method(:wait) + Process.stub(:wait) do |child| + parent_procline = procline + old_wait.call(child) + end + + worker.work(0) + parent_procline.should match(/Forked .* at/) + end + + it 'sets an appropriate procline in the child process' do + worker.work(0) + output = File.read(output_file) + output.should include("Processing", job.queue, job.klass, job.jid) end end end From 8e826deba36ffc0edd36a7367486d883bab69edc Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Wed, 18 Apr 2012 15:03:29 -0700 Subject: [PATCH 16/30] Fix worker spec. --- spec/unit/worker_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/unit/worker_spec.rb b/spec/unit/worker_spec.rb index 39f1f8c5..c2b49307 100644 --- a/spec/unit/worker_spec.rb +++ b/spec/unit/worker_spec.rb @@ -4,7 +4,7 @@ module Qless describe Worker do - let(:reserver) { fire_double("Qless::JobReservers::Ordered") } + let(:reserver) { fire_double("Qless::JobReservers::Ordered", description: "job reserver") } let(:client) { stub.as_null_object } let(:worker) { Worker.new(client, reserver) } From 44d43fc2bb8ec6a9076b0f88a7af91f006c0f42b Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Wed, 18 Apr 2012 15:02:22 -0700 Subject: [PATCH 17/30] Add Job#state_changed? method. --- lib/qless/job.rb | 55 ++++++++++++++++++++++++++++++------------- spec/unit/job_spec.rb | 28 ++++++++++++++++++++++ 2 files changed, 66 insertions(+), 17 deletions(-) diff --git a/lib/qless/job.rb b/lib/qless/job.rb index 3cd32e07..4216d25b 100644 --- a/lib/qless/job.rb +++ b/lib/qless/job.rb @@ -49,6 +49,7 @@ def initialize(client, atts) @tags = [] if @tags == {} @dependents = [] if @dependents == {} @dependencies = [] if @dependencies == {} + @state_changed = false end def [](key) @@ -77,19 +78,23 @@ def ttl # Move this from it's current queue into another def move(queue) - @client._put.call([queue], [ - @jid, @klass, JSON.generate(@data), Time.now.to_f, 0 - ]) + note_state_change do + @client._put.call([queue], [ + @jid, @klass, JSON.generate(@data), Time.now.to_f, 0 + ]) + end end # Fail a job def fail(group, message) - @client._fail.call([], [ - @jid, - @worker_name, - group, message, - Time.now.to_f, - JSON.generate(@data)]) || false + note_state_change do + @client._fail.call([], [ + @jid, + @worker_name, + group, message, + Time.now.to_f, + JSON.generate(@data)]) || false + end end # Heartbeat a job @@ -106,19 +111,27 @@ def heartbeat() # => next (String) the next queue # => delay (int) how long to delay it in the next queue def complete(nxt=nil, options={}) - if nxt.nil? - response = @client._complete.call([], [ - @jid, @worker_name, @queue, Time.now.to_f, JSON.generate(@data)]) - else - response = @client._complete.call([], [ - @jid, @worker_name, @queue, Time.now.to_f, JSON.generate(@data), 'next', nxt, 'delay', - options.fetch(:delay, 0), 'depends', JSON.generate(options.fetch(:depends, []))]) + response = note_state_change do + if nxt.nil? + @client._complete.call([], [ + @jid, @worker_name, @queue, Time.now.to_f, JSON.generate(@data)]) + else + @client._complete.call([], [ + @jid, @worker_name, @queue, Time.now.to_f, JSON.generate(@data), 'next', nxt, 'delay', + options.fetch(:delay, 0), 'depends', JSON.generate(options.fetch(:depends, []))]) + end end response.nil? ? false : response end + + def state_changed? + @state_changed + end def cancel - @client._cancel.call([], [@jid]) + note_state_change do + @client._cancel.call([], [@jid]) + end end def track(*tags) @@ -136,5 +149,13 @@ def depend(*jids) def undepend(*jids) !!@client._depends.call([], [@jid, 'off'] + jids) end + + private + + def note_state_change + result = yield + @state_changed = true + result + end end end diff --git a/spec/unit/job_spec.rb b/spec/unit/job_spec.rb index 45de9f90..b6286c6d 100644 --- a/spec/unit/job_spec.rb +++ b/spec/unit/job_spec.rb @@ -34,6 +34,34 @@ class Nested job.perform end end + + [ + [:fail, 'group', 'message'], + [:complete], + [:cancel], + [:move, 'queue'] + ].each do |meth, *args| + describe "##{meth}" do + let(:job) { Job.mock(client, JobClass) } + + it 'updates #state_changed? from false to true' do + expect { + job.send(meth, *args) + }.to change(job, :state_changed?).from(false).to(true) + end + + class MyCustomError < StandardError; end + it 'does not update #state_changed? if there is a redis connection error' do + client.stub(:"_#{meth}") { raise MyCustomError, "boom" } + client.stub(:"_put") { raise MyCustomError, "boom" } # for #move + + expect { + job.send(meth, *args) + }.to raise_error(MyCustomError) + job.state_changed?.should be_false + end + end + end end end From 68c5b69171da1ab8e00298605e0abc82e48ad361 Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Wed, 18 Apr 2012 15:07:14 -0700 Subject: [PATCH 18/30] Don't auto-complete jobs if the job logic changes its state. --- lib/qless/worker.rb | 2 +- spec/unit/worker_spec.rb | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/qless/worker.rb b/lib/qless/worker.rb index e5bae6c8..b89e9866 100644 --- a/lib/qless/worker.rb +++ b/lib/qless/worker.rb @@ -36,7 +36,7 @@ def perform(job) rescue => error fail_job(job, error) else - job.complete + job.complete unless job.state_changed? end private diff --git a/spec/unit/worker_spec.rb b/spec/unit/worker_spec.rb index c2b49307..b4c61d2a 100644 --- a/spec/unit/worker_spec.rb +++ b/spec/unit/worker_spec.rb @@ -37,6 +37,12 @@ class MyJobClass; end job.should_receive(:complete).with(no_args) worker.perform(job) end + + it 'does not complete the job if the job logic itself changes the state of it (e.g. moves it to a new queue)' do + MyJobClass.stub(:perform) { |j| j.move("other") } + job.should_not_receive(:complete) + worker.perform(job) + end end describe "#work" do From 9f87476efb54e78ccccf7ffc2a151672eaa99ca0 Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Wed, 18 Apr 2012 17:04:30 -0700 Subject: [PATCH 19/30] Add debugger to gemfile. --- Gemfile | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Gemfile b/Gemfile index 81da76d6..4cc2c682 100644 --- a/Gemfile +++ b/Gemfile @@ -7,3 +7,5 @@ gemspec # We can't do that until qless-core has been released # to rubygems.org, so for now it's hear. gem 'qless-core', :git => 'git://github.com/seomoz/qless-core.git' + +gem 'debugger', :platform => :mri From 7d9ba0438089979de06fc9a1ea21f78f4ccbdf9b Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Wed, 18 Apr 2012 17:04:39 -0700 Subject: [PATCH 20/30] Add signal handling support (borrowed from Resque) to worker. --- lib/qless/worker.rb | 67 ++++++++++++++++++++++++++++++++-- spec/unit/worker_spec.rb | 77 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 136 insertions(+), 8 deletions(-) diff --git a/lib/qless/worker.rb b/lib/qless/worker.rb index b89e9866..44dd88cf 100644 --- a/lib/qless/worker.rb +++ b/lib/qless/worker.rb @@ -2,15 +2,22 @@ require 'time' module Qless + # This is heavily inspired by Resque's excellent worker: + # https://github.com/defunkt/resque/blob/v1.20.0/lib/resque/worker.rb class Worker def initialize(client, job_reserver) @client, @job_reserver = client, job_reserver + @shutdown = @paused = false end def work(interval = 5.0) procline "Starting #{@job_reserver.description}" + register_signal_handlers loop do + break if shutdown? + next if paused? + unless job = @job_reserver.reserve break if interval.zero? procline "Waiting for #{@job_reserver.description}" @@ -18,10 +25,10 @@ def work(interval = 5.0) next end - if child = fork + if @child = fork # We're in the parent process - procline "Forked #{child} for #{job.description}" - Process.wait(child) + procline "Forked #{@child} for #{job.description}" + Process.wait(@child) else # We're in the child process procline "Processing #{job.description}" @@ -39,6 +46,31 @@ def perform(job) job.complete unless job.state_changed? end + def shutdown + @shutdown = true + end + + def shutdown! + shutdown + kill_child + end + + def shutdown? + @shutdown + end + + def paused? + @paused + end + + def pause_processing + @paused = true + end + + def unpause_processing + @paused = false + end + private def fail_job(job, error) @@ -54,6 +86,35 @@ def handle_child_from_parent(child) def procline(value) $0 = "Qless: #{value} at #{Time.now.iso8601}" end + + def kill_child + return unless @child + return unless system("ps -o pid,state -p #{@child}") + Process.kill("KILL", @child) rescue nil + end + + # This is stolen directly from resque... (thanks, @defunkt!) + # Registers the various signal handlers a worker responds to. + # + # TERM: Shutdown immediately, stop processing jobs. + # INT: Shutdown immediately, stop processing jobs. + # QUIT: Shutdown after the current job has finished processing. + # USR1: Kill the forked child immediately, continue processing jobs. + # USR2: Don't process any new jobs + # CONT: Start processing jobs again after a USR2 + def register_signal_handlers + trap('TERM') { shutdown! } + trap('INT') { shutdown! } + + begin + trap('QUIT') { shutdown } + trap('USR1') { kill_child } + trap('USR2') { pause_processing } + trap('CONT') { unpause_processing } + rescue ArgumentError + warn "Signals QUIT, USR1, USR2, and/or CONT not supported." + end + end end end diff --git a/spec/unit/worker_spec.rb b/spec/unit/worker_spec.rb index b4c61d2a..e590f6f8 100644 --- a/spec/unit/worker_spec.rb +++ b/spec/unit/worker_spec.rb @@ -58,18 +58,20 @@ def procline class FileWriterJob def self.perform(job) + sleep(job['sleep']) if job['sleep'] File.open(job['file'], "w") { |f| f.write("done: #{$0}") } end end - let(:output_file) { File.join(temp_dir, "job.out") } + + let(:output_file) { File.join(temp_dir, "job.out.#{Time.now.to_i}") } let(:job) { Job.mock(client, FileWriterJob, data: { 'file' => output_file }) } let(:temp_dir) { "./spec/tmp" } before do - FileUtils.mkdir_p temp_dir - FileUtils.rm_r Dir.glob("#{temp_dir}/*") - reserver.stub(:reserve).and_return(job, nil) - end + FileUtils.rm_rf temp_dir + FileUtils.mkdir_p temp_dir + reserver.stub(:reserve).and_return(job, nil) + end it "performs the job in a child process and waits for it to complete" do worker.work(0) @@ -104,6 +106,71 @@ def self.perform(job) output = File.read(output_file) output.should include("Processing", job.queue, job.klass, job.jid) end + + it 'stops working when told to shutdown' do + num_jobs_performed = 0 + old_wait = Process.method(:wait) + Process.stub(:wait) do |child| + worker.shutdown if num_jobs_performed == 1 + num_jobs_performed += 1 + old_wait.call(child) + end + + reserver.stub(:reserve).and_return(job, job) + worker.work(0.0001) + num_jobs_performed.should eq(2) + end + + it 'kills the child immediately when told to #shutdown!' do + job['sleep'] = 0.5 # to ensure the parent has a chance to kill the child before it does work + + old_wait = Process.method(:wait) + Process.stub(:wait) do |child| + worker.shutdown! + old_wait.call(child) + end + + File.exist?(output_file).should be_false + worker.work(0) + File.exist?(output_file).should be_false + end + + it 'can be paused' do + old_wait = Process.method(:wait) + Process.stub(:wait) do |child| + worker.pause_processing # pause the worker after starting the first job + old_wait.call(child) + end + + paused_checks = 0 + old_paused = worker.method(:paused?) + worker.stub(:paused?) do + paused_checks += 1 # count the number of loop iterations + worker.shutdown if paused_checks == 20 # so we don't loop forever + old_paused.call + end + + # a job should only be reserved once because it is paused while processing the first one + reserver.should_receive(:reserve).once { job } + + worker.work(0) + paused_checks.should eq(20) + end + + it 'can be unpaused' do + worker.pause_processing + + paused_checks = 0 + old_paused = worker.method(:paused?) + worker.stub(:paused?) do + paused_checks += 1 # count the number of loop iterations + worker.unpause_processing if paused_checks == 20 # so we don't loop forever + old_paused.call + end + + worker.work(0) + paused_checks.should be >= 20 + end end end end From 77c39989798aecbae8eb89bdb694471b83ec87d9 Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Thu, 19 Apr 2012 09:45:33 -0700 Subject: [PATCH 21/30] Indicate the worker is paused in the procline. --- lib/qless/worker.rb | 1 + spec/unit/worker_spec.rb | 6 +++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/qless/worker.rb b/lib/qless/worker.rb index 44dd88cf..ca364a33 100644 --- a/lib/qless/worker.rb +++ b/lib/qless/worker.rb @@ -65,6 +65,7 @@ def paused? def pause_processing @paused = true + procline "Paused -- #{@job_reserver.description}" end def unpause_processing diff --git a/spec/unit/worker_spec.rb b/spec/unit/worker_spec.rb index e590f6f8..d3145539 100644 --- a/spec/unit/worker_spec.rb +++ b/spec/unit/worker_spec.rb @@ -142,12 +142,15 @@ def self.perform(job) old_wait.call(child) end + paused_procline = nil paused_checks = 0 old_paused = worker.method(:paused?) worker.stub(:paused?) do paused_checks += 1 # count the number of loop iterations worker.shutdown if paused_checks == 20 # so we don't loop forever - old_paused.call + old_paused.call.tap do |paused| + paused_procline = procline if paused + end end # a job should only be reserved once because it is paused while processing the first one @@ -155,6 +158,7 @@ def self.perform(job) worker.work(0) paused_checks.should eq(20) + paused_procline.should include("Paused") end it 'can be unpaused' do From e26f1309606fe62335be32e367ff0a7826f38b38 Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Thu, 19 Apr 2012 10:34:46 -0700 Subject: [PATCH 22/30] Add Worker.start method. --- lib/qless.rb | 2 +- lib/qless/job_reservers/ordered.rb | 2 + lib/qless/job_reservers/round_robin.rb | 2 + lib/qless/worker.rb | 21 ++++++ spec/spec_helper.rb | 29 +++++-- spec/unit/worker_spec.rb | 100 ++++++++++++++++++++++++- 6 files changed, 147 insertions(+), 9 deletions(-) diff --git a/lib/qless.rb b/lib/qless.rb index 9019c71b..b493afa7 100644 --- a/lib/qless.rb +++ b/lib/qless.rb @@ -35,7 +35,7 @@ class Client def initialize(options = {}) # This is the redis instance we're connected to - @redis = Redis.new(options) + @redis = Redis.connect(options) # use connect so REDIS_URL will be honored @config = Config.new(self) ['cancel', 'complete', 'depends', 'fail', 'failed', 'get', 'getconfig', 'heartbeat', 'jobs', 'peek', 'pop', 'put', 'queues', 'setconfig', 'stats', 'track', 'workers'].each do |f| diff --git a/lib/qless/job_reservers/ordered.rb b/lib/qless/job_reservers/ordered.rb index c5f9bfd5..1f331b8f 100644 --- a/lib/qless/job_reservers/ordered.rb +++ b/lib/qless/job_reservers/ordered.rb @@ -1,6 +1,8 @@ module Qless module JobReservers class Ordered + attr_reader :queues + def initialize(queues) @queues = queues end diff --git a/lib/qless/job_reservers/round_robin.rb b/lib/qless/job_reservers/round_robin.rb index 02d1e1ca..51a408b5 100644 --- a/lib/qless/job_reservers/round_robin.rb +++ b/lib/qless/job_reservers/round_robin.rb @@ -1,6 +1,8 @@ module Qless module JobReservers class RoundRobin + attr_reader :queues + def initialize(queues) @queues = queues @num_queues = queues.size diff --git a/lib/qless/worker.rb b/lib/qless/worker.rb index ca364a33..8b6c17ce 100644 --- a/lib/qless/worker.rb +++ b/lib/qless/worker.rb @@ -1,5 +1,7 @@ require 'qless' require 'time' +require 'qless/job_reservers/ordered' +require 'qless/job_reservers/round_robin' module Qless # This is heavily inspired by Resque's excellent worker: @@ -10,6 +12,25 @@ def initialize(client, job_reserver) @shutdown = @paused = false end + # Starts a worker based on ENV vars. Supported ENV vars: + # - REDIS_URL=redis://host:port/db-num (the redis gem uses this automatically) + # - QUEUES=high,medium,low or QUEUE=blah + # - JOB_RESERVER=Ordered or JOB_RESERVER=RoundRobin + # - INTERVAL=3.2 + # This is designed to be called from a rake task + def self.start + client = Qless::Client.new + queues = (ENV['QUEUES'] || ENV['QUEUE']).to_s.split(',').map { |q| client.queue(q.strip) } + if queues.none? + raise "No queues provided. You must pass QUEUE or QUEUES when starting a worker." + end + + reserver = JobReservers.const_get(ENV.fetch('JOB_RESERVER', 'Ordered')).new(queues) + interval = Float(ENV.fetch('INTERVAL', 5.0)) + + new(client, reserver).work(interval) + end + def work(interval = 5.0) procline "Starting #{@job_reserver.description}" register_signal_handlers diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 13709f45..d1381d6f 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -9,22 +9,37 @@ require 'rspec/fire' +module QlessSpecHelpers + def with_env_vars(vars) + original = ENV.to_hash + vars.each { |k, v| ENV[k] = v } + + begin + yield + ensure + ENV.replace(original) + end + end + + def redis_config + @redis_config ||= if File.exist?('./spec/redis.config.yml') + YAML.load_file('./spec/redis.config.yml') + else + {} + end + end +end + RSpec.configure do |c| c.treat_symbols_as_metadata_keys_with_true_values = true c.filter_run :f c.run_all_when_everything_filtered = true c.include RSpec::Fire + c.include QlessSpecHelpers end shared_context "redis integration", :integration do let(:client) { Qless::Client.new(redis_config) } - let(:redis_config) do - if File.exist?('./spec/redis.config.yml') - YAML.load_file('./spec/redis.config.yml') - else - {} - end - end def assert_minimum_redis_version(version) redis_version = Gem::Version.new(@redis.info["redis_version"]) diff --git a/spec/unit/worker_spec.rb b/spec/unit/worker_spec.rb index d3145539..c3c8e304 100644 --- a/spec/unit/worker_spec.rb +++ b/spec/unit/worker_spec.rb @@ -1,6 +1,6 @@ require 'spec_helper' +require 'yaml' require 'qless/worker' -require 'qless/job_reservers/ordered' module Qless describe Worker do @@ -176,6 +176,104 @@ def self.perform(job) paused_checks.should be >= 20 end end + + describe ".start" do + def redis_url + return "redis://localhost:6379/4" if redis_config.empty? + "redis://#{redis_config[:host]}:#{redis_config[:port]}/4" + end + + def with_env_vars(vars = {}) + defaults = { + 'REDIS_URL' => redis_url, + 'QUEUE' => 'mock_queue' + } + super(defaults.merge(vars)) { yield } + end + + it 'uses REDIS_URL to make a redis connection' do + with_env_vars "REDIS_URL" => redis_url do + Worker.should_receive(:new) do |client, reserver| + client.redis.client.db.should eq(4) + stub.as_null_object + end + + Worker.start + end + end + + it 'starts working with sleep interval INTERVAL' do + with_env_vars "INTERVAL" => "2.3" do + worker = fire_double("Qless::Worker") + Worker.stub(:new).and_return(worker) + worker.should_receive(:work).with(2.3) + + Worker.start + end + end + + it 'defaults the sleep interval to 5.0' do + with_env_vars do + worker = fire_double("Qless::Worker") + Worker.stub(:new).and_return(worker) + worker.should_receive(:work).with(5.0) + + Worker.start + end + end + + it 'uses the named QUEUE' do + with_env_vars 'QUEUE' => 'normal' do + Worker.should_receive(:new) do |client, reserver| + reserver.queues.map(&:name).should eq(["normal"]) + stub.as_null_object + end + + Worker.start + end + end + + it 'uses the named QUEUES (comma delimited)' do + with_env_vars 'QUEUES' => 'high,normal, low' do + Worker.should_receive(:new) do |client, reserver| + reserver.queues.map(&:name).should eq(["high", "normal", "low"]) + stub.as_null_object + end + + Worker.start + end + end + + it 'raises an error if no queues are provided' do + with_env_vars 'QUEUE' => '', 'QUEUES' => '' do + expect { + Worker.start + }.to raise_error(/must pass QUEUE or QUEUES/) + end + end + + it 'uses the Ordered reserver by default' do + with_env_vars do + Worker.should_receive(:new) do |client, reserver| + reserver.should be_a(JobReservers::Ordered) + stub.as_null_object + end + + Worker.start + end + end + + it 'uses the RoundRobin reserver if so configured' do + with_env_vars 'JOB_RESERVER' => 'RoundRobin' do + Worker.should_receive(:new) do |client, reserver| + reserver.should be_a(JobReservers::RoundRobin) + stub.as_null_object + end + + Worker.start + end + end + end end end From 7d9a53e7a4d6b6fc157be0af91b0082796385598 Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Thu, 19 Apr 2012 11:08:56 -0700 Subject: [PATCH 23/30] Remove unused method. --- lib/qless/worker.rb | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lib/qless/worker.rb b/lib/qless/worker.rb index 8b6c17ce..e22f07ee 100644 --- a/lib/qless/worker.rb +++ b/lib/qless/worker.rb @@ -101,10 +101,6 @@ def fail_job(job, error) job.fail(group, message) end - def handle_child_from_parent(child) - Process.wait(child) - end - def procline(value) $0 = "Qless: #{value} at #{Time.now.iso8601}" end From 816e737d8f646947cfc5415eca04225482e2e445 Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Thu, 19 Apr 2012 11:14:43 -0700 Subject: [PATCH 24/30] Add logging to worker. --- lib/qless/worker.rb | 40 ++++++++++++++++++++++++++--- spec/spec_helper.rb | 5 ++++ spec/unit/worker_spec.rb | 54 ++++++++++++++++++++++++++++++++++------ 3 files changed, 89 insertions(+), 10 deletions(-) diff --git a/lib/qless/worker.rb b/lib/qless/worker.rb index e22f07ee..77d6f99e 100644 --- a/lib/qless/worker.rb +++ b/lib/qless/worker.rb @@ -7,11 +7,19 @@ module Qless # This is heavily inspired by Resque's excellent worker: # https://github.com/defunkt/resque/blob/v1.20.0/lib/resque/worker.rb class Worker - def initialize(client, job_reserver) + def initialize(client, job_reserver, options = {}) @client, @job_reserver = client, job_reserver @shutdown = @paused = false + self.very_verbose = options[:very_verbose] + self.verbose = options[:verbose] end + # Whether the worker should log basic info to STDOUT + attr_accessor :verbose + + # Whether the worker should log lots of info to STDOUT + attr_accessor :very_verbose + # Starts a worker based on ENV vars. Supported ENV vars: # - REDIS_URL=redis://host:port/db-num (the redis gem uses this automatically) # - QUEUES=high,medium,low or QUEUE=blah @@ -28,7 +36,11 @@ def self.start reserver = JobReservers.const_get(ENV.fetch('JOB_RESERVER', 'Ordered')).new(queues) interval = Float(ENV.fetch('INTERVAL', 5.0)) - new(client, reserver).work(interval) + options = {} + options[:verbose] = !!ENV['VERBOSE'] + options[:very_verbose] = !!ENV['VVERBOSE'] + + new(client, reserver, options).work(interval) end def work(interval = 5.0) @@ -42,10 +54,13 @@ def work(interval = 5.0) unless job = @job_reserver.reserve break if interval.zero? procline "Waiting for #{@job_reserver.description}" + log! "Sleeping for #{interval} seconds" sleep interval next end + log "got: #{job.inspect}" + if @child = fork # We're in the parent process procline "Forked #{@child} for #{job.description}" @@ -85,11 +100,13 @@ def paused? end def pause_processing + log "USR2 received; pausing job processing" @paused = true procline "Paused -- #{@job_reserver.description}" end def unpause_processing + log "CONT received; resuming job processing" @paused = false end @@ -98,11 +115,13 @@ def unpause_processing def fail_job(job, error) group = "#{job.klass}:#{error.class}" message = "#{error.message}\n\n#{error.backtrace.join("\n")}" + log "Got #{group} failure from #{job.inspect}" job.fail(group, message) end def procline(value) - $0 = "Qless: #{value} at #{Time.now.iso8601}" + $0 = "Qless-#{Qless::VERSION}: #{value} at #{Time.now.iso8601}" + log! $0 end def kill_child @@ -133,6 +152,21 @@ def register_signal_handlers warn "Signals QUIT, USR1, USR2, and/or CONT not supported." end end + + # Log a message to STDOUT if we are verbose or very_verbose. + def log(message) + if verbose + puts "*** #{message}" + elsif very_verbose + time = Time.now.strftime('%H:%M:%S %Y-%m-%d') + puts "** [#{time}] #$$: #{message}" + end + end + + # Logs a very verbose message to STDOUT. + def log!(message) + log message if very_verbose + end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index d1381d6f..7a14b018 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -28,6 +28,11 @@ def redis_config {} end end + + def redis_url + return "redis://localhost:6379" if redis_config.empty? + "redis://#{redis_config[:host]}:#{redis_config[:port]}" + end end RSpec.configure do |c| diff --git a/spec/unit/worker_spec.rb b/spec/unit/worker_spec.rb index c3c8e304..9eeadd35 100644 --- a/spec/unit/worker_spec.rb +++ b/spec/unit/worker_spec.rb @@ -86,7 +86,7 @@ def self.perform(job) end worker.work(0) - starting_procline.should include("Qless: Starting") + starting_procline.should include("Starting") end it 'sets an appropriate procline for the parent process' do @@ -178,11 +178,6 @@ def self.perform(job) end describe ".start" do - def redis_url - return "redis://localhost:6379/4" if redis_config.empty? - "redis://#{redis_config[:host]}:#{redis_config[:port]}/4" - end - def with_env_vars(vars = {}) defaults = { 'REDIS_URL' => redis_url, @@ -192,7 +187,7 @@ def with_env_vars(vars = {}) end it 'uses REDIS_URL to make a redis connection' do - with_env_vars "REDIS_URL" => redis_url do + with_env_vars "REDIS_URL" => (redis_url + '/4') do Worker.should_receive(:new) do |client, reserver| client.redis.client.db.should eq(4) stub.as_null_object @@ -273,6 +268,51 @@ def with_env_vars(vars = {}) Worker.start end end + + it 'sets verbose and very_verbose to false by default' do + with_env_vars do + orig_new = Worker.method(:new) + Worker.should_receive(:new) do |client, reserver, options = {}| + worker = orig_new.call(client, reserver, options) + worker.verbose.should be_false + worker.very_verbose.should be_false + + stub.as_null_object + end + + Worker.start + end + end + + it 'sets verbose=true when passed VERBOSE' do + with_env_vars 'VERBOSE' => '1' do + orig_new = Worker.method(:new) + Worker.should_receive(:new) do |client, reserver, options = {}| + worker = orig_new.call(client, reserver, options) + worker.verbose.should be_true + worker.very_verbose.should be_false + + stub.as_null_object + end + + Worker.start + end + end + + it 'sets very_verbose=true when passed VVERBOSE' do + with_env_vars 'VVERBOSE' => '1' do + orig_new = Worker.method(:new) + Worker.should_receive(:new) do |client, reserver, options = {}| + worker = orig_new.call(client, reserver, options) + worker.verbose.should be_false + worker.very_verbose.should be_true + + stub.as_null_object + end + + Worker.start + end + end end end end From 4ce26f6acfbd69fa842b4e59429244cd28a3d7f7 Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Thu, 19 Apr 2012 13:36:07 -0700 Subject: [PATCH 25/30] Add worker integration test. --- spec/integration/worker_spec.rb | 61 +++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 spec/integration/worker_spec.rb diff --git a/spec/integration/worker_spec.rb b/spec/integration/worker_spec.rb new file mode 100644 index 00000000..825ac440 --- /dev/null +++ b/spec/integration/worker_spec.rb @@ -0,0 +1,61 @@ +require 'spec_helper' +require 'redis' +require 'yaml' +require 'qless/worker' +require 'qless' + +class WorkerIntegrationJob + def self.perform(job) + File.open(job['file'], "w") { |f| f.write("done") } + end +end + +describe "Worker integration", :integration do + def start_worker + unless @child = fork + with_env_vars 'REDIS_URL' => redis_url, 'QUEUE' => 'main', 'INTERVAL' => '0.0001' do + Qless::Worker.start + exit! # once the work ends, exit hte worker process + end + end + end + + def output_file(i) + File.join(temp_dir, "job.out.#{Time.now.to_i}.#{i}") + end + + def wait_for_job_completion + 20.times do |i| + sleep 0.1 + return if `ps -p #{@child}` =~ /Waiting/i + end + + raise "Didn't complete: #{`ps -p #{@child}`}" + end + + let(:temp_dir) { "./spec/tmp" } + + before do + FileUtils.rm_rf temp_dir + FileUtils.mkdir_p temp_dir + end + + it 'can start a worker and then shut it down' do + files = 3.times.map { |i| output_file(i) } + files.select { |f| File.exist?(f) }.should eq([]) + + start_worker + + queue = client.queue("main") + files.each do |f| + queue.put(WorkerIntegrationJob, "file" => f) + end + + wait_for_job_completion + Process.kill("QUIT", @child) # send shutdown signal + + contents = files.map { |f| File.read(f) } + contents.should eq(['done'] * files.size) + end +end + From 446e5648b9ce9e41c3ec2367930fad7481512e35 Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Thu, 19 Apr 2012 13:56:26 -0700 Subject: [PATCH 26/30] Add note about additional ENV vars. --- lib/qless/worker.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/qless/worker.rb b/lib/qless/worker.rb index 77d6f99e..a87c55aa 100644 --- a/lib/qless/worker.rb +++ b/lib/qless/worker.rb @@ -25,6 +25,8 @@ def initialize(client, job_reserver, options = {}) # - QUEUES=high,medium,low or QUEUE=blah # - JOB_RESERVER=Ordered or JOB_RESERVER=RoundRobin # - INTERVAL=3.2 + # - VERBOSE=true (to enable logging) + # - VVERBOSE=true (to enable very verbose logging) # This is designed to be called from a rake task def self.start client = Qless::Client.new From 3d63a14e34e84cc1ca4a47649da1bdc8ddb767f1 Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Thu, 19 Apr 2012 13:59:11 -0700 Subject: [PATCH 27/30] Define rake task. --- Rakefile | 5 +++++ lib/qless/tasks.rb | 10 ++++++++++ 2 files changed, 15 insertions(+) create mode 100644 lib/qless/tasks.rb diff --git a/Rakefile b/Rakefile index 903b5915..c24ae7ab 100644 --- a/Rakefile +++ b/Rakefile @@ -7,3 +7,8 @@ RSpec::Core::RakeTask.new(:spec) do |t| end task :default => :spec + +require 'bundler' +Bundler.setup + +require 'qless/tasks' diff --git a/lib/qless/tasks.rb b/lib/qless/tasks.rb new file mode 100644 index 00000000..3962dc8d --- /dev/null +++ b/lib/qless/tasks.rb @@ -0,0 +1,10 @@ +namespace :qless do + task :setup # no-op; users should define their own setup + + desc "Start a Qless worker using env vars: QUEUES, JOB_RESERVER, REDIS_URL, INTERVAL, VERBOSE, VVERBOSE" + task :work => :setup do + require 'qless/worker' + Qless::Worker.start + end +end + From 1b08d352ecfade707a2529ca024ee8ea2296b4d1 Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Thu, 19 Apr 2012 14:21:23 -0700 Subject: [PATCH 28/30] Remove unnecessary dependencies. --- qless.gemspec | 2 -- 1 file changed, 2 deletions(-) diff --git a/qless.gemspec b/qless.gemspec index 886286ed..112929d6 100644 --- a/qless.gemspec +++ b/qless.gemspec @@ -34,9 +34,7 @@ Gem::Specification.new do |s| s.add_dependency "sinatra", "~> 1.3.2" s.add_dependency "vegas" , "~> 0.1.11" - s.add_dependency "hiredis", "~> 0.4.5" s.add_dependency "redis" , "~> 2.2.2" - s.add_dependency "json" , "~> 1.6.6" s.add_development_dependency "rspec" , "~> 2.6" s.add_development_dependency "rspec-fire", "~> 0.4" From f72a25205311f54721c44a11f42f63aabce2e484 Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Thu, 19 Apr 2012 14:40:51 -0700 Subject: [PATCH 29/30] Add capybara dependency. --- qless.gemspec | 2 ++ 1 file changed, 2 insertions(+) diff --git a/qless.gemspec b/qless.gemspec index 112929d6..b43187bf 100644 --- a/qless.gemspec +++ b/qless.gemspec @@ -39,4 +39,6 @@ Gem::Specification.new do |s| s.add_development_dependency "rspec" , "~> 2.6" s.add_development_dependency "rspec-fire", "~> 0.4" s.add_development_dependency "rake" , "~> 0.9.2.2" + s.add_development_dependency 'capybara', '~> 1.1.2' + s.add_development_dependency 'launchy', '~> 2.1.0' end From 5cd643d06394af58c979c8d00cc61b59baa6c2f7 Mon Sep 17 00:00:00 2001 From: Myron Marston Date: Thu, 19 Apr 2012 17:01:17 -0700 Subject: [PATCH 30/30] Add a basic server spec that clicks each top-level tab. --- lib/qless/server/views/_job.erb | 2 +- spec/integration/server_spec.rb | 24 ++++++++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) create mode 100644 spec/integration/server_spec.rb diff --git a/lib/qless/server/views/_job.erb b/lib/qless/server/views/_job.erb index 78489422..ca500a15 100644 --- a/lib/qless/server/views/_job.erb +++ b/lib/qless/server/views/_job.erb @@ -3,7 +3,7 @@

- <%= job.jid %> | <%= job.state %> / <%= job.queue %><%= job.worker.nil? ? "/ #{job.worker}" : "" %> + <%= job.jid %> | <%= job.state %> / <%= job.queue %><%= job.worker_name.nil? ? "/ #{job.worker_name}" : "" %>

diff --git a/spec/integration/server_spec.rb b/spec/integration/server_spec.rb new file mode 100644 index 00000000..0644b85c --- /dev/null +++ b/spec/integration/server_spec.rb @@ -0,0 +1,24 @@ +ENV['RACK_ENV'] = 'test' +require 'spec_helper' +require 'yaml' +require 'qless/server' +require 'capybara/rspec' + +module Qless + describe Server, :type => :request do + before(:all) do + Qless::Server.client = Qless::Client.new(redis_config) + Capybara.app = Qless::Server.new + end + + it 'can visit each top-nav tab' do + visit '/' + + links = all('ul.nav a') + links.should have_at_least(6).links + links.each do |link| + click_link link.text + end + end + end +end