Skip to content

Flush pending writes before reading from buffer#12

Merged
ioquatix merged 3 commits intosocketry:mainfrom
paddor:fix-flush-before-buffered-read
Mar 27, 2026
Merged

Flush pending writes before reading from buffer#12
ioquatix merged 3 commits intosocketry:mainfrom
paddor:fix-flush-before-buffered-read

Conversation

@paddor
Copy link
Copy Markdown
Contributor

@paddor paddor commented Mar 25, 2026

Problem

When read-ahead pulls more data than a single read/read_exactly call needs, the excess stays in @read_buffer. A subsequent read(size) finds @read_buffer.bytesize >= size and returns directly from the buffer without entering the fill_read_buffer loop — which means flush is never called.

Any data sitting in the write buffer is silently held back.

Deadlock scenario

In a bidirectional protocol (e.g. ZMTP handshake) where two fibers exchange data over a socket pair under Async:

  1. Fiber A writes "A1" (buffered), calls read_exactly(2)fill_read_buffer → flushes "A1", sysread yields (:wait_readable)
  2. Fiber B writes "B1" (buffered), calls read_exactly(2)fill_read_buffer → flushes "B1", sysread reads "A1" → returns
  3. Fiber B writes "B2" (buffered), calls read_exactly(2)fill_read_buffer → flushes "B2", sysread yields
  4. Fiber A resumes, sysread returns "B1B2" (4 bytes — read-ahead pulled both)
  5. read_exactly(2) consumes "B1" (2 bytes), "B2" remains in buffer
  6. Fiber A writes "A2" (buffered), calls read_exactly(2)
  7. @read_buffer.bytesize >= sizeloop skippedflush never called → "A2" never sent
  8. Fiber B blocks forever waiting for "A2"

Reproduction

require "async"
require "io/stream"
require "socket"

Async do
  a, b = UNIXSocket.pair.map { |s| IO::Stream::Buffered.wrap(s) }

  ta = Async do
    a.write("A1")        # buffered
    a.read_exactly(2)    # flush-on-read flushes A1, reads B1
    a.write("A2")        # buffered
    a.read_exactly(2)    # BUG: buffer has B2 from read-ahead, flush skipped, A2 never sent
  end

  tb = Async do
    b.write("B1")        # buffered
    b.read_exactly(2)    # flush-on-read flushes B1, reads A1
    b.write("B2")        # buffered
    b.read_exactly(2)    # flush-on-read flushes B2, blocks forever waiting for A2
  end

  ta.wait; tb.wait
ensure
  a&.close rescue nil
  b&.close rescue nil
end

Fix

Call flush at the top of #read before checking the buffer, so pending writes are always sent before any read — even when the read buffer already has enough data from a previous read-ahead.

Test plan

  • All 330 existing tests pass
  • The reproduction above deadlocks on main, completes with the fix

@paddor paddor force-pushed the fix-flush-before-buffered-read branch from 221a57b to db27c24 Compare March 25, 2026 18:34
paddor added a commit to zeromq/omq that referenced this pull request Mar 25, 2026
Replace hand-rolled SocketIO wrapper and ZMTP.read_exact with
IO::Stream::Buffered from the io-stream gem. This brings read-ahead
buffering (fewer syscalls for frame parsing), automatic TCP_NODELAY,
and read_exactly for exact-byte reads.

Deleted:
- TCP::SocketIO class
- ZMTP.read_exact / Mechanism module
- String#rsplit_host_port monkey-patch
- TCP keepalive options (tcp_keepalive_*)

TCP throughput improved 20-28% from read-ahead buffering.
Uses minimum_write_size: 0 to work around socketry/io-stream#12.
@paddor paddor force-pushed the fix-flush-before-buffered-read branch from db27c24 to 308b84c Compare March 26, 2026 00:02
When read-ahead pulls more data than requested, subsequent reads can
complete from the buffer without calling fill_read_buffer. This skips
the flush call inside fill_read_buffer, leaving pending writes unsent.

In bidirectional protocols (e.g. ZMTP handshake), this causes deadlock:
fiber A's write sits in the buffer while fiber B blocks waiting for it.

Move the flush to the top of #read so it always runs, regardless of
whether the read buffer already has enough data.
@ioquatix ioquatix force-pushed the fix-flush-before-buffered-read branch from 308b84c to d17a0c4 Compare March 27, 2026 02:55
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes a deadlock in bidirectional/Async usage by ensuring buffered writes are flushed even when a read(size) can be satisfied entirely from the existing read-ahead buffer.

Changes:

  • Update IO::Stream::Readable#read to flush pending writes even when the read buffer already contains enough data to satisfy the request.
  • Add an Async regression test using Socket.pair to cover the “read-ahead leaves extra bytes in buffer” scenario.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.

