/
consumer_work_pool.rb
107 lines (84 loc) · 1.78 KB
/
consumer_work_pool.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
require "thread"
module Bunny
# Thread pool that dispatches consumer deliveries. Not supposed to be shared between channels
# or threads.
#
# Every channel its own consumer pool.
#
# @private
class ConsumerWorkPool
#
# API
#
attr_reader :threads
attr_reader :size
attr_reader :abort_on_exception
def initialize(size = 1, abort_on_exception = false)
@size = size
@abort_on_exception = abort_on_exception
@queue = ::Queue.new
@paused = false
end
def submit(callable = nil, &block)
@queue.push(callable || block)
end
def start
@threads = []
@size.times do
t = Thread.new(&method(:run_loop))
t.abort_on_exception = true if abort_on_exception
@threads << t
end
@running = true
end
def running?
@running
end
def backlog
@queue.length
end
def busy?
!@queue.empty?
end
def shutdown
@running = false
@size.times do
submit do |*args|
throw :terminate
end
end
end
def join(timeout = nil)
@threads.each { |t| t.join(timeout) }
end
def pause
@running = false
@paused = true
end
def resume
@running = true
@paused = false
@threads.each { |t| t.run }
end
def kill
@running = false
@threads.each { |t| t.kill }
end
protected
def run_loop
catch(:terminate) do
loop do
Thread.stop if @paused
callable = @queue.pop
begin
callable.call
rescue ::StandardError => e
# TODO: use connection logger
$stderr.puts e.class.name
$stderr.puts e.message
end
end
end
end
end
end