Skip to content
This repository
Browse code

Merge pull request #90 from seomoz/harden_worker

Harden worker and improve script caching
  • Loading branch information...
commit 7502f0e0777ad860ff1672e568e70da64d648873 2 parents 8e2917b + 57d264f
Myron Marston authored April 05, 2013
8  lib/qless.rb
@@ -14,11 +14,11 @@ module Qless
14 14
   USING_LEGACY_REDIS_VERSION = ::Redis::VERSION.to_f < 3.0
15 15
 end
16 16
 
17  
-require "qless/lua_script_cache"
18 17
 require "qless/version"
19 18
 require "qless/config"
20 19
 require "qless/queue"
21 20
 require "qless/job"
  21
+require "qless/lua_script"
22 22
 
23 23
 module Qless
24 24
   extend self
@@ -40,10 +40,6 @@ def worker_name
40 40
     @worker_name ||= [Socket.gethostname, Process.pid.to_s].join('-')
41 41
   end
42 42
 
43  
-  def lua_script_cache
44  
-    @lua_script_cache ||= LuaScriptCache.new
45  
-  end
46  
-
47 43
   class ClientJobs
48 44
     def initialize(client)
49 45
       @client = client
@@ -163,7 +159,7 @@ def initialize(options = {})
163 159
       ['cancel', 'config', 'complete', 'depends', 'fail', 'failed', 'get', 'heartbeat', 'jobs', 'peek', 'pop',
164 160
         'priority', 'put', 'queues', 'recur', 'retry', 'stats', 'tag', 'track', 'workers', 'pause', 'unpause',
165 161
         'deregister_workers'].each do |f|
166  
-        self.instance_variable_set("@_#{f}", Qless.lua_script_cache.script_for(f, @redis))
  162
+        self.instance_variable_set("@_#{f}", Qless::LuaScript.new(f, @redis))
167 163
       end
168 164
 
169 165
       @jobs    = ClientJobs.new(self)
13  lib/qless/lua_script.rb
... ...
@@ -1,18 +1,19 @@
  1
+require 'digest/sha1'
  2
+
1 3
 module Qless
2 4
   class LuaScript
3 5
     LUA_SCRIPT_DIR = File.expand_path("../qless-core/", __FILE__)
4 6
 
5  
-    def initialize(name, redis, sha = nil)
6  
-      @sha   = sha
  7
+    def initialize(name, redis)
7 8
       @name  = name
8 9
       @redis = redis
9  
-      reload() unless sha
  10
+      @sha   = Digest::SHA1.hexdigest(script_contents)
10 11
     end
11 12
 
12 13
     attr_reader :name, :redis, :sha
13 14
 
14 15
     def reload()
15  
-      @sha = @redis.script(:load, File.read(File.join(LUA_SCRIPT_DIR, "#{@name}.lua")))
  16
+      @sha = @redis.script(:load, script_contents)
16 17
     end
17 18
 
18 19
     def call(keys, argv)
@@ -33,5 +34,9 @@ def _call(keys, argv)
33 34
         @redis.evalsha(@sha, keys: keys, argv: argv)
34 35
       end
35 36
     end
  37
+
  38
+    def script_contents
  39
+      @script_contents ||= File.read(File.join(LUA_SCRIPT_DIR, "#{@name}.lua"))
  40
+    end
36 41
   end
37 42
 end
22  lib/qless/lua_script_cache.rb
... ...
@@ -1,22 +0,0 @@
1  
-require 'qless/lua_script'
2  
-
3  
-module Qless
4  
-  class LuaScriptCache
5  
-    def initialize
6  
-      @sha_cache = {}
7  
-    end
8  
-
9  
-    def script_for(script_name, redis_connection)
10  
-      key = CacheKey.new(script_name, redis_connection.id)
11  
-
12  
-      sha = @sha_cache.fetch(key) do
13  
-        @sha_cache[key] = LuaScript.new(script_name, redis_connection).sha
14  
-      end
15  
-
16  
-      LuaScript.new(script_name, redis_connection, sha)
17  
-    end
18  
-
19  
-    CacheKey = Struct.new(:script_name, :redis_server_url)
20  
-  end
21  
-end
22  
-
11  lib/qless/worker.rb
@@ -78,7 +78,7 @@ def work(interval = 5.0)
78 78
           next
