Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

hm: DEA_EVAC prioritization

Change-Id: I114a4701c051fa278a98036172bcd2a3e071947b
  • Loading branch information...
commit 71f3fe46376c3afd3cc15c62d818784265912a04 1 parent 97c4d9c
@bnugmanov bnugmanov authored Patrick Bozeman committed
View
8 health_manager/config/health_manager.yml
@@ -51,10 +51,10 @@ intervals:
# Time to wait before analyzing the state of an application that has been
# started/restarted
stable_state: 60
- # Minimum time between sending start requests. This is the rate at which start requests
- # are dequed. Default value is 0.02 (20ms)
- # Set this value to 0 to revert to non-queuing behavior
- request_queue: 0.02
+
+#number of start requests send each second (subject to EM timer limitations)
+#default value is 50.
+dequeueing_rate: 50
# Used for /healthz and /vars endpoints. If not provided random
# values will be generated on component start. Uncomment to use
View
46 health_manager/lib/health_manager.rb
@@ -77,6 +77,10 @@ class HealthManager
RUNNING_STATES = Set.new([STARTING, RUNNING])
RESTART_REASONS = Set.new([CRASHED, DEA_SHUTDOWN, DEA_EVACUATION])
+
+ INFINITE_PRIORITY = 2_000_000_000
+
+
def self.start(options)
health_manager = new(options)
health_manager.run
@@ -95,7 +99,7 @@ def initialize(config)
@restart_timeout = config['intervals']['restart_timeout']
@stable_state = config['intervals']['stable_state']
@nats_ping = config['intervals']['nats_ping'] || 10
- @request_queue_interval = config['intervals']['request_queue'] || 0.02
+ @dequeueing_rate = config['dequeueing_rate'] || 50
@database_environment = config['database_environment']
@droplets = {}
@@ -413,8 +417,11 @@ def process_exited_message(message)
index_entry[:state] = DOWN
index_entry[:state_timestamp] = Time.now.to_i
index_entry[:last_action] = now
+
+ high_priority = (exit_message['reason'] == DEA_EVACUATION)
+
@logger.info("Preparing to start instance (app_id=#{droplet_id}, index=#{index}). Reason: Instance exited with reason '#{exit_message['reason']}'.")
- start_instances(droplet_id, [index])
+ start_instances(droplet_id, [index], high_priority)
end
end
end
@@ -633,7 +640,7 @@ def update_droplet(droplet)
entry_updated
end
- def start_instances(droplet_id, indices)
+ def start_instances(droplet_id, indices, high_priority = false)
droplet_entry = @droplets[droplet_id]
start_message = {
:droplet => droplet_id,
@@ -644,28 +651,25 @@ def start_instances(droplet_id, indices)
}
if queue_requests?
- queue_request(start_message)
+ queue_request(start_message, high_priority)
else
#old behavior: send the message immediately
NATS.publish('cloudcontrollers.hm.requests', start_message.to_json)
@logger.info("Requesting the start of extra instances: #{start_message}")
end
-
-
end
- def queue_request message
+ def queue_request(message, high_priority)
#the priority is higher for older items, to de-prioritize flapping items
priority = Time.now.to_i - message[:last_updated]
priority = 0 if priority < 0 #avoid timezone drama
-
+ priority = INFINITE_PRIORITY if high_priority
key = message.clone
key.delete :last_updated
@logger.info("Queueing priority '#{priority}' request: #{message}, using key: #{key}. Queue size: #{@request_queue.size}")
@request_queue.insert(message, priority, key)
end
-
def stop_instances(droplet_id, instances)
droplet_entry = @droplets[droplet_id]
last_updated = droplet_entry ? droplet_entry[:last_updated] : 0
@@ -708,14 +712,20 @@ def configure_timers
end
if queue_requests?
- EM.add_periodic_timer(@request_queue_interval) do
- unless @request_queue.empty?
- #TODO: if STOP requests are also queued, refactor this to be generic, particularly the log message
- start_message = encode_json(@request_queue.remove)
- NATS.publish('cloudcontrollers.hm.requests', start_message)
- @logger.info("Requesting the start of missing instances: #{start_message}")
- VCAP::Component.varz[:queue_length] = @request_queue.size
- end
+ EM.add_periodic_timer(1) do
+ deque_a_batch_of_requests(@dequeueing_rate)
+ end
+ end
+ end
+
+ def deque_a_batch_of_requests(num_requests)
+ num_requests.times do
+ unless @request_queue.empty?
+ #TODO: if STOP requests are also queued, refactor this to be generic, particularly the log message
+ start_message = encode_json(@request_queue.remove)
+ NATS.publish('cloudcontrollers.hm.requests', start_message)
+ @logger.info("Requesting the start of missing instances: #{start_message}")
+ VCAP::Component.varz[:queue_length] = @request_queue.size
end
end
end
@@ -801,7 +811,7 @@ def subscribe_to_messages
end
def queue_requests?
- @request_queue_interval != 0
+ @dequeueing_rate != 0
end
end
View
15 health_manager/spec/health_manager_spec.rb
@@ -59,9 +59,10 @@ def should_publish_to_nats(message, payload)
'flapping_death' => 3,
'flapping_timeout' => 5,
'restart_timeout' => 2,
- 'stable_state' => 1,
- 'request_queue' => 0
- },
+ 'stable_state' => -1,
+
+ },
+ 'dequeueing_rate' => 0,
'rails_environment' => 'test',
'database_environment' => {
'test' => {
@@ -140,7 +141,7 @@ def make_heartbeat_message(indices, state)
it "should detect instances that are down and send a START request" do
stats = { :frameworks => {}, :runtimes => {}, :down => 0 }
should_publish_to_nats "cloudcontrollers.hm.requests", {
- 'droplet' => 1,
+ 'droplet' => @app.id,
'op' => 'START',
'last_updated' => @app.last_updated.to_i,
'version' => "#{@app.staged_package_hash}-#{@app.run_count}",
@@ -164,7 +165,7 @@ def make_heartbeat_message(indices, state)
3 => { :state => 'RUNNING', :timestamp => timestamp, :last_action => @app.last_updated, :instance => '3' }
}}
should_publish_to_nats "cloudcontrollers.hm.requests", {
- 'droplet' => 1,
+ 'droplet' => @app.id,
'op' => 'STOP',
'last_updated' => @app.last_updated.to_i,
'instances' => [ version_entry[:indices][3][:instance] ]
@@ -193,7 +194,7 @@ def make_heartbeat_message(indices, state)
it "should restart an instance that exits unexpectedly" do
should_publish_to_nats "cloudcontrollers.hm.requests", {
- 'droplet' => 1,
+ 'droplet' => @app.id,
'op' => 'START',
'last_updated' => @app.last_updated.to_i,
'version' => "#{@app.staged_package_hash}-#{@app.run_count}",
@@ -202,7 +203,7 @@ def make_heartbeat_message(indices, state)
@hm.process_heartbeat_message(make_heartbeat_message([0], "RUNNING"))
droplet_entry = @hm.process_exited_message({
- 'droplet' => 1,
+ 'droplet' => @app.id,
'version' => "#{@app.staged_package_hash}-#{@app.run_count}",
'index' => 0,
'instance' => 0,
Please sign in to comment.
Something went wrong with that request. Please try again.