From ae0b30cd4aa60019fa2c7ae99b03ba4dc82e7a89 Mon Sep 17 00:00:00 2001 From: Sean T Allen Date: Wed, 29 Mar 2017 18:31:34 -0400 Subject: [PATCH] Pass # of times called to TCPConnectionNotify.received On non-Windows platforms, TCPConnection will read data off of a socket until: - there's no more data to read - a max size is hit - `TCPConnectionNotify.received` returns false The last option was introduced via [RFC to give the programmer more control of when to yield the scheduler. This was a noble goal but is weakly implemented. In order to exercise better control, the programmer needs an additional bit of information: the number of times during *this scheduler run* that `received` has been called. As we began to use RFC #19 at Sendence it became clear that is wasn't doing what we wanted. What we hoped to be able to do was read up to X number of messages off the socket, inject them into our application and then give up the scheduler. Our initial implementation was to keep a counter of messages received in our `TCPConnectionNotify` instances and when it hit a number such as 25 or 50, return false to give up the scheduler. This, however, didn't accomplish what we wanted. The following scenario was possible: Scheduler run results in 24 calls to `received`. When the next scheduler run would occur, we'd get 1 more `received` call and return false. What we really wanted was to *read no more than 25 messages per scheduler run*. In order to accomplish this, we added an additional parameter to `TCPConnectionNotify.received`: the number of times during this scheduler run that `received` has been called (inclusive of the existing call). This gives much more fine-grained control over when to "prematurely" give up the scheduler and play nice with other sockets in the system. You might think, "why not lower the max read size"? And this certainly is something you could do, but lowering the max read size, lowers how large of a chunk we read from the socket during a given system call. In the case of a high-throughput system, that will greatly increase the number of system calls thereby lowering performance. --- packages/net/_test.pony | 24 +++++++++++++++------ packages/net/http/_client_conn_handler.pony | 4 +++- packages/net/http/_server_conn_handler.pony | 6 ++++-- packages/net/ssl/ssl_connection.pony | 11 ++++++++-- packages/net/tcp_connection.pony | 6 ++++-- packages/net/tcp_connection_notify.pony | 7 +++++- 6 files changed, 44 insertions(+), 14 deletions(-) diff --git a/packages/net/_test.pony b/packages/net/_test.pony index d9c51bddc30..d7d573d01af 100644 --- a/packages/net/_test.pony +++ b/packages/net/_test.pony @@ -225,7 +225,9 @@ class _TestTCPExpectNotify is TCPConnectionNotify _h.complete_action("expect received") qty - fun ref received(conn: TCPConnection ref, data: Array[U8] val): Bool => + fun ref received(conn: TCPConnection ref, data: Array[U8] val, + times: USize): Bool + => if _frame then _frame = false _expect = 0 @@ -302,7 +304,9 @@ class _TestTCPWritevNotifyServer is TCPConnectionNotify new iso create(h: TestHelper) => _h = h - fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool => + fun ref received(conn: TCPConnection ref, data: Array[U8] iso, + times: USize): Bool + => _buffer.append(consume data) let expected = "hello, hello (from client)" @@ -360,7 +364,9 @@ class _TestTCPMuteReceiveNotify is TCPConnectionNotify _h.complete_action("receiver asks for data") _h.dispose_when_done(conn) - fun ref received(conn: TCPConnection ref, data: Array[U8] val): Bool => + fun ref received(conn: TCPConnection ref, data: Array[U8] val, + times: USize): Bool + => _h.complete(false) true @@ -384,7 +390,9 @@ class _TestTCPMuteSendNotify is TCPConnectionNotify fun ref connect_failed(conn: TCPConnection ref) => _h.fail_action("sender connected") - fun ref received(conn: TCPConnection ref, data: Array[U8] val): Bool => + fun ref received(conn: TCPConnection ref, data: Array[U8] val, + times: USize): Bool + => conn.write("it's sad that you won't ever read this") _h.complete_action("sender sent data") true @@ -434,7 +442,9 @@ class _TestTCPUnmuteReceiveNotify is TCPConnectionNotify conn.unmute() _h.complete_action("receiver unmuted") - fun ref received(conn: TCPConnection ref, data: Array[U8] val): Bool => + fun ref received(conn: TCPConnection ref, data: Array[U8] val, + times: USize): Bool + => _h.complete(true) true @@ -504,7 +514,9 @@ class _TestTCPThrottleSendNotify is TCPConnectionNotify fun ref connect_failed(conn: TCPConnection ref) => _h.fail_action("sender connected") - fun ref received(conn: TCPConnection ref, data: Array[U8] val): Bool => + fun ref received(conn: TCPConnection ref, data: Array[U8] val, + times: USize): Bool + => conn.write("it's sad that you won't ever read this") _h.complete_action("sender sent data") true diff --git a/packages/net/http/_client_conn_handler.pony b/packages/net/http/_client_conn_handler.pony index 6a252b23aa7..e1840a7b8d6 100644 --- a/packages/net/http/_client_conn_handler.pony +++ b/packages/net/http/_client_conn_handler.pony @@ -37,7 +37,9 @@ class _ClientConnHandler is TCPConnectionNotify """ _session._auth_failed(conn) - fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool => + fun ref received(conn: TCPConnection ref, data: Array[U8] iso, + times: USize): Bool + => """ Pass a received chunk of data to the `_HTTPParser`. """ diff --git a/packages/net/http/_server_conn_handler.pony b/packages/net/http/_server_conn_handler.pony index 7f5966c6256..35b99e0e0ff 100644 --- a/packages/net/http/_server_conn_handler.pony +++ b/packages/net/http/_server_conn_handler.pony @@ -15,7 +15,7 @@ class _ServerConnHandler is TCPConnectionNotify var _parser: (_HTTPParser | None) = None var _session: (_ServerConnection | None) = None let _registry: HTTPServer tag - + new iso create(handlermaker: HandlerFactory val, logger: Logger, reversedns: (DNSLookupAuth | None), registry: HTTPServer) => @@ -44,7 +44,9 @@ class _ServerConnHandler is TCPConnectionNotify _parser = _HTTPParser.request(_session as _ServerConnection) end - fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool => + fun ref received(conn: TCPConnection ref, data: Array[U8] iso, + times: USize): Bool + => """ Pass chunks of data to the `HTTPParser` for this session. It will then pass completed information on the the `HTTPSession`. diff --git a/packages/net/ssl/ssl_connection.pony b/packages/net/ssl/ssl_connection.pony index 7505544e56d..2e1a75670dc 100644 --- a/packages/net/ssl/ssl_connection.pony +++ b/packages/net/ssl/ssl_connection.pony @@ -69,7 +69,9 @@ class SSLConnection is TCPConnectionNotify recover val Array[ByteSeq] end - fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool => + fun ref received(conn: TCPConnection ref, data: Array[U8] iso, + times: USize): Bool + => """ Pass the data to the SSL session and check for both new application data and new destination data. @@ -133,11 +135,16 @@ class SSLConnection is TCPConnectionNotify end try + var received_called: USize = 0 + while true do let r = _ssl.read(_expect) if r isnt None then - _notify.received(conn, (consume r) as Array[U8] iso^) + received_called = received_called + 1 + _notify.received(conn, + (consume r) as Array[U8] iso^, + received_called) else break end diff --git a/packages/net/tcp_connection.pony b/packages/net/tcp_connection.pony index b2b7de9f4f1..2fb213d2094 100644 --- a/packages/net/tcp_connection.pony +++ b/packages/net/tcp_connection.pony @@ -667,7 +667,7 @@ actor TCPConnection data.truncate(_read_len) _read_len = 0 - _notify.received(this, consume data) + _notify.received(this, consume data, 1) _read_buf_size() end @@ -708,6 +708,7 @@ actor TCPConnection ifdef not windows then try var sum: USize = 0 + var received_called: USize = 0 while _readable and not _shutdown_peer do if _muted then @@ -741,7 +742,8 @@ actor TCPConnection data.truncate(_read_len) _read_len = 0 - if not _notify.received(this, consume data) then + received_called = received_called + 1 + if not _notify.received(this, consume data, received_called) then _read_buf_size() _read_again() return diff --git a/packages/net/tcp_connection_notify.pony b/packages/net/tcp_connection_notify.pony index bd5703f933a..28a636cdd9b 100644 --- a/packages/net/tcp_connection_notify.pony +++ b/packages/net/tcp_connection_notify.pony @@ -58,12 +58,17 @@ interface TCPConnectionNotify """ data - fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool => + fun ref received(conn: TCPConnection ref, data: Array[U8] iso, + times: USize): Bool + => """ Called when new data is received on the connection. Return true if you want to continue receiving messages without yielding until you read max_size on the TCPConnection. Return false to cause the TCPConnection to yield now. + + Includes the number of times during the current behavior, that received has + been called. This allows the notifier to end reads on a regular basis. """ true