From fe06ff26410100bb9b62873fc6072726971f0fb5 Mon Sep 17 00:00:00 2001 From: Tero Marttila Date: Wed, 19 Jul 2017 16:31:45 +0300 Subject: [PATCH 1/6] require kontena-websocket-client gem from github --- agent/Dockerfile | 4 ++-- agent/Gemfile | 3 +-- agent/Gemfile.lock | 16 +++++++++------- agent/lib/kontena-agent.rb | 4 +--- 4 files changed, 13 insertions(+), 14 deletions(-) diff --git a/agent/Dockerfile b/agent/Dockerfile index 4d3d60aa76..3f7bdf3883 100644 --- a/agent/Dockerfile +++ b/agent/Dockerfile @@ -7,7 +7,7 @@ RUN apk update && apk --update add tzdata ruby ruby-irb ruby-bigdecimal \ ADD Gemfile /app/ ADD Gemfile.lock /app/ -RUN apk --update add --virtual build-dependencies ruby-dev build-base openssl-dev && \ +RUN apk --update add --virtual build-dependencies ruby-dev build-base openssl-dev git && \ gem install bundler --no-ri --no-rdoc && \ cd /app ; bundle install --without development test && \ apk del build-dependencies @@ -15,4 +15,4 @@ RUN apk --update add --virtual build-dependencies ruby-dev build-base openssl-de WORKDIR /app ADD . /app -CMD ["/app/bin/kontena-agent"] +CMD ["bundle", "exec", "/app/bin/kontena-agent"] diff --git a/agent/Gemfile b/agent/Gemfile index df3169ab7b..397918fcc2 100644 --- a/agent/Gemfile +++ b/agent/Gemfile @@ -1,8 +1,7 @@ source 'https://rubygems.org' +gem 'kontena-websocket-client', git: 'https://github.com/kontena/kontena-websocket-client.git' gem 'docker-api', '~> 1.32.0' -gem 'eventmachine', '~> 1.2.3' -gem 'faye-websocket', '~> 0.10.7' gem 'msgpack', '~> 1.0.3' gem 'activesupport', '~> 4.2.0' gem 'celluloid', '~> 0.17.3' diff --git a/agent/Gemfile.lock b/agent/Gemfile.lock index a9c9d34f64..a062807d50 100644 --- a/agent/Gemfile.lock +++ b/agent/Gemfile.lock @@ -1,3 +1,10 @@ +GIT + remote: https://github.com/kontena/kontena-websocket-client.git + revision: e70d1435cdce9d2979e5b9914cbdf05d6e0e1f57 + specs: + kontena-websocket-client (0.1.0) + websocket-driver (~> 0.6.5) + GEM remote: https://rubygems.org/ specs: @@ -35,11 +42,7 @@ GEM dotenv (1.0.2) etcd (0.3.0) mixlib-log - eventmachine (1.2.3) excon (0.54.0) - faye-websocket (0.10.7) - eventmachine (>= 0.12.0) - websocket-driver (>= 0.5.1) fluent-logger (0.6.2) msgpack (>= 0.5.6, < 2) hitimes (1.2.3) @@ -90,9 +93,8 @@ DEPENDENCIES docker-api (~> 1.32.0) dotenv etcd - eventmachine (~> 1.2.3) - faye-websocket (~> 0.10.7) fluent-logger (~> 0.6.2) + kontena-websocket-client! msgpack (~> 1.0.3) rake rspec @@ -102,4 +104,4 @@ DEPENDENCIES webmock BUNDLED WITH - 1.14.3 + 1.15.1 diff --git a/agent/lib/kontena-agent.rb b/agent/lib/kontena-agent.rb index 3450070db3..42733bae7a 100644 --- a/agent/lib/kontena-agent.rb +++ b/agent/lib/kontena-agent.rb @@ -1,13 +1,11 @@ require 'docker' -require 'faye/websocket' -require 'eventmachine' require 'thread' - require 'statsd' require 'celluloid/current' require 'celluloid/autostart' require 'active_support/core_ext/time' require 'active_support/core_ext/module/delegation' +require 'kontena-websocket-client' require_relative 'ipaddr_helpers' From 3b699fd2ad440dca34ba4d8abccdf8b480d00dc3 Mon Sep 17 00:00:00 2001 From: Tero Marttila Date: Wed, 19 Jul 2017 16:32:50 +0300 Subject: [PATCH 2/6] rewrite agent websocket client as a celluloid actor using kontena-websocket-client --- agent/lib/kontena/websocket_client.rb | 334 +++++---- .../spec/lib/kontena/websocket_client_spec.rb | 670 +++++++++++++----- 2 files changed, 707 insertions(+), 297 deletions(-) diff --git a/agent/lib/kontena/websocket_client.rb b/agent/lib/kontena/websocket_client.rb index d59e016142..2cc7545343 100644 --- a/agent/lib/kontena/websocket_client.rb +++ b/agent/lib/kontena/websocket_client.rb @@ -3,43 +3,44 @@ require_relative 'rpc_server' require_relative 'rpc_client' -module Faye::WebSocket::Client::Connection - # Workaround https://github.com/faye/faye-websocket-ruby/issues/103 - # force connection to close without waiting if the send buffer is full - def close_connection_after_writing - close_connection - end -end - +# Celluloid::Notifications: +# websocket:connect [nil] connecting, not yet connected +# websocket:open [nil] connected, websocket open +# websocket:disconnect [nil] websocket closing +# websocket:close [nil] websocket closed module Kontena class WebsocketClient + include Celluloid + include Celluloid::Notifications include Kontena::Logging STRFTIME = '%F %T.%NZ' - KEEPALIVE_INTERVAL = 30.0 # seconds - PING_TIMEOUT = Kernel::Float(ENV['WEBSOCKET_TIMEOUT'] || 5) + + CONNECT_TIMEOUT = 10.0 + OPEN_TIMEOUT = 10.0 + PING_INTERVAL = 30.0 # seconds + PING_TIMEOUT = Kernel::Float(ENV['WEBSOCKET_TIMEOUT'] || 5.0) + CLOSE_TIMEOUT = 10.0 + WRITE_TIMEOUT = 10.0 # this one is a little odd attr_reader :api_uri, :ws, :rpc_server, :ping_timer - delegate :on, to: :ws - # @param [String] api_uri # @param [String] grid_token # @param [String] node_token - def initialize(api_uri, grid_token: nil, node_token: nil) + # @param [Boolean] ssl_verify + def initialize(api_uri, grid_token: nil, node_token: nil, ssl_params: {}, ssl_hostname: nil, autostart: true) @api_uri = api_uri + @ssl_params = ssl_params + @ssl_hostname = ssl_hostname @grid_token = grid_token @node_token = node_token - @rpc_server = Kontena::RpcServer.supervise(as: :rpc_server) - @rpc_client = Kontena::RpcClient.supervise(as: :rpc_client, args: [self]) - @abort = false @connected = false @connecting = false - @ping_timer = nil if @node_token info "initialized with node token #{@node_token[0..8]}..., node ID #{host_id}" @@ -48,17 +49,8 @@ def initialize(api_uri, grid_token: nil, node_token: nil) else fail "Missing grid, node token" end - end - def ensure_connect - EM::PeriodicTimer.new(1) { - connect unless connected? - } - EM::PeriodicTimer.new(KEEPALIVE_INTERVAL) { - if connected? - EM.next_tick { verify_connection } - end - } + async.start if autostart end # @return [Boolean] @@ -71,17 +63,30 @@ def connecting? @connecting end + def rpc_server + Celluloid::Actor[:rpc_server] + end + def rpc_client + Celluloid::Actor[:rpc_client] + end + + def start + every(1.0) do + connect if !connected? unless connecting? + end + end + def connect - return if connecting? - @connected = false @connecting = true + info "connecting to master at #{api_uri}" headers = { 'Kontena-Node-Id' => host_id.to_s, 'Kontena-Version' => Kontena::Agent::VERSION, - 'Kontena-Node-Labels' => labels, + 'Kontena-Node-Labels' => labels.join(','), 'Kontena-Connected-At' => Time.now.utc.strftime(STRFTIME), } + if @node_token headers['Kontena-Node-Token'] = @node_token.to_s elsif @grid_token @@ -90,130 +95,222 @@ def connect fail "Missing grid, node token" end - @ws = Faye::WebSocket::Client.new(self.api_uri, nil, {headers: headers}) + @ws = Kontena::Websocket::Client.new(@api_uri, + headers: headers, + ssl_params: @ssl_params, + ssl_hostname: @ssl_hostname, + connect_timeout: CONNECT_TIMEOUT, + open_timeout: OPEN_TIMEOUT, + ping_interval: PING_INTERVAL, + ping_timeout: PING_TIMEOUT, + close_timeout: CLOSE_TIMEOUT, + ) - notify_actors('websocket:connect', self) + async.connect_client @ws - @ws.on :open do |event| - on_open(event) - end - @ws.on :message do |event| - on_message(event) - end - @ws.on :close do |event| - on_close(event) + publish('websocket:connect', nil) + + rescue => exc + error exc + + # abort connect, allow re-connecting + @connecting = false + end + + # Connect the websocket client, and read messages. + # + # Keeps running as a separate defer thread as long as the websocket client is connected. + # + # @param ws [Kontena::Websocket::Client] + def connect_client(ws) + actor = Actor.current + + # run the blocking websocket client connect+read in a separate thread + defer { + ws.on_pong do |delay| + actor.on_pong(delay) + end + + # blocks until open, raises on errors + ws.connect + + # These are called from the read_ws -> defer thread, proxy back to actor + actor.on_open + + ws.read do |message| + actor.on_message(message) + end + } + + rescue Kontena::Websocket::CloseError => exc + # handle known errors, will reconnect + on_close(exc.code, exc.reason) + + rescue Kontena::Websocket::Error => exc + # handle known errors, will reconnect + on_error exc + + rescue => exc + # XXX: crash instead of reconnecting on unknown errors? + error exc + + else + on_close(ws.close_code, ws.close_reason) + + ensure + @connected = false + @connecting = false + @ws = nil + + ws.disconnect + end + + def on_open + @connected = true + @connecting = false + + ssl_verify = ws.ssl_verify? + + begin + ssl_cert = ws.ssl_cert! + ssl_error = nil + rescue Kontena::Websocket::SSLVerifyError => exc + ssl_cert = exc.cert + ssl_error = exc end - @ws.on :error do |event| - on_error(event) + + if ssl_error + if ssl_cert + warn "insecure connection established with SSL errors: #{ssl_error}: #{ssl_cert.subject} (issuer #{ssl_cert.issuer})" + else + warn "insecure connection established with SSL errors: #{ssl_error}" + end + elsif ssl_cert + if !ssl_verify + warn "secure connection established without KONTENA_SSL_VERIFY=true: #{ssl_cert.subject} (issuer #{ssl_cert.issuer})" + else + info "secure connection established with KONTENA_SSL_VERIFY: #{ssl_cert.subject} (issuer #{ssl_cert.issuer})" + end + else + info "unsecure connection established without SSL" end + + publish('websocket:open', nil) end - ## + def ws + fail "not connected" unless @ws + + @ws + end + + # Called from RpcServer, does not crash the Actor on errors. + # # @param [String, Array] msg + # @raise [RuntimeError] not connected def send_message(msg) - EM.next_tick { - begin - @ws.send(msg) if @ws - rescue - error "failed to send message" - end - } + ws.send(msg) rescue => exc - error "failed to send message: #{exc.message}" + warn exc + abort exc end + # Called from RpcClient, does not crash the Actor on errors. + # # @param [String] method # @param [Array] params + # @raise [RuntimeError] not connected def send_notification(method, params) data = MessagePack.dump([2, method, params]).bytes - send_message(data) + ws.send(data) rescue => exc - error "failed to send notification: #{exc.message}" + warn exc + abort exc end + # Called from RpcClient, does not crash the Actor on errors. + # # @param [Integer] id # @param [String] method # @param [Array] params + # @raise [RuntimeError] not connected def send_request(id, method, params) data = MessagePack.dump([0, id, method, params]).bytes - send_message(data) + ws.send(data) rescue => exc - error "failed to send request: #{exc.message}" + warn exc + abort exc end - # @param [Faye::WebSocket::API::Event] event - def on_open(event) - ping_timer.cancel if ping_timer - info 'connection established' - @connected = true - @connecting = false - notify_actors('websocket:open', event) - end - - # @param [Faye::WebSocket::API::Event] event - def on_message(event) - data = MessagePack.unpack(event.data.pack('c*')) + # @param [String] message + def on_message(message) + data = MessagePack.unpack(message.pack('c*')) if request_message?(data) - Celluloid::Actor[:rpc_server].async.handle_request(self, data) + rpc_server.async.handle_request(Actor.current, data) elsif response_message?(data) - Celluloid::Actor[:rpc_client].async.handle_response(data) + rpc_client.async.handle_response(data) elsif notification_message?(data) - Celluloid::Actor[:rpc_server].async.handle_notification(data) + rpc_server.async.handle_notification(data) end - rescue => exc - error exc.message end - def on_error(event) - debug event.message.inspect + # @param exc [Exception] + def on_error(exc) + case exc + when Kontena::Websocket::SSLVerifyError + if exc.cert + error "unable to connect to SSL server with KONTENA_SSL_VERIFY=true: #{exc} (subject #{exc.subject}, issuer #{exc.issuer})" + else + error "unable to connect to SSL server with KONTENA_SSL_VERIFY=true: #{exc}" + end + + when Kontena::Websocket::SSLConnectError + error "unable to connect to SSL server: #{exc}" + + when Kontena::Websocket::ConnectError + error "unable to connect to server: #{exc}" + + when Kontena::Websocket::ProtocolError + error "unexpected response from server, check url: #{exc}" - if event.message == Errno::EINVAL - error "invalid URI: #{api_uri}" - elsif event.message == Errno::ECONNREFUSED - error "connection refused: #{api_uri}" - elsif event.message == Errno::EPROTO - error "protocol error, check ws/wss: #{api_uri}" else - error "connection error: #{event.message}" + error "websocket error: #{exc}" end end - # @param [Faye::WebSocket::API::Event] event - def on_close(event) - @ping_timer = nil - @connected = false - @connecting = false - @ws = nil + # @param code [Integer] + # @param reason [String] + def on_close(code, reason) + debug "Server closed connection with code #{code}: #{reason}" - case event.code + case code when 4001 handle_invalid_token when 4010 - handle_invalid_version(event.reason) + handle_invalid_version(reason) when 4040, 4041 - handle_invalid_connection(event.reason) + handle_invalid_connection(reason) else - warn "connection closed with code #{event.code}: #{event.reason}" + warn "connection closed with code #{code}: #{reason}" end - notify_actors('websocket:close', nil) - rescue => exc - error exc.message + + publish('websocket:close', nil) end def handle_invalid_token error 'master does not accept our token, shutting down ...' - EM.next_tick { abort('Shutting down ...') } + Kontena::Agent.shutdown end def handle_invalid_version(reason) agent_version = Kontena::Agent::VERSION error "master does not accept our version (#{agent_version}): #{reason}" - EM.next_tick { abort("Shutting down ...") } + Kontena::Agent.shutdown end def handle_invalid_connection(reason) error "master indicates that this agent should not reconnect: #{reason}" - EM.next_tick { abort("Shutting down ...") } + Kontena::Agent.shutdown end # @param [Array] msg @@ -241,49 +338,24 @@ def host_id # @return [Array] def labels - Docker.info['Labels'].to_a.join(',') + Docker.info['Labels'].to_a end - def verify_connection - return if @ping_timer - - ping_time = Time.now - @ping_timer = EM::Timer.new(PING_TIMEOUT) do - delay = Time.now - ping_time - - # @ping_timer remains nil until re-connected to prevent further keepalives while closing - if connected? - error 'keepalive ping %.2fs timeout, closing connection' % [delay] - close - end + def on_pong(delay) + if delay > PING_TIMEOUT / 2 + warn "server ping %.2fs of %.2fs timeout" % [delay, PING_TIMEOUT] + else + debug "server ping %.2fs of %.2fs timeout" % [delay, PING_TIMEOUT] end - ws.ping { - @ping_timer.cancel - @ping_timer = nil - - delay = Time.now - ping_time - - if delay > PING_TIMEOUT / 2 - warn "keepalive ping %.2fs of %.2fs timeout" % [delay, PING_TIMEOUT] - else - debug "keepalive ping %.2fs of %.2fs timeout" % [delay, PING_TIMEOUT] - end - } - rescue => exc - error exc.message end # Abort the connection, closing the websocket, with a timeout def close # stop sending messages, queue them up until reconnected - notify_actors('websocket:disconnect', nil) - - # send close frame; this has a 30s timeout - ws.close - end + publish('websocket:disconnect', nil) - def notify_actors(event, value) - Celluloid::Notifications.publish(event, value) + # send close frame with CLOSE_TIMEOUT + @ws.close end end end diff --git a/agent/spec/lib/kontena/websocket_client_spec.rb b/agent/spec/lib/kontena/websocket_client_spec.rb index 6e18fa0e29..7c74aa490e 100644 --- a/agent/spec/lib/kontena/websocket_client_spec.rb +++ b/agent/spec/lib/kontena/websocket_client_spec.rb @@ -1,248 +1,595 @@ - -describe Kontena::WebsocketClient do - let(:api_uri) { 'http://test' } - let(:grid_token) { 'test' } +describe Kontena::WebsocketClient, :celluloid => true do + let(:url) { 'ws://socket.example.com' } + let(:grid_token) { 'secret' } let(:node_token) { nil } + let(:ssl_params) { {} } + let(:options) { {} } + + let(:node_id) { 'ABCD' } + let(:labels) { ['region=test'] } - let(:subject) { described_class.new(api_uri, grid_token: grid_token, node_token: node_token)} + before do + allow_any_instance_of(described_class).to receive(:host_id).and_return(node_id) + allow_any_instance_of(described_class).to receive(:labels).and_return(labels) + end - before(:each) { - Celluloid.boot - allow_any_instance_of(described_class).to receive(:host_id).and_return('ABCD') - allow_any_instance_of(described_class).to receive(:labels).and_return(['region=test']) + let(:actor) { + described_class.new(url, + grid_token: grid_token, + node_token: node_token, + ssl_params: ssl_params, + autostart: false, + **options + ) } - after(:each) { Celluloid.shutdown } + subject { actor.wrapped_object } + let(:async) { instance_double(described_class) } - around(:each, :em => true) do |example| - EM.run { - example.run - EM.stop - } + before do + allow(subject).to receive(:async).and_return(async) + end + + before do + # run timers immediately, once + allow(subject.wrapped_object).to receive(:every) do |&block| + block.call + end end - describe '#connected?' do - it 'returns false by default' do - expect(subject.connected?).to eq(false) + describe '#initialize' do + it 'is not connected' do + expect(subject.connected?).to be false end - it 'returns true if connection is established' do - subject.on_open(spy(:event)) - expect(subject.connected?).to eq(true) + it 'is not connecting' do + expect(subject.connecting?).to be false + end + end + + describe '#start' do + it 'connects' do + expect(subject).to receive(:connect) + + actor.start end end describe '#connect' do - it 'sets connecting to true' do - expect { - subject.connect - }.to change{ subject.connecting? }.from(false).to(true) + it 'creates a websocket client and calls run_websocket' do + expect(async).to receive(:connect_client) + expect(subject).to receive(:publish).with('websocket:connect', nil) + + actor.connect + + expect(subject).to be_connecting + expect(subject).to_not be_connected + expect(subject.ws).to be_a Kontena::Websocket::Client + expect(subject.ws.url).to eq 'ws://socket.example.com' + expect(subject.ws.ssl?).to be false + expect(subject.ws.ssl_verify?).to be false + expect(subject.ws.instance_variable_get('@headers')).to match( + 'Kontena-Grid-Token' => 'secret', + 'Kontena-Node-Id' => 'ABCD', + 'Kontena-Version' => Kontena::Agent::VERSION, + 'Kontena-Node-Labels' => 'region=test', + 'Kontena-Connected-At' => String, + ) end - it 'sets connected to false' do - subject.on_open(spy) - expect { - subject.connect - }.to change{ subject.connected? }.from(true).to(false) + context 'with a node token' do + let(:grid_token) { nil } + let(:node_token) { 'node-secret' } + it 'creates a websocket client with Kontena-Node-Token header' do + expect(async).to receive(:connect_client) + + actor.connect + + expect(subject).to be_connecting + expect(subject).to_not be_connected + expect(subject.ws).to be_a Kontena::Websocket::Client + expect(subject.ws.url).to eq 'ws://socket.example.com' + expect(subject.ws.ssl?).to be false + expect(subject.ws.ssl_verify?).to be false + expect(subject.ws.instance_variable_get('@headers')).to match( + 'Kontena-Node-Token' => 'node-secret', + 'Kontena-Node-Id' => 'ABCD', + 'Kontena-Version' => Kontena::Agent::VERSION, + 'Kontena-Node-Labels' => 'region=test', + 'Kontena-Connected-At' => String, + ) + end end end - describe '#on_open' do - it 'sets connected to true' do - expect(subject.connected?).to be_falsey - subject.on_open(spy) - expect(subject.connected?).to be_truthy + describe '#ws' do + it 'fails when not connected' do + expect{subject.ws}.to raise_error(RuntimeError, "not connected") end + end - it 'sets connecting to false' do - subject.connect - expect(subject.connecting?).to be_truthy - subject.on_open(spy) - expect(subject.connecting?).to be_falsey + describe '#send_message' do + it 'fails when not connected' do + expect{actor.send_message('asdf')}.to raise_error(RuntimeError, "not connected") end - - it 'cancels ping timer' do - timer = spy(:timer) - allow(subject).to receive(:ping_timer).and_return(timer) - expect(timer).to receive(:cancel) - subject.on_open(spy) + end + describe '#send_notification' do + it 'fails when not connected' do + expect{actor.send_notification('/test', [])}.to raise_error(RuntimeError, "not connected") + end + end + describe '#send_request' do + it 'fails when not connected' do + expect{actor.send_request(1, '/test', [])}.to raise_error(RuntimeError, "not connected") end end - describe '#on_error' do - context "For a server that is ECONNREFUSED" do - subject do - described_class.new('ws://127.0.0.1:1337', grid_token: 'test-token') - end + context 'with an invalid URL' do + let(:url) { 'http://api.example.com' } - it "logs an error", :em => false do - expect(subject).to receive(:error).with(/connection refused/) do |event| - EM.stop + describe '#connect' do + it 'logs error and does not set connecting' do + expect(subject).to receive(:error).with(ArgumentError) do |err| + expect(err.message).to eq 'Invalid websocket URL: http://api.example.com' end - # XXX: EM will segfault if the mock raises - # => https://github.com/eventmachine/eventmachine/issues/765 - EM.run { - subject.connect - } + actor.connect + + expect(subject.connecting?).to be false end end end - describe '#on_close' do - let(:event) { Faye::WebSocket::API::CloseEvent.new('close', {}) } + context 'for a wss:// URL with defaults' do + let(:url) { 'wss://socket.example.com' } - before do - allow(EM).to receive(:next_tick) do |&block| block.call end - end + describe '#connect' do + it 'creates a websocket client with ssl, and ssl_verify' do + expect(async).to receive(:connect_client) - it 'sets connected to false' do - subject.on_open(spy) - expect { - subject.on_close(event) - }.to change{ subject.connected? }.from(true).to(false) - end + actor.connect - it 'sets connecting to false' do - subject.instance_variable_set('@connecting', true) - expect { - subject.on_close(event) - }.to change{ subject.connecting? }.from(true).to(false) + expect(subject.ws).to be_a Kontena::Websocket::Client + expect(subject.ws.url).to eq 'wss://socket.example.com' + expect(subject.ws.ssl?).to be true + expect(subject.ws.ssl_verify?).to be true + end end + end - it 'aborts on 4001 error code' do - event = Faye::WebSocket::API::CloseEvent.new('close', code: 4001) - expect(subject).to receive(:handle_invalid_token).and_call_original - expect(subject).to receive(:abort) - subject.on_close(event) + context 'for a wss:// URL without ssl verify' do + let(:url) { 'wss://socket.example.com' } + let(:ssl_params) { { verify_mode: OpenSSL::SSL::VERIFY_NONE }} + + describe '#connect' do + it 'creates a websocket client with ssl and no ssl_verify' do + expect(async).to receive(:connect_client) + + actor.connect + + expect(subject.ws).to be_a Kontena::Websocket::Client + expect(subject.ws.url).to eq 'wss://socket.example.com' + expect(subject.ws.ssl?).to be true + expect(subject.ws.ssl_verify?).to be false + end end + end - it 'aborts on 4010 error code' do - event = Faye::WebSocket::API::CloseEvent.new('close', code: 4010) - expect(subject).to receive(:handle_invalid_version).and_call_original - expect(subject).to receive(:abort) - subject.on_close(event) + context 'for a wss:// URL with ssl_hostname' do + let(:url) { 'wss://socket.example.com' } + let(:options) { { ssl_hostname: 'test'} } + + describe '#connect' do + it 'creates a websocket client with ssl and ssl_hostname' do + expect(async).to receive(:connect_client) + + actor.connect + + expect(subject.ws).to be_a Kontena::Websocket::Client + expect(subject.ws.url).to eq 'wss://socket.example.com' + expect(subject.ws.ssl?).to be true + expect(subject.ws.ssl_hostname).to eq 'test' + expect(subject.ws.ssl_verify?).to be true + end end + end + + context 'for a connecting websocket client' do + let(:ws_client) { instance_double(Kontena::Websocket::Client) } - it 'disconnects on 4030 error code' do - event = Faye::WebSocket::API::CloseEvent.new('close', code: 4030) - expect(subject).to_not receive(:abort) - subject.on_close(event) + before do + subject.instance_variable_set('@connecting', true) + subject.instance_variable_set('@ws', ws_client) end - it 'aborts on 4040 error code' do - event = Faye::WebSocket::API::CloseEvent.new('close', code: 4040) - expect(subject).to receive(:handle_invalid_connection).and_call_original - expect(subject).to receive(:abort) - subject.on_close(event) + it 'is not connected' do + expect(subject.connected?).to be false end - it 'aborts on 4041 error code' do - event = Faye::WebSocket::API::CloseEvent.new('close', code: 4040) - expect(subject).to receive(:handle_invalid_connection).and_call_original - expect(subject).to receive(:abort) - subject.on_close(event) + it 'is connecting' do + expect(subject.connecting?).to be true end - it 'publishes websocket:close' do - expect(Celluloid::Notifications).to receive(:publish).with('websocket:close', nil) - subject.on_close(event) + describe '#start' do + it 'does not connect' do + expect(subject).not_to receive(:connect) + + actor.start + end end - end - describe '#verify_connection' do - let :ws do - instance_double(Faye::WebSocket::Client) + describe '#connect_client' do + before do + allow(ws_client).to receive(:on_pong) do |&block| + @on_pong = block + end + end + + it 'runs the websocket client in a separate thread, and is no longer connecting after it returns' do + expect(ws_client).to receive(:connect) do + expect(Celluloid.actor?).to be false + end + expect(subject).to receive(:on_open) do + expect(Celluloid.actor?).to be true + expect(Celluloid.current_actor).to eq actor + end + + expect(ws_client).to receive(:read) do |&block| + expect(subject).to receive(:on_message).with('test') + + block.call 'test' + + allow(ws_client).to receive(:close_code).and_return(1000) + allow(ws_client).to receive(:close_reason).and_return '' + end + expect(subject).to_not receive(:on_error) + expect(subject).to receive(:on_close).with(1000, '') + + expect(ws_client).to receive(:disconnect) + + actor.connect_client(ws_client) + + expect(subject.connecting?).to be false + expect(subject.connected?).to be false + end + + it 'handles websocket connect errors' do + expect(ws_client).to receive(:connect) do |&block| + raise Kontena::Websocket::SSLVerifyError.new(OpenSSL::X509::V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT), 'certificate verify failed: self signed certificate' + end + expect(ws_client).to receive(:disconnect) + + expect(subject).to receive(:on_error).with(Kontena::Websocket::SSLVerifyError) + + actor.connect_client(ws_client) + + expect(subject.connecting?).to be false + expect(subject.connected?).to be false + end + + it 'handles websocket read errors' do + expect(ws_client).to receive(:connect) + expect(subject).to receive(:on_open) + expect(ws_client).to receive(:read) do |&block| + raise Kontena::Websocket::TimeoutError, 'ping timeout' + end + expect(ws_client).to receive(:disconnect) + + expect(subject).to receive(:on_error).with(Kontena::Websocket::TimeoutError) + + actor.connect_client(ws_client) + + expect(subject.connecting?).to be false + expect(subject.connected?).to be false + end + + it 'handles websocket close errors' do + expect(ws_client).to receive(:connect) + expect(subject).to receive(:on_open) + + expect(ws_client).to receive(:read) do |&block| + raise Kontena::Websocket::CloseError.new(1337, 'testing') + end + expect(subject).to receive(:on_close).with(1337, 'testing') + + expect(ws_client).to receive(:disconnect) + + actor.connect_client(ws_client) + + expect(subject.connecting?).to be false + expect(subject.connected?).to be false + end + + it 'handles websocket close' do + expect(ws_client).to receive(:connect) + expect(subject).to receive(:on_open) + + expect(ws_client).to receive(:read) do |&block| + allow(ws_client).to receive(:close_code).and_return(1337) + allow(ws_client).to receive(:close_reason).and_return 'testing' + end + expect(subject).to receive(:on_close).with(1337, 'testing') + + expect(ws_client).to receive(:disconnect) + + actor.connect_client(ws_client) + + expect(subject.connecting?).to be false + expect(subject.connected?).to be false + end + + it 'handles unkonwn errors' do + expect(ws_client).to receive(:connect) do |&block| + fail 'test' + end + expect(ws_client).to receive(:disconnect) + + expect(subject).not_to receive(:on_error) + expect(subject.logger).to receive(:error) + + actor.connect_client(ws_client) + + expect(subject.connecting?).to be false + expect(subject.connected?).to be false + end end - before do - stub_const("Kontena::WebsocketClient::PING_TIMEOUT", 0.1) - allow(subject).to receive(:ws).and_return(ws) + describe '#ws' do + it 'returns websocket client' do + expect(subject.ws).to eq ws_client + end end - it "logs a warning if delay is over threshold", :em => false do - expect(ws).to receive(:ping) do |&block| - sleep 0.05 + describe '#on_open' do + context 'without ssl' do + before do + allow(ws_client).to receive(:ssl_cert!).and_return(nil) + allow(ws_client).to receive(:ssl_verify?).and_return(false) + end + + it 'updates connecting -> connected and published websocket:open' do + expect(subject).to receive(:info).with('unsecure connection established without SSL') + expect(subject).to receive(:publish).with('websocket:open', nil) + + actor.on_open + + expect(subject.connecting?).to be false + expect(subject.connected?).to be true + end + end + + context 'for a wss:// URL with ssl_verify' do + let(:url) { 'wss://socket.example.com' } + let(:ssl_params) { { verify_mode: OpenSSL::SSL::VERIFY_PEER }} + + before do + allow(ws_client).to receive(:ssl_verify?).and_return(true) + end - block.call + it 'logs the subject and issuer of a verified cert' do + expect(ws_client).to receive(:ssl_cert!).and_return(instance_double(OpenSSL::X509::Certificate, + issuer: '/CN=ca', + subject: '/CN=test', + )) + expect(subject).to receive(:info).with('secure connection established with KONTENA_SSL_VERIFY: /CN=test (issuer /CN=ca)') - EM.stop + actor.on_open + end end - expect(subject).to receive(:warn).with(/keepalive ping \d+.\d+s/) + context 'for a wss:// URL without ssl verify' do + let(:url) { 'wss://socket.example.com' } + let(:ssl_params) { { verify_mode: OpenSSL::SSL::VERIFY_NONE }} + + before do + allow(ws_client).to receive(:ssl_verify?).and_return(false) + end + + it 'logs a warning about unverified cert' do + expect(ws_client).to receive(:ssl_cert!).and_raise(Kontena::Websocket::SSLVerifyError.new(OpenSSL::X509::V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT), 'self signed certificate') + expect(subject).to receive(:warn).with('insecure connection established with SSL errors: certificate verify failed: self signed certificate') + + actor.on_open + end + + it 'logs a warning about verified cert' do + expect(ws_client).to receive(:ssl_cert!).and_return(instance_double(OpenSSL::X509::Certificate, + issuer: '/CN=ca', + subject: '/CN=test', + )) - EM.run { - subject.verify_connection - } + expect(subject).to receive(:warn).with('secure connection established without KONTENA_SSL_VERIFY=true: /CN=test (issuer /CN=ca)') + + actor.on_open + end + end end - it "logs an error and closes the connection if over timeout", :em => false do - expect(ws).to receive(:ping) do |&block| - # nothing, let the ping timer expire + describe '#on_error' do + let(:ssl_cert) { instance_double(OpenSSL::X509::Certificate, + subject: OpenSSL::X509::Name.parse('/CN=test'), + issuer: OpenSSL::X509::Name.parse('/CN=test-ca'), + ) } + + it 'logs ssl verify errors with cert details' do + expect(subject).to receive(:error).with("unable to connect to SSL server with KONTENA_SSL_VERIFY=true: certificate verify failed: self signed certificate (subject /CN=test, issuer /CN=test-ca)") + + subject.on_error(Kontena::Websocket::SSLVerifyError.new(OpenSSL::X509::V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT, ssl_cert, [], 'self signed certificate')) + end + + it 'logs ssl verify errors' do + expect(subject).to receive(:error).with("unable to connect to SSL server with KONTENA_SSL_VERIFY=true: certificate verify failed: self signed certificate") + + subject.on_error(Kontena::Websocket::SSLVerifyError.new(OpenSSL::X509::V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT, nil, [], 'self signed certificate')) + end + + it 'logs ssl errors' do + expect(subject).to receive(:error).with("unable to connect to SSL server: SSL_connect SYSCALL returned=5 errno=0 state=unknown state") + + subject.on_error(Kontena::Websocket::SSLConnectError.new('SSL_connect SYSCALL returned=5 errno=0 state=unknown state')) end - expect(subject).to receive(:connected?).and_return(true) - expect(subject).to receive(:error).with(/keepalive ping \d+.\d+s timeout/) + it 'logs protocol errors' do + expect(subject).to receive(:error).with("unexpected response from server, check url: Error during WebSocket handshake: Unexpected response code: 404") - expect(subject).to receive(:close) do - EM.stop + subject.on_error(Kontena::Websocket::ProtocolError.new('Error during WebSocket handshake: Unexpected response code: 404')) end - EM.run { - subject.verify_connection - } + it 'logs other errors' do + expect(subject).to receive(:error).with("websocket error: testing") + + subject.on_error(Kontena::Websocket::Error.new('testing')) # XXX: other examples? + end end end - describe '#close' do - let :ws do - instance_double(Faye::WebSocket::Client) - end + context 'for a connected websocket client' do + let(:ws_client) { instance_double(Kontena::Websocket::Client) } before do - allow(subject).to receive(:ws).and_return(ws) + subject.instance_variable_set('@connecting', false) + subject.instance_variable_set('@connected', true) + subject.instance_variable_set('@ws', ws_client) end - it 'publishes event', :em => true do - expect(Celluloid::Notifications).to receive(:publish).with('websocket:disconnect', nil) - expect(ws).to receive(:close) - subject.close + it 'is not connected' do + expect(subject.connected?).to be true end - context "for a connected websocket", :em => true do - let :open_event do - double(:open_event) + it 'is not connecting' do + expect(subject.connecting?).to be false + end + + describe '#start' do + it 'does not connect' do + expect(subject).not_to receive(:connect) + + actor.start end - let :close_event do - double(:close_event, code: 1006, reason: "Connection closed") + end + + describe '#ws' do + it 'returns websocket client' do + expect(subject.ws).to eq ws_client end + end - let :close_timer do - instance_double(EM::Timer) + describe '#send_message' do + it 'sends message via websocket client' do + expect(ws_client).to receive(:send).with('asdf') + + subject.send_message('asdf') end + end + describe '#send_notification' do + it 'sends message via websocket client' do + expect(ws_client).to receive(:send).with([147, 2, 165, 47, 116, 101, 115, 116, 145, 164, 97, 115, 100, 102]) + + subject.send_notification('/test', ['asdf']) + end + end + describe '#send_request' do + it 'sends message via websocket client' do + expect(ws_client).to receive(:send).with([148, 0, 1, 165, 47, 116, 101, 115, 116, 145, 164, 97, 115, 100, 102]) + + subject.send_request(1, '/test', ['asdf']) + end + end + + describe '#on_message' do + let(:rpc_request) { [0, 1, '/test', ['foo']] } + let(:rpc_response) { [1, 1, '/test', ['foo']] } + let(:rpc_notification) { [2, '/test', ['foo']] } + + let(:rpc_server) { instance_double(Kontena::RpcServer) } + let(:rpc_server_async) { instance_double(Kontena::RpcServer) } + let(:rpc_client) { instance_double(Kontena::RpcClient) } + let(:rpc_client_async) { instance_double(Kontena::RpcClient) } before do - subject.on_open open_event + allow(subject).to receive(:rpc_server).and_return(rpc_server) + allow(subject).to receive(:rpc_client).and_return(rpc_client) + + allow(rpc_server).to receive(:async).and_return(rpc_server_async) + allow(rpc_client).to receive(:async).and_return(rpc_client_async) + end + + it 'passes an RPC request to the rpc server' do + expect(rpc_server_async).to receive(:handle_request).with(actor, rpc_request) - expect(subject).to be_connected - expect(subject).to_not be_connecting + actor.on_message MessagePack.dump(rpc_request).bytes end - it 'eventually sets connection to closed ' do - expect(Celluloid::Notifications).to receive(:publish).with('websocket:disconnect', nil) + it 'passes an RPC response to the rpc client' do + expect(rpc_client_async).to receive(:handle_response).with(rpc_response) - expect(ws).to receive(:close) { } + actor.on_message MessagePack.dump(rpc_response).bytes + end + + it 'passes an RPC notification to the rpc server' do + expect(rpc_server_async).to receive(:handle_notification).with(rpc_notification) - subject.close + actor.on_message MessagePack.dump(rpc_notification).bytes + end + end - expect(subject).to be_connected - expect(subject).to_not be_connecting + describe '#on_close' do + it 'publishes websocket:close' do + expect(subject).to receive(:publish).with('websocket:close', nil) - expect(Celluloid::Notifications).to receive(:publish).with('websocket:close', nil) + actor.on_close(1000, '') + end - subject.on_close close_event + it 'aborts on 4001 error code', log_celluloid_actor_crashes: false do + expect(subject).to receive(:handle_invalid_token).and_call_original + expect(Kontena::Agent).to receive(:shutdown) - expect(subject).to_not be_connected - expect(subject).to_not be_connecting + actor.on_close(4001, "Invalid token") + end + + it 'aborts on 4010 error code', log_celluloid_actor_crashes: false do + expect(subject).to receive(:handle_invalid_version).and_call_original + expect(Kontena::Agent).to receive(:shutdown) + + actor.on_close(4010, "Invalid version") + end + + it 'disconnects on 4030 error code' do + expect(subject).to_not receive(:abort) + + actor.on_close(4030, "Invalid clock") + end + + it 'aborts on 4040 error code', log_celluloid_actor_crashes: false do + expect(subject).to receive(:handle_invalid_connection).and_call_original + expect(Kontena::Agent).to receive(:shutdown) + + actor.on_close(4040, "Invalid node") + end + + it 'aborts on 4041 error code', log_celluloid_actor_crashes: false do + expect(subject).to receive(:handle_invalid_connection).and_call_original + expect(Kontena::Agent).to receive(:shutdown) + + actor.on_close(4041, "Invalid connection") + end + end + + describe '#on_pong' do + it "logs a warning if delay is over threshold", :em => false do + sleep 0.05 + + expect(subject).to receive(:warn).with(/server ping 3.20s of 5.00s timeout/) + + subject.on_pong(3.2) + end + end + + describe '#close' do + it 'publishes event and closes websocket' do + expect(subject).to receive(:publish).with('websocket:disconnect', nil) + expect(ws_client).to receive(:close) + + actor.close end end end @@ -265,13 +612,4 @@ expect(subject.notification_message?(msg)).to be_truthy end end - - describe '#send_message' do - it 'does not raise error if ws is nil' do - allow(subject).to receive(:@ws).and_return(nil) - expect { - subject.send_message('foo') - }.not_to raise_error - end - end end From 3f671825396e0b17114d62f651d54dcd9bee23ae Mon Sep 17 00:00:00 2001 From: Tero Marttila Date: Wed, 19 Jul 2017 16:34:27 +0300 Subject: [PATCH 3/6] agent: supervise websocket client, refactor Kontena::Agent as singleton for shutdown --- agent/bin/kontena-agent | 5 +- agent/lib/kontena/agent.rb | 146 +++++++++++++++-------- docs/references/environment-variables.md | 4 + 3 files changed, 103 insertions(+), 52 deletions(-) diff --git a/agent/bin/kontena-agent b/agent/bin/kontena-agent index c3d871b84c..e4ee7e7e71 100755 --- a/agent/bin/kontena-agent +++ b/agent/bin/kontena-agent @@ -31,9 +31,12 @@ else end Kontena::Logging.initialize_logger(STDOUT, log_level) -agent = Kontena::Agent.new( +agent = Kontena::Agent.instance +agent.configure( api_uri: api_uri, grid_token: grid_token, node_token: node_token, + ssl_verify: ENV['KONTENA_SSL_VERIFY'], + ssl_hostname: ENV['KONTENA_SSL_HOSTNAME'], ) agent.run! diff --git a/agent/lib/kontena/agent.rb b/agent/lib/kontena/agent.rb index f128925c63..9926f3ac5a 100644 --- a/agent/lib/kontena/agent.rb +++ b/agent/lib/kontena/agent.rb @@ -1,74 +1,106 @@ +require 'singleton' require_relative 'logging' module Kontena class Agent + include Singleton include Logging VERSION = File.read('./VERSION').strip - def initialize(opts) + # Called from other actors + def self.shutdown + instance.write_signal('shutdown') + end + + def initialize info "initializing agent (version #{VERSION})" + + @read_pipe, @write_pipe = IO.pipe + end + + def configure(opts) @opts = opts - @client = Kontena::WebsocketClient.new(@opts[:api_uri], - grid_token: @opts[:grid_token], - node_token: @opts[:node_token], - ) - @supervisor = Celluloid::Supervision::Container.run! - self.supervise_state - self.supervise_launchers - self.supervise_network_adapter - self.supervise_lb - self.supervise_workers end - # Connect to master server - def connect! - start_em - @client.ensure_connect + def ssl_verify? + return false if @opts[:ssl_verify].nil? + return false if @opts[:ssl_verify].empty? + return true end - def run! - self_read, self_write = IO.pipe + def ssl_params + { + verify_mode: self.ssl_verify? ? OpenSSL::SSL::VERIFY_PEER : OpenSSL::SSL::VERIFY_NONE, + } + end - %w(TERM TTIN).each do |sig| - trap sig do - self_write.puts(sig) - end + def ssl_hostname + @opts[:ssl_hostname] + end + + def write_signal(sig) + @write_pipe.puts(sig) + end + + def run! + trap 'TERM' do + write_signal('shutdown') + end + trap 'TTIN' do + write_signal('trace') end - begin - connect! + self.supervise - while readable_io = IO.select([self_read]) - signal = readable_io.first[0].gets.strip - handle_signal(signal) - end - rescue Interrupt - exit(0) + while line = @read_pipe.gets + handle_signal line.strip end + rescue Interrupt + exit(0) end # @param [String] signal def handle_signal(signal) info "Got signal #{signal}" case signal - when 'TERM' - info "Shutting down..." - EM.stop - @supervisor.shutdown - raise Interrupt - when 'TTIN' - Thread.list.each do |thread| - warn "Thread #{thread.object_id.to_s(36)} #{thread['label']}" - if thread.backtrace - warn thread.backtrace.join("\n") - else - warn "no backtrace available" - end + when 'shutdown' + self.handle_shutdown + when 'trace' + self.handle_trace + end + end + + def handle_shutdown + info "Shutting down..." + @supervisor.shutdown # shutdown all actors + @write_pipe.close # let run! break and return + end + + def handle_trace + info "Dump thread trace..." + + Thread.list.each do |thread| + warn "Thread #{thread.object_id.to_s(36)} #{thread['label']}" + if thread.backtrace + warn thread.backtrace.join("\n") + else + warn "no backtrace available" end end end + def supervise + @supervisor = Celluloid::Supervision::Container.run! + + self.supervise_state + self.supervise_rpc + self.supervise_launchers + self.supervise_network_adapter + self.supervise_lb + self.supervise_workers + end + def supervise_state @supervisor.supervise( type: Kontena::Workers::NodeInfoWorker, @@ -76,6 +108,27 @@ def supervise_state ) end + def supervise_rpc + @supervisor.supervise( + type: Kontena::RpcServer, + as: :rpc_server, + ) + @supervisor.supervise( + type: Kontena::RpcClient, + as: :rpc_client, + ) + @supervisor.supervise( + type: Kontena::WebsocketClient, + as: :websocket_client, + args: [@opts[:api_uri], + grid_token: @opts[:grid_token], + node_token: @opts[:node_token], + ssl_params: self.ssl_params, + ssl_hostname: self.ssl_hostname, + ], + ) + end + def supervise_launchers @supervisor.supervise( type: Kontena::Launchers::IpamPlugin, @@ -159,14 +212,5 @@ def supervise_lb as: :lb_registrator ) end - - def start_em - EM.epoll - Thread.new { - Thread.current.abort_on_exception = true - EventMachine.run - } unless EventMachine.reactor_running? - sleep 0.01 until EventMachine.reactor_running? - end end end diff --git a/docs/references/environment-variables.md b/docs/references/environment-variables.md index 7b9d23e11b..04e8dad0cf 100644 --- a/docs/references/environment-variables.md +++ b/docs/references/environment-variables.md @@ -29,6 +29,8 @@ toc_order: 2 The `KONTENA_URI` and either of `KONTENA_TOKEN` or `KONTENA_NODE_TOKEN` is required. - `KONTENA_URI`: Kontena Master websocket uri, `ws://...` or `wss://...` (required) +- `KONTENA_SSL_VERIFY`: Verify `wss://` server SSL certificate (default: no verification) +- `KONTENA_SSL_HOSTNAME`: Override hostname for SSL SNI and certificate subject verification - `KONTENA_TOKEN`: Kontena [Grid token](../using-kontena/nodes.md#grid-token) - `KONTENA_NODE_TOKEN`: Kontena [Node token](../using-kontena/nodes.md#node-token) - `KONTENA_PEER_INTERFACE`: network interface for peer/private communication (default: eth1) @@ -44,6 +46,8 @@ The `KONTENA_URI` and either of `KONTENA_TOKEN` or `KONTENA_NODE_TOKEN` is requi - `WEAVEEXEC_IMAGE`: weave exec image (default: weaveworks/weaveexec) - `WEAVE_VERSION`: weave net version - `WEBSOCKET_TIMEOUT`: websocket timeout in seconds (default: 5.0) +- `SSL_CERT_FILE`: path to SSL CA cert bundle file +- `SSL_CERT_PATH`: path to SSL CA cert bundle directory ## Kontena CLI From 655ad88d2c97bc11e3372c1bd4936c7e42c72c59 Mon Sep 17 00:00:00 2001 From: Tero Marttila Date: Wed, 19 Jul 2017 16:35:11 +0300 Subject: [PATCH 4/6] agent rpc server: async send_message for responses to ignore errors --- agent/lib/kontena/rpc_server.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/agent/lib/kontena/rpc_server.rb b/agent/lib/kontena/rpc_server.rb index f550fba86d..201626b7f5 100644 --- a/agent/lib/kontena/rpc_server.rb +++ b/agent/lib/kontena/rpc_server.rb @@ -32,6 +32,7 @@ def initialize(code, message) exclusive :handle_notification ## + # @param ws_client [Kontena::WebsocketClient] celluloid actor proxy # @param [Array] message msgpack-rpc request array # @return [Array] def handle_request(ws_client, message) @@ -58,7 +59,7 @@ def handle_request(ws_client, message) # @param [WebsocketClient] ws_client # @param [Array, Hash] msg def send_message(ws_client, msg) - ws_client.send_message(MessagePack.dump(msg).bytes) + ws_client.async.send_message(MessagePack.dump(msg).bytes) end ## From 3d0a2bdd2f4ac0696d8c4f6bbf59d17eca4f9cc7 Mon Sep 17 00:00:00 2001 From: Tero Marttila Date: Wed, 19 Jul 2017 16:35:54 +0300 Subject: [PATCH 5/6] refactor agent rpc client to use websocket client actor, use Celluloid.abort to safely re-raise errors --- agent/lib/kontena/helpers/rpc_helper.rb | 8 +-- agent/lib/kontena/rpc_client.rb | 51 ++++++++----------- agent/spec/lib/kontena/rpc_client_spec.rb | 41 ++++----------- .../workers/service_pod_manager_spec.rb | 26 ++++------ .../workers/volumes/volume_manager_spec.rb | 14 +++-- 5 files changed, 48 insertions(+), 92 deletions(-) diff --git a/agent/lib/kontena/helpers/rpc_helper.rb b/agent/lib/kontena/helpers/rpc_helper.rb index 252f656853..74357f6884 100644 --- a/agent/lib/kontena/helpers/rpc_helper.rb +++ b/agent/lib/kontena/helpers/rpc_helper.rb @@ -12,13 +12,7 @@ def rpc_client # @raise [Kontena::RpcClient::Error] # @return [Object] def rpc_request(method, params) - response, error = rpc_client.request_with_error(method, params) - - if error - raise error - else - return response - end + rpc_client.request(method, params) end end end diff --git a/agent/lib/kontena/rpc_client.rb b/agent/lib/kontena/rpc_client.rb index 69b3d0f4ed..3c85b41180 100644 --- a/agent/lib/kontena/rpc_client.rb +++ b/agent/lib/kontena/rpc_client.rb @@ -23,72 +23,61 @@ def initialize(code, message) attr_reader :requests - # @param [Kontena::WebsocketClient] client - def initialize(client) + def initialize @requests = {} - @client = client info 'initialized' end + def websocket_client + Celluloid::Actor[:websocket_client] + end + def connected? - @client.connected? + websocket_client && websocket_client.connected? end # @param [String] method # @param [Array] params def notification(method, params) - @client.send_notification(method, params) + websocket_client.async.send_notification(method, params) rescue => exc logger.error exc.message end - # This method should not raise, or the Actor will crash, and terminate any other pending requests. + # Aborts caller on errors. # # @param [String] method # @param [Array] params # @param [Fixnum] timeout seconds - # @return [Object, Exception] - def request_with_error(method, params, timeout: 30) + # @raise abort + # @return [Object] + def request(method, params, timeout: 30) id = request_id @requests[id] = nil - if !wait_until("websocket client is connected", timeout: timeout, threshold: 10.0, interval: 0.1) { @client.connected? } - return nil, TimeoutError.new(500, 'WebsocketClient is not connected') + if !wait_until("websocket client is connected", timeout: timeout, threshold: 10.0, interval: 0.1) { connected? } + raise TimeoutError.new(500, 'WebsocketClient is not connected') end - @client.send_request(id, method, params) + websocket_client.send_request(id, method, params) if !wait_until("request #{method} has response wth id=#{id}", timeout: timeout, interval: 0.01) { @requests[id] } - return nil, TimeoutError.new(500, 'Request timed out') + raise TimeoutError.new(500, 'Request timed out') end result, error = @requests.delete(id) if error - return result, Error.new(error['code'], error['message']) - else - return result, nil - end - end - - # Async request wrapper. - # - # Logs a warning and returns nil on errors. - # Use Kontena::Helpers::RpcError.rpc_request to get a raised error instead. - # - # @return [Object, nil] - def request(method, params, **opts) - result, error = request_with_error(method, params, **opts) - - if error - warn "RPC request #{method} failed: #{error}" - return nil + raise Error.new(error['code'], error['message']) else return result end + rescue => exc + warn exc + abort exc end - # Called from Kontena::WebsocketClient in the EM thread + # Sent by the Kontena::WebsocketClient actor def handle_response(response) type, msgid, error, result = response @requests[msgid] = [result, error] diff --git a/agent/spec/lib/kontena/rpc_client_spec.rb b/agent/spec/lib/kontena/rpc_client_spec.rb index f09f62c593..48fc51c68e 100644 --- a/agent/spec/lib/kontena/rpc_client_spec.rb +++ b/agent/spec/lib/kontena/rpc_client_spec.rb @@ -1,10 +1,11 @@ describe Kontena::RpcClient, :celluloid => true do let(:ws_client) { instance_double(Kontena::WebsocketClient) } - let(:subject) { described_class.new(ws_client) } + let(:subject) { described_class.new() } before do allow(ws_client).to receive(:connected?).and_return(true) + allow(subject.wrapped_object).to receive(:websocket_client).and_return(ws_client) end describe '#request_id' do @@ -24,7 +25,7 @@ end end - describe '#request_with_error' do + describe '#request' do it "returns the response" do expect(ws_client).to receive(:send_request).with(Fixnum, "/test", ["foo"]) do |id, method, params| Celluloid.after(0.0) { @@ -32,7 +33,7 @@ } end - expect(subject.request_with_error("/test", ["foo"], timeout: 1.0)).to eq ["foobar", nil] + expect(subject.request("/test", ["foo"], timeout: 1.0)).to eq "foobar" end it "returns any error" do @@ -42,34 +43,16 @@ } end - expect(subject.request_with_error("/test", ["foo"], timeout: 1.0)).to eq [nil, Kontena::RpcClient::Error.new(500, "test error")] + expect{subject.request("/test", ["foo"], timeout: 1.0)}.to raise_error(Kontena::RpcClient::Error, "test error") end it "returns a timeout error" do expect(ws_client).to receive(:send_request).with(Fixnum, "/test", ["foo"]) # do nothing.. - expect(subject.request_with_error("/test", ["foo"], timeout: 0.01)).to match [nil, Kontena::RpcClient::TimeoutError] - end - end - - describe '#request' do - it "returns the response" do - expect(ws_client).to receive(:send_request).with(Fixnum, "/test", ["foo"]) do |id, method, params| - Celluloid.after(0.0) { - subject.async.handle_response([1, id, nil, "foobar"]) - } - end - - expect(subject.wrapped_object).not_to receive(:warn) - expect(subject.request("/test", ["foo"], timeout: 1.0)).to eq "foobar" - end - - it "returns nil and logs a warning on errors" do - expect(ws_client).to receive(:send_request).with(Fixnum, "/test", ["foo"]) # do nothing.. - expect(subject.wrapped_object).to receive(:warn).with(/timeout after waiting/) - expect(subject.wrapped_object).to receive(:warn).with("RPC request /test failed: Request timed out") - expect(subject.request("/test", ["foo"], timeout: 0.01)).to be_nil + expect(subject.wrapped_object).to receive(:warn).with(Kontena::RpcClient::TimeoutError) + + expect{subject.request("/test", ["foo"], timeout: 0.01)}.to raise_error(Kontena::RpcClient::TimeoutError) end end @@ -93,13 +76,9 @@ expect(subject.wrapped_object).to receive(:sleep).at_least(:once).and_call_original requests = (1..count).map{ |i| - subject.future.request_with_error("/echo", [i]) - } - responses = requests.map{|f| - response, error = f.value - expect(error).to be_nil - response[0] + subject.future.request("/echo", [i]) } + responses = requests.map{|f| f.value[0] } expect(responses).to match_array (1..count).to_a end diff --git a/agent/spec/lib/kontena/workers/service_pod_manager_spec.rb b/agent/spec/lib/kontena/workers/service_pod_manager_spec.rb index fb3a73ebf6..348b38a75f 100644 --- a/agent/spec/lib/kontena/workers/service_pod_manager_spec.rb +++ b/agent/spec/lib/kontena/workers/service_pod_manager_spec.rb @@ -24,25 +24,23 @@ end it 'calls terminate_workers' do - allow(rpc_client).to receive(:request_with_error).with('/node_service_pods/list', [node.id]).and_return([ + allow(rpc_client).to receive(:request).with('/node_service_pods/list', [node.id]).and_return( { 'service_pods' => [ { 'id' => 'a/1', 'instance_number' => 1} ] - }, - nil - ]) + } + ) expect(subject.wrapped_object).to receive(:terminate_workers).with(['a/1']) subject.populate_workers_from_master end it 'does not call terminate_workers if master returns something weird' do - allow(rpc_client).to receive(:request_with_error).with('/node_service_pods/list', [node.id]).and_return([ + allow(rpc_client).to receive(:request).with('/node_service_pods/list', [node.id]).and_return( { 'service_pods' => 'lolwtf' - }, - nil - ]) + } + ) expect(subject.wrapped_object).to receive(:error).with(/Invalid response from master/) expect(subject.wrapped_object).not_to receive(:terminate_workers) expect(subject.wrapped_object).not_to receive(:ensure_service_worker) @@ -51,10 +49,9 @@ end it 'does not call terminate_workers if RPC fails' do - allow(rpc_client).to receive(:request_with_error).with('/node_service_pods/list', [node.id]).and_return([ - nil, + allow(rpc_client).to receive(:request).with('/node_service_pods/list', [node.id]).and_raise( Kontena::RpcClient::Error.new(500, "random failure") - ]) + ) expect(subject.wrapped_object).to receive(:warn).with(/failed to get list of service pods from master/) expect(subject.wrapped_object).not_to receive(:terminate_workers) expect(subject.wrapped_object).not_to receive(:ensure_service_worker) @@ -63,15 +60,14 @@ end it 'calls ensure_service_worker for each service pod' do - allow(rpc_client).to receive(:request_with_error).with('/node_service_pods/list', [node.id]).and_return([ + allow(rpc_client).to receive(:request).with('/node_service_pods/list', [node.id]).and_return( { 'service_pods' => [ { 'id' => 'a/1', 'instance_number' => 1}, { 'id' => 'b/2', 'instance_number' => 2} ] - }, - nil - ]) + } + ) expect(subject.wrapped_object).to receive(:ensure_service_worker) do |s| expect(s.id).to eq('a/1') end diff --git a/agent/spec/lib/kontena/workers/volumes/volume_manager_spec.rb b/agent/spec/lib/kontena/workers/volumes/volume_manager_spec.rb index 7a44950065..c8ea504a21 100644 --- a/agent/spec/lib/kontena/workers/volumes/volume_manager_spec.rb +++ b/agent/spec/lib/kontena/workers/volumes/volume_manager_spec.rb @@ -25,27 +25,25 @@ describe '#populate_volumes_from_master' do it 'fails with a warning if no proper response from master' do - expect(rpc_client).to receive(:request_with_error).with('/node_volumes/list', [node.id]).and_return([ + expect(rpc_client).to receive(:request).with('/node_volumes/list', [node.id]).and_return( { 'volumes' => 'foo' - }, - nil - ]) + } + ) expect(subject.wrapped_object).to receive(:error).with(/Invalid response from master/) expect{subject.populate_volumes_from_master}.not_to raise_error end it 'calls terminate and ensure with volumes from master' do expect(subject.wrapped_object).to receive(:terminate_volumes).with(['123', '456']) - expect(rpc_client).to receive(:request_with_error).with('/node_volumes/list', [node.id]).and_return([ + expect(rpc_client).to receive(:request).with('/node_volumes/list', [node.id]).and_return( { 'volumes' => [ {'name' => 'foo', 'volume_instance_id' => '123'}, {'name' => 'bar', 'volume_instance_id' => '456'} ] - }, - nil - ]) + } + ) expect(subject.wrapped_object).to receive(:ensure_volume).twice subject.populate_volumes_from_master end From 87d24a85409f019efc40b3624b7448e772a57b22 Mon Sep 17 00:00:00 2001 From: Tero Marttila Date: Mon, 24 Jul 2017 11:32:25 +0300 Subject: [PATCH 6/6] use kontena-websocket-client gem --- agent/Dockerfile | 2 +- agent/Gemfile | 2 +- agent/Gemfile.lock | 13 ++++--------- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/agent/Dockerfile b/agent/Dockerfile index 3f7bdf3883..37879abe2e 100644 --- a/agent/Dockerfile +++ b/agent/Dockerfile @@ -15,4 +15,4 @@ RUN apk --update add --virtual build-dependencies ruby-dev build-base openssl-de WORKDIR /app ADD . /app -CMD ["bundle", "exec", "/app/bin/kontena-agent"] +CMD ["/app/bin/kontena-agent"] diff --git a/agent/Gemfile b/agent/Gemfile index 397918fcc2..cf58a533f3 100644 --- a/agent/Gemfile +++ b/agent/Gemfile @@ -1,6 +1,6 @@ source 'https://rubygems.org' -gem 'kontena-websocket-client', git: 'https://github.com/kontena/kontena-websocket-client.git' +gem 'kontena-websocket-client', '~> 0.1.0' gem 'docker-api', '~> 1.32.0' gem 'msgpack', '~> 1.0.3' gem 'activesupport', '~> 4.2.0' diff --git a/agent/Gemfile.lock b/agent/Gemfile.lock index a062807d50..0456317a0b 100644 --- a/agent/Gemfile.lock +++ b/agent/Gemfile.lock @@ -1,10 +1,3 @@ -GIT - remote: https://github.com/kontena/kontena-websocket-client.git - revision: e70d1435cdce9d2979e5b9914cbdf05d6e0e1f57 - specs: - kontena-websocket-client (0.1.0) - websocket-driver (~> 0.6.5) - GEM remote: https://rubygems.org/ specs: @@ -48,6 +41,8 @@ GEM hitimes (1.2.3) i18n (0.7.0) json (1.8.3) + kontena-websocket-client (0.1.0) + websocket-driver (~> 0.6.5) minitest (5.5.0) mixlib-log (1.6.0) msgpack (1.0.3) @@ -94,7 +89,7 @@ DEPENDENCIES dotenv etcd fluent-logger (~> 0.6.2) - kontena-websocket-client! + kontena-websocket-client (~> 0.1.0) msgpack (~> 1.0.3) rake rspec @@ -104,4 +99,4 @@ DEPENDENCIES webmock BUNDLED WITH - 1.15.1 + 1.15.3