Skip to content

Commit

Permalink
Major refactor
Browse files Browse the repository at this point in the history
The earlier code was ignoring Ruby's internal IO buffering.
  • Loading branch information
lann committed May 12, 2013
1 parent a2a2c4e commit a167299
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 61 deletions.
32 changes: 16 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
**The current version has major problems. Don't use it.**

# TCPTimeout

A simple wrapper around Ruby Sockets providing timeouts for connect, write,
and read using IO.select instead of Timeout.timeout.
A wrapper around Ruby Sockets providing timeouts for connect, write, and read
operations using `Socket#*_nonblock` methods and `IO.select` instead of
`Timeout.timeout`.

## Usage

`gem install tcp_timeout`

Pass one or more of `:connect_timeout`, `:write_timeout`, or `:read_timeout`
Pass one or more of `:connect_timeout`, `:write_timeout`, and `:read_timeout`
as options to TCPTimeout::TCPSocket.new. If a timeout is omitted or nil, that
operation will behave as a normal Socket would. On timeout, a
`TCPTimeout::SocketTimeout` (a subclass of `SocketError`) will be raised.
`TCPTimeout::SocketTimeout` (subclass of `SocketError`) will be raised.

TCPTimeout::TCPSocket implements only a small subset of Socket methods:
When calling `#read` with a byte length it is possible for it to read some data
before timing out. If you need to avoid losing this data you can pass a buffer
string which will receive the data even after a timeout.

```
#connect
#write
#read
#readbyte
#close
#closed?
#setsockopt
```
Other options:

`:family` - set the address family for the connection, e.g. `:INET` or `:INET6`
`:local_host` and `:local_port` - the host and port to bind to

TCPTimeout::TCPSocket supports only a subset of IO methods, including:

```close closed? read read_nonblock readbyte readpartial write write_nonblock```

**Example:**

Expand Down
170 changes: 125 additions & 45 deletions lib/tcp_timeout.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
require 'socket'

module TCPTimeout
VERSION = "0.0.3"
VERSION = "0.1.0"

DELEGATED_METHODS = %w[close closed? setsockopt]
WRITE_METHODS = %w[write]
READ_METHODS = %w[read readbyte]
DELEGATED_METHODS = %w[
close closed?
getsockopt setsockopt
local_address remote_address
read_nonblock wrote_nonblock
fileno
]

class SocketTimeout < SocketError; end

class TCPSocket
DELEGATED_METHODS.each do |method|
class_eval(<<-EVAL, __FILE__, __LINE__)
Expand All @@ -18,66 +22,142 @@ def #{method}(*args)
EVAL
end

WRITE_METHODS.each do |method|
class_eval(<<-EVAL, __FILE__, __LINE__)
def #{method}(*args)
select_timeout(:write) if @timeouts[:write]
@socket.__send__(:#{method}, *args)
end
EVAL
end
def initialize(host, port, opts = {})
@connect_timeout = opts[:connect_timeout]
@write_timeout = opts[:write_timeout]
@read_timeout = opts[:read_timeout]

READ_METHODS.each do |method|
class_eval(<<-EVAL, __FILE__, __LINE__)
def #{method}(*args)
select_timeout(:read) if @timeouts[:read]
@socket.__send__(:#{method}, *args)
end
EVAL
end
family = opts[:family] || Socket::AF_INET
address = Socket.getaddrinfo(host, nil, family).first[3]
@sockaddr = Socket.pack_sockaddr_in(port, address)

def initialize(host, port, opts = {})
@timeouts = {
:connect => opts[:connect_timeout],
:write => opts[:write_timeout],
:read => opts[:read_timeout],
}

@family = opts[:family] || Socket::AF_INET
@address = Socket.getaddrinfo(host, nil, @family).first[3]
@port = port

@socket_address = Socket.pack_sockaddr_in(@port, @address)
@socket = Socket.new(@family, Socket::SOCK_STREAM, 0)
@socket = Socket.new(family, Socket::SOCK_STREAM, 0)
@socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)

local_host = opts[:local_host]
local_port = opts[:local_port]
if local_host || local_port
local_host ||= ''
local_address = Socket.getaddrinfo(local_host, nil, family).first[3]
local_sockaddr = Socket.pack_sockaddr_in(local_port, local_address)
@socket.bind(local_sockaddr)
end

connect
end

def connect
return @socket.connect(@socket_address) unless @timeouts[:connect]
return @socket.connect(@sockaddr) unless @connect_timeout

begin
@socket.connect_nonblock(@socket_address)
rescue Errno::EINPROGRESS
select_timeout(:connect)
select_timeout(:connect, @connect_timeout)
# If there was a failure this will raise an Error
begin
@socket.connect_nonblock(@sockaddr)
rescue Errno::EISCONN
# Successfully connected
end
end
end

def write(data, timeout = nil)
timeout ||= @write_timeout
return @socket.write(data) unless timeout

length = data.bytesize

total_count = 0
loop do
begin
count = @socket.write_nonblock(data)
rescue Errno::EWOULDBLOCK
timeout = select_timeout(:write, timeout)
retry
end

total_count += count
return total_count if total_count >= length
data = data.byteslice(count..-1)
end
end

def read(length = nil, *args)
raise ArgumentError, 'too many arguments' if args.length > 2

timeout = (args.length > 1) ? args.pop : @read_timeout
return @socket.read(length, *args) unless length > 0 && timeout

buffer = args.first || ''.force_encoding(Encoding::ASCII_8BIT)

# If there was a failure this will raise an Error
begin
@socket.connect_nonblock(@socket_address)
rescue Errno::EISCONN
# Successfully connected
# Drain internal buffers
@socket.read_nonblock(length, buffer)
return buffer if buffer.bytesize >= length
rescue Errno::EWOULDBLOCK
# Internal buffers were empty
buffer.clear
rescue EOFError
return nil
end

@chunk ||= ''.force_encoding(Encoding::ASCII_8BIT)

loop do
timeout = select_timeout(:read, timeout)

begin
@socket.read_nonblock(length, @chunk)
rescue Errno::EWOULDBLOCK
retry
rescue EOFError
return buffer.empty? ? nil : buffer
end
buffer << @chunk

if length
length -= @chunk.bytesize
return buffer if length <= 0
end
end
end

def readpartial(length, *args)
raise ArgumentError, 'too many arguments' if args.length > 2

timeout = (args.length > 1) ? args.pop : @read_timeout
return @socket.readpartial(length, *args) unless length > 0 && timeout

begin
@socket.read_nonblock(length, *args)
rescue Errno::EWOULDBLOCK
timeout = select_timeout(:read, timeout)
retry
end
end

def readbyte
readpartial(1).ord
end

private

def select_timeout(type)
type == :read ? read_array = [@socket] : write_array = [@socket]
unless IO.select(read_array, write_array, [@socket], @timeouts[type])
raise SocketTimeout, "#{type} timeout"
def select_timeout(type, timeout)
if timeout >= 0
if type == :read
read_array = [@socket]
else
write_array = [@socket]
end

start = Time.now
if IO.select(read_array, write_array, [@socket], timeout)
waited = Time.now - start
return timeline - waited
end
end
raise SocketTimeout, "#{type} timeout"
end
end
end

0 comments on commit a167299

Please sign in to comment.