Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tokio in current_thread not releasing open file handles once the limit is reached. #4782

Closed
nohupped opened this issue Jun 21, 2022 · 12 comments · Fixed by #6221
Closed

Tokio in current_thread not releasing open file handles once the limit is reached. #4782

nohupped opened this issue Jun 21, 2022 · 12 comments · Fixed by #6221
Assignees
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. E-help-wanted Call for participation: Help is requested to fix this issue. E-medium Call for participation: Experience needed to fix: Medium / intermediate M-coop Module: tokio/coop M-net Module: tokio/net

Comments

@nohupped
Copy link

nohupped commented Jun 21, 2022

Version

cargo tree | grep tokio

└── tokio v1.19.2
    └── tokio-macros v1.8.0 (proc-macro)

Platform
The output of uname -a (UNIX), or version and 32 or 64-bit (Windows)

Linux pdebian 5.10.0-10-amd64 #1 SMP Debian 5.10.84-1 (2021-12-08) x86_64 GNU/Linux

Description

While reading from a Unix Socket over a buffered stream, the tokio executor in current_thread doesn't release open files once the open file OS limit is hit and will not accept any more incoming requests, but if the number of open files stays within the OS limit, the file handles are released.

If the executor is switched to a multi_threaded one, this issue doesn't happen. Although the open file limits are hit, it frees the openfiles and the program continues to work.

I am not sure if I am using the BufStream wrong or I might have overlooked something.

I tried this code:

A minimal version of the server code that accepts connection over a unix socket and prints it

src/main.rs

use anyhow::{Context, Result};
use clap::{App, Arg};
use env_logger::{Builder, Env};
use log::{error, info};
use tokio::{
    io::{AsyncBufReadExt, AsyncWriteExt, BufStream},
    net::{UnixListener, UnixStream},
    runtime,
    // time::{timeout, Duration},
};

pub static SOCK_PATH: &str = "/var/run/sock.sock";

fn main() -> Result<()> {
    Builder::from_env(Env::default().default_filter_or("info")).init();
    let clap_app = App::new("openfiles")
        .version(env!("CARGO_PKG_VERSION"))
        .author(env!("CARGO_PKG_AUTHORS"))
        .about("checking tokio openfiles")
        .arg(
            Arg::new("worker-threads")
                .long("worker-threads")
                .takes_value(true)
                .help("number of worker threads. 0 = current_thread. >0 = worker_threads")
                .default_value("1")
                .global(true),
        )
        .get_matches();
    let threads = clap_app
        .value_of("worker-threads")
        .unwrap()
        .parse::<usize>()
        .unwrap();
    let rt = match threads {
        0 => {
            info!("running in current thread");
            runtime::Builder::new_current_thread()
                .enable_all()
                .build()
                .context("cannot create runtime")?
        }
        multi => {
            info!("worker_threads: {}", multi);
            runtime::Builder::new_multi_thread()
                .worker_threads(multi)
                .enable_all()
                .thread_name("foobar")
                .build()
                .context("cannot create runtime")?
        }
    };

    let handle = rt.handle();
    let _enter_guard = handle.enter();
    let _ = std::fs::remove_file(SOCK_PATH);
    let listener = UnixListener::bind(SOCK_PATH).unwrap();
    rt.block_on(async move { run_listener(listener).await });
    Ok(())
}

pub async fn run_listener(listener: UnixListener) {
    loop {
        match listener.accept().await {
            Ok((stream, _)) => {
                info!("Received incoming");
                tokio::task::spawn(async move {
                    match handle_client(stream).await {
                        Ok(_) => (),
                        Err(err) => error!("error handling client, error: {}", err),
                    }
                });
            }
            Err(err) => {
                error!("error accepting connection, error: {}", err);
            }
        }
    }
}

async fn handle_client(stream: UnixStream) -> Result<()> {
    let mut buf_stream = BufStream::new(stream);
    let mut line = String::new();
    buf_stream.read_line(&mut line).await?;

    info!("Received request: {}", line);
    buf_stream.write_all(b"END\r\n").await?;
    buf_stream.shutdown().await?;
    drop(buf_stream);
    Ok(())
}

Client code that generates parallel requests

src/client/main.rs

use anyhow::{Context, Result};
use tokio::{
    io::{AsyncWriteExt, BufStream},
    net::UnixStream,
    runtime,
};

pub static SOCK_PATH: &str = "/var/run/sock.sock";

fn main() -> Result<()> {
    let rt = runtime::Builder::new_multi_thread()
        .enable_all()
        .thread_name("resolver-core")
        .build()
        .context("cannot create runtime")?;
    rt.block_on(run_client())?;

    Ok(())
}

