Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TcpStream `write` silently loses one message #2234

Closed
ufoscout opened this issue Feb 12, 2020 · 4 comments
Closed

TcpStream `write` silently loses one message #2234

ufoscout opened this issue Feb 12, 2020 · 4 comments

Comments

@ufoscout
Copy link

@ufoscout ufoscout commented Feb 12, 2020

Version

tokio = 0.2.11

Platform

Linux 4.15.0-76-generic #86-Ubuntu SMP Fri Jan 17 17:24:28 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux

Description

The issue: Given a TcpStream connected to a remote TCP server, when the server goes down, it is still possible to call the write methods on the stream without being notified of the failure.

Below you can find a complete reproducer of the issue.

This is the tested flow:

  1. Start a TCP server
  2. a TcpStream connects to the server
  3. the TcpStream sends 'message_1' -> the message is received by the server
  4. the TcpStream sends 'shutdown' -> the server receives the message and stops itself
  5. the TcpStream sends 'message_2' -> HERE THE ISSUE, in fact, both the write and the flush methods complete successfully, but the message is lost because the server is not available
  6. the TcpStream sends 'message_3' -> at this point the write fails, but it should have failed already on 'message_2'

Is this a bug or a desired behaviour? And, if not a bug, how can we know whether a message was really sent to the server?

Issue reproducer

use tokio::io;
use tokio::io::*;
use tokio::net::{TcpListener, TcpStream};
use tokio::time;

#[tokio::test]
async fn should_not_lose_tcp_requests_1() {

    // start a TCP server
    tokio::spawn(start_server());
    time::delay_until(time::Instant::now() + time::Duration::new(1, 0)).await;

    // Create a TcpStream connected to the server
    let stream = TcpStream::connect("127.0.0.1:8080").await.unwrap();

    let (_, mut send) = io::split(stream);

    // send `message_1' -> Ok
    assert!(send_bytes(&mut send, "message_1\n").await.is_ok());
    time::delay_until(time::Instant::now() + time::Duration::new(1, 0)).await;

    // send 'shutdown' -> Ok, the server is stopped
    assert!(send_bytes(&mut send, "shutdown\n").await.is_ok());
    time::delay_until(time::Instant::now() + time::Duration::new(1, 0)).await;

    // As expected, a new connection fails as the server is not available
    assert!(TcpStream::connect("127.0.0.1:8080").await.is_err());

    // HERE THE ISSUE
    // THIS ASSERT FAILS: the `write` returns `Ok` instead of an error even if the server
    // is not available. Consequently, the message is silently lost.
    assert!(send_bytes(&mut send, "message_2\n").await.is_err());

    // Sending a second message, after the server went down, correctly fails.
    // Anyway, we expected the failure to happen on the previous message.
    assert!(send_bytes(&mut send, "message_3\n").await.is_err());
}

async fn send_bytes<R: AsyncWrite + Unpin>(send: &mut R, message: &str) -> Result<()> {
    send.write_all(message.as_bytes()).await?;
    send.flush().await?;
    println!("message sent: {:?}", message);
    Ok(())
}

async fn start_server() {
    let mut listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();

    println!("TCP listener ready");

    let (socket, _) = listener.accept().await.unwrap();
    let (recv, _) = io::split(socket);
    let mut reader = tokio::io::BufReader::new(recv);

    loop {
        let mut response = String::new();
        reader.read_line(&mut response).await.unwrap();

        println!("received message: {}", response);

        if response.trim().eq("shutdown") {
            break;
        }
    }

    println!("Shutdown TCP server");
}
@Matthias247

This comment has been minimized.

Copy link

@Matthias247 Matthias247 commented Feb 13, 2020

This is more or less a race condition. The state of the client and server is fully decoupled. When one peer observes something it doesn't mean it is visible to the other. E.g. the close might not have yet propagated to the client, and therefore the next send still succeeds. Only a while later it fails. The fact that the listener rejects the next connection attempt but the send still succeeds seems a bit weird. But given that those are 2 different sockets with different states it is still not unexpected.

Regarding flush: Keep in mind that the only guarantee a flush gives you is that something was written into the OS local socket buffer, from where it is transferred asynchronously to the peer. It doesn't give you any indication whether it arrived at the peer. Since a socket has a buffer of >= 64kB, the flushes in your example will always succeed without blocking.

And, if not a bug, how can we know whether a message was really sent to the server?

The only way you can really know is by sending a confirmation on application level. That's e.g. the response in HTTP, or replies in all the RPC protocols. Without those, the client would neither have an idea whether something was delivered nor processed by the server.

Oh, and an interesting fact: Even with a reply, the server won't have any confirmation whether the client received it.

@ufoscout

This comment has been minimized.

Copy link
Author

@ufoscout ufoscout commented Feb 13, 2020

@Matthias247

This is more or less a race condition.

I could be wrong, but I don't think this is a race condition. In fact, this bug happens even you send a message hours after the server went down; this is exactly what happened in our production environment.
You can simulate it by adding a sleep of two minutes, after the server shutdown, in the provided test (i.e. time::delay_until(time::Instant::now() + time::Duration::new(120, 0)).await; ) and you'll notice that it still behaves incorrectly.

I am rewriting a Perl application in Rust and the original application does not have this problem and I guess that Perl uses the same OS socket buffers under the hood.

@gralpli

This comment has been minimized.

Copy link
Contributor

@gralpli gralpli commented Feb 15, 2020

I agree to @Matthias247 that this is a race condition that cannot be fixed at the TCP layer.


@ufoscout try the following: Put the server on a different machine, establish the connection and unplug the network cable between the two. Then on the sender side you can still successfully write messages into the OS-own transmit buffer until it is full (then writes will block). On Linux it takes about 15 minutes until the sender notices that the connection was dropped (the OS tries with growing intervals to resend the transmit buffer but it never receives an ACK anymore and goes into a timeout after about 15 minutes). The receiver will never (I repeat: never) notice that the connection was dropped.

Server:

use std::io::Read;

fn main() {
    let listener = std::net::TcpListener::bind("0.0.0.0:9090").unwrap();

    let (mut stream, _) = listener.accept().unwrap();

    let mut buf = [0; 1024];

    loop {
        match stream.read(&mut buf) {
            Ok(count) if count == 0 => { println!("client closed connection"); break; },
            Ok(count) => println!("read {} bytes", count),
            Err(err) => { println!("error: {}", err); break; },
        }
    }
}

Client:

use std::io::Write;

fn main() {
    let mut stream = std::net::TcpStream::connect("192.168.1.115:9090").unwrap();

    let buf = [b'A'; 1024];

    stream.write(&buf[..]).unwrap();
    println!("sent 1024 bytes");

    println!("now unplug the cable and press enter");

    let mut input = String::new();
    std::io::stdin().read_line(&mut input).unwrap();

    // Let's fill up the TCP send buffer.
    // At some point write() will block and return an error 15 minutes later.

    // Btw, you can replug the cable at any time and the connection will still be alive
    // (it might take a couple seconds/minutes until the TCP stack notices, because of the growing retry intervals).

    let mut total = 0;

    loop {
        stream.write(&buf[..]).unwrap();
        total += 1;
        println!("sent {} KiB", total);
    }
}
@ufoscout

This comment has been minimized.

Copy link
Author

@ufoscout ufoscout commented Feb 17, 2020

@gralpli @Matthias247
thanks for your explanations; I understood it now.
Closing as it is not a bug but the expected TCP behaviour.

@ufoscout ufoscout closed this Feb 17, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked pull requests

Successfully merging a pull request may close this issue.

None yet
3 participants
You can’t perform that action at this time.