Skip to content

Commit

Permalink
Allow TCPConnectionNotify to cause connection to yield while receiving
Browse files Browse the repository at this point in the history
This comes from direct experience using the existing `TCPConnection`
functionality at Sendence. We are heavy users of `expect` on `TCPConnection` in
order to support framed protocols. Our `received` methods on notifiers are
generally of the following form:

```pony
  fun ref received(conn: TCPConnection ref, data: Array[U8] iso) =>
    if _header then
      // convert the 4 byte header into a value for expect, aka payload length
      let expect = Bytes.to_u32(data(0), data(1), data(2), data(3)).usize()

      conn.expect(expect)
      _header = false
    else
      // do something with payload
      ...

      // reset expect for next 4 byte header
        conn.expect(4)
        _header = true
    end
```

This short of usage is why `expect` was initially added to `TCPConnection`.
Upon usage, we found a serious drawback with this approach. `TCPConnection`
will read up to 4k of data on a single behavior run and if there is still data
available, it will then send itself a `_read_again` message to trigger more
reading of additional data. It does this so that it doesn't hogged the
scheduler while reading from the socket. This can work reasonably well in some
scenarios but not others.

In the framed protocol example above, if the message payloads are small then
4k of data can result in a lot of messages being sent from our `received`
method to other actors in the application. In an application that is
continously receiving data, this results in a very bursty scheduling experience.

After consulting with Sylvan, we changed `received` and `TCPConnection` to
allow `received` to return a Boolean to indicate whether `TCPConnection`
should continue sending more data on this behavior run.

We've found that for some workloads, we are able to get equal performance
while greatly lowering latency by having `TCPConnection` call `_read_again`
earlier than it otherwise would.

Closes RFC #19
Closes #1343
  • Loading branch information
SeanTAllen committed Oct 21, 2016
1 parent 79f8e88 commit 102c43f
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ All notable changes to the Pony compiler and standard library will be documented
### Added

