-
Notifications
You must be signed in to change notification settings - Fork 527
/
driver.rb
200 lines (167 loc) · 4.26 KB
/
driver.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
require 'monitor'
# ruby 1.9 specific fixes
unless RUBY_VERSION < '1.9'
require 'god/compat19'
end
module God
class TimedEvent
include Comparable
attr_accessor :at
# Instantiate a new TimedEvent that will be triggered after the specified delay
# +delay+ is the number of seconds from now at which to trigger
#
# Returns TimedEvent
def initialize(delay = 0)
self.at = Time.now + delay
end
def due?
Time.now >= self.at
end
def <=>(other)
self.at <=> other.at
end
end # DriverEvent
class DriverEvent < TimedEvent
def initialize(delay, task, condition)
super(delay)
@task = task
@condition = condition
end
def handle_event
@task.handle_poll(@condition)
end
end # DriverEvent
class DriverOperation < TimedEvent
def initialize(task, name, args)
super(0)
@task = task
@name = name
@args = args
end
# Handle the next queued operation that was issued asynchronously
#
# Returns nothing
def handle_event
@task.send(@name, *@args)
end
end
class DriverEventQueue
def initialize
@shutdown = false
@events = []
@monitor = Monitor.new
@resource = @monitor.new_cond
end
#
# Wake any sleeping threads after setting the sentinel
#
def shutdown
@shutdown = true
@monitor.synchronize do
@resource.broadcast
end
end
#
# Sleep until the queue has something due
#
def pop
@monitor.synchronize do
if @events.empty?
raise ThreadError, "queue empty" if @shutdown
@resource.wait
else !@events.first.due?
delay = @events.first.at - Time.now
@resource.wait(delay) if delay > 0
end
@events.shift
end
end
alias shift pop
alias deq pop
#
# Add an event to the queue, wake any waiters if what we added needs to
# happen sooner than the next pending event
#
def push(event)
@monitor.synchronize do
@events << event
@events.sort!
# If we've sorted the events and found the one we're adding is at
# the front, it will likely need to run before the next due date
@resource.signal if @events.first == event
end
end
alias << push
alias enq push
def empty?
@events.empty?
end
def clear
@events.clear
end
def length
@events.length
end
alias size length
end
class Driver
attr_reader :thread
# Instantiate a new Driver and start the scheduler loop to handle events
# +task+ is the Task this Driver belongs to
#
# Returns Driver
def initialize(task)
@task = task
@events = God::DriverEventQueue.new
@thread = Thread.new do
loop do
begin
@events.pop.handle_event
rescue ThreadError => e
# queue is empty
break
rescue Object => e
message = format("Unhandled exception in driver loop - (%s): %s\n%s",
e.class, e.message, e.backtrace.join("\n"))
applog(nil, :fatal, message)
end
end
end
end
# Check if we're in the driver context
#
# Returns true if in driver thread
def in_driver_context?
Thread.current == @thread
end
# Clear all events for this Driver
#
# Returns nothing
def clear_events
@events.clear
end
# Shutdown the DriverEventQueue threads
#
# Returns nothing
def shutdown
@events.shutdown
end
# Queue an asynchronous message
# +name+ is the Symbol name of the operation
# +args+ is an optional Array of arguments
#
# Returns nothing
def message(name, args = [])
@events.push(DriverOperation.new(@task, name, args))
end
# Create and schedule a new DriverEvent
# +condition+ is the Condition
# +delay+ is the number of seconds to delay (default: interval defined in condition)
#
# Returns nothing
def schedule(condition, delay = condition.interval)
applog(nil, :debug, "driver schedule #{condition} in #{delay} seconds")
@events.push(DriverEvent.new(delay, @task, condition))
end
end # Driver
end # God