Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite agent websocket client #2560

Merged
merged 7 commits into from
Jul 26, 2017
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ RUN apk update && apk --update add tzdata ruby ruby-irb ruby-bigdecimal \
ADD Gemfile /app/
ADD Gemfile.lock /app/

RUN apk --update add --virtual build-dependencies ruby-dev build-base openssl-dev && \
RUN apk --update add --virtual build-dependencies ruby-dev build-base openssl-dev git && \
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The git is required for bundle install when testing changes to kontena-websocket-client using gems from git.

gem install bundler --no-ri --no-rdoc && \
cd /app ; bundle install --without development test && \
apk del build-dependencies
Expand Down
3 changes: 1 addition & 2 deletions agent/Gemfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
source 'https://rubygems.org'

gem 'kontena-websocket-client', '~> 0.1.0'
gem 'docker-api', '~> 1.32.0'
gem 'eventmachine', '~> 1.2.3'
gem 'faye-websocket', '~> 0.10.7'
gem 'msgpack', '~> 1.0.3'
gem 'activesupport', '~> 4.2.0'
gem 'celluloid', '~> 0.17.3'
Expand Down
11 changes: 4 additions & 7 deletions agent/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,14 @@ GEM
dotenv (1.0.2)
etcd (0.3.0)
mixlib-log
eventmachine (1.2.3)
excon (0.54.0)
faye-websocket (0.10.7)
eventmachine (>= 0.12.0)
websocket-driver (>= 0.5.1)
fluent-logger (0.6.2)
msgpack (>= 0.5.6, < 2)
hitimes (1.2.3)
i18n (0.7.0)
json (1.8.3)
kontena-websocket-client (0.1.0)
websocket-driver (~> 0.6.5)
minitest (5.5.0)
mixlib-log (1.6.0)
msgpack (1.0.3)
Expand Down Expand Up @@ -90,9 +88,8 @@ DEPENDENCIES
docker-api (~> 1.32.0)
dotenv
etcd
eventmachine (~> 1.2.3)
faye-websocket (~> 0.10.7)
fluent-logger (~> 0.6.2)
kontena-websocket-client (~> 0.1.0)
msgpack (~> 1.0.3)
rake
rspec
Expand All @@ -102,4 +99,4 @@ DEPENDENCIES
webmock

BUNDLED WITH
1.14.3
1.15.3
5 changes: 4 additions & 1 deletion agent/bin/kontena-agent
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@ else
end
Kontena::Logging.initialize_logger(STDOUT, log_level)

