-
Notifications
You must be signed in to change notification settings - Fork 3
/
worker.rb
105 lines (85 loc) · 2.48 KB
/
worker.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# frozen_string_literal: true
require "yabeda/datadog/worker/send"
require "yabeda/datadog/worker/register"
module Yabeda
module Datadog
# = Perform actions async
class Worker
def self.start(config)
queue_size = config.queue_size
num_threads = config.num_threads
Logging.instance.info("start worker; queue size: #{queue_size}; threads #{num_threads} ")
queue = SizedQueue.new(queue_size)
instance = new(queue)
instance.spawn_threads(num_threads)
instance
end
def initialize(queue)
@queue = queue
@threads = []
end
def enqueue(action, payload)
queue.push([action, payload])
end
def spawn_threads(num_threads)
num_threads.times do
threads << Thread.new do
grouped_actions = Hash.new { |hash, key| hash[key] = [] }
while running? || actions_left?
batch_size = 0
# wait for actions, blocks the current thread
action_key, action_payload = wait_for_action
if action_key
grouped_actions[action_key].push(action_payload)
batch_size += 1
end
# group a batch of actions
while batch_size < Yabeda::Datadog.config.batch_size
begin
action_key, action_payload = dequeue_action
grouped_actions[action_key].push(action_payload)
batch_size += 1
rescue ThreadError
break # exit batch loop if we drain the queue
end
end
# invoke actions in batches
grouped_actions.each_pair do |group_key, group_payload|
self.class.const_get(group_key, false).call(group_payload)
end
grouped_actions.clear
end
end
end
true
end
def spawned_threads_count
threads.size
end
def stop
Logging.instance.info("stop worker")
queue.close
threads.each(&:exit)
threads.clear
true
end
private
attr_reader :queue, :threads, :logger
def actions_left?
!queue.empty?
end
def no_acitons?
queue.empty?
end
def dequeue_action
queue.pop(true)
end
def wait_for_action
queue.pop(false)
end
def running?
!queue.closed?
end
end
end
end