79 79
         end
80 80
 
81  
-        unless job = @job_reserver.reserve
  81
+        unless job = reserve_job
82 82
           break if interval.zero?
83 83
           procline "Waiting for #{@job_reserver.description}"
84 84
           log! "Sleeping for #{interval} seconds"
@@ -117,6 +117,15 @@ def perform(job)
117 117
       try_complete(job)
118 118
     end
119 119
 
  120
+    def reserve_job
  121
+      @job_reserver.reserve
  122
+    rescue Exception => error
  123
+      # We want workers to durably stay up, so we don't want errors
  124
+      # during job reserving (e.g. network timeouts, etc) to kill
  125
+      # the worker.
  126
+      log "Got an error while reserving a job: #{error.class}: #{error.message}"
  127
+    end
  128
+
120 129
     def shutdown
121 130
       @shutdown = true
122 131
     end
35  spec/integration/lua_script_cache_spec.rb
... ...
@@ -1,35 +0,0 @@
1  
-require 'spec_helper'
2  
-require "qless"
3  
-require 'yaml'
4  
-
5  
-module Qless
6  
-  describe LuaScriptCache, :integration do
7  
-    def ensure_put_script_loaded(client)
8  
-      client.queues["q1"].put(Qless::Job, {})
9  
-    end
10  
-
11  
-    let(:redis_1)  { Redis.new(url: redis_url) }
12  
-    let(:redis_2)  { Redis.new(url: redis_url) }
13  
-    let(:client_1) { Qless::Client.new(redis: redis_1) }
14  
-    let(:client_2) { Qless::Client.new(redis: redis_2) }
15  
-
16  
-    before { stub_const("SomeJob", Class.new) }
17  
-
18  
-    it 'does not prevent watch-multi-exec blocks from working properly' do
19  
-      ensure_put_script_loaded(client_1)
20  
-      ensure_put_script_loaded(client_2)
21  
-
22  
-      redis_2.watch("some_key") do
23  
-        response = redis_2.multi do
24  
-          redis_1.set("some_key", "some_value") # force the multi block to no-op
25  
-          client_2.queues["some_job"].put(SomeJob, {})
26  
-        end
27  
-
28  
-        expect(response).to be_nil
29  
-      end
30  
-
31  
-      expect(client_2.queues["some_job"].length).to eq(0)
32  
-    end
33  
-  end
34  
-end
35  
-
39  spec/integration/lua_script_spec.rb
... ...
@@ -0,0 +1,39 @@
  1
+require 'spec_helper'
  2
+require "qless"
  3
+require 'yaml'
  4
+
  5
+module Qless
  6
+  describe LuaScript, :integration do
  7
+    let(:redis) { client.redis }
  8
+
  9
+    it 'does not make any redis requests upon initialization' do
  10
+      redis = double("Redis")
  11
+
  12
+      expect {
  13
+        LuaScript.new("config", redis)
  14
+      }.not_to raise_error # e.g. MockExpectationError
  15
+    end
  16
+
  17
+    it 'can issue the command without loading the script if it is already loaded' do
  18
+      script = LuaScript.new("config", redis)
  19
+      redis.script(:load, script.send(:script_contents)) # to ensure its loaded
  20
+      redis.should_not_receive(:script)
  21
+
  22
+      expect {
  23
+        script.call([], ['set', 'key', 3])
  24
+      }.to change { redis.keys.size }.by(1)
  25
+    end
  26
+
  27
+    it 'loads the script as needed if the command fails' do
  28
+      script = LuaScript.new("config", redis)
  29
+      redis.script(:flush)
  30
+
  31
+      redis.should_receive(:script).and_call_original
  32
+
  33
+      expect {
  34
+        script.call([], ['set', 'key', 3])
  35
+      }.to change { redis.keys.size }.by(1)
  36
+    end
  37
+  end
  38
+end
  39