agent = Kontena::Agent.new(
agent = Kontena::Agent.instance
agent.configure(
api_uri: api_uri,
grid_token: grid_token,
node_token: node_token,
ssl_verify: ENV['KONTENA_SSL_VERIFY'],
ssl_hostname: ENV['KONTENA_SSL_HOSTNAME'],
)
agent.run!
4 changes: 1 addition & 3 deletions agent/lib/kontena-agent.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
require 'docker'
require 'faye/websocket'
require 'eventmachine'
require 'thread'

require 'statsd'
require 'celluloid/current'
require 'celluloid/autostart'
require 'active_support/core_ext/time'
require 'active_support/core_ext/module/delegation'
require 'kontena-websocket-client'

require_relative 'ipaddr_helpers'

Expand Down
146 changes: 95 additions & 51 deletions agent/lib/kontena/agent.rb
Original file line number Diff line number Diff line change
@@ -1,81 +1,134 @@
require 'singleton'
require_relative 'logging'

module Kontena
class Agent
include Singleton
include Logging

VERSION = File.read('./VERSION').strip

def initialize(opts)
# Called from other actors
def self.shutdown
instance.write_signal('shutdown')
end

def initialize
info "initializing agent (version #{VERSION})"

@read_pipe, @write_pipe = IO.pipe
end

def configure(opts)
@opts = opts
@client = Kontena::WebsocketClient.new(@opts[:api_uri],
grid_token: @opts[:grid_token],
node_token: @opts[:node_token],
)
@supervisor = Celluloid::Supervision::Container.run!
self.supervise_state
self.supervise_launchers
self.supervise_network_adapter
self.supervise_lb
self.supervise_workers
end

# Connect to master server
def connect!
start_em
@client.ensure_connect
def ssl_verify?
return false if @opts[:ssl_verify].nil?
return false if @opts[:ssl_verify].empty?
return true
end

def run!
self_read, self_write = IO.pipe
def ssl_params
{
verify_mode: self.ssl_verify? ? OpenSSL::SSL::VERIFY_PEER : OpenSSL::SSL::VERIFY_NONE,
}
end

%w(TERM TTIN).each do |sig|
trap sig do
self_write.puts(sig)
end
def ssl_hostname
@opts[:ssl_hostname]
end

def write_signal(sig)
@write_pipe.puts(sig)
end

def run!
trap 'TERM' do
write_signal('shutdown')
end
trap 'TTIN' do
write_signal('trace')
end

begin
connect!
self.supervise

while readable_io = IO.select([self_read])
signal = readable_io.first[0].gets.strip
handle_signal(signal)
end
rescue Interrupt
exit(0)
while line = @read_pipe.gets
handle_signal line.strip
end
rescue Interrupt
exit(0)
end

# @param [String] signal
def handle_signal(signal)
info "Got signal #{signal}"
case signal
when 'TERM'
info "Shutting down..."
EM.stop
@supervisor.shutdown
raise Interrupt
when 'TTIN'
Thread.list.each do |thread|
warn "Thread #{thread.object_id.to_s(36)} #{thread['label']}"
if thread.backtrace
warn thread.backtrace.join("\n")
else
warn "no backtrace available"
end
when 'shutdown'
self.handle_shutdown
when 'trace'
self.handle_trace
end
end

def handle_shutdown
info "Shutting down..."
@supervisor.shutdown # shutdown all actors
@write_pipe.close # let run! break and return
end

def handle_trace
info "Dump thread trace..."

Thread.list.each do |thread|
warn "Thread #{thread.object_id.to_s(36)} #{thread['label']}"
if thread.backtrace
warn thread.backtrace.join("\n")
else
warn "no backtrace available"
end
end
end

def supervise
@supervisor = Celluloid::Supervision::Container.run!

self.supervise_state
self.supervise_rpc
self.supervise_launchers
self.supervise_network_adapter
self.supervise_lb
self.supervise_workers
end

def supervise_state
@supervisor.supervise(
type: Kontena::Workers::NodeInfoWorker,
as: :node_info_worker
)
end

def supervise_rpc
@supervisor.supervise(
type: Kontena::RpcServer,
as: :rpc_server,
)
@supervisor.supervise(
type: Kontena::RpcClient,
as: :rpc_client,
)
@supervisor.supervise(
type: Kontena::WebsocketClient,
as: :websocket_client,
args: [@opts[:api_uri],
grid_token: @opts[:grid_token],
node_token: @opts[:node_token],
ssl_params: self.ssl_params,
ssl_hostname: self.ssl_hostname,
],
)
end

def supervise_launchers
@supervisor.supervise(
type: Kontena::Launchers::IpamPlugin,
Expand Down Expand Up @@ -159,14 +212,5 @@ def supervise_lb
as: :lb_registrator
)
end

def start_em
EM.epoll
Thread.new {
Thread.current.abort_on_exception = true
EventMachine.run
} unless EventMachine.reactor_running?
sleep 0.01 until EventMachine.reactor_running?
end
end
end
8 changes: 1 addition & 7 deletions agent/lib/kontena/helpers/rpc_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,7 @@ def rpc_client
# @raise [Kontena::RpcClient::Error]
# @return [Object]
def rpc_request(method, params)
response, error = rpc_client.request_with_error(method, params)

if error
raise error
else
return response
end
rpc_client.request(method, params)
end
end
end
Expand Down
51 changes: 20 additions & 31 deletions agent/lib/kontena/rpc_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,72 +23,61 @@ def initialize(code, message)

attr_reader :requests

# @param [Kontena::WebsocketClient] client
def initialize(client)
def initialize
@requests = {}
@client = client
info 'initialized'
end

def websocket_client
Celluloid::Actor[:websocket_client]
end

def connected?
@client.connected?
websocket_client && websocket_client.connected?
end

# @param [String] method
# @param [Array] params
def notification(method, params)
@client.send_notification(method, params)
websocket_client.async.send_notification(method, params)
rescue => exc
logger.error exc.message
end

# This method should not raise, or the Actor will crash, and terminate any other pending requests.
# Aborts caller on errors.
#
# @param [String] method
# @param [Array] params
# @param [Fixnum] timeout seconds
# @return [Object, Exception]
def request_with_error(method, params, timeout: 30)
# @raise abort
# @return [Object]
def request(method, params, timeout: 30)
id = request_id
@requests[id] = nil

if !wait_until("websocket client is connected", timeout: timeout, threshold: 10.0, interval: 0.1) { @client.connected? }
return nil, TimeoutError.new(500, 'WebsocketClient is not connected')
if !wait_until("websocket client is connected", timeout: timeout, threshold: 10.0, interval: 0.1) { connected? }
raise TimeoutError.new(500, 'WebsocketClient is not connected')
end

@client.send_request(id, method, params)
websocket_client.send_request(id, method, params)

if !wait_until("request #{method} has response wth id=#{id}", timeout: timeout, interval: 0.01) { @requests[id] }
return nil, TimeoutError.new(500, 'Request timed out')
raise TimeoutError.new(500, 'Request timed out')
end

result, error = @requests.delete(id)

if error
return result, Error.new(error['code'], error['message'])
else
return result, nil
end
end

# Async request wrapper.
#
# Logs a warning and returns nil on errors.
# Use Kontena::Helpers::RpcError.rpc_request to get a raised error instead.
#
# @return [Object, nil]
def request(method, params, **opts)
result, error = request_with_error(method, params, **opts)

if error
warn "RPC request #{method} failed: #{error}"
return nil
raise Error.new(error['code'], error['message'])
else
return result
end
rescue => exc
warn exc
abort exc
end

# Called from Kontena::WebsocketClient in the EM thread
# Sent by the Kontena::WebsocketClient actor
def handle_response(response)
type, msgid, error, result = response
@requests[msgid] = [result, error]
Expand Down