From 7f5fb927228dfebf08eed47d8a73b17f21ef548f Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Tue, 11 Mar 2025 11:28:51 -0600 Subject: [PATCH 01/10] Prototype binary support --- data.bin | Bin 0 -> 211 bytes examples/stdin.rs | 2 +- src/lib.rs | 40 ++++++++++++++++++++++++---------------- 3 files changed, 25 insertions(+), 17 deletions(-) create mode 100644 data.bin diff --git a/data.bin b/data.bin new file mode 100644 index 0000000000000000000000000000000000000000..7c5c95b29db9e8fb42e97018c76ac8463814ff60 GIT binary patch literal 211 zcmeZB&B@7ENGrDJJwq=u`$M5S~(3l3HA%V5m@;nwZgl`**lW z!G<0+sI1V63b3pZSXLn~wOrxW;wk(;s-JhcK$X0ntC|6}rX(Y^C{-aNzg!^^NLS{U t&g${8w=VN?NQbICDPm`6WNh?!=E3-P?Y|~MMK2|LAc-!=D!LaT3IO4_N8SJc literal 0 HcmV?d00001 diff --git a/examples/stdin.rs b/examples/stdin.rs index bf04e55..31b553c 100644 --- a/examples/stdin.rs +++ b/examples/stdin.rs @@ -1,6 +1,6 @@ use stdin_nonblocking::get_stdin_or_default; fn main() { - let input = get_stdin_or_default(Some("backup_value")); + let input = get_stdin_or_default(Some(b"backup_value")); println!("Final input: {:?}", input); } diff --git a/src/lib.rs b/src/lib.rs index ff3f55a..b148d93 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,10 +1,9 @@ #[cfg(doctest)] doc_comment::doctest!("../README.md"); -use std::io::{self, BufRead, IsTerminal}; +use std::io::{self, BufRead, IsTerminal, Read}; use std::sync::mpsc::{self, Receiver, Sender}; use std::thread; -use std::time::Duration; /// Spawns a background thread that continuously reads from stdin as a stream. /// @@ -95,22 +94,31 @@ pub fn spawn_stdin_stream() -> Receiver { /// /// assert_eq!(input, Some("fallback_value".to_string())); /// ``` -pub fn get_stdin_or_default(default: Option<&str>) -> Option { - let stdin_channel = spawn_stdin_stream(); - let mut input = String::new(); - - // Give the reader thread a short time to capture any available input - thread::sleep(Duration::from_millis(50)); - - while let Ok(line) = stdin_channel.try_recv() { - input.push_str(&line); // Collect all lines - input.push('\n'); // Add a newline between lines +/// Reads from stdin if available, otherwise returns a default value. +/// - Works with **both binary and text data**. +/// - Uses blocking mode to capture full piped input. +/// - Returns a `Vec` to avoid UTF-8 errors. +/// +/// # Arguments +/// * `default` - An optional fallback value (used if stdin is empty). +/// +/// # Returns +/// * `Vec` - The full stdin input (or default value as bytes). +pub fn get_stdin_or_default(default: Option<&[u8]>) -> Vec { + // If running interactively, return the default value (to avoid blocking). + if io::stdin().is_terminal() { + return default.unwrap_or(b"").to_vec(); } - // If input was collected, return it. Otherwise, return the default value. - if !input.trim().is_empty() { - Some(input.trim().to_string()) + // Read the entire stdin into a byte buffer + let mut buffer = Vec::new(); + io::stdin() + .read_to_end(&mut buffer) + .expect("Failed to read stdin"); + + if !buffer.is_empty() { + buffer } else { - default.map(|s| s.to_string()) + default.unwrap_or(b"").to_vec() } } From c609d65b02dbdb2fb22b78b5dce80954ff469d35 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Tue, 11 Mar 2025 11:34:21 -0600 Subject: [PATCH 02/10] Ignore *.bin files --- .gitignore | 1 + data.bin | Bin 211 -> 0 bytes 2 files changed, 1 insertion(+) delete mode 100644 data.bin diff --git a/.gitignore b/.gitignore index ea8c4bf..a0e7a17 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ /target +*.bin diff --git a/data.bin b/data.bin deleted file mode 100644 index 7c5c95b29db9e8fb42e97018c76ac8463814ff60..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 211 zcmeZB&B@7ENGrDJJwq=u`$M5S~(3l3HA%V5m@;nwZgl`**lW z!G<0+sI1V63b3pZSXLn~wOrxW;wk(;s-JhcK$X0ntC|6}rX(Y^C{-aNzg!^^NLS{U t&g${8w=VN?NQbICDPm`6WNh?!=E3-P?Y|~MMK2|LAc-!=D!LaT3IO4_N8SJc From 2f116c5d6334c036f93163ad42612330d2d46617 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Tue, 11 Mar 2025 11:34:34 -0600 Subject: [PATCH 03/10] Prototype binary support --- src/bin/test_binary.rs | 2 +- src/lib.rs | 103 +++++++++-------------------------------- 2 files changed, 24 insertions(+), 81 deletions(-) diff --git a/src/bin/test_binary.rs b/src/bin/test_binary.rs index 996baef..390596e 100644 --- a/src/bin/test_binary.rs +++ b/src/bin/test_binary.rs @@ -2,6 +2,6 @@ use stdin_nonblocking::get_stdin_or_default; // Used for integration testing fn main() { - let input = get_stdin_or_default(Some("fallback_value")); + let input = get_stdin_or_default(Some(b"fallback_value")); println!("Received input: {:?}", input); } diff --git a/src/lib.rs b/src/lib.rs index b148d93..6ab4e79 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,69 +1,36 @@ -#[cfg(doctest)] -doc_comment::doctest!("../README.md"); - -use std::io::{self, BufRead, IsTerminal, Read}; +use std::io::{self, IsTerminal, Read}; use std::sync::mpsc::{self, Receiver, Sender}; use std::thread; +use std::time::Duration; -/// Spawns a background thread that continuously reads from stdin as a stream. -/// -/// This function returns an `mpsc Receiver`, allowing non-blocking polling -/// of stdin input just like `spawn_stdin_channel`. +/// Spawns a background thread that continuously reads from stdin as a binary stream. /// /// **Handling Interactive Mode:** /// - If stdin is a terminal (interactive mode), this function immediately returns an empty receiver. /// - This prevents blocking behavior when running interactively. -/// - When reading from a file or pipe, the background thread captures input line by line. +/// - When reading from a file or pipe, the background thread captures input **as raw bytes**. /// /// # Returns -/// A `Receiver` that emits lines from stdin. -/// -/// # Example -/// ``` -/// use stdin_nonblocking::spawn_stdin_stream; -/// use std::sync::mpsc::TryRecvError; -/// use std::time::Duration; -/// -/// let stdin_stream = spawn_stdin_stream(); -/// -/// loop { -/// match stdin_stream.try_recv() { -/// Ok(line) => println!("Received: {}", line), -/// Err(TryRecvError::Empty) => { -/// // No input yet; continue execution -/// } -/// Err(TryRecvError::Disconnected) => { -/// println!("Input stream closed. Exiting..."); -/// break; -/// } -/// } -/// std::thread::sleep(Duration::from_millis(500)); -/// } -/// ``` -pub fn spawn_stdin_stream() -> Receiver { - let (tx, rx): (Sender, Receiver) = mpsc::channel(); +/// A `Receiver>` that emits **binary data** from stdin. +pub fn spawn_stdin_stream() -> Receiver> { + let (tx, rx): (Sender>, Receiver>) = mpsc::channel(); - // If stdin is a terminal, return early (don't block). This check prevents potential blocking - // if the program is running interactively (i.e. the user is typing in the terminal). + // If stdin is a terminal, return early (no blocking). if io::stdin().is_terminal() { return rx; } thread::spawn(move || { + let mut buffer = Vec::new(); let stdin = io::stdin(); let mut stdin_lock = stdin.lock(); - loop { - let mut buffer = String::new(); - match stdin_lock.read_line(&mut buffer) { - Ok(0) => break, // EOF detected, exit thread - Ok(_) => { - if tx.send(buffer.trim().to_string()).is_err() { - break; // Exit if receiver is dropped - } - } - Err(_) => break, // Read failure + match stdin_lock.read_to_end(&mut buffer) { + Ok(0) => return, // EOF, no data + Ok(_) => { + let _ = tx.send(buffer); // Send full binary data } + Err(_) => return, // Read failure } }); @@ -78,47 +45,23 @@ pub fn spawn_stdin_stream() -> Receiver { /// **Handling Interactive Mode:** /// - If running interactively (stdin is a terminal), this function returns the default value immediately. /// - This prevents hanging on waiting for user input in interactive sessions. -/// - When used with redirected input (e.g., from a file or pipe), it collects available input. +/// - When used with redirected input (e.g., from a file or pipe), it collects available **binary** input. /// /// # Arguments /// * `default` - An optional fallback value returned if no input is available. /// /// # Returns -/// * `Option` - The trimmed `stdin` input as a `String` if available, or the provided `default` as a `String` if no input is received. -/// -/// # Example -/// ``` -/// use stdin_nonblocking::get_stdin_or_default; -/// -/// let input = get_stdin_or_default(Some("fallback_value")); -/// -/// assert_eq!(input, Some("fallback_value".to_string())); -/// ``` -/// Reads from stdin if available, otherwise returns a default value. -/// - Works with **both binary and text data**. -/// - Uses blocking mode to capture full piped input. -/// - Returns a `Vec` to avoid UTF-8 errors. -/// -/// # Arguments -/// * `default` - An optional fallback value (used if stdin is empty). -/// -/// # Returns /// * `Vec` - The full stdin input (or default value as bytes). pub fn get_stdin_or_default(default: Option<&[u8]>) -> Vec { - // If running interactively, return the default value (to avoid blocking). - if io::stdin().is_terminal() { - return default.unwrap_or(b"").to_vec(); - } + let stdin_channel = spawn_stdin_stream(); - // Read the entire stdin into a byte buffer - let mut buffer = Vec::new(); - io::stdin() - .read_to_end(&mut buffer) - .expect("Failed to read stdin"); + // Give the reader thread a short time to capture any available input + thread::sleep(Duration::from_millis(50)); - if !buffer.is_empty() { - buffer - } else { - default.unwrap_or(b"").to_vec() + if let Ok(data) = stdin_channel.try_recv() { + return data; } + + // No input available, return the default value + default.unwrap_or(b"").to_vec() } From 7c3569b64e94038f1b63c41deeac07045a2156cd Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Tue, 11 Mar 2025 12:00:36 -0600 Subject: [PATCH 04/10] Update tests to handle binary support --- src/bin/test_binary.rs | 7 +- tests/stdin_tests.rs | 119 +++++++++++++++++++++------------- tokio-example-app/src/main.rs | 23 +++---- 3 files changed, 93 insertions(+), 56 deletions(-) diff --git a/src/bin/test_binary.rs b/src/bin/test_binary.rs index 390596e..776215c 100644 --- a/src/bin/test_binary.rs +++ b/src/bin/test_binary.rs @@ -1,7 +1,12 @@ +use std::io::{self, Write}; use stdin_nonblocking::get_stdin_or_default; // Used for integration testing fn main() { let input = get_stdin_or_default(Some(b"fallback_value")); - println!("Received input: {:?}", input); + + // Print raw binary data instead of Debug format + io::stdout() + .write_all(&input) + .expect("Failed to write output"); } diff --git a/tests/stdin_tests.rs b/tests/stdin_tests.rs index 400e134..ee9e039 100644 --- a/tests/stdin_tests.rs +++ b/tests/stdin_tests.rs @@ -1,10 +1,10 @@ use std::io::Write; use std::process::{Command, Stdio}; -/// Runs an actual workspace binary (`test_binary` or `tokio-example-app`) and -/// provides input via stdin to observe how the binaries respond, rather than -/// mocking the inputs. -fn run_binary(binary: &str, input: &str) -> String { +/// Runs an actual workspace binary (`test_binary` or `tokio-example-app`) +/// and provides **binary input** via stdin. +/// Returns **raw bytes** instead of assuming UTF-8. +fn run_binary(binary: &str, input: &[u8]) -> Vec { // Ensure all binaries are built first let build_status = Command::new("cargo") .args(["build", "--workspace"]) @@ -35,65 +35,96 @@ fn run_binary(binary: &str, input: &str) -> String { .spawn() .unwrap_or_else(|_| panic!("Failed to spawn process: {}", binary)); - // Pass the input to the binary if needed + // Pass binary input to the process if needed if let Some(mut stdin) = child.stdin.take() { if !input.is_empty() { - writeln!(stdin, "{}", input).expect("Failed to write to stdin"); + stdin + .write_all(input) + .expect("Failed to write binary to stdin"); } drop(stdin); // Close stdin explicitly to send EOF } - // Capture and return the output + // Capture binary output let output = child.wait_with_output().expect("Failed to read stdout"); - String::from_utf8(output.stdout).expect("Invalid UTF-8 output") + output.stdout // Return raw binary output } -/// Test `test_binary` with piped input -#[test] -fn test_piped_input_sync_app() { - let output = run_binary("test_binary", "test input"); - assert!(output.contains("Received input: Some(\"test input\")")); +/// **Helper**: Convert binary output to a UTF-8 string safely (lossy conversion). +fn decode_output(output: &[u8]) -> String { + String::from_utf8_lossy(output).to_string() } -/// Test `test_binary` with empty input (should use fallback) +/// **Test binary input handling with raw data** #[test] -fn test_empty_input_sync_app() { - let output = run_binary("test_binary", ""); - assert!( - output.contains("fallback_value"), - "Expected fallback value but got: {}", - output - ); -} +fn test_binary_input_handling() { + { + let binary_input: &[u8] = b"\xDE\xAD\xBE\xEF"; // Arbitrary binary data + let output_bytes = run_binary("test_binary", binary_input); -/// Test `tokio-example-app` with piped input -#[test] -fn test_piped_input_tokio_app() { - let output = run_binary("tokio-example-app", "test input"); - assert!(output.contains("Received input: Some(\"test input\")")); + assert_eq!( + output_bytes, binary_input, + "Expected output to match input, but got: {:?}", + output_bytes + ); + } + + { + let binary_input: &[u8] = b"\xDE\xAD\xBE\xEF"; // Same binary input for tokio-example-app + let output_bytes = run_binary("tokio-example-app", binary_input); + + assert_eq!( + output_bytes, binary_input, + "Expected output to match input, but got: {:?}", + output_bytes + ); + } } -/// Test `tokio-example-app` with empty input (should use fallback) #[test] -fn test_empty_input_tokio_app() { - let output = run_binary("tokio-example-app", ""); - assert!( - output.contains("fallback_value"), - "Expected fallback value but got: {}", - output - ); +fn test_text_input_handling() { + { + let text_input = b"Hello, binary world!"; + let output_bytes = run_binary("test_binary", text_input); + + assert_eq!( + output_bytes, text_input, + "Expected output to match input, but got: {:?}", + output_bytes + ); + } + + { + let text_input = b"Hello, binary world!"; + let output_bytes = run_binary("tokio-example-app", text_input); + + assert_eq!( + output_bytes, text_input, + "Expected output to match input, but got: {:?}", + output_bytes + ); + } } -/// Test reading the README.md file and ensure all content is captured #[test] -fn test_multi_line_content() { - let input = "line1\nline2\r\nline3\rline4\nline5"; +fn test_empty_input() { + { + let output_bytes = run_binary("test_binary", b""); - // Run the binary or command and capture the output - let output = run_binary("test_binary", input); + assert_eq!( + output_bytes, b"fallback_value", + "Expected fallback value but got: {:?}", + output_bytes + ); + } + + { + let output_bytes = run_binary("tokio-example-app", b""); - assert_eq!( - output, - "Received input: Some(\"line1\\nline2\\nline3\\rline4\\nline5\")\n" - ); + assert_eq!( + output_bytes, b"fallback_value", + "Expected fallback value but got: {:?}", + output_bytes + ); + } } diff --git a/tokio-example-app/src/main.rs b/tokio-example-app/src/main.rs index b9884ca..537a49f 100644 --- a/tokio-example-app/src/main.rs +++ b/tokio-example-app/src/main.rs @@ -1,23 +1,24 @@ +use std::io::{self, Write}; use stdin_nonblocking::get_stdin_or_default; use tokio::runtime::Runtime; +use tokio::time::{sleep, Duration}; fn main() { - // Step 1: Call `get_stdin_or_default` synchronously to get stdin input - let input = get_stdin_or_default(Some("fallback_value")); - - println!("Received input: {:?}", input); + // Step 1: Read stdin input (binary-safe) + let input = get_stdin_or_default(Some(b"fallback_value")); // Step 2: Start the Tokio runtime and pass the input to async code let rt = Runtime::new().unwrap(); rt.block_on(async_main(input)); } -// Step 3: Define an async function to process the input -async fn async_main(input: Option) { - println!("Async processing input: {:?}", input); - - // Simulating async work - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; +// Step 3: Define an async function to process binary input +async fn async_main(input: Vec) { + // Simulate async work + sleep(Duration::from_secs(1)).await; - println!("Async work complete."); + // Print raw binary data instead of Debug format + io::stdout() + .write_all(&input) + .expect("Failed to write output"); } From 84512f57bffe2cc74459d47bf44a8160c9753065 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Tue, 11 Mar 2025 12:01:11 -0600 Subject: [PATCH 05/10] Remove non-utilized helper --- tests/stdin_tests.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tests/stdin_tests.rs b/tests/stdin_tests.rs index ee9e039..8b5b34e 100644 --- a/tests/stdin_tests.rs +++ b/tests/stdin_tests.rs @@ -50,11 +50,6 @@ fn run_binary(binary: &str, input: &[u8]) -> Vec { output.stdout // Return raw binary output } -/// **Helper**: Convert binary output to a UTF-8 string safely (lossy conversion). -fn decode_output(output: &[u8]) -> String { - String::from_utf8_lossy(output).to_string() -} - /// **Test binary input handling with raw data** #[test] fn test_binary_input_handling() { From 3d32cd4c8df510b072fdc7f7c5d58bc448d1a258 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Tue, 11 Mar 2025 12:03:23 -0600 Subject: [PATCH 06/10] Update README --- README.md | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index c232e1a..53413f2 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,8 @@ Dependency-less non-blocking `stdin` reader using background threads. Supports streaming and immediate fallback defaults. +Supports **binary data**, streaming, and immediate fallback defaults. + ## Install ```sh @@ -29,9 +31,10 @@ use stdin_nonblocking::get_stdin_or_default; // If running in interactive mode (stdin is a terminal), // `get_stdin_or_default` returns the default value immediately. -let input = get_stdin_or_default(Some("fallback_value")); +let input = get_stdin_or_default(Some(b"fallback_value")); -assert_eq!(input, Some("fallback_value".to_string())); +// Input is always `Vec`, ensuring binary safety. +assert_eq!(input, b"fallback_value".to_vec()); ``` ### Read `stdin` as Stream @@ -47,7 +50,7 @@ let stdin_stream = spawn_stdin_stream(); loop { match stdin_stream.try_recv() { - Ok(line) => println!("Received: {}", line), + Ok(bytes) => println!("Received: {:?}", bytes), // Always raw bytes Err(TryRecvError::Empty) => { // No input yet; continue execution } From d085e8b1c6819bb5f59caf07b8ca192cb4b33139 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Tue, 11 Mar 2025 12:06:27 -0600 Subject: [PATCH 07/10] Update docs --- src/lib.rs | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 6ab4e79..393d3e2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,9 @@ use std::time::Duration; /// Spawns a background thread that continuously reads from stdin as a binary stream. /// +/// This function returns an `mpsc Receiver`, allowing non-blocking polling +/// of stdin input just like `spawn_stdin_channel`. +/// /// **Handling Interactive Mode:** /// - If stdin is a terminal (interactive mode), this function immediately returns an empty receiver. /// - This prevents blocking behavior when running interactively. @@ -12,6 +15,29 @@ use std::time::Duration; /// /// # Returns /// A `Receiver>` that emits **binary data** from stdin. +/// +/// # Example +/// ``` +/// use stdin_nonblocking::spawn_stdin_stream; +/// use std::sync::mpsc::TryRecvError; +/// use std::time::Duration; +/// +/// let stdin_stream = spawn_stdin_stream(); +/// +/// loop { +/// match stdin_stream.try_recv() { +/// Ok(bytes) => println!("Received: {:?}", bytes), // Always raw bytes +/// Err(TryRecvError::Empty) => { +/// // No input yet; continue execution +/// } +/// Err(TryRecvError::Disconnected) => { +/// println!("Input stream closed. Exiting..."); +/// break; +/// } +/// } +/// std::thread::sleep(Duration::from_millis(500)); +/// } +/// ``` pub fn spawn_stdin_stream() -> Receiver> { let (tx, rx): (Sender>, Receiver>) = mpsc::channel(); @@ -44,7 +70,7 @@ pub fn spawn_stdin_stream() -> Receiver> { /// /// **Handling Interactive Mode:** /// - If running interactively (stdin is a terminal), this function returns the default value immediately. -/// - This prevents hanging on waiting for user input in interactive sessions. +/// - This prevents hanging while waiting for user input in interactive sessions. /// - When used with redirected input (e.g., from a file or pipe), it collects available **binary** input. /// /// # Arguments @@ -52,6 +78,15 @@ pub fn spawn_stdin_stream() -> Receiver> { /// /// # Returns /// * `Vec` - The full stdin input (or default value as bytes). +/// +/// # Example +/// ``` +/// use stdin_nonblocking::get_stdin_or_default; +/// +/// let input = get_stdin_or_default(Some(b"fallback_value")); +/// +/// assert_eq!(input, b"fallback_value".to_vec()); +/// ``` pub fn get_stdin_or_default(default: Option<&[u8]>) -> Vec { let stdin_channel = spawn_stdin_stream(); From 27d357f4e2b213cfb4d20c586e3c82a33581ae2c Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Tue, 11 Mar 2025 12:07:03 -0600 Subject: [PATCH 08/10] Bump to v0.3.0 --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4c5b926..ec81851 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -94,7 +94,7 @@ checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" [[package]] name = "stdin-nonblocking" -version = "0.2.1" +version = "0.3.0" dependencies = [ "doc-comment", ] diff --git a/Cargo.toml b/Cargo.toml index 90a7c74..b9c0475 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "stdin-nonblocking" -version = "0.2.1" +version = "0.3.0" authors = ["Jeremy Harris "] description = "Dependency-less non-blocking stdin reader using background threads. Supports streaming and immediate fallback defaults." repository = "https://github.com/jzombie/rust-stdin-nonblocking" From 025ce3019bdc3b89b4e7d398f3ed91303cb04b7a Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Tue, 11 Mar 2025 12:08:16 -0600 Subject: [PATCH 09/10] Fix Clippy warning --- src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 393d3e2..9f3f1b5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -52,11 +52,11 @@ pub fn spawn_stdin_stream() -> Receiver> { let mut stdin_lock = stdin.lock(); match stdin_lock.read_to_end(&mut buffer) { - Ok(0) => return, // EOF, no data + Ok(0) => (), // EOF, no data Ok(_) => { let _ = tx.send(buffer); // Send full binary data } - Err(_) => return, // Read failure + Err(_) => (), // Read failure } }); From ec6264ee0b7b38509648aef8dbd846225b16473a Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Tue, 11 Mar 2025 12:23:05 -0600 Subject: [PATCH 10/10] Update tokio example to use streaming version --- Cargo.lock | 171 ++++++++++++++++++++++++++++++++++ tokio-example-app/Cargo.toml | 2 +- tokio-example-app/src/main.rs | 59 ++++++++---- 3 files changed, 214 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ec81851..f960839 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,12 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "autocfg" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" + [[package]] name = "backtrace" version = "0.3.74" @@ -32,6 +38,18 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "bitflags" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" + +[[package]] +name = "bytes" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" + [[package]] name = "cfg-if" version = "1.0.0" @@ -56,6 +74,16 @@ version = "0.2.170" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "875b3680cb2f8f71bdcf9a30f38d48282f5d3c95cbf9b3fa57269bb5d5c06828" +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "memchr" version = "2.7.4" @@ -71,6 +99,17 @@ dependencies = [ "adler2", ] +[[package]] +name = "mio" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" +dependencies = [ + "libc", + "wasi", + "windows-sys", +] + [[package]] name = "object" version = "0.36.7" @@ -80,18 +119,99 @@ dependencies = [ "memchr", ] +[[package]] +name = "parking_lot" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + [[package]] name = "pin-project-lite" version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +[[package]] +name = "proc-macro2" +version = "1.0.94" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a31971752e70b8b2686d7e46ec17fb38dad4051d94024c88df49b667caea9c84" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1f1914ce909e1658d9907913b4b91947430c7d9be598b15a1912935b8c04801" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "redox_syscall" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b8c0c260b63a8219631167be35e6a988e9554dbd323f8bd08439c8ed1302bd1" +dependencies = [ + "bitflags", +] + [[package]] name = "rustc-demangle" version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + +[[package]] +name = "smallvec" +version = "1.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcf8323ef1faaee30a44a340193b1ac6814fd9b7b4e88e9d4519a3e4abe1cfd" + +[[package]] +name = "socket2" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8" +dependencies = [ + "libc", + "windows-sys", +] + [[package]] name = "stdin-nonblocking" version = "0.3.0" @@ -99,6 +219,17 @@ dependencies = [ "doc-comment", ] +[[package]] +name = "syn" +version = "2.0.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b09a44accad81e1ba1cd74a32461ba89dee89095ba17b32f5d03683b1b1fc2a0" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "tokio" version = "1.43.0" @@ -106,7 +237,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" dependencies = [ "backtrace", + "bytes", + "libc", + "mio", + "parking_lot", "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys", ] [[package]] @@ -117,6 +256,38 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-macros" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "unicode-ident" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets", +] + [[package]] name = "windows-targets" version = "0.52.6" diff --git a/tokio-example-app/Cargo.toml b/tokio-example-app/Cargo.toml index 15c6d3e..35e2a30 100644 --- a/tokio-example-app/Cargo.toml +++ b/tokio-example-app/Cargo.toml @@ -5,6 +5,6 @@ edition = "2021" publish = false [dependencies] -tokio = { version = "1.43.0", features = ["rt-multi-thread", "time"] } +tokio = { version = "1.43.0", features = ["full"] } stdin-nonblocking = { path = "../" } diff --git a/tokio-example-app/src/main.rs b/tokio-example-app/src/main.rs index 537a49f..b73ad0c 100644 --- a/tokio-example-app/src/main.rs +++ b/tokio-example-app/src/main.rs @@ -1,24 +1,49 @@ use std::io::{self, Write}; -use stdin_nonblocking::get_stdin_or_default; -use tokio::runtime::Runtime; +use std::thread; +use stdin_nonblocking::spawn_stdin_stream; +use tokio::sync::mpsc; use tokio::time::{sleep, Duration}; -fn main() { - // Step 1: Read stdin input (binary-safe) - let input = get_stdin_or_default(Some(b"fallback_value")); +/// Maximum buffer size for async channel +const BUFFER_SIZE: usize = 10; +const FALLBACK_VALUE: &[u8] = b"fallback_value"; - // Step 2: Start the Tokio runtime and pass the input to async code - let rt = Runtime::new().unwrap(); - rt.block_on(async_main(input)); -} +#[tokio::main] +async fn main() { + // Step 1: Start the blocking stdin reader + let blocking_stdin_stream = spawn_stdin_stream(); // std::sync::mpsc::Receiver> + + // Step 2: Create an async Tokio channel + let (tx, mut rx) = mpsc::channel::>(BUFFER_SIZE); + + // Step 3: Spawn a thread to forward data from std::sync::mpsc to Tokio mpsc + thread::spawn(move || { + while let Ok(chunk) = blocking_stdin_stream.recv() { + if tx.blocking_send(chunk).is_err() { + break; // If the receiver is closed, stop forwarding + } + } + }); + + // Step 4: Process the async stream of binary input + let mut received_any = false; + + while let Some(chunk) = rx.recv().await { + received_any = true; + + // Simulate async work per chunk + sleep(Duration::from_millis(100)).await; -// Step 3: Define an async function to process binary input -async fn async_main(input: Vec) { - // Simulate async work - sleep(Duration::from_secs(1)).await; + // Print raw binary data as it arrives + io::stdout() + .write_all(&chunk) + .expect("Failed to write output"); + } - // Print raw binary data instead of Debug format - io::stdout() - .write_all(&input) - .expect("Failed to write output"); + // Step 5: If no input was received, print the fallback value + if !received_any { + io::stdout() + .write_all(FALLBACK_VALUE) + .expect("Failed to write fallback value"); + } }