Skip to content

Commit

Permalink
Implement TCPListener.
Browse files Browse the repository at this point in the history
  • Loading branch information
jemc committed Nov 13, 2019
1 parent e868ea8 commit 01aad46
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 22 deletions.
9 changes: 9 additions & 0 deletions packages/net/_os_socket.mare
Expand Up @@ -4,6 +4,8 @@
:fun pony_os_sockopt_level (I32) I32
:fun pony_os_sockopt_option (I32) I32
:fun pony_os_connect_tcp (AsioEventNotify, CPointer(U8), CPointer(U8), CPointer(U8), U32) U32
:fun pony_os_listen_tcp (AsioEventNotify, CPointer(U8), CPointer(U8)) CPointer(AsioEvent)
:fun pony_os_accept (CPointer(AsioEvent)) U32
:fun pony_os_socket_close (U32) None
:fun pony_os_socket_shutdown (U32) None
:fun pony_os_writev (CPointer(AsioEvent), CPointer(CPointer(U8)), USize) USize
Expand All @@ -21,6 +23,13 @@
LibPonyOs.pony_os_connect_tcp(
owner, host.cstring, service.cstring, from.cstring, asio_flags)

:fun listen_tcp (owner, host String, service String)
event = LibPonyOs.pony_os_listen_tcp(owner, host.cstring, service.cstring)
if event.is_not_null (event | LibPonyOs.pony_os_errno)

:fun accept (event) U32
LibPonyOs.pony_os_accept(event)

:fun close (fd U32)
LibPonyOs.pony_os_socket_close(fd)

Expand Down
39 changes: 33 additions & 6 deletions packages/net/tcp.mare
@@ -1,25 +1,23 @@
:import "os/error"