File Description
lib/io/stream/readable.rb Flushes pending writes on buffer-hit paths in #read(size) to prevent write starvation/deadlocks.
test/io/stream/buffered.rb Adds a regression test using Sus::Fixtures::Async::ReactorContext to reproduce/guard the deadlock scenario.


# Don't read less than @minimum_read_size to avoid lots of small reads:
fill_read_buffer(read_size > @minimum_read_size ? read_size : @minimum_read_size)
if @finished or @read_buffer.bytesize >= size
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read(size) now calls flush when @finished is true. That means a buffered read after EOF can raise (e.g. EPIPE/IOError from flushing pending writes) and prevent returning data that’s already in @read_buffer, which is a behavior change from before. If the intent is only to fix the “buffer satisfied size so fill_read_buffer is skipped” case, consider only flushing when @read_buffer.bytesize >= size (or otherwise ensure EOF-buffered reads can still succeed without being interrupted by write-side errors).

Suggested change
if @finished or @read_buffer.bytesize >= size
if @read_buffer.bytesize >= size

Copilot uses AI. Check for mistakes.
Comment on lines +104 to +112
while true
# Compute the amount of data we need to read from the underlying stream:
read_size = size - @read_buffer.bytesize

# Don't read less than @minimum_read_size to avoid lots of small reads:
fill_read_buffer(read_size > @minimum_read_size ? read_size : @minimum_read_size)

break if @finished or @read_buffer.bytesize >= size
end
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This addresses the flush-skipped path for #read, but other APIs (e.g. read_partial, peek, gets/read_until) can also return purely from @read_buffer without calling fill_read_buffer (which is where flushing currently happens). That can leave the same “pending writes never flushed” hazard for those methods. Consider centralizing “flush before serving data from the read buffer” in a shared helper (or applying the same flush-on-buffer-hit behavior to the other read entry points) to keep semantics consistent.

Copilot uses AI. Check for mistakes.
Comment on lines +1047 to +1048
task_a.wait
task_b.wait
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As written, a regression will manifest as a deadlock and this example will hang indefinitely (waiting on task_a/task_b) rather than failing fast. Please wrap the waits in a bounded timeout (using the Async/Sus reactor context utilities) so CI gets a deterministic failure instead of a stuck test run.

Suggested change
task_a.wait
task_b.wait
reactor.with_timeout(1) do
task_a.wait
task_b.wait
end

Copilot uses AI. Check for mistakes.
# flush. This test verifies that pending writes are still flushed.
task_a = reactor.async do
client.write("A1")
data = client.read_exactly(2)
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data = client.read_exactly(2) assigns to data but the value is never used. Consider removing the assignment (or asserting on the returned value) to keep the test focused and avoid unused locals.

Suggested change
data = client.read_exactly(2)
client.read_exactly(2)

Copilot uses AI. Check for mistakes.
@ioquatix ioquatix merged commit b01c4b9 into socketry:main Mar 27, 2026
21 of 24 checks passed
@ioquatix
Copy link
Copy Markdown
Member

ioquatix commented Mar 27, 2026

I merged this, but I'm having 2nd thoughts about it.

Implicit flushing adds overhead and it also messes with interleaved reads and writes across fibers. Also, as copilot pointed out, it would have to go in multiple methods.

  ta = Async do
    a.write("A1")        # buffered
    a.flush              # message finished
    
    a.read_exactly(2)    # flush-on-read flushes A1, reads B1
    a.write("A2")        # buffered
    a.flush              # message finished
    
    a.read_exactly(2)
  end

Honestly, it's probably better to terminate messages with flush explicitly.

@ioquatix
Copy link
Copy Markdown
Member

For example, it's entirely possible to have this:

stream = ...

Async do
  stream.read ...
end

Async do
  stream.write ...
end

It seems like a better direction would be to remove the implicit flush to reduce contention.

@paddor
Copy link
Copy Markdown
Contributor Author

paddor commented Mar 27, 2026

This was interesting to read. But I wanna apologize for wasting your time, probably. Me and Claude were a little too PR-happy. Turns out all we had to do was flush after queueing a message send, duh. No workaround needed. Huge perf wins. Sorry again.

@ioquatix
Copy link
Copy Markdown
Member

You don't need to apologise, I think what we should do is improve the documentation to cover the recommended patterns if not already.

@ioquatix
Copy link
Copy Markdown
Member

@paddor
Copy link
Copy Markdown
Contributor Author

paddor commented Mar 29, 2026

Thanks a lot. Great writeup! In my omq gem I've actually implemented proper batch flushing in the meantime. The send pump waits for data (messages) in the queue, greedy drain (up to 64), flush, so latency is still good. 3-4x better throughput. It's nuts! And not janky!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants