Skip to content

Commit

Permalink
Merge 2e6478d into d606979
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Apr 4, 2019
2 parents d606979 + 2e6478d commit 6224c5f
Show file tree
Hide file tree
Showing 15 changed files with 154 additions and 113 deletions.
94 changes: 78 additions & 16 deletions lib/async/io/generic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@

module Async
module IO
# The default block size for IO buffers.
# BLOCK_SIZE = ENV.fetch('BLOCK_SIZE', 1024*16).to_i
BLOCK_SIZE = 1024*8

# Convert a Ruby ::IO object to a wrapped instance:
def self.try_convert(io, &block)
if wrapper_class = Generic::WRAPPERS[io.class]
Expand All @@ -42,17 +46,20 @@ class << self
# @!macro [attach] wrap_blocking_method
# @method $1
# Invokes `$2` on the underlying {io}. If the operation would block, the current task is paused until the operation can succeed, at which point it's resumed and the operation is completed.
def wrap_blocking_method(new_name, method_name, invert: true)
define_method(new_name) do |*args|
async_send(method_name, *args)
def wrap_blocking_method(new_name, method_name, invert: true, &block)
if block_given?
define_method(new_name, &block)
else
define_method(new_name) do |*args|
async_send(method_name, *args)
end
end

if invert
# We define the original _nonblock method to call the async variant. We ignore options.
# define_method(method_name) do |*args, **options|
# self.__send__(new_name, *args)
# end
def_delegators :@io, method_name
# We wrap the original _nonblock method, ignoring options.
define_method(method_name) do |*args, **options|
async_send(method_name, *args)
end
end
end

Expand Down Expand Up @@ -85,17 +92,72 @@ def wrap(*args)

wraps ::IO, :external_encoding, :internal_encoding, :autoclose?, :autoclose=, :pid, :stat, :binmode, :flush, :set_encoding, :to_io, :to_i, :reopen, :fileno, :fsync, :fdatasync, :sync, :sync=, :tell, :seek, :rewind, :pos, :pos=, :eof, :eof?, :close_on_exec?, :close_on_exec=, :closed?, :close_read, :close_write, :isatty, :tty?, :binmode?, :sysseek, :advise, :ioctl, :fcntl, :nread, :ready?, :pread, :pwrite, :pathconf

# Read the specified number of bytes from the input stream. This is fast path.
# @example
# data = io.read(512)
wrap_blocking_method :read, :read_nonblock
alias sysread read
alias readpartial read
# data = io.sysread(512)
wrap_blocking_method :sysread, :read_nonblock

def read(length = nil, buffer = nil)
if buffer
buffer.clear
else
buffer = String.new
end

if length
return "" if length <= 0

# Fast path:
buffer = self.sysread(length, buffer)

while buffer.bytesize < length
# Slow path:
if chunk = self.sysread(length - buffer.bytesize)
buffer << chunk
else
break
end
end

return buffer.empty? ? nil : buffer
else
buffer = self.sysread(BLOCK_SIZE, buffer)

while chunk = self.sysread(BLOCK_SIZE)
buffer << chunk
end

return buffer
end
end

# Write entire buffer to output stream. This is fast path.
# @example
# io.write("Hello World")
wrap_blocking_method :write, :write_nonblock
alias syswrite write
alias << write
# io.syswrite("Hello World")
wrap_blocking_method :syswrite, :write_nonblock

alias readpartial read_nonblock

def write(buffer)
# Fast path:
written = self.syswrite(buffer)
remaining = buffer.bytesize - written

while remaining > 0
# Slow path:
length = self.syswrite(buffer.byteslice(written, remaining))

remaining -= length
written += length
end

return written
end

def << buffer
write(buffer)
return self
end

def dup
super.tap do |copy|
Expand Down
2 changes: 1 addition & 1 deletion lib/async/io/ssl_endpoint.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

require_relative 'endpoint'
require_relative 'host_endpoint'
require_relative 'ssl_socket'

module Async
Expand Down
32 changes: 21 additions & 11 deletions lib/async/io/ssl_socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ class SSLSocket < Generic
wrap_blocking_method :accept, :accept_nonblock
wrap_blocking_method :connect, :connect_nonblock

alias syswrite write
alias sysread read

def self.connect(socket, context, hostname = nil, &block)
client = self.new(socket, context)

Expand All @@ -62,14 +59,6 @@ def self.connect(socket, context, hostname = nil, &block)
end
end

def local_address
@io.to_io.local_address
end

def remote_address
@io.to_io.remote_address
end

include Peer

def initialize(socket, context)
Expand All @@ -90,6 +79,27 @@ def initialize(socket, context)
super(io, socket.reactor)
end
end

def local_address
@io.to_io.local_address
end

def remote_address
@io.to_io.remote_address
end

def close_write
self.shutdown(Socket::SHUT_WR)
end

def close_read
self.shutdown(Socket::SHUT_RD)
end