- TCP read and write backpressure hooks in `TCPConnection` (issue #1311)
- Allow TCP notifiers to cause connections to yield while receiving (issue #1343)

### Changed

Expand Down
18 changes: 12 additions & 6 deletions packages/net/_test.pony
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class _TestTCPExpectNotify is TCPConnectionNotify
_h.complete_action("expect received")
qty

fun ref received(conn: TCPConnection ref, data: Array[U8] val) =>
fun ref received(conn: TCPConnection ref, data: Array[U8] val): Bool =>
if _frame then
_frame = false
_expect = 0
Expand All @@ -250,6 +250,7 @@ class _TestTCPExpectNotify is TCPConnectionNotify
end

conn.expect(_expect)
true

fun ref _send(conn: TCPConnection ref, data: String) =>
let len = data.size()
Expand Down Expand Up @@ -301,7 +302,7 @@ class _TestTCPWritevNotifyServer is TCPConnectionNotify
new iso create(h: TestHelper) =>
_h = h

fun ref received(conn: TCPConnection ref, data: Array[U8] iso) =>
fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool =>
_buffer.append(consume data)

let expected = "hello, hello (from client)"
Expand All @@ -311,6 +312,7 @@ class _TestTCPWritevNotifyServer is TCPConnectionNotify
_h.assert_eq[String](expected, consume buffer)
_h.complete_action("server receive")
end
true

class iso _TestTCPMute is UnitTest
"""
Expand Down Expand Up @@ -360,8 +362,9 @@ class _TestTCPMuteReceiveNotify is TCPConnectionNotify
_h.complete_action("receiver asks for data")
_h.dispose_when_done(conn)

fun ref received(conn: TCPConnection ref, data: Array[U8] val) =>
fun ref received(conn: TCPConnection ref, data: Array[U8] val): Bool =>
_h.complete(false)
true

class _TestTCPMuteSendNotify is TCPConnectionNotify
"""
Expand All @@ -383,9 +386,10 @@ class _TestTCPMuteSendNotify is TCPConnectionNotify
fun ref connect_failed(conn: TCPConnection ref) =>
_h.fail_action("sender connected")

fun ref received(conn: TCPConnection ref, data: Array[U8] val) =>
fun ref received(conn: TCPConnection ref, data: Array[U8] val): Bool =>
conn.write("it's sad that you won't ever read this")
_h.complete_action("sender sent data")
true

class iso _TestTCPUnmute is UnitTest
"""
Expand Down Expand Up @@ -434,8 +438,9 @@ class _TestTCPUnmuteReceiveNotify is TCPConnectionNotify
conn.unmute()
_h.complete_action("receiver unmuted")

fun ref received(conn: TCPConnection ref, data: Array[U8] val) =>
fun ref received(conn: TCPConnection ref, data: Array[U8] val): Bool =>
_h.complete(true)
true

class iso _TestTCPThrottle is UnitTest
"""
Expand Down Expand Up @@ -505,9 +510,10 @@ class _TestTCPThrottleSendNotify is TCPConnectionNotify
fun ref connect_failed(conn: TCPConnection ref) =>
_h.fail_action("sender connected")

fun ref received(conn: TCPConnection ref, data: Array[U8] val) =>
fun ref received(conn: TCPConnection ref, data: Array[U8] val): Bool =>
conn.write("it's sad that you won't ever read this")
_h.complete_action("sender sent data")
true

fun ref throttled(conn: TCPConnection ref) =>
_throttled_yet = true
Expand Down
3 changes: 2 additions & 1 deletion packages/net/http/_request_builder.pony
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class _RequestBuilder is TCPConnectionNotify

_server = _ServerConnection(_handler, _logger, conn, host)

fun ref received(conn: TCPConnection ref, data: Array[U8] iso) =>
fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool =>
"""
Assemble chunks of data into a request. When we have a whole request,
dispatch it.
Expand All @@ -56,3 +56,4 @@ class _RequestBuilder is TCPConnectionNotify
break
end
end
true
3 changes: 2 additions & 1 deletion packages/net/http/_response_builder.pony
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ class _ResponseBuilder is TCPConnectionNotify
"""
_client._auth_failed(conn)

fun ref received(conn: TCPConnection ref, data: Array[U8] iso) =>
fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool =>
"""
Assemble chunks of data into a response. When we have a whole response,
give it to the client and start a new one.
"""
// TODO: inactivity timer
_buffer.append(consume data)
_dispatch(conn)
true

fun ref closed(conn: TCPConnection ref) =>
"""
Expand Down
3 changes: 2 additions & 1 deletion packages/net/ssl/ssl_connection.pony
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,14 @@ class SSLConnection is TCPConnectionNotify

recover val Array[ByteSeq] end

fun ref received(conn: TCPConnection ref, data: Array[U8] iso) =>
fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool =>
"""
Pass the data to the SSL session and check for both new application data
and new destination data.
"""
_ssl.receive(consume data)
_poll(conn)
true

fun ref expect(conn: TCPConnection ref, qty: USize): USize =>
"""
Expand Down
9 changes: 7 additions & 2 deletions packages/net/tcp_connection.pony
Original file line number Diff line number Diff line change
Expand Up @@ -616,8 +616,13 @@ actor TCPConnection
data.truncate(_read_len)
_read_len = 0

_notify.received(this, consume data)
_read_buf_size()
if not _notify.received(this, consume data) then
_read_buf_size()
_read_again()
return
else
_read_buf_size()
end
end

sum = sum + len
Expand Down
9 changes: 6 additions & 3 deletions packages/net/tcp_connection_notify.pony
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,14 @@ interface TCPConnectionNotify
"""
data

fun ref received(conn: TCPConnection ref, data: Array[U8] iso) =>
fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool =>
"""
Called when new data is received on the connection.
Called when new data is received on the connection. Return true if you
want to continue receiving messages without yielding until you read
max_size on the TCPConnection. Return false to cause the TCPConnection
to yield now.
"""
None
true

fun ref expect(conn: TCPConnection ref, qty: USize): USize =>
"""
Expand Down

0 comments on commit 102c43f

Please sign in to comment.