This repository has been archived by the owner on Aug 30, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 64
/
instance.rb
316 lines (264 loc) · 9.08 KB
/
instance.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
require 'multi_json'
require 'thread'
require 'celluloid'
require 'core_ext/hash/compact'
require 'travis/support'
require 'travis/worker/factory'
require 'travis/worker/virtual_machine'
require 'travis/worker/reporter'
require 'travis/worker/utils/hard_timeout'
require 'travis/worker/utils/serialization'
require 'travis/worker/job/runner'
module Travis
module Worker
class Instance
include Celluloid
include Logging
log_header { "#{name}:worker:instance" }
def self.create(name, config, broker_connection)
Factory.new(name, config, broker_connection).worker
end
STATES = [:created, :starting, :ready, :working, :stopping, :stopped, :errored]
attr_accessor :state
attr_reader :name, :vm, :broker_connection, :queue, :queue_name,
:subscription, :config, :payload, :last_error, :observers, :reporter
def initialize(name, vm, broker_connection, queue_name, config, observers = [])
raise ArgumentError, "worker name cannot be nil!" if name.nil?
raise ArgumentError, "VM cannot be nil!" if vm.nil?
raise ArgumentError, "broker connection cannot be nil!" if broker_connection.nil?
raise ArgumentError, "config cannot be nil!" if config.nil?
@name = name
@vm = vm
@queue_name = queue_name
@broker_connection = broker_connection
@config = config
@observers = Array(observers)
# create the reporter early so it is not created within the `process` callback
@reporter = Reporter.new(name, broker_connection.create_channel, broker_connection.create_channel)
end
def start
set :starting
vm.prepare
open_channels
declare_queues
subscribe
set :ready
end
log :start
# need to relook at this method as it feels wrong to
# report a worker at stopping while it is also working
def stop(options = {})
# set :stopping
info "stopping job"
unsubscribe
kill if options[:force]
end
def cancel
if @runner
info "cancelling job"
@runner.cancel
else
@job_canceled = true
info "marked job for cancellation as it is not running yet"
end
# vm.shell.terminate("Worker #{name} was stopped forcefully.")
end
def process(message, payload)
work(message, payload)
rescue => error
error_build(error, message) unless @job_canceled
ensure
reporter.reset
@job_canceled = false
end
def work(message, payload)
prepare(payload)
info "starting job slug:#{self.payload['repository']['slug']} id:#{self.payload['job']['id']}"
info "this is a requeued message" if message.redelivered?
notify_job_received
run_job
finish(message)
rescue VirtualMachine::VmFatalError => e
error "the job (slug:#{self.payload['repository']['slug']} id:#{self.payload['job']['id']}) was requeued as the vm had a fatal error"
finish(message, :restart => true)
rescue Job::Runner::ConnectionError => e
error "the job (slug:#{self.payload['repository']['slug']} id:#{self.payload['job']['id']}) was requeued as the runner had a connection error"
finish(message, :restart => true)
rescue MultiJson::DecodeError => e
error "invalid JSON for a job, dropping message: #{e.message}"
finish(message)
end
def report
{ :name => name, :host => host, :state => state, :last_error => last_error, :payload => payload }
end
def shutdown
info "shutting down"
stop
end
def working?
@state == :working
end
def stopping?
@state == :stopping
end
protected
def open_channels
# error handling happens on the per-channel basis, so using
# one channel for one type of operation is a highly recommended practice. MK.
build_channel
reporting_channel
end
def close_channels
# channels may be nil in some tests that mock out #start and #stop. MK.
build_channel.close if build_channel.open?
reporting_channel.close if reporting_channel && reporting_channel.open?
end
def build_channel
# technically there is no need to use one channel per consumer but with RabbitMQ version on
# Heroku (2.5) this is the only way to go :/ 2.6 and 2.7 on my local network work just fine.
# But hey, Heroku gods, we must obey to. For now. MK.
@build_channel ||= begin
channel = broker_connection.create_channel
channel.prefetch = 1
channel
end
end
def reporting_channel
@reporting_channel ||= broker_connection.create_channel
end
def declare_queues
@queue = build_channel.queue(queue_name, :durable => true)
# these are declared here mostly to aid development purposes. MK
reporting_channel = broker_connection.create_channel
reporting_channel.queue("reporting.jobs.builds", :durable => true)
reporting_channel.queue("reporting.jobs.logs", :durable => true)
end
def subscribe
@subscription = queue.subscribe(:ack => true, :blocking => false, &method(:process))
end
def unsubscribe
# due to some aspects of how RabbitMQ Java client works and MarchHare consumer
# implementation that uses thread pools (JDK executor services), we need to shut down
# consumers manually to guarantee that after disconnect we leave no active non-daemon
# threads (that are pretty much harmless but JVM won't exit as long as they are running). MK.
return if subscription.nil? || subscription.cancelled?
if working?
graceful_shutdown
else
info "unsubscribing from #{queue_name} right now"
subscription.cancel
sleep 2
set :stopped
end
rescue StandardError => e
puts e.inspect
info "subscription is still active"
graceful_shutdown
end
def graceful_shutdown
info "unsubscribing from #{queue_name} once the current job has finished"
@shutdown = true
end
def set(state)
@state = state
observers.each { |observer| observer.notify(report) }
state
end
def prepare(payload)
@last_error = nil
@payload = decode(payload)
Travis.uuid = @payload.delete(:uuid)
set :working
end
log :prepare, :as => :debug
def finish(message, opts = {})
if @shutdown
set :stopping
stop
end
restart_job if opts[:restart]
message.ack
@payload = nil
if working?
set :ready
elsif stopping?
set :stopped
end
end
log :finish, :params => false
def error_build(error, message)
log_errored_build(error)
finish(message, restart: true)
# stop
set :errored
sleep 10
set :ready
end
log :error_build, :as => :debug
def log_errored_build(error)
@last_error = [error.message, error.backtrace].flatten.join("\n")
log_exception(error)
Raven.capture_exception(error)
rescue => error
$stderr.puts "ERROR: failed to log error: #{error}"
end
def host
Travis::Worker.config.host
end
def decode(payload)
Hashr.new(MultiJson.decode(payload))
end
def run_job
@runner = nil
vm_opts = {
language: job_language,
job_id: payload.job.id,
custom_image: job_image,
dist: job_dist,
group: job_group
}
vm.sandboxed(vm_opts) do
if @job_canceled
reporter.send_log(payload.job.id, "\n\nDone: Job Cancelled\n")
reporter.notify_job_finished(payload.job.id, 'canceled')
else
@runner = Job::Runner.new(self.payload, vm.session, reporter, vm.full_name, timeouts, name)
@runner.run
end
end
ensure
# @runner.terminate if @runner && @runner.alive?
@runner = nil
end
def timeouts
{ hard_limit: timeout(:hard_limit), log_silence: timeout(:log_silence) }
end
def timeout(type)
timeout = payload.timeouts && payload.timeouts.send(type) || config.timeouts.send(type)
timeout.to_i
end
def notify_job_received
reporter.notify_job_received(self.payload['job']['id'])
end
def restart_job
if reporter && payload['job']['id']
info "requeuing job"
Metriks.meter('worker.job.requeue').mark
reporter.restart(payload['job']['id'])
end
end
def job_language
payload['config']['language']
end
def job_dist
payload['config']['dist']
end
def job_group
payload['config']['group']
end
def job_image
payload['config']['osx_image']
end
end
end
end