diff --git a/.gitignore b/.gitignore index ea8c4bf..a0e7a17 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ /target +*.bin diff --git a/Cargo.lock b/Cargo.lock index 4c5b926..f960839 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,12 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "autocfg" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" + [[package]] name = "backtrace" version = "0.3.74" @@ -32,6 +38,18 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "bitflags" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" + +[[package]] +name = "bytes" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" + [[package]] name = "cfg-if" version = "1.0.0" @@ -56,6 +74,16 @@ version = "0.2.170" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "875b3680cb2f8f71bdcf9a30f38d48282f5d3c95cbf9b3fa57269bb5d5c06828" +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "memchr" version = "2.7.4" @@ -71,6 +99,17 @@ dependencies = [ "adler2", ] +[[package]] +name = "mio" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" +dependencies = [ + "libc", + "wasi", + "windows-sys", +] + [[package]] name = "object" version = "0.36.7" @@ -80,25 +119,117 @@ dependencies = [ "memchr", ] +[[package]] +name = "parking_lot" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + [[package]] name = "pin-project-lite" version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +[[package]] +name = "proc-macro2" +version = "1.0.94" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a31971752e70b8b2686d7e46ec17fb38dad4051d94024c88df49b667caea9c84" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1f1914ce909e1658d9907913b4b91947430c7d9be598b15a1912935b8c04801" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "redox_syscall" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b8c0c260b63a8219631167be35e6a988e9554dbd323f8bd08439c8ed1302bd1" +dependencies = [ + "bitflags", +] + [[package]] name = "rustc-demangle" version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + +[[package]] +name = "smallvec" +version = "1.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcf8323ef1faaee30a44a340193b1ac6814fd9b7b4e88e9d4519a3e4abe1cfd" + +[[package]] +name = "socket2" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8" +dependencies = [ + "libc", + "windows-sys", +] + [[package]] name = "stdin-nonblocking" -version = "0.2.1" +version = "0.3.0" dependencies = [ "doc-comment", ] +[[package]] +name = "syn" +version = "2.0.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b09a44accad81e1ba1cd74a32461ba89dee89095ba17b32f5d03683b1b1fc2a0" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "tokio" version = "1.43.0" @@ -106,7 +237,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" dependencies = [ "backtrace", + "bytes", + "libc", + "mio", + "parking_lot", "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys", ] [[package]] @@ -117,6 +256,38 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-macros" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "unicode-ident" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets", +] + [[package]] name = "windows-targets" version = "0.52.6" diff --git a/Cargo.toml b/Cargo.toml index 90a7c74..b9c0475 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "stdin-nonblocking" -version = "0.2.1" +version = "0.3.0" authors = ["Jeremy Harris "] description = "Dependency-less non-blocking stdin reader using background threads. Supports streaming and immediate fallback defaults." repository = "https://github.com/jzombie/rust-stdin-nonblocking" diff --git a/README.md b/README.md index c232e1a..53413f2 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,8 @@ Dependency-less non-blocking `stdin` reader using background threads. Supports streaming and immediate fallback defaults. +Supports **binary data**, streaming, and immediate fallback defaults. + ## Install ```sh @@ -29,9 +31,10 @@ use stdin_nonblocking::get_stdin_or_default; // If running in interactive mode (stdin is a terminal), // `get_stdin_or_default` returns the default value immediately. -let input = get_stdin_or_default(Some("fallback_value")); +let input = get_stdin_or_default(Some(b"fallback_value")); -assert_eq!(input, Some("fallback_value".to_string())); +// Input is always `Vec`, ensuring binary safety. +assert_eq!(input, b"fallback_value".to_vec()); ``` ### Read `stdin` as Stream @@ -47,7 +50,7 @@ let stdin_stream = spawn_stdin_stream(); loop { match stdin_stream.try_recv() { - Ok(line) => println!("Received: {}", line), + Ok(bytes) => println!("Received: {:?}", bytes), // Always raw bytes Err(TryRecvError::Empty) => { // No input yet; continue execution } diff --git a/examples/stdin.rs b/examples/stdin.rs index bf04e55..31b553c 100644 --- a/examples/stdin.rs +++ b/examples/stdin.rs @@ -1,6 +1,6 @@ use stdin_nonblocking::get_stdin_or_default; fn main() { - let input = get_stdin_or_default(Some("backup_value")); + let input = get_stdin_or_default(Some(b"backup_value")); println!("Final input: {:?}", input); } diff --git a/src/bin/test_binary.rs b/src/bin/test_binary.rs index 996baef..776215c 100644 --- a/src/bin/test_binary.rs +++ b/src/bin/test_binary.rs @@ -1,7 +1,12 @@ +use std::io::{self, Write}; use stdin_nonblocking::get_stdin_or_default; // Used for integration testing fn main() { - let input = get_stdin_or_default(Some("fallback_value")); - println!("Received input: {:?}", input); + let input = get_stdin_or_default(Some(b"fallback_value")); + + // Print raw binary data instead of Debug format + io::stdout() + .write_all(&input) + .expect("Failed to write output"); } diff --git a/src/lib.rs b/src/lib.rs index ff3f55a..9f3f1b5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,12 +1,9 @@ -#[cfg(doctest)] -doc_comment::doctest!("../README.md"); - -use std::io::{self, BufRead, IsTerminal}; +use std::io::{self, IsTerminal, Read}; use std::sync::mpsc::{self, Receiver, Sender}; use std::thread; use std::time::Duration; -/// Spawns a background thread that continuously reads from stdin as a stream. +/// Spawns a background thread that continuously reads from stdin as a binary stream. /// /// This function returns an `mpsc Receiver`, allowing non-blocking polling /// of stdin input just like `spawn_stdin_channel`. @@ -14,10 +11,10 @@ use std::time::Duration; /// **Handling Interactive Mode:** /// - If stdin is a terminal (interactive mode), this function immediately returns an empty receiver. /// - This prevents blocking behavior when running interactively. -/// - When reading from a file or pipe, the background thread captures input line by line. +/// - When reading from a file or pipe, the background thread captures input **as raw bytes**. /// /// # Returns -/// A `Receiver` that emits lines from stdin. +/// A `Receiver>` that emits **binary data** from stdin. /// /// # Example /// ``` @@ -29,7 +26,7 @@ use std::time::Duration; /// /// loop { /// match stdin_stream.try_recv() { -/// Ok(line) => println!("Received: {}", line), +/// Ok(bytes) => println!("Received: {:?}", bytes), // Always raw bytes /// Err(TryRecvError::Empty) => { /// // No input yet; continue execution /// } @@ -41,30 +38,25 @@ use std::time::Duration; /// std::thread::sleep(Duration::from_millis(500)); /// } /// ``` -pub fn spawn_stdin_stream() -> Receiver { - let (tx, rx): (Sender, Receiver) = mpsc::channel(); +pub fn spawn_stdin_stream() -> Receiver> { + let (tx, rx): (Sender>, Receiver>) = mpsc::channel(); - // If stdin is a terminal, return early (don't block). This check prevents potential blocking - // if the program is running interactively (i.e. the user is typing in the terminal). + // If stdin is a terminal, return early (no blocking). if io::stdin().is_terminal() { return rx; } thread::spawn(move || { + let mut buffer = Vec::new(); let stdin = io::stdin(); let mut stdin_lock = stdin.lock(); - loop { - let mut buffer = String::new(); - match stdin_lock.read_line(&mut buffer) { - Ok(0) => break, // EOF detected, exit thread - Ok(_) => { - if tx.send(buffer.trim().to_string()).is_err() { - break; // Exit if receiver is dropped - } - } - Err(_) => break, // Read failure + match stdin_lock.read_to_end(&mut buffer) { + Ok(0) => (), // EOF, no data + Ok(_) => { + let _ = tx.send(buffer); // Send full binary data } + Err(_) => (), // Read failure } }); @@ -78,39 +70,33 @@ pub fn spawn_stdin_stream() -> Receiver { /// /// **Handling Interactive Mode:** /// - If running interactively (stdin is a terminal), this function returns the default value immediately. -/// - This prevents hanging on waiting for user input in interactive sessions. -/// - When used with redirected input (e.g., from a file or pipe), it collects available input. +/// - This prevents hanging while waiting for user input in interactive sessions. +/// - When used with redirected input (e.g., from a file or pipe), it collects available **binary** input. /// /// # Arguments /// * `default` - An optional fallback value returned if no input is available. /// /// # Returns -/// * `Option` - The trimmed `stdin` input as a `String` if available, or the provided `default` as a `String` if no input is received. +/// * `Vec` - The full stdin input (or default value as bytes). /// /// # Example /// ``` /// use stdin_nonblocking::get_stdin_or_default; /// -/// let input = get_stdin_or_default(Some("fallback_value")); +/// let input = get_stdin_or_default(Some(b"fallback_value")); /// -/// assert_eq!(input, Some("fallback_value".to_string())); +/// assert_eq!(input, b"fallback_value".to_vec()); /// ``` -pub fn get_stdin_or_default(default: Option<&str>) -> Option { +pub fn get_stdin_or_default(default: Option<&[u8]>) -> Vec { let stdin_channel = spawn_stdin_stream(); - let mut input = String::new(); // Give the reader thread a short time to capture any available input thread::sleep(Duration::from_millis(50)); - while let Ok(line) = stdin_channel.try_recv() { - input.push_str(&line); // Collect all lines - input.push('\n'); // Add a newline between lines + if let Ok(data) = stdin_channel.try_recv() { + return data; } - // If input was collected, return it. Otherwise, return the default value. - if !input.trim().is_empty() { - Some(input.trim().to_string()) - } else { - default.map(|s| s.to_string()) - } + // No input available, return the default value + default.unwrap_or(b"").to_vec() } diff --git a/tests/stdin_tests.rs b/tests/stdin_tests.rs index 400e134..8b5b34e 100644 --- a/tests/stdin_tests.rs +++ b/tests/stdin_tests.rs @@ -1,10 +1,10 @@ use std::io::Write; use std::process::{Command, Stdio}; -/// Runs an actual workspace binary (`test_binary` or `tokio-example-app`) and -/// provides input via stdin to observe how the binaries respond, rather than -/// mocking the inputs. -fn run_binary(binary: &str, input: &str) -> String { +/// Runs an actual workspace binary (`test_binary` or `tokio-example-app`) +/// and provides **binary input** via stdin. +/// Returns **raw bytes** instead of assuming UTF-8. +fn run_binary(binary: &str, input: &[u8]) -> Vec { // Ensure all binaries are built first let build_status = Command::new("cargo") .args(["build", "--workspace"]) @@ -35,65 +35,91 @@ fn run_binary(binary: &str, input: &str) -> String { .spawn() .unwrap_or_else(|_| panic!("Failed to spawn process: {}", binary)); - // Pass the input to the binary if needed + // Pass binary input to the process if needed if let Some(mut stdin) = child.stdin.take() { if !input.is_empty() { - writeln!(stdin, "{}", input).expect("Failed to write to stdin"); + stdin + .write_all(input) + .expect("Failed to write binary to stdin"); } drop(stdin); // Close stdin explicitly to send EOF } - // Capture and return the output + // Capture binary output let output = child.wait_with_output().expect("Failed to read stdout"); - String::from_utf8(output.stdout).expect("Invalid UTF-8 output") + output.stdout // Return raw binary output } -/// Test `test_binary` with piped input +/// **Test binary input handling with raw data** #[test] -fn test_piped_input_sync_app() { - let output = run_binary("test_binary", "test input"); - assert!(output.contains("Received input: Some(\"test input\")")); -} +fn test_binary_input_handling() { + { + let binary_input: &[u8] = b"\xDE\xAD\xBE\xEF"; // Arbitrary binary data + let output_bytes = run_binary("test_binary", binary_input); -/// Test `test_binary` with empty input (should use fallback) -#[test] -fn test_empty_input_sync_app() { - let output = run_binary("test_binary", ""); - assert!( - output.contains("fallback_value"), - "Expected fallback value but got: {}", - output - ); -} + assert_eq!( + output_bytes, binary_input, + "Expected output to match input, but got: {:?}", + output_bytes + ); + } -/// Test `tokio-example-app` with piped input -#[test] -fn test_piped_input_tokio_app() { - let output = run_binary("tokio-example-app", "test input"); - assert!(output.contains("Received input: Some(\"test input\")")); + { + let binary_input: &[u8] = b"\xDE\xAD\xBE\xEF"; // Same binary input for tokio-example-app + let output_bytes = run_binary("tokio-example-app", binary_input); + + assert_eq!( + output_bytes, binary_input, + "Expected output to match input, but got: {:?}", + output_bytes + ); + } } -/// Test `tokio-example-app` with empty input (should use fallback) #[test] -fn test_empty_input_tokio_app() { - let output = run_binary("tokio-example-app", ""); - assert!( - output.contains("fallback_value"), - "Expected fallback value but got: {}", - output - ); +fn test_text_input_handling() { + { + let text_input = b"Hello, binary world!"; + let output_bytes = run_binary("test_binary", text_input); + + assert_eq!( + output_bytes, text_input, + "Expected output to match input, but got: {:?}", + output_bytes + ); + } + + { + let text_input = b"Hello, binary world!"; + let output_bytes = run_binary("tokio-example-app", text_input); + + assert_eq!( + output_bytes, text_input, + "Expected output to match input, but got: {:?}", + output_bytes + ); + } } -/// Test reading the README.md file and ensure all content is captured #[test] -fn test_multi_line_content() { - let input = "line1\nline2\r\nline3\rline4\nline5"; +fn test_empty_input() { + { + let output_bytes = run_binary("test_binary", b""); - // Run the binary or command and capture the output - let output = run_binary("test_binary", input); + assert_eq!( + output_bytes, b"fallback_value", + "Expected fallback value but got: {:?}", + output_bytes + ); + } + + { + let output_bytes = run_binary("tokio-example-app", b""); - assert_eq!( - output, - "Received input: Some(\"line1\\nline2\\nline3\\rline4\\nline5\")\n" - ); + assert_eq!( + output_bytes, b"fallback_value", + "Expected fallback value but got: {:?}", + output_bytes + ); + } } diff --git a/tokio-example-app/Cargo.toml b/tokio-example-app/Cargo.toml index 15c6d3e..35e2a30 100644 --- a/tokio-example-app/Cargo.toml +++ b/tokio-example-app/Cargo.toml @@ -5,6 +5,6 @@ edition = "2021" publish = false [dependencies] -tokio = { version = "1.43.0", features = ["rt-multi-thread", "time"] } +tokio = { version = "1.43.0", features = ["full"] } stdin-nonblocking = { path = "../" } diff --git a/tokio-example-app/src/main.rs b/tokio-example-app/src/main.rs index b9884ca..b73ad0c 100644 --- a/tokio-example-app/src/main.rs +++ b/tokio-example-app/src/main.rs @@ -1,23 +1,49 @@ -use stdin_nonblocking::get_stdin_or_default; -use tokio::runtime::Runtime; +use std::io::{self, Write}; +use std::thread; +use stdin_nonblocking::spawn_stdin_stream; +use tokio::sync::mpsc; +use tokio::time::{sleep, Duration}; -fn main() { - // Step 1: Call `get_stdin_or_default` synchronously to get stdin input - let input = get_stdin_or_default(Some("fallback_value")); +/// Maximum buffer size for async channel +const BUFFER_SIZE: usize = 10; +const FALLBACK_VALUE: &[u8] = b"fallback_value"; - println!("Received input: {:?}", input); +#[tokio::main] +async fn main() { + // Step 1: Start the blocking stdin reader + let blocking_stdin_stream = spawn_stdin_stream(); // std::sync::mpsc::Receiver> - // Step 2: Start the Tokio runtime and pass the input to async code - let rt = Runtime::new().unwrap(); - rt.block_on(async_main(input)); -} + // Step 2: Create an async Tokio channel + let (tx, mut rx) = mpsc::channel::>(BUFFER_SIZE); + + // Step 3: Spawn a thread to forward data from std::sync::mpsc to Tokio mpsc + thread::spawn(move || { + while let Ok(chunk) = blocking_stdin_stream.recv() { + if tx.blocking_send(chunk).is_err() { + break; // If the receiver is closed, stop forwarding + } + } + }); + + // Step 4: Process the async stream of binary input + let mut received_any = false; + + while let Some(chunk) = rx.recv().await { + received_any = true; -// Step 3: Define an async function to process the input -async fn async_main(input: Option) { - println!("Async processing input: {:?}", input); + // Simulate async work per chunk + sleep(Duration::from_millis(100)).await; - // Simulating async work - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + // Print raw binary data as it arrives + io::stdout() + .write_all(&chunk) + .expect("Failed to write output"); + } - println!("Async work complete."); + // Step 5: If no input was received, print the fallback value + if !received_any { + io::stdout() + .write_all(FALLBACK_VALUE) + .expect("Failed to write fallback value"); + } }