:trait TCPConnectionNotify
:fun ref accepted (conn TCPConnection'ref): None
:fun ref connecting (conn TCPConnection'ref, count U32): None
:fun ref connect_failed (conn TCPConnection'ref) None
:fun ref connected (conn TCPConnection'ref): None
:fun ref sent (conn TCPConnection'ref, data String): data // TODO: ByteSeq instead of String
:fun ref received (conn TCPConnection'ref, data String, call_count USize) Bool // TODO: ByteSeq instead of String
:fun ref closed (conn TCPConnection'ref): None

:actor TCPListener
:be _conn_closed

:actor TCPConnection
:prop listen (TCPListener | None): TCPListener.new // TODO: None
:prop listen (TCPListener | None): None
:prop notify TCPConnectionNotify
:prop connect_error OSError: OSErrorNone

:prop _connect_count U32
:prop _read_buffer Array(U8) // TODO: iso
:prop _read_buffer_size USize
:prop _read_buffer_offset USize: 0
:prop _read_buffer_size USize
:prop _yield_after_reading USize
:prop _yield_after_writing USize

Expand Down Expand Up @@ -56,12 +54,41 @@
@notify = --notify // TODO: is it possible to use param assign sugar for this?
@_read_buffer = Array(U8).new(@_read_buffer_size)
@_read_buffer.reserve_undefined(@_read_buffer_size)

asio_flags =
if Platform.windows (AsioEvent.read_write | AsioEvent.read_write_oneshot)

@_connect_count = _OSSocket.connect_tcp(@, host, service, from, asio_flags)
@_notify_connecting

:new _accept (
// TODO: TCPConnectionAuth, rather than ambient authority.
listen TCPListener
notify TCPConnectionNotify'iso
@_fd
@_read_buffer_size
@_yield_after_reading
@_yield_after_writing
)
@listen = listen // TODO: is it possible to use param assign sugar for this?
@_connect_count = 0
@_connected = True
@_writeable = True

@notify = --notify // TODO: is it possible to use param assign sugar for this?
@_read_buffer = Array(U8).new(@_read_buffer_size)
@_read_buffer.reserve_undefined(@_read_buffer_size)

asio_flags =
if Platform.windows (AsioEvent.read_write | AsioEvent.read_write_oneshot)
@_event = AsioEvent.create(@, @_fd, asio_flags, 0, True)
@_notify_accepted // TODO: shouldn't need this indirection

@_readable = True
@_windows_queue_read
@_pending_reads

:fun ref _notify_accepted: @notify.accepted(@)

:fun ref _notify_connecting
if (@_connect_count > 0) (
// When nonzero, we know we're connecting, so we call the connecting hook.
Expand Down
120 changes: 120 additions & 0 deletions packages/net/tcp_listener.mare
@@ -0,0 +1,120 @@
:import "os/error"

:trait TCPListenerNotify
:fun ref listening (listen TCPListener'ref): None
:fun ref not_listening (listen TCPListener'ref) None
:fun ref closed (listen TCPListener'ref): None
:fun ref connected! (listen TCPListener'ref) TCPConnectionNotify'iso

:actor TCPListener
:prop notify TCPListenerNotify
:prop listen_error OSError: EINVAL

:prop _fd U32
:prop _event CPointer(AsioEvent): CPointer(AsioEvent).null

:prop _count USize: 0
:prop _limit USize
:prop _read_buffer_size USize
:prop _yield_after_reading USize
:prop _yield_after_writing USize

:prop _closed Bool: False
:prop _paused Bool: False

:new (
// TODO: TCPListenerAuth, rather than ambient authority.
notify TCPListenerNotify'iso
host String = ""
service String = "0"
@_limit = 0
@_read_buffer_size = 16384
@_yield_after_reading = 16384
@_yield_after_writing = 16384
)
@_fd = -1 // TODO: Move default value to prop declaration
@notify = --notify // TODO: is it possible to use param assign sugar for this?
event = _OSSocket.listen_tcp(@, host, service)
case (
| event <: CPointer(AsioEvent) | @_event = event
| event <: OSError | @listen_error = event
)
@_fd = AsioEvent.fd(@_event)
@_notify_listening

:fun ref _notify_listening
if @_event.is_not_null (
@notify.listening(@)
|
@_closed = True
@notify.not_listening(@)
)

:: This is a special behaviour that hooks into the AsioEventNotify runtime,
:: called whenever an event handle we're subscribed to receives an event.
:be _event_notify (event CPointer(AsioEvent), flags U32, arg U32)
if (@_event is event) (
if AsioEvent.is_readable(flags) (
@_accept(arg)
)
if AsioEvent.is_disposable(flags) (
AsioEvent.destroy(@_event)
@_event = CPointer(AsioEvent).null
)
)

:be _accept (ns U32 = 0)
if Platform.windows (
// TODO
|
if @_closed.not (
try (
while (@_limit == 0 || @_count < @_limit) (
conn_fd = _OSSocket.accept(@_event)
case (
| conn_fd == 0 | error! // EWOULDBLOCK, don't try again
| conn_fd == -1 | None // Some other error, so we can try again
| @_spawn(conn_fd)
)
)
@_paused = True
)
)
)

:fun ref _spawn (conn_fd U32)
try (
TCPConnection._accept(
@
@notify.connected!(@)
conn_fd
@_read_buffer_size
@_yield_after_reading
@_yield_after_writing
)
@_count += 1
|
_OSSocket.close(conn_fd)
)

:be _conn_closed
@_count -= 1

// If releasing this connection takes us below the limit,
// unpause acceptance and try to accept more connections.
if (@_paused && @_count < @_limit) (
@_paused = False
@_accept
)

:be dispose: @close
:fun ref close
if (@_closed.not && @_event.is_not_null) (
// When not on windows, unsubscribe immediately here instead of later.
if Platform.windows.not AsioEvent.unsubscribe(@_event)

_OSSocket.close(@_fd)
@_fd = -1

@notify.closed(@)
)
80 changes: 71 additions & 9 deletions packages/net/test/tcp_spec.mare
@@ -1,32 +1,94 @@
:import "spec"
:import ".." (TCPConnection, TCPConnectionNotify)
:import ".." (
TCPConnection
TCPConnectionNotify
TCPListener
TCPListenerNotify
)

:class iso TCPSpecListenerNotify
:is TCPListenerNotify
:prop env Env
:new (@env)

:fun ref listening (listen TCPListener'ref)
// TODO: get port dynamically instead of hard-coding it.
TCPConnection.new(TCPSpecNotify.new(@env), "localhost", "12345")
@env.err.print("[Listener] Listening")

:fun ref not_listening (listen TCPListener'ref) None
@env.err.print("[Listener] Not listening:")
@env.err.print(listen.listen_error.name)

:fun ref closed (listen TCPListener'ref): None
@env.err.print("[Listener] Stopped listening")

:fun ref connected! (listen TCPListener'ref) TCPConnectionNotify'iso
TCPSpecEchoNotify.new(@env)

:class iso TCPSpecEchoNotify
:is TCPConnectionNotify
:prop env Env
:new (@env)

:fun ref accepted (conn TCPConnection'ref)
@env.err.print("[Echoer] Accepted")

:fun ref connecting (conn TCPConnection'ref, count U32)
@env.err.print("[Echoer] Connecting")

:fun ref connected (conn TCPConnection'ref)
@env.err.print("[Echoer] Connected")

:fun ref connect_failed (conn TCPConnection'ref)
@env.err.print("[Echoer] Failed to connect:")
@env.err.print(conn.connect_error.name)

:fun ref closed (conn TCPConnection'ref)
@env.err.print("[Echoer] Closed")
try conn.listen.as!(TCPListener).dispose

:fun ref sent (conn TCPConnection'ref, data String)
@env.err.print("[Echoer] Sent:")
@env.err.print(data)
data

:fun ref received (conn TCPConnection'ref, data String, call_count USize)
@env.err.print("[Echoer] Received:")
@env.err.print(data)
conn.write(data)
conn.dispose
True

:class iso TCPSpecNotify
:is TCPConnectionNotify
:prop env Env
:new (@env)

:fun ref accepted (conn TCPConnection'ref)
@env.err.print("[Sender] Accepted")

:fun ref connecting (conn TCPConnection'ref, count U32)
@env.err.print("Connecting")
@env.err.print("[Sender] Connecting")

:fun ref connected (conn TCPConnection'ref)
@env.err.print("Connected")
@env.err.print("[Sender] Connected")
conn.write("Hello, World!")

:fun ref connect_failed (conn TCPConnection'ref)
@env.err.print("Failed to connect:")
@env.err.print("[Sender] Failed to connect:")
@env.err.print(conn.connect_error.name)

:fun ref closed (conn TCPConnection'ref)
@env.err.print("Closed")
@env.err.print("[Sender] Closed")

:fun ref sent (conn TCPConnection'ref, data String)
@env.err.print("Sent:")
@env.err.print("[Sender] Sent:")
@env.err.print(data)
data

:fun ref received (conn TCPConnection'ref, data String, call_count USize)
@env.err.print("Received:")
@env.err.print("[Sender] Received:")
@env.err.print(data)
conn.dispose
True
Expand All @@ -35,5 +97,5 @@
:is Spec
:const describes: "TCP"

:it "can connect"
conn = TCPConnection.new(TCPSpecNotify.new(@env), "localhost", "8080")
:it "can listen, connect, send, respond, disconnect, and stop listening"
TCPListener.new(TCPSpecListenerNotify.new(@env), "", "12345")
17 changes: 13 additions & 4 deletions src/mare/compiler/code_gen.cr
Expand Up @@ -1900,10 +1900,19 @@ class Mare::Compiler::CodeGen
value = gen_unboxed(value, gtype_of(from_expr.not_nil!))
gen_assign_cast(value, to_type, from_expr)
when LLVM::Type::Kind::Pointer
# If we're going from pointer to non-pointer, we're unboxing,
# so we have to do that first before the LLVM bit cast.
value = gen_boxed(value, gtype_of(from_expr.not_nil!)) \
if value.type.kind != LLVM::Type::Kind::Pointer
from_expr_type = type_of(from_expr.not_nil!)
if value.type.kind != LLVM::Type::Kind::Pointer
# If we're going from non-pointer to pointer, we're boxing.
value = gen_boxed(value, gtype_of(from_expr_type))
elsif from_expr_type.singular? && from_expr_type.single_def!(ctx).is_cpointer?
if value.type != @obj_ptr && to_type == @obj_ptr
# If going from non-object cpointer to object pointer, we're boxing.
value = gen_boxed(value, gtype_of(from_expr_type))
elsif value.type == @obj_ptr && to_type != @obj_ptr
# If going from object pointer to non-object cpointer, we're unboxing.
value = gen_unboxed(value, gtype_of(from_expr_type))
end
end

# Do the LLVM bitcast.
@builder.bit_cast(value, to_type, "#{value.name}.CAST")
Expand Down
6 changes: 3 additions & 3 deletions src/prelude/string.mare
Expand Up @@ -55,14 +55,14 @@
:: Call the cpointer method instead when you don't need a null terminator
:: (that is, when the FFI function you are calling has a size argument).

:fun cstring
:fun cstring CPointer(U8)
if @is_null_terminated (
@_ptr
ret tag = @_ptr // TODO: remove this type hint - it shouldn't be needed
|
ptr = @_ptr._alloc(@_size + 1)
@_ptr._copy_to(ptr._unsafe, @_size)
ptr._assign_at(@_size, 0)
ptr
ret tag = ptr // TODO: remove this type hint - it shouldn't be needed
)

:: Ensure enough capacity is allocated for the given space, in bytes.
Expand Down

0 comments on commit 01aad46

Please sign in to comment.