Skip to content

Commit

Permalink
Support SIGNAL handling for ECS Task stopping
Browse files Browse the repository at this point in the history
  • Loading branch information
joker1007 committed Feb 9, 2018
1 parent 15d46b0 commit 85a350a
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 14 deletions.
2 changes: 1 addition & 1 deletion lib/wrapbox/log_fetcher/papertrail.rb
Expand Up @@ -24,7 +24,7 @@ def run

def stop
@stop = true
@loop_thread.join(STOP_WAIT_TIMELIMIT)
@loop_thread&.join(STOP_WAIT_TIMELIMIT)
end

def main_loop
Expand Down
34 changes: 29 additions & 5 deletions lib/wrapbox/runner/docker.rb
Expand Up @@ -17,6 +17,7 @@ class ExecutionError < StandardError; end
def initialize(options)
@name = options[:name]
@container_definitions = options[:container_definition] ? [options[:container_definition]] : options[:container_definitions]
@logger = Logger.new($stdout)

if @container_definitions.size >= 2
raise "Docker runner does not support multi container currently"
Expand All @@ -37,15 +38,16 @@ def run(class_name, method_name, args, container_definition_overrides: {}, envir
exec_docker(definition: definition, cmd: ["bundle", "exec", "rake", "wrapbox:run"], environments: envs)
end

def run_cmd(cmds, container_definition_overrides: {}, environments: [])
def run_cmd(cmds, container_definition_overrides: {}, environments: [], ignore_signal: false)
ths = []
definition = container_definition
.merge(container_definition_overrides)

environments = extract_environments(environments)

cmds << nil if cmds.empty?
ths = cmds.map.with_index do |cmd, idx|
Thread.new(cmd, idx) do |c, i|
cmds.each_with_index do |cmd, idx|
ths << Thread.new(cmd, idx) do |c, i|
envs = environments + ["WRAPBOX_CMD_INDEX=#{idx}"]
exec_docker(
definition: definition,
Expand All @@ -54,7 +56,23 @@ def run_cmd(cmds, container_definition_overrides: {}, environments: [])
)
end
end
ths.each(&:join)
ths.each { |th| th&.join }

true
rescue SignalException => e
sig = e.is_a?(Interrupt) ? "SIGINT" : e.signm
if ignore_signal
@logger.info("Receive #{sig} signal. But Docker container continue running")
else
@logger.info("Receive #{sig} signal. Stop All tasks")
ths.each do |th|
th.report_on_exception = false
th.raise(e)
end
thread_timeout = 15
ths.each { |th| th.join(thread_timeout) }
end
nil
end

private
Expand Down Expand Up @@ -93,6 +111,9 @@ def exec_docker(definition:, cmd:, environments: [])
unless resp["StatusCode"].zero?
raise ExecutionError, "exit_code=#{resp["StatusCode"]}"
end
rescue SignalException => e
sig = e.is_a?(Interrupt) ? "SIGINT" : e.signm
container&.kill(signal: sig)
ensure
container.remove(force: true) if container && !keep_container
end
Expand All @@ -116,6 +137,7 @@ class Cli < Thor
method_option :cpu, type: :numeric
method_option :memory, type: :numeric
method_option :environments, aliases: "-e"
method_option :ignore_signal, type: :boolean, default: false, desc: "Even if receive a signal (like TERM, INT, QUIT), Docker container continue running"
def run_cmd(*args)
repo = Wrapbox::ConfigRepository.new.tap { |r| r.load_yaml(options[:config]) }
config = repo.get(options[:config_name])
Expand All @@ -129,7 +151,9 @@ def run_cmd(*args)
else
container_definition_overrides = {}
end
runner.run_cmd(args, environments: environments, container_definition_overrides: container_definition_overrides)
unless runner.run_cmd(args, environments: environments, container_definition_overrides: container_definition_overrides, ignore_signal: options[:ignore_signal])
exit 1
end
end
end
end
Expand Down
56 changes: 50 additions & 6 deletions lib/wrapbox/runner/ecs.rb
Expand Up @@ -22,6 +22,7 @@ class LackResource < StandardError; end

EXECUTION_RETRY_INTERVAL = 3
WAIT_DELAY = 5
TERM_TIMEOUT = 120
HOST_TERMINATED_REASON_REGEXP = /Host EC2.*terminated/

attr_reader \
Expand Down Expand Up @@ -128,12 +129,14 @@ def run(class_name, method_name, args, container_definition_overrides: {}, **par
)
end

def run_cmd(cmds, container_definition_overrides: {}, **parameters)
def run_cmd(cmds, container_definition_overrides: {}, ignore_signal: false, **parameters)
ths = []

task_definition = prepare_task_definition(container_definition_overrides)

cmds << nil if cmds.empty?
ths = cmds.map.with_index do |cmd, idx|
Thread.new(cmd, idx) do |c, i|
cmds.each_with_index do |cmd, idx|
ths << Thread.new(cmd, idx) do |c, i|
Thread.current[:cmd_index] = i
envs = (parameters[:environments] || []) + [{name: "WRAPBOX_CMD_INDEX", value: i.to_s}]
run_task(
Expand All @@ -143,7 +146,23 @@ def run_cmd(cmds, container_definition_overrides: {}, **parameters)
)
end
end
ths.each(&:join)
ths.each { |th| th&.join }

true
rescue SignalException => e
sig = e.is_a?(Interrupt) ? "SIGINT" : e.signm
if ignore_signal
@logger.info("Receive #{sig} signal. But ECS Tasks continue running")
else
@logger.info("Receive #{sig} signal. Stop All tasks")
ths.each do |th|
th.report_on_exception = false
th.raise(e)
end
thread_timeout_buffer = 15
ths.each { |th| th.join(TERM_TIMEOUT + thread_timeout_buffer) }
end
nil
end

private
Expand All @@ -158,6 +177,8 @@ def run_task(task_definition_arn, class_name, method_name, args, command, parame

begin
task = create_task(task_definition_arn, class_name, method_name, args, command, parameter)
return unless task # only Task creation aborted by SignalException

@log_fetcher.run if @log_fetcher

@logger.debug("Launch Task: #{task.task_arn}")
Expand Down Expand Up @@ -188,6 +209,14 @@ def run_task(task_definition_arn, class_name, method_name, args, command, parame
sleep EXECUTION_RETRY_INTERVAL
retry
end
rescue SignalException
client.stop_task(
cluster: cl,
task: task.task_arn,
reason: "signal interrupted"
)
wait_task_stopped(cl, task.task_arn, TERM_TIMEOUT)
@logger.debug("Stop Task: #{task.task_arn}")
ensure
if @log_fetcher
begin
Expand Down Expand Up @@ -270,6 +299,17 @@ def create_task(task_definition_arn, class_name, method_name, args, command, par
current_retry_interval = [current_retry_interval * parameter.retry_interval_multiplier, parameter.max_retry_interval].min
retry
end
rescue SignalException
if task
client.stop_task(
cluster: cl,
task: task.task_arn,
reason: "signal interrupted"
)
wait_task_stopped(cl, task.task_arn, TERM_TIMEOUT)
@logger.debug("Stop Task: #{task.task_arn}")
nil
end
end
end

Expand Down Expand Up @@ -449,6 +489,7 @@ class Cli < Thor
method_option :launch_retry, type: :numeric
method_option :execution_retry, type: :numeric
method_option :max_retry_interval, type: :numeric
method_option :ignore_signal, type: :boolean, default: false, desc: "Even if receive a signal (like TERM, INT, QUIT), ECS Tasks continue running"
def run_cmd(*args)
repo = Wrapbox::ConfigRepository.new.tap { |r| r.load_yaml(options[:config]) }
config = repo.get(options[:config_name])
Expand All @@ -464,14 +505,17 @@ def run_cmd(*args)
launch_timeout: options[:launch_timeout],
launch_retry: options[:launch_retry],
execution_retry: options[:execution_retry],
max_retry_interval: options[:max_retry_interval]
max_retry_interval: options[:max_retry_interval],
ignore_signal: options[:ignore_signal],
}.reject { |_, v| v.nil? }
if options[:cpu] || options[:memory]
container_definition_overrides = {cpu: options[:cpu], memory: options[:memory]}.reject { |_, v| v.nil? }
else
container_definition_overrides = {}
end
runner.run_cmd(args, environments: environments, container_definition_overrides: container_definition_overrides, **run_options)
unless runner.run_cmd(args, environments: environments, container_definition_overrides: container_definition_overrides, **run_options)
exit 1
end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion spec/config.yml
@@ -1,5 +1,5 @@
default:
cluster: ecsr-test
cluster: <%= ENV["ECS_CLUSTER"] %>
runner: ecs
region: ap-northeast-1
container_definition:
Expand Down
44 changes: 43 additions & 1 deletion spec/wrapbox_spec.rb
Expand Up @@ -3,7 +3,7 @@
describe Wrapbox do
it "can load yaml" do
config = Wrapbox.configs[:default]
expect(config.cluster).to eq("ecsr-test")
expect(config.cluster).to eq(ENV["ECS_CLUSTER"])
expect(config.region).to eq("ap-northeast-1")
expect(config.container_definition[:cpu]).to be_a(Integer)
end
Expand All @@ -23,6 +23,27 @@
Wrapbox.run_cmd(["ls ."], environments: [{name: "RAILS_ENV", value: "development"}])
end

specify "executable on ECS and kill task", aws: true do
r, w = IO.pipe
pid = fork do
puts "exec on child process"
r.close
unless Wrapbox.run_cmd(["ruby -e 'sleep 120'"], environments: [{name: "RAILS_ENV", value: "development"}])
w.write("ok")
w.flush
end
end

if pid
w.close
sleep 15
puts "send SIGTERM to child process"
Process.kill("SIGTERM", pid)
sleep 1
expect(r.read).to eq("ok")
end
end

specify "executable on ECS with error", aws: true do
expect {
Wrapbox.run_cmd(["ls no_dir"], environments: [{name: "RAILS_ENV", value: "development"}])
Expand All @@ -38,5 +59,26 @@
specify "executable on Docker" do
Wrapbox.run_cmd(["ls ."], config_name: :docker, environments: [{name: "RAILS_ENV", value: "development"}])
end

specify "executable on Docker and kill task" do
r, w = IO.pipe
pid = fork do
puts "exec on child process"
r.close
unless Wrapbox.run_cmd(["sleep 30"], config_name: :docker, environments: [{name: "RAILS_ENV", value: "development"}])
w.write("ok")
w.flush
end
end

if pid
w.close
sleep 10
puts "send SIGTERM to child process"
Process.kill("SIGTERM", pid)
sleep 1
expect(r.read).to eq("ok")
end
end
end
end

0 comments on commit 85a350a

Please sign in to comment.