def shutdown(how)
@io.flush
@io.to_io.shutdown(how)
end
end

# We reimplement this from scratch because the native implementation doesn't expose the underlying server/context that we need to implement non-blocking accept.
Expand Down
36 changes: 6 additions & 30 deletions lib/async/io/stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
module Async
module IO
class Stream
# The default block size for IO buffers.
# BLOCK_SIZE = ENV.fetch('BLOCK_SIZE', 1024*16).to_i
BLOCK_SIZE = 1024*8
BLOCK_SIZE = IO::BLOCK_SIZE

def initialize(io, block_size: BLOCK_SIZE, sync: true)
@io = io
Expand Down Expand Up @@ -115,12 +113,12 @@ def peek
# @return the number of bytes appended to the buffer.
def write(string)
if @write_buffer.empty? and string.bytesize >= @block_size
syswrite(string)
@io.write(string)
else
@write_buffer << string

if @write_buffer.size >= @block_size
syswrite(@write_buffer)
@io.write(@write_buffer)
@write_buffer.clear
end
end
Expand All @@ -138,7 +136,7 @@ def <<(string)
# Flushes buffered data to the stream.
def flush
unless @write_buffer.empty?
syswrite(@write_buffer)
@io.write(@write_buffer)
@write_buffer.clear
end
end
Expand Down Expand Up @@ -197,9 +195,9 @@ def eof!

# Fills the buffer from the underlying stream.
def fill_read_buffer(size = @block_size)
if @read_buffer.empty? and @io.read(size, @read_buffer)
if @read_buffer.empty? and @io.read_nonblock(size, @read_buffer, exception: false)
return true
elsif chunk = @io.read(size, @input_buffer)
elsif chunk = @io.read_nonblock(size, @input_buffer, exception: false)
@read_buffer << chunk
return true
else
Expand Down Expand Up @@ -235,28 +233,6 @@ def consume_read_buffer(size = nil)

return result
end

# Write a buffer to the underlying stream.
# @param buffer [String] The string to write, any encoding is okay.
def syswrite(buffer)
remaining = buffer.bytesize

# Fast path:
written = @io.write(buffer)
return if written == remaining

# Slow path:
remaining -= written

while remaining > 0
wrote = @io.write(buffer.byteslice(written, remaining))

remaining -= wrote
written += wrote
end

return written
end
end
end
end
54 changes: 14 additions & 40 deletions lib/async/io/tcp_socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,6 @@ module IO
class TCPSocket < IPSocket
wraps ::TCPSocket

class StreamWrapper
def initialize(io)
@io = io
end

def sync= value
@io.sync = value
end

def close
@io.close
end

def read(*args)
@io.sysread(*args)
end

def write(*args)
@io.syswrite(*args)
end

def flush
@io.flush
end
end

def initialize(remote_host, remote_port = nil, local_host = nil, local_port = nil)
if remote_host.is_a? ::TCPSocket
super(remote_host)
Expand All @@ -73,33 +47,33 @@ def initialize(remote_host, remote_port = nil, local_host = nil, local_port = ni
# super(::TCPSocket.new(remote_host, remote_port, local_host, local_port))
end

@buffer = Stream.new(StreamWrapper.new(self))
@stream = Stream.new(self)
end

class << self
alias open new
end

include Peer
def close
@stream.flush
super
end

attr :buffer
include Peer

def_delegators :@buffer, :gets, :puts, :flush
attr :stream

def write(*)
@buffer.flush

super
end
# The way this buffering works is pretty atrocious.
def_delegators :@stream, :gets, :puts

def read(size, outbuf = nil)
buffer = @buffer.read_partial(size)
def sysread(size, buffer = nil)
data = @stream.read_partial(size)

if outbuf
outbuf.replace(buffer)
if buffer
buffer.replace(data)
end

return buffer
return data
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/async/io/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@

module Async
module IO
VERSION = "1.20.0"
VERSION = "1.21.0"
end
end
1 change: 1 addition & 0 deletions spec/async/io/c10k_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def echo_client(server_address, data, responses)
begin
Async::IO::Socket.connect(server_address) do |peer|
result = peer.write(data)
peer.close_write

message = peer.read(512)

Expand Down
3 changes: 2 additions & 1 deletion spec/async/io/echo_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ def echo_client(server_address, data, responses)
Async do |task|
Async::IO::Socket.connect(server_address) do |peer|
result = peer.write(data)
peer.close_write

message = peer.read(512)
message = peer.read(data.bytesize)

responses << message
end
Expand Down
5 changes: 2 additions & 3 deletions spec/async/io/generic_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,16 @@

output_task = reactor.async do
received = input.read(1024)
input.close
end

reactor.async do
output.write(message)
output.close
end

output_task.wait
expect(received).to be == message

input.close
output.close
end

describe '#wait' do
Expand Down

0 comments on commit 6224c5f

Please sign in to comment.