async fn run_client() -> Result<()> {
    loop {
        let listener = UnixStream::connect(SOCK_PATH).await?;
        let mut buf_stream = BufStream::new(listener);
        tokio::spawn(async move {
            match buf_stream.write_all(b"foobar\r\n").await {
                Ok(_) => (),
                Err(err) => {
                    println!("write_all error:: {}", err);
                }
            };

            match buf_stream.flush().await {
                Ok(_) => (),
                Err(err) => {
                    println!("flush error:: {}", err);
                }
            };
        });
    }
}

cargo.toml

[package]
name = "foobar"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[[bin]]
name = "server"
path = "src/main.rs"

[[bin]]
name = "client"
path = "src/client/main.rs"

[dependencies]
tokio = { version = "1.15.0", features = ["full"] }
clap = "3.0.13"
anyhow = "1.0.54"
log = "0.4.17"
env_logger = "0.9.0"

I expected to see this happen:

When the server is run as

./server --worker-threads=0

and when the client is run as

./client

I expect to see

[2022-06-21T19:55:57Z ERROR server] error accepting connection, error: Too many open files (os error 24)

from the server's stdout once the ulimit to openfiles are hit, but I also expect it to recover after the client is stopped.

Instead, this happened:

With the above code, the server keeps on printing

[2022-06-21T19:55:57Z ERROR server] error accepting connection, error: Too many open files (os error 24)

even after the client is exited. I watched the output of lsof for some time and it was as below

sudo lsof -p 2704488 | grep -ic sock
1015

The number of open file handles to the socket never comes down because it was never released.

Note:

This happens only when running in current_thread. If the executer is switched to multi_threaded by running the server as
./server --worker-threads=1, even if the server hits open file limit, it recovers and lsof output shows the number of open filehandles to the socket coming down.
I tried to reproduce this in a docker running on my mac, but it didn't occur. I tried running this on baremetal linux and linux running on vmware fusion and I was able to reproduce this.
I have this code added into my repo if anybody want to try it locally on a linux machine. (https://github.com/nohupped/buggy)

@nohupped nohupped added A-tokio Area: The main tokio crate C-bug Category: This is a bug. labels Jun 21, 2022
@Noah-Kennedy Noah-Kennedy self-assigned this Jun 21, 2022
@Noah-Kennedy
Copy link
Contributor

@nohupped this reproduces more reliably:
bin/server.rs

use std::fs::remove_file;
use tokio::net::{UnixListener, UnixStream};
use ulimit_echo::SOCK;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let _ = remove_file(SOCK);

    let sock = UnixListener::bind(SOCK).unwrap();

    loop {
        match sock.accept().await {
            Ok((stream, _)) => {
                tokio::spawn(handle_stream(stream));
            },
            Err(e) => eprintln!("Failed to accept: {:?}", e),
        }
    }
}

async fn handle_stream(mut stream: UnixStream) {
    let (mut rx, mut tx) = stream.split();
    tokio::io::copy(&mut rx, &mut tx).await.unwrap();
}

bin/client.rs

use tokio::io::AsyncWriteExt;
use tokio::net::UnixStream;
use ulimit_echo::SOCK;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    loop {
        match UnixStream::connect(SOCK).await {
            Ok(stream) => {
                tokio::spawn(handle_stream(stream));
            }
            Err(e) => eprintln!("Error in creating stream: {:?}", e),
        }
    }
}

async fn handle_stream(mut stream: UnixStream) {
    stream.write_all(b"UNLIMITED FILES!!!").await.unwrap();

    let (mut rx, mut tx) = stream.split();
    tokio::io::copy(&mut rx, &mut tx).await.unwrap();
}

lib.rs

pub const SOCK: &str = "sock";

@Noah-Kennedy
Copy link
Contributor

Adding tokio::task::yield_now().await to the error branch in the server fixes it. It appears that we are busy looping in the listener task, preventing the connections from closing properly.

@Noah-Kennedy
Copy link
Contributor

Might be a budgeting issue?

@Noah-Kennedy
Copy link
Contributor

I'll look at this more later tonight.

@Noah-Kennedy
Copy link
Contributor

Using the unstable tokio::task::consume_budget also fixes this.

@Darksonn Darksonn added M-net Module: tokio/net M-coop Module: tokio/coop labels Jun 22, 2022
@Darksonn
Copy link
Contributor

Darksonn commented Jun 22, 2022

