/
worker_loop.rb
90 lines (73 loc) · 2.4 KB
/
worker_loop.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
require 'thread'
require 'newrelic/agent/synchronize'
# A worker loop executes a set of registered tasks on a single thread.
# A task is a proc or block with a specified call period in seconds.
module NewRelic::Agent
class WorkerLoop
include(NewRelic::Agent::Synchronize)
attr_reader :log
def initialize(log = Logger.new(STDERR))
@tasks = []
@log = log
end
# run infinitely, calling the registered tasks at their specified
# call periods. The caller is responsible for creating the thread
# that runs this worker loop
def run
while(true) do
run_next_task
end
end
MIN_CALL_PERIOD = 0.1
# add a task to the worker loop. The task will be called approximately once
# every call_period seconds. The task is passed as a block
def add_task(call_period, &task_proc)
if call_period < MIN_CALL_PERIOD
raise ArgumentError, "Invalid Call Period (must be > #{MIN_CALL_PERIOD}): #{call_period}"
end
synchronize do
@tasks << LoopTask.new(call_period, &task_proc)
end
end
private
def get_next_task
synchronize do
return @tasks.inject do |soonest, task|
(task.next_invocation_time < soonest.next_invocation_time) ? task : soonest
end
end
end
def run_next_task
if @tasks.empty?
sleep 5.0
return
end
# get the next task to be executed, which is the task with the lowest (ie, soonest)
# next invocation time.
task = get_next_task
# sleep until this next task's scheduled invocation time
sleep_time = task.next_invocation_time - Time.now
sleep sleep_time unless sleep_time <= 0
begin
task.execute
rescue Timeout::Error, StandardError => e
log.debug "Error running task in Agent Worker Loop: #{e}"
log.debug e.backtrace.join("\n")
end
end
class LoopTask
def initialize(call_period, &task_proc)
@call_period = call_period
@last_invocation_time = Time.now
@task = task_proc
end
def next_invocation_time
@last_invocation_time + @call_period
end
def execute
@last_invocation_time = Time.now
@task.call
end
end
end
end