+
56  spec/unit/lua_script_cache_spec.rb
... ...
@@ -1,56 +0,0 @@
1  
-require 'spec_helper'
2  
-require 'qless'
3  
-require 'qless/lua_script_cache'
4  
-
5  
-module Qless
6  
-  describe LuaScriptCache do
7  
-    def redis_double(db_num)
8  
-      fire_double("Redis", id: "redis://foo:1234/#{db_num}").tap do |redis|
9  
-        redis.stub(:script) do |script_contents|
10  
-          script_contents
11  
-        end
12  
-      end
13  
-    end
14  
-
15  
-    let(:redis_1a) { redis_double(1) }
16  
-    let(:redis_1b) { redis_double(1) }
17  
-    let(:redis_2)  { redis_double(2) }
18  
-    let(:cache)    { LuaScriptCache.new }
19  
-
20  
-    before do
21  
-      File.stub(:read) do |file_name|
22  
-        file_name.split('/').last
23  
-      end
24  
-    end
25  
-
26  
-    it 'loads each different script' do
27  
-      redis_1a.should_receive(:script).twice
28  
-
29  
-      script_1 = cache.script_for("foo", redis_1a)
30  
-      script_2 = cache.script_for("bar", redis_1a)
31  
-
32  
-      expect(script_1).to be_a(LuaScript)
33  
-      expect(script_2).to be_a(LuaScript)
34  
-
35  
-      expect(script_1.name).to eq("foo")
36  
-      expect(script_2.name).to eq("bar")
37  
-    end
38  
-
39  
-    it 'loads the same script each time it is needed in a different redis server' do
40  
-      redis_1a.should_receive(:script).once
41  
-      redis_2.should_receive(:script).once
42  
-
43  
-      cache.script_for("foo", redis_1a)
44  
-      cache.script_for("foo", redis_2)
45  
-    end
46  
-
47  
-    it 'loads a script only once when it is needed by multiple connections to the same redis server' do
48  
-      redis_1a.should_receive(:script).once
49  
-      redis_1b.should_not_receive(:script)
50  
-
51  
-      cache.script_for("foo", redis_1a)
52  
-      cache.script_for("foo", redis_1b)
53  
-    end
54  
-  end
55  
-end
56  
-
18  spec/unit/qless_spec.rb
@@ -18,13 +18,6 @@
18 18
     end
19 19
   end
20 20
 
21  
-  describe ".lua_script_cache" do
22  
-    it 'returns a memoized script cache instance' do
23  
-      expect(Qless.lua_script_cache).to be_a(Qless::LuaScriptCache)
24  
-      expect(Qless.lua_script_cache).to be(Qless.lua_script_cache)
25  
-    end
26  
-  end
27  
-
28 21
   context 'when instantiated' do
29 22
     let(:redis) { fire_double("Redis", id: "redis://foo:1/1", info: { "redis_version" => "2.6.0" }) }
30 23
     let(:redis_class) { fire_replaced_class_double("Redis") }
@@ -55,17 +48,6 @@
55 48
       client = Qless::Client.new(redis: redis)
56 49
       client.redis.should be(redis)
57 50
     end
58  
-
59  
-    it 'loads the lua scripts from the cache so that the scripts are not unnecessarily loaded multiple times' do
60  
-      cache = fire_double("Qless::LuaScriptCache")
61  
-      Qless.stub(lua_script_cache: cache)
62  
-
63  
-      loaded_scripts = []
64  
-      cache.stub(:script_for) { |name, redis| loaded_scripts << name }
65  
-
66  
-      client = Qless::Client.new(redis: redis)
67  
-      expect(loaded_scripts).to include("put", "complete")
68  
-    end
69 51
   end
70 52
 end
71 53
 
13  spec/unit/worker_spec.rb
@@ -164,6 +164,19 @@ def around_perform(job)
164 164
           worker.work(0)
165 165
           paused_checks.should be >= 20
166 166
         end
  167
+
  168
+        context 'when an error occurs while reserving a job' do
  169
+          before { reserver.stub(:reserve) { raise "redis error" } }
  170
+
  171
+          it 'does not kill the worker' do
  172
+            expect { worker.work(0) }.not_to raise_error
  173
+          end
  174
+
  175
+          it 'logs the error' do
  176
+            worker.work(0)
  177
+            expect(log_output.string).to include("redis error")
  178
+          end
  179
+        end
167 180
       end
168 181
     end
169 182
 

0 notes on commit 7502f0e

Please sign in to comment.
Something went wrong with that request. Please try again.