Skip to content

Commit

Permalink
Pass # of times called to TCPConnectionNotify.received
Browse files Browse the repository at this point in the history
On non-Windows platforms, TCPConnection will read data off of a socket
until:

- there's no more data to read
- a max size is hit
- `TCPConnectionNotify.received` returns false

The last option was introduced via [RFC
to give the programmer more control of when to yield the scheduler. This
was a noble goal but is weakly implemented. In order to exercise better
control, the programmer needs an additional bit of information: the
number of times during *this scheduler run* that `received` has been
called.

As we began to use RFC #19 at Sendence it became clear that is wasn't
doing what we wanted. What we hoped to be able to do was read up to X
number of messages off the socket, inject them into our application and
then give up the scheduler.

Our initial implementation was to keep a counter of messages received in
our `TCPConnectionNotify` instances and when it hit a number such as 25
or 50, return false to give up the scheduler. This, however, didn't
accomplish what we wanted. The following scenario was possible:

Scheduler run results in 24 calls to `received`. When the next scheduler
run would occur, we'd get 1 more `received` call and return false. What
we really wanted was to *read no more than 25 messages per scheduler
run*.

In order to accomplish this, we added an additional parameter to
`TCPConnectionNotify.received`: the number of times during this
scheduler run that `received` has been called (inclusive of the existing
call). This gives much more fine-grained control over when to
"prematurely" give up the scheduler and play nice with other sockets in
the system.

You might think, "why not lower the max read size"? And this certainly
is something you could do, but lowering the max read size, lowers how
large of a chunk we read from the socket during a given system call. In
the case of a high-throughput system, that will greatly increase the
number of system calls thereby lowering performance.
  • Loading branch information
SeanTAllen committed Mar 30, 2017
1 parent 51c11a3 commit 344b6b5
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 17 deletions.
4 changes: 3 additions & 1 deletion examples/echo/echo.pony
Expand Up @@ -41,7 +41,9 @@ class Server is TCPConnectionNotify
fun ref accepted(conn: TCPConnection ref) =>
_out.print("connection accepted")

fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool =>
fun ref received(conn: TCPConnection ref, data: Array[U8] iso,
times: USize): Bool
=>
_out.print("data received, looping it back")
conn.write("server says: ")
conn.write(consume data)
Expand Down
4 changes: 3 additions & 1 deletion examples/net/client.pony
Expand Up @@ -21,7 +21,9 @@ class ClientSide is TCPConnectionNotify
fun ref connect_failed(conn: TCPConnection ref) =>
_env.out.print("connect failed")

fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool =>
fun ref received(conn: TCPConnection ref, data: Array[U8] iso,
times: USize): Bool
=>
_env.out.print(consume data)
true

Expand Down
4 changes: 3 additions & 1 deletion examples/net/server.pony
Expand Up @@ -13,7 +13,9 @@ class ServerSide is TCPConnectionNotify
conn.write("server says hi")
end

fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool =>
fun ref received(conn: TCPConnection ref, data: Array[U8] iso,
times: USize): Bool
=>
_env.out.print(consume data)
conn.dispose()
true
Expand Down
24 changes: 18 additions & 6 deletions packages/net/_test.pony
Expand Up @@ -225,7 +225,9 @@ class _TestTCPExpectNotify is TCPConnectionNotify
_h.complete_action("expect received")
qty

fun ref received(conn: TCPConnection ref, data: Array[U8] val): Bool =>
fun ref received(conn: TCPConnection ref, data: Array[U8] val,
times: USize): Bool
=>
if _frame then
_frame = false
_expect = 0
Expand Down Expand Up @@ -302,7 +304,9 @@ class _TestTCPWritevNotifyServer is TCPConnectionNotify
new iso create(h: TestHelper) =>
_h = h

fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool =>
fun ref received(conn: TCPConnection ref, data: Array[U8] iso,
times: USize): Bool
=>
_buffer.append(consume data)

let expected = "hello, hello (from client)"
Expand Down Expand Up @@ -360,7 +364,9 @@ class _TestTCPMuteReceiveNotify is TCPConnectionNotify
_h.complete_action("receiver asks for data")
_h.dispose_when_done(conn)

fun ref received(conn: TCPConnection ref, data: Array[U8] val): Bool =>
fun ref received(conn: TCPConnection ref, data: Array[U8] val,
times: USize): Bool
=>
_h.complete(false)
true

Expand All @@ -384,7 +390,9 @@ class _TestTCPMuteSendNotify is TCPConnectionNotify
fun ref connect_failed(conn: TCPConnection ref) =>
_h.fail_action("sender connected")

fun ref received(conn: TCPConnection ref, data: Array[U8] val): Bool =>
fun ref received(conn: TCPConnection ref, data: Array[U8] val,
times: USize): Bool
=>
conn.write("it's sad that you won't ever read this")
_h.complete_action("sender sent data")
true
Expand Down Expand Up @@ -434,7 +442,9 @@ class _TestTCPUnmuteReceiveNotify is TCPConnectionNotify
conn.unmute()
_h.complete_action("receiver unmuted")

fun ref received(conn: TCPConnection ref, data: Array[U8] val): Bool =>
fun ref received(conn: TCPConnection ref, data: Array[U8] val,
times: USize): Bool
=>
_h.complete(true)
true

Expand Down Expand Up @@ -504,7 +514,9 @@ class _TestTCPThrottleSendNotify is TCPConnectionNotify
fun ref connect_failed(conn: TCPConnection ref) =>
_h.fail_action("sender connected")

fun ref received(conn: TCPConnection ref, data: Array[U8] val): Bool =>
fun ref received(conn: TCPConnection ref, data: Array[U8] val,
times: USize): Bool
=>
conn.write("it's sad that you won't ever read this")
_h.complete_action("sender sent data")
true
Expand Down
4 changes: 3 additions & 1 deletion packages/net/http/_client_conn_handler.pony
Expand Up @@ -37,7 +37,9 @@ class _ClientConnHandler is TCPConnectionNotify
"""
_session._auth_failed(conn)

fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool =>
fun ref received(conn: TCPConnection ref, data: Array[U8] iso,
times: USize): Bool
=>
"""
Pass a received chunk of data to the `_HTTPParser`.
"""
Expand Down
6 changes: 4 additions & 2 deletions packages/net/http/_server_conn_handler.pony
Expand Up @@ -15,7 +15,7 @@ class _ServerConnHandler is TCPConnectionNotify
var _parser: (_HTTPParser | None) = None
var _session: (_ServerConnection | None) = None
let _registry: HTTPServer tag

new iso create(handlermaker: HandlerFactory val, logger: Logger,
reversedns: (DNSLookupAuth | None), registry: HTTPServer)
=>
Expand Down Expand Up @@ -44,7 +44,9 @@ class _ServerConnHandler is TCPConnectionNotify
_parser = _HTTPParser.request(_session as _ServerConnection)
end

fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool =>
fun ref received(conn: TCPConnection ref, data: Array[U8] iso,
times: USize): Bool
=>
"""
Pass chunks of data to the `HTTPParser` for this session. It will
then pass completed information on the the `HTTPSession`.
Expand Down
11 changes: 9 additions & 2 deletions packages/net/ssl/ssl_connection.pony
Expand Up @@ -69,7 +69,9 @@ class SSLConnection is TCPConnectionNotify

recover val Array[ByteSeq] end

fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool =>
fun ref received(conn: TCPConnection ref, data: Array[U8] iso,
times: USize): Bool
=>
"""
Pass the data to the SSL session and check for both new application data
and new destination data.
Expand Down Expand Up @@ -133,11 +135,16 @@ class SSLConnection is TCPConnectionNotify
end

try
var received_called: USize = 0

while true do
let r = _ssl.read(_expect)

if r isnt None then
_notify.received(conn, (consume r) as Array[U8] iso^)
received_called = received_called + 1
_notify.received(conn,
(consume r) as Array[U8] iso^,
received_called)
else
break
end
Expand Down
6 changes: 4 additions & 2 deletions packages/net/tcp_connection.pony
Expand Up @@ -667,7 +667,7 @@ actor TCPConnection
data.truncate(_read_len)
_read_len = 0

_notify.received(this, consume data)
_notify.received(this, consume data, 1)
_read_buf_size()
end

Expand Down Expand Up @@ -708,6 +708,7 @@ actor TCPConnection
ifdef not windows then
try
var sum: USize = 0
var received_called: USize = 0

while _readable and not _shutdown_peer do
if _muted then
Expand Down Expand Up @@ -741,7 +742,8 @@ actor TCPConnection
data.truncate(_read_len)
_read_len = 0

if not _notify.received(this, consume data) then
received_called = received_called + 1
if not _notify.received(this, consume data, received_called) then
_read_buf_size()
_read_again()
return
Expand Down
7 changes: 6 additions & 1 deletion packages/net/tcp_connection_notify.pony
Expand Up @@ -58,12 +58,17 @@ interface TCPConnectionNotify
"""
data

fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool =>
fun ref received(conn: TCPConnection ref, data: Array[U8] iso,
times: USize): Bool
=>
"""
Called when new data is received on the connection. Return true if you
want to continue receiving messages without yielding until you read
max_size on the TCPConnection. Return false to cause the TCPConnection
to yield now.
Includes the number of times during the current behavior, that received has
been called. This allows the notifier to end reads on a regular basis.
"""
true

Expand Down

0 comments on commit 344b6b5

Please sign in to comment.