Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

207 lines (163 sloc) 5.334 kb
require "socket"
require "thread"
require "bunny/exceptions"
require "bunny/socket"
module Bunny
class Transport
#
# API
#
DEFAULT_CONNECTION_TIMEOUT = 5.0
attr_reader :session, :host, :port, :socket, :connect_timeout, :read_write_timeout, :disconnect_timeout
def initialize(session, host, port, opts)
@session = session
@host = host
@port = port
@opts = opts
@ssl = opts[:ssl] || false
@ssl_cert = opts[:ssl_cert]
@ssl_key = opts[:ssl_key]
@ssl_cert_string = opts[:ssl_cert_string]
@ssl_key_string = opts[:ssl_key_string]
@verify_ssl = opts[:verify_ssl].nil? || opts[:verify_ssl]
@read_write_timeout = opts[:socket_timeout] || 1
@read_write_timeout = nil if @read_write_timeout == 0
@connect_timeout = self.timeout_from(opts)
@connect_timeout = nil if @connect_timeout == 0
@disconnect_timeout = @read_write_timeout || @connect_timeout
@frames = Hash.new { Array.new }
initialize_socket
end
def hostname
@host
end
def uses_tls?
@ssl
end
alias tls? uses_tls?
def uses_ssl?
@ssl
end
alias ssl? uses_ssl?
# Writes data to the socket. If read/write timeout was specified, Bunny::ClientTimeout will be raised
# if the operation times out.
#
# @raise [ClientTimeout]
def write(*args)
begin
raise Bunny::ConnectionError.new("No connection: socket is nil. ", @host, @port) if !@socket
if @read_write_timeout
Bunny::Timer.timeout(@read_write_timeout, Bunny::ClientTimeout) do
@socket.write(*args) if open?
end
else
@socket.write(*args) if open?
end
rescue Errno::EPIPE, Errno::EAGAIN, Bunny::ClientTimeout, Bunny::ConnectionError, IOError => e
close
@session.handle_network_failure(e)
end
end
alias send_raw write
def close(reason = nil)
@socket.close if @socket and not @socket.closed?
@socket = nil
end
def open?
!@socket.nil? && !@socket.closed?
end
def closed?
!open?
end
def flush
@socket.flush if @socket
end
def read_fully(*args)
@socket.read_fully(*args)
end
def read_ready?(timeout = nil)
io = IO.select([@socket].compact, nil, nil, timeout)
io && io[0].include?(@socket)
end
# Exposed primarily for Bunny::Channel
# @private
def read_next_frame(opts = {})
header = @socket.read_fully(7)
type, channel, size = AMQ::Protocol::Frame.decode_header(header)
payload = @socket.read_fully(size)
frame_end = @socket.read_fully(1)
# 1) the size is miscalculated
if payload.bytesize != size
raise BadLengthError.new(size, payload.bytesize)
end
# 2) the size is OK, but the string doesn't end with FINAL_OCTET
raise NoFinalOctetError.new if frame_end != AMQ::Protocol::Frame::FINAL_OCTET
AMQ::Protocol::Frame.new(type, payload, channel)
end
# Sends frame to the peer.
#
# @raise [ConnectionClosedError]
# @private
def send_frame(frame)
if closed?
@session.handle_network_failure(ConnectionClosedError.new(frame))
else
frame.encode_to_array.each do |component|
send_raw(component)
end
end
end
def self.reacheable?(host, port, timeout)
begin
s = Bunny::Socket.open(host, port,
:socket_timeout => timeout)
true
rescue SocketError, Timeout::Error => e
false
ensure
s.close if s
end
end
def self.ping!(host, port, timeout)
raise ConnectionTimeout.new("#{host}:#{port} is unreachable") if !reacheable?(host, port, timeout)
end
protected
def initialize_socket
begin
@socket = Bunny::Timer.timeout(@connect_timeout, ConnectionTimeout) do
Bunny::Socket.open(@host, @port,
:keepalive => @opts[:keepalive],
:socket_timeout => @connect_timeout)
end
if @ssl
require 'openssl' unless defined? OpenSSL::SSL
sslctx = OpenSSL::SSL::SSLContext.new
initialize_client_pair(sslctx)
@socket = OpenSSL::SSL::SSLSocket.new(@socket, sslctx)
@socket.sync_close = true
@socket.connect
@socket.post_connection_check(host) if @verify_ssl
@socket
end
rescue StandardError, ConnectionTimeout => e
@status = :not_connected
raise Bunny::TCPConnectionFailed.new(e, self.hostname, self.port)
end
@socket
end
def initialize_client_pair(sslctx)
if @ssl_cert
@ssl_cert_string = File.read(@ssl_cert)
end
if @ssl_key
@ssl_key_string = File.read(@ssl_key)
end
sslctx.cert = OpenSSL::X509::Certificate.new(@ssl_cert_string) if @ssl_cert_string
sslctx.key = OpenSSL::PKey::RSA.new(@ssl_key_string) if @ssl_key_string
sslctx
end
def timeout_from(options)
options[:connect_timeout] || options[:connection_timeout] || options[:timeout] || DEFAULT_CONNECTION_TIMEOUT
end
end
end
Jump to Line
Something went wrong with that request. Please try again.