Yes, this does appear to be due to connect not consuming budget when it returns an error. Though in a production application you should probably use a limit on the number of connections (probably a Semaphore) to avoid running out of fds in the first place.

@Noah-Kennedy
Copy link
Contributor

@Darksonn I think the issue is in accept, not connect.

@Darksonn
Copy link
Contributor

Right, good point. That was a typo.

@Noah-Kennedy
Copy link
Contributor

@nohupped workaround for now is to use yield_now when accept returns that error. If you are using --cfg tokio-unstable, you also have the option of consume_budget.

@sfackler
Copy link
Contributor

More generally though, you probably don't want to be hot-looping over accept errors. Sleeping for a bit of time after an error comes out of accept would keep the CPU from needlessly getting slammed.

@Noah-Kennedy
Copy link
Contributor

@sfackler I agree completely especially with a ulimit issue, however I do think that it is problematic that we just busy-loop here.

@nohupped
Copy link
Author

nohupped commented Jun 22, 2022

@nohupped workaround for now is to use yield_now when accept returns that error. If you are using --cfg tokio-unstable, you also have the option of consume_budget.

Thank you. I tried the yield_now, and that's working for me.

More generally though, you probably don't want to be hot-looping over accept errors. Sleeping for a bit of time after an error comes out of accept would keep the CPU from needlessly getting slammed.

Thank you. I'm planning to add that. Before using a match, I was using an if let Ok((stream, _)) = listener.accept().await, so with that, it was busy looping by default.

@Darksonn Darksonn added E-help-wanted Call for participation: Help is requested to fix this issue. E-medium Call for participation: Experience needed to fix: Medium / intermediate labels Apr 16, 2023
Noah-Kennedy pushed a commit that referenced this issue Dec 16, 2023
Fixes #5946, #4782.

This change adds budgeting to most of the remaining unbudgeted IO operations which can complete instantly, including datagram send/recv operations and listener socket accepts.

This is particularly significant for scenarios in which resource limits are hit, as it can be common for things like listener tasks to spin when receiving errors and just log them, busy looping worker threads which might otherwise be handling existing connections and closing them.

This can also sometimes lead to complex failure scenarios within datagram systems experiencing resource exhaustion.
Noah-Kennedy pushed a commit that referenced this issue Dec 16, 2023
Fixes #5946, #4782.

This change adds budgeting to most of the remaining unbudgeted IO operations which can complete instantly, including datagram send/recv operations and listener socket accepts.

This is particularly significant for scenarios in which resource limits are hit, as it can be common for things like listener tasks to spin when receiving errors and just log them, busy looping worker threads which might otherwise be handling existing connections and closing them.

This can also sometimes lead to complex failure scenarios within datagram systems experiencing resource exhaustion.
Noah-Kennedy pushed a commit that referenced this issue Dec 16, 2023
Fixes #5946.
Fixes #4782.

This change adds budgeting to most of the remaining unbudgeted IO operations which can complete instantly, including datagram send/recv operations and listener socket accepts.

This is particularly significant for scenarios in which resource limits are hit, as it can be common for things like listener tasks to spin when receiving errors and just log them, busy looping worker threads which might otherwise be handling existing connections and closing them.

This can also sometimes lead to complex failure scenarios within datagram systems experiencing resource exhaustion.
Noah-Kennedy pushed a commit that referenced this issue Dec 16, 2023
Fixes #5946.
Fixes #4782.

This change adds budgeting to most of the remaining unbudgeted IO operations which can complete instantly, including datagram send/recv operations and listener socket accepts.

This is particularly significant for scenarios in which resource limits are hit, as it can be common for things like listener tasks to spin when receiving errors and just log them, busy looping worker threads which might otherwise be handling existing connections and closing them.

This can also sometimes lead to complex failure scenarios within datagram systems experiencing resource exhaustion.
Noah-Kennedy pushed a commit that referenced this issue Dec 16, 2023
Fixes #5946.
Fixes #4782.

This change adds budgeting to most of the remaining unbudgeted IO operations which can complete instantly, including datagram send/recv operations and listener socket accepts.

This is particularly significant for scenarios in which resource limits are hit, as it can be common for things like listener tasks to spin when receiving errors and just log them, busy looping worker threads which might otherwise be handling existing connections and closing them.

This can also sometimes lead to complex failure scenarios within datagram systems experiencing resource exhaustion.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. E-help-wanted Call for participation: Help is requested to fix this issue. E-medium Call for participation: Experience needed to fix: Medium / intermediate M-coop Module: tokio/coop M-net Module: tokio/net
Projects
None yet
4 participants