forked from wr0ngway/resque-concurrent-restriction
/
spec_helper.rb
194 lines (153 loc) · 4.05 KB
/
spec_helper.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
require 'rspec'
require 'ap'
require 'resque-concurrent-restriction'
# No need to start redis when running in Travis
unless ENV['CI']
begin
Resque.queues
rescue Errno::ECONNREFUSED
spec_dir = File.dirname(File.expand_path(__FILE__))
REDIS_CMD = "redis-server #{spec_dir}/redis-test.conf"
puts "Starting redis for testing at localhost..."
puts `cd #{spec_dir}; #{REDIS_CMD}`
# Schedule the redis server for shutdown when tests are all finished.
at_exit do
puts 'Stopping redis'
pid = File.read("#{spec_dir}/redis.pid").to_i rescue nil
system ("kill -9 #{pid}") if pid.to_i != 0
File.delete("#{spec_dir}/redis.pid") rescue nil
File.delete("#{spec_dir}/redis-server.log") rescue nil
File.delete("#{spec_dir}/dump.rdb") rescue nil
end
end
end
##
# Helper to perform job classes
#
module PerformJob
def run_resque_job(job_class, *job_args)
opts = job_args.last.is_a?(Hash) ? job_args.pop : {}
queue = opts[:queue] || Resque.queue_from_class(job_class)
Resque::Job.create(queue, job_class, *job_args)
run_resque_queue(queue, opts)
end
def run_resque_queue(queue, opts={})
worker = Resque::Worker.new(queue)
worker.very_verbose = true if opts[:verbose]
# do a single job then shutdown
def worker.done_working
super
shutdown
end
if opts[:inline]
job = worker.reserve
worker.perform(job)
else
worker.work(0)
end
end
def dump_redis
result = {}
Resque.redis.keys("*").each do |key|
type = Resque.redis.type(key)
result[key] = case type
when 'string' then Resque.redis.get(key)
when 'list' then Resque.redis.lrange(key, 0, -1)
when 'set' then Resque.redis.smembers(key)
else type
end
end
return result
end
end
module RunCountHelper
def around_perform(*args)
begin
Resque.redis.set("restricted_job_started:#{self}:#{args.to_json}", true)
Resque.redis.incr("restricted_job_run_count:#{self}:#{args.to_json}")
yield
ensure
Resque.redis.set("restricted_job_ended:#{self}:#{args.to_json}", true)
end
end
def perform(*args)
#puts "Running job #{self}:#{args}"
end
def run_count(*args)
Resque.redis.get("restricted_job_run_count:#{self}:#{args.to_json}").to_i
end
def total_run_count
keys = Resque.redis.keys("restricted_job_run_count:#{self}:*")
keys.inject(0) {|sum, k| sum + Resque.redis.get(k).to_i }
end
def started?(*args)
return Resque.redis.get("restricted_job_started#{self}:#{args.to_json}") == true
end
def ended?(*args)
return Resque.redis.get("restricted_job_ended#{self}:#{args.to_json}") == true
end
end
class NoRestrictionJob
extend RunCountHelper
@queue = 'normal'
end
class RestrictionJob
extend RunCountHelper
extend Resque::Plugins::ConcurrentRestriction
concurrent 1
@queue = 'normal'
end
module Jobs
class NestedRestrictionJob
extend RunCountHelper
extend Resque::Plugins::ConcurrentRestriction
concurrent 1
@queue = 'normal'
end
end
class IdentifiedRestrictionJob
extend RunCountHelper
extend Resque::Plugins::ConcurrentRestriction
concurrent 1
@queue = 'normal'
def self.concurrent_identifier(*args)
args.first.to_s
end
end
class ConcurrentRestrictionJob
extend RunCountHelper
extend Resque::Plugins::ConcurrentRestriction
concurrent 1
@queue = 'normal'
def self.perform(*args)
raise args.first if args.first
sleep 0.2
end
end
class MultipleConcurrentRestrictionJob
extend RunCountHelper
extend Resque::Plugins::ConcurrentRestriction
concurrent 4
@queue = 'normal'
def self.perform(*args)
sleep 0.5
end
end
class OneConcurrentRestrictionJob
extend RunCountHelper
extend Resque::Plugins::ConcurrentRestriction
concurrent 1
@queue = 'normal'
def self.perform(*args)
sleep 0.5
end
end
class TwoConcurrentRestrictionJob
extend RunCountHelper
extend Resque::Plugins::ConcurrentRestriction
concurrent 2
@queue = 'normal'
def self.perform(*args)
sleep 0.5
end
end