Skip to content

Commit

Permalink
Rewrite CLI container exec websocket client (#2599)
Browse files Browse the repository at this point in the history
* remove bundled Kontena::Websocket::Client

* require kontena-websocket-client from github

* rewrite cli exec helper using Kontena::Websocket::Client

* rewrite cli container exec command using container_exec()

* fix container exec missing cmd_list

* refactor websocket exec helper connect/read/write logging

* websocket logging with DEBUG=websocket

* tweak

* fix container id

* rewrite service exec command

* update service exec spec to just spec container_exec()

* cli container exec command spec

* cli spec client helpers: make master_url customizable

* cli exec helper: spec websocket_url, container_exec

* fix and spec read_stdin

* fix non-interactive exec to not send, spec websocket_exec

* spec SSL_IGNORE_ERRORS -> verify_mode

* join write thread to fix specs

* cli: refactor Kontena::Cli::Config::Server ssl_cert_path, ssl_cert, ssl_subject_cn for Kontena::Client

* cli websocket exec: use server SSL cert, subject cn

* cli websocket exec: rescue websocket errors to exit_with_error

* cli websocket exec client timeouts

* drop gemfile kontena-websocket-client from git, use the 0.1.x releases

* comment services ExecCommand#maybe_spinner

* fix e2e container exec 404 spec

* e2e spec container exec exit errors

* e2e spec service exec error exits
  • Loading branch information
SpComb committed Jul 27, 2017
1 parent bd88fcf commit a7e05ed
Show file tree
Hide file tree
Showing 17 changed files with 787 additions and 480 deletions.
2 changes: 1 addition & 1 deletion cli/kontena-cli.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ Gem::Specification.new do |spec|
spec.add_runtime_dependency "semantic", "~> 1.5"
spec.add_runtime_dependency "liquid", "~> 4.0.0"
spec.add_runtime_dependency "tty-table", "~> 0.8.0"
spec.add_runtime_dependency "websocket-driver-kontena", "0.6.5"
spec.add_runtime_dependency "kontena-websocket-client", "~> 0.1.0"
end
9 changes: 7 additions & 2 deletions cli/lib/kontena/cli/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,10 @@ def use_refresh_token(server)
return unless server.token
return unless server.token.refresh_token
return if server.token.expired?
client = Kontena::Client.new(server.url, server.token)
client = Kontena::Client.new(server.url, server.token,
ssl_cert_path: server.ssl_cert_path,
ssl_subject_cn: server.ssl_subject_cn,
)
logger.debug "Trying to invalidate refresh token on #{server.name}"
client.refresh_token
rescue => ex
Expand Down Expand Up @@ -205,7 +208,9 @@ def client(token = nil, api_url = nil)

@client = Kontena::Client.new(
api_url || require_current_master.url,
token || require_current_master.token
token || require_current_master.token,
ssl_cert_path: require_current_master.ssl_cert_path,
ssl_subject_cn: require_current_master.ssl_subject_cn,
)
end

Expand Down
33 changes: 33 additions & 0 deletions cli/lib/kontena/cli/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,39 @@ def initialize(*args)
super
@table[:account] ||= 'master'
end

def uri
@uri ||= URI.parse(self.url)
end

# @return [String, nil] path to ~/.kontena/certs/*.pem
def ssl_cert_path
path = File.join(Dir.home, '.kontena', 'certs', "#{self.uri.host}.pem")

if File.exist?(path) && File.readable?(path)
return path
else
return nil
end
end

# @return [OpenSSL::X509::Certificate, nil]
def ssl_cert
if path = self.ssl_cert_path
return OpenSSL::X509::Certificate.new(File.read(path))
else
return nil
end
end

# @return [String, nil] ssl cert subject CN=
def ssl_subject_cn
if cert = self.ssl_cert
return cert.subject.to_a.select{|name, data, type| name == 'CN' }.map{|name, data, type| data }.first
else
nil
end
end
end

class Token < OpenStruct
Expand Down
45 changes: 12 additions & 33 deletions cli/lib/kontena/cli/containers/exec_command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,42 +9,21 @@ class ExecCommand < Kontena::Command
parameter "CONTAINER_ID", "Container id"
parameter "CMD ...", "Command"

option ["--shell"], :flag, "Execute as a shell command"
option ["-i", "--interactive"], :flag, "Keep stdin open"
option ["-t", "--tty"], :flag, "Allocate a pseudo-TTY"
option ["--shell"], :flag, "Execute as a shell command", default: false
option ["-i", "--interactive"], :flag, "Keep stdin open", default: false
option ["-t", "--tty"], :flag, "Allocate a pseudo-TTY", default: false

requires_current_master
requires_current_grid

def execute
exit_with_error "the input device is not a TTY" if tty? && !STDIN.tty?
exit_status = container_exec("#{current_grid}/#{self.container_id}", self.cmd_list,
interactive: interactive?,
shell: shell?,
tty: tty?,
)

require_api_url
token = require_token
cmd = JSON.dump({cmd: cmd_list})
queue = Queue.new
stdin_reader = nil
url = ws_url("#{current_grid}/#{container_id}", interactive: interactive?, shell: shell?, tty: tty?)
ws = connect(url, token)
ws.on :message do |msg|
data = parse_message(msg)
queue << data if data.is_a?(Hash)
end
ws.on :open do
ws.text(cmd)
stdin_reader = self.stream_stdin_to_ws(ws, tty: self.tty?) if self.interactive?
end
ws.on :close do |e|
if e.reason.include?('code: 404')
queue << {'exit' => 1, 'message' => 'Not found'}
else
queue << {'exit' => 1}
end
end
ws.connect
while msg = queue.pop
self.handle_message(msg)
end
rescue SystemExit
stdin_reader.kill if stdin_reader
raise
exit exit_status unless exit_status.zero?
end
end
end
227 changes: 165 additions & 62 deletions cli/lib/kontena/cli/helpers/exec_helper.rb
Original file line number Diff line number Diff line change
@@ -1,87 +1,190 @@
require_relative '../../websocket/client'
require 'io/console'
require 'kontena-websocket-client'

module Kontena::Cli::Helpers
module ExecHelper

# @param [WebSocket::Client::Simple] ws
# @return [Thread]
def stream_stdin_to_ws(ws, tty: nil)
require 'io/console'
Thread.new {
if tty
STDIN.raw {
while char = STDIN.readpartial(1024)
ws.text(JSON.dump({ stdin: char }))
end
}
else
while char = STDIN.gets
ws.text(JSON.dump({ stdin: char }))
websocket_log_level = if ENV["DEBUG"] == 'websocket'
Logger::DEBUG
elsif ENV["DEBUG"]
Logger::INFO
else
Logger::WARN
end

Kontena::Websocket::Logging.initialize_logger(STDERR, websocket_log_level)

WEBSOCKET_CLIENT_OPTIONS = {
connect_timeout: ENV["EXCON_CONNECT_TIMEOUT"] ? ENV["EXCON_CONNECT_TIMEOUT"].to_f : 5.0,
open_timeout: ENV["EXCON_CONNECT_TIMEOUT"] ? ENV["EXCON_CONNECT_TIMEOUT"].to_f : 5.0,
ping_interval: ENV["EXCON_READ_TIMEOUT"] ? ENV["EXCON_READ_TIMEOUT"].to_f : 30.0,
ping_timeout: ENV["EXCON_CONNECT_TIMEOUT"] ? ENV["EXCON_CONNECT_TIMEOUT"].to_f : 5.0,
close_timeout: ENV["EXCON_CONNECT_TIMEOUT"] ? ENV["EXCON_CONNECT_TIMEOUT"].to_f : 5.0,
write_timeout: ENV["EXCON_WRITE_TIMEOUT"] ? ENV["EXCON_WRITE_TIMEOUT"].to_f : 5.0,
}

# @param ws [Kontena::Websocket::Client]
# @param tty [Boolean] read stdin in raw mode, sending tty escapes for remote pty
# @raise [ArgumentError] not a tty
# @yield [data]
# @yieldparam data [String] data from stdin
# @raise [ArgumentError] not a tty
# @return EOF on stdin (!tty)
def read_stdin(tty: nil)
if tty
raise ArgumentError, "the input device is not a TTY" unless STDIN.tty?

STDIN.raw { |io|
# we do not expect EOF on a TTY, ^D sends a tty escape to close the pty instead
loop do
# raises EOFError, SyscallError or IOError
yield io.readpartial(1024)
end
ws.text(JSON.dump({ stdin: nil }))
}
else
# line-buffered
while line = STDIN.gets
yield line
end
}
end
end

# @param [Hash] msg
def handle_message(msg)
if msg.has_key?('exit')
if msg['message']
exit_with_error(msg['message'])
else
exit msg['exit'].to_i
end
elsif msg.has_key?('stream')
if msg['stream'] == 'stdout'
$stdout << msg['chunk']
else
$stderr << msg['chunk']
# @return [String]
def websocket_url(path, query = nil)
url = URI.parse(require_current_master.url)
url.scheme = url.scheme.sub('http', 'ws')
url.path = '/v1/' + path
url.query = (query && !query.empty?) ? URI.encode_www_form(query) : nil
url.to_s
end

# @param ws [Kontena::Websocket::Client]
# @return [Integer] exit code
def websocket_exec_read(ws)
ws.read do |msg|
msg = JSON.parse(msg)

logger.debug "websocket exec read: #{msg.inspect}"

if msg.has_key?('exit')
# breaks the read loop
return msg['exit'].to_i
elsif msg.has_key?('stream')
if msg['stream'] == 'stdout'
$stdout << msg['chunk']
else
$stderr << msg['chunk']
end
end
end
end

# @param [Websocket::Frame::Incoming] msg
def parse_message(msg)
JSON.parse(msg.data)
rescue JSON::ParserError
nil
# @param ws [Kontena::Websocket::Client]
# @param msg [Hash]
def websocket_exec_write(ws, msg)
logger.debug "websocket exec write: #{msg.inspect}"

ws.send(JSON.dump(msg))
end

# Start thread to read from stdin, and write to websocket.
# Closes websocket on stdin read errors.
#
# @param ws [Kontena::Websocket::Client]
# @param tty [Boolean]
# @return [Thread]
def websocket_exec_write_thread(ws, tty: nil)
Thread.new do
begin
read_stdin(tty: tty) do |stdin|
websocket_exec_write(ws, 'stdin' => stdin)
end
websocket_exec_write(ws, 'stdin' => nil) # EOF
rescue => exc
logger.error exc
ws.close(1001, "stdin read #{exc.class}: #{exc}")
end
end
end

# @param container_id [String] The container id
# Connect to server websocket, send from stdin, and write out messages
#
# @param paths [String]
# @param options [Hash] @see Kontena::Websocket::Client
# @param cmd [Array<String>] command to execute
# @param interactive [Boolean] Interactive TTY on/off
# @param shell [Boolean] Shell on/of
# @param tty [Boolean] TTY on/of
# @return [String]
def ws_url(container_id, interactive: false, shell: false, tty: false)
require 'uri' unless Object.const_defined?(:URI)
extend Kontena::Cli::Common unless self.respond_to?(:require_current_master)
# @return [Integer] exit code
def websocket_exec(path, cmd, interactive: false, shell: false, tty: false)
exit_status = nil
write_thread = nil

url = URI.parse(require_current_master.url)
url.scheme = url.scheme.sub('http', 'ws')
url.path = "/v1/containers/#{container_id}/exec"
if shell || interactive || tty
query = {}
query.merge!(interactive: true) if interactive
query.merge!(shell: true) if shell
query.merge!(tty: true) if tty
url.query = URI.encode_www_form(query)
end
url.to_s
end
query = {}
query[:interactive] = interactive if interactive
query[:shell] = shell if shell
query[:tty] = tty if tty

# @param [String] url
# @param [String] token
# @return [WebSocket::Client::Simple]
def connect(url, token)
options = {
headers: {
server = require_current_master
url = websocket_url(path, query)
token = require_token
options = WEBSOCKET_CLIENT_OPTIONS.dup
options[:headers] = {
'Authorization' => "Bearer #{token.access_token}"
}
}
if ENV['SSL_IGNORE_ERRORS'].to_s == 'true'
options[:verify_mode] = ::OpenSSL::SSL::VERIFY_NONE
options[:ssl_params] = {
verify_mode: ENV['SSL_IGNORE_ERRORS'].to_s == 'true' ? OpenSSL::SSL::VERIFY_NONE : OpenSSL::SSL::VERIFY_PEER,
ca_file: server.ssl_cert_path,
}
options[:ssl_hostname] = server.ssl_subject_cn

logger.debug { "websocket exec connect... #{url}" }

# we do not expect CloseError, because the server will send an 'exit' message first,
# and we return before seeing the close frame
# TODO: handle HTTP 404 errors
Kontena::Websocket::Client.connect(url, **options) do |ws|
logger.debug { "websocket exec open" }

# first frame contains exec command
websocket_exec_write(ws, 'cmd' => cmd)

if interactive
# start new thread to write from stdin to websocket
write_thread = websocket_exec_write_thread(ws, tty: tty)
end

# blocks reading from websocket, returns with exec exit code
exit_status = websocket_exec_read(ws)

fail ws.close_reason unless exit_status
end

rescue Kontena::Websocket::Error => exc
exit_with_error(exc)

rescue => exc
logger.error { "websocket exec error: #{exc}" }
raise

else
logger.debug { "websocket exec exit: #{exit_status}"}
return exit_status

ensure
if write_thread
write_thread.kill
write_thread.join
end
Kontena::Websocket::Client.new(url, options)
end

# Execute command on container using websocket API.
#
# @param id [String] Container ID (grid/host/name)
# @param cmd [Array<String>] command to execute
# @return [Integer] exit code
def container_exec(id, cmd, **exec_options)
websocket_exec("containers/#{id}/exec", cmd, **exec_options)
end
end
end
Loading

0 comments on commit a7e05ed

Please sign in to comment.