Skip to content

Commit

Permalink
Merge pull request ManageIQ#13805 from jrafanie/stopping_worker
Browse files Browse the repository at this point in the history
Kill workers that don't stop after a configurable time
  • Loading branch information
gtanzillo committed Feb 14, 2017
2 parents dd178c7 + b60a5f0 commit 9764870
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 3 deletions.
8 changes: 6 additions & 2 deletions app/models/miq_server/worker_management/monitor.rb
Expand Up @@ -89,7 +89,7 @@ def check_not_responding(class_name = nil)
processed_workers = []
miq_workers.each do |w|
next unless class_name.nil? || (w.type == class_name)
next unless [:not_responding, :memory_exceeded].include?(worker_get_monitor_reason(w.pid))
next unless monitor_reason_not_responding?(w)
next unless [:waiting_for_stop_before_restart, :waiting_for_stop].include?(worker_get_monitor_status(w.pid))
processed_workers << w
worker_not_responding(w)
Expand All @@ -99,6 +99,10 @@ def check_not_responding(class_name = nil)
processed_workers.collect(&:id)
end

def monitor_reason_not_responding?(w)
[Reason::NOT_RESPONDING, Reason::MEMORY_EXCEEDED].include?(worker_get_monitor_reason(w.pid)) || w.stopping_for_too_long?
end

def do_system_limit_exceeded
self.class.monitor_class_names_in_kill_order.each do |class_name|
workers = class_name.constantize.find_current.to_a
Expand All @@ -109,7 +113,7 @@ def do_system_limit_exceeded
msg = "#{w.format_full_log_msg} is being stopped because system resources exceeded threshold, it will be restarted once memory has freed up"
_log.warn(msg)
MiqEvent.raise_evm_event_queue_in_region(w.miq_server, "evm_server_memory_exceeded", :event_details => msg, :type => w.class.name)
restart_worker(w, :memory_exceeded)
restart_worker(w, Reason::MEMORY_EXCEEDED)
break
end
end
Expand Down
3 changes: 3 additions & 0 deletions app/models/miq_server/worker_management/monitor/reason.rb
@@ -1,6 +1,9 @@
module MiqServer::WorkerManagement::Monitor::Reason
extend ActiveSupport::Concern

MEMORY_EXCEEDED = :memory_exceeded
NOT_RESPONDING = :not_responding

def worker_set_monitor_reason(pid, reason)
@workers_lock.synchronize(:EX) do
@workers[pid][:monitor_reason] = reason if @workers.key?(pid)
Expand Down
Expand Up @@ -14,7 +14,7 @@ def validate_worker(w)
msg = "#{w.format_full_log_msg} has not responded in #{Time.now.utc - w.last_heartbeat} seconds, restarting worker"
_log.error(msg)
MiqEvent.raise_evm_event_queue(w.miq_server, "evm_worker_not_responding", :event_details => msg, :type => w.class.name)
restart_worker(w, :not_responding)
restart_worker(w, Reason::NOT_RESPONDING)
return false
end

Expand Down
8 changes: 8 additions & 0 deletions app/models/miq_worker.rb
Expand Up @@ -411,6 +411,14 @@ def enabled_or_running?
!is_stopped? || actually_running?
end

def stopping_for_too_long?
# Note, a 'stopping' worker heartbeats in DRb but NOT to
# the database, so we can see how long it's been
# 'stopping' by checking the last_heartbeat.
stopping_timeout = self.class.worker_settings[:stopping_timeout] || 10.minutes
status == MiqWorker::STATUS_STOPPING && last_heartbeat < stopping_timeout.seconds.ago
end

def validate_active_messages
active_messages.each { |msg| msg.check_for_timeout(_log.prefix) }
end
Expand Down
1 change: 1 addition & 0 deletions config/settings.yml
Expand Up @@ -1194,6 +1194,7 @@
:poll_method: :normal
:restart_interval: 0.hours
:starting_timeout: 10.minutes
:stopping_timeout: 10.minutes
:embedded_ansible_worker:
:poll: 10.seconds
:memory_threshold: 0.megabytes
Expand Down
34 changes: 34 additions & 0 deletions spec/models/miq_server/worker_management/monitor_spec.rb
@@ -0,0 +1,34 @@
describe MiqServer::WorkerManagement::Monitor do
context "#check_not_responding" do
let(:server) { EvmSpecHelper.local_miq_server }
let(:worker) do
FactoryGirl.create(:miq_worker,
:type => "MiqGenericWorker",
:miq_server => server,
:pid => 12345,
:last_heartbeat => 5.minutes.ago)
end

before do
server.setup_drb_variables
server.worker_add(worker.pid)
end

it "destroys an unresponsive 'stopping' worker" do
worker.update(:last_heartbeat => 20.minutes.ago)
server.stop_worker(worker)
server.check_not_responding
server.reload
expect(server.miq_workers).to be_empty
expect { worker.reload }.to raise_error(ActiveRecord::RecordNotFound)
end

it "monitors recently heartbeated 'stopping' workers" do
worker.update(:last_heartbeat => 1.minute.ago)
server.stop_worker(worker)
server.check_not_responding
server.reload
expect(server.miq_workers.first.id).to eq(worker.id)
end
end
end
21 changes: 21 additions & 0 deletions spec/models/miq_worker_spec.rb
Expand Up @@ -331,6 +331,27 @@ def check_has_required_role(worker_role_names, expected_result)
expect(@worker.worker_options).to eq(:guid => @worker.guid)
end

describe "#stopping_for_too_long?" do
subject { @worker.stopping_for_too_long? }

it "false if started" do
@worker.update(:status => described_class::STATUS_STARTED)
expect(subject).to be_falsey
end

it "true if stopping and not heartbeated recently" do
@worker.update(:status => described_class::STATUS_STOPPING,
:last_heartbeat => 30.minutes.ago)
expect(subject).to be_truthy
end

it "false if stopping and heartbeated recently" do
@worker.update(:status => described_class::STATUS_STOPPING,
:last_heartbeat => 1.minute.ago)
expect(subject).to be_falsey
end
end

it "is_current? false when starting" do
@worker.update_attribute(:status, described_class::STATUS_STARTING)
expect(@worker.is_current?).not_to be_truthy
Expand Down

0 comments on commit 9764870

Please sign in to comment.