diff --git a/Cargo.lock b/Cargo.lock index 49ac3b56a23..1389b0f6a97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -37,7 +37,7 @@ checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" dependencies = [ "cfg-if", "cipher", - "cpufeatures 0.2.17", + "cpufeatures", ] [[package]] @@ -114,6 +114,21 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" +[[package]] +name = "anstream" +version = "0.6.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a" +dependencies = [ + "anstyle", + "anstyle-parse 0.2.7", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + [[package]] name = "anstream" version = "1.0.0" @@ -121,7 +136,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "824a212faf96e9acacdbd09febd34438f8f711fb84e09a8916013cd7815ca28d" dependencies = [ "anstyle", - "anstyle-parse", + "anstyle-parse 1.0.0", "anstyle-query", "anstyle-wincon", "colorchoice", @@ -135,6 +150,15 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000" +[[package]] +name = "anstyle-parse" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" +dependencies = [ + "utf8parse", +] + [[package]] name = "anstyle-parse" version = "1.0.0" @@ -646,9 +670,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.39.1" +version = "0.39.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83a25cf98105baa966497416dbd42565ce3a8cf8dbfd59803ec9ad46f3126399" +checksum = "1fa7e52a4c5c547c741610a2c6f123f3881e409b714cd27e6798ef020c514f0a" dependencies = [ "cc", "cmake", @@ -1355,16 +1379,16 @@ dependencies = [ [[package]] name = "blake3" -version = "1.8.4" +version = "1.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d2d5991425dfd0785aed03aedcf0b321d61975c9b5b3689c774a2610ae0b51e" +checksum = "2468ef7d57b3fb7e16b576e8377cdbde2320c60e1491e961d11da40fc4f02a2d" dependencies = [ "arrayref", "arrayvec", "cc", "cfg-if", "constant_time_eq", - "cpufeatures 0.3.0", + "cpufeatures", ] [[package]] @@ -1495,9 +1519,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.58" +version = "1.2.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1e928d4b69e3077709075a938a05ffbedfa53a84c8f766efbf8220bb1ff60e1" +checksum = "7a0dd1ca384932ff3641c8718a02769f1698e7563dc6974ffd03346116310423" dependencies = [ "find-msvc-tools", "jobserver", @@ -1609,7 +1633,7 @@ version = "4.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f" dependencies = [ - "anstream", + "anstream 1.0.0", "anstyle", "clap_lex", "strsim 0.11.1", @@ -1635,9 +1659,9 @@ checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" [[package]] name = "cmake" -version = "0.1.58" +version = "0.1.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0f78a02292a74a88ac736019ab962ece0bc380e3f977bf72e376c5d78ff0678" +checksum = "75443c44cd6b379beb8c5b45d85d0773baf31cce901fe7bb252f4eff3008ef7d" dependencies = [ "cc", ] @@ -1788,15 +1812,6 @@ dependencies = [ "libc", ] -[[package]] -name = "cpufeatures" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201" -dependencies = [ - "libc", -] - [[package]] name = "crc" version = "3.3.0" @@ -3091,9 +3106,9 @@ dependencies = [ [[package]] name = "env_filter" -version = "1.0.1" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32e90c2accc4b07a8456ea0debdc2e7587bdd890680d71173a15d4ae604f6eef" +checksum = "7a1c3cc8e57274ec99de65301228b537f1e4eedc1b8e0f9411c6caac8ae7308f" dependencies = [ "log", "regex", @@ -3101,11 +3116,11 @@ dependencies = [ [[package]] name = "env_logger" -version = "0.11.10" +version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0621c04f2196ac3f488dd583365b9c09be011a4ab8b9f37248ffcc8f6198b56a" +checksum = "b2daee4ea451f429a58296525ddf28b45a3b64f1acf6587e2067437bb11e218d" dependencies = [ - "anstream", + "anstream 0.6.21", "anstyle", "env_filter", "jiff", @@ -4350,6 +4365,17 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "io-uring" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdd7bddefd0a8833b88a4b68f90dae22c7450d11b354198baee3874fd811b344" +dependencies = [ + "bitflags 2.11.0", + "cfg-if", + "libc", +] + [[package]] name = "ipnet" version = "2.12.0" @@ -4358,9 +4384,9 @@ checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2" [[package]] name = "iri-string" -version = "0.7.12" +version = "0.7.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25e659a4bb38e810ebc252e53b5814ff908a8c58c2a9ce2fae1bbec24cbf4e20" +checksum = "d8e7418f59cc01c88316161279a7f665217ae316b388e58a0d10e29f54f1e5eb" dependencies = [ "memchr", "serde", @@ -4510,21 +4536,19 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.92" +version = "0.3.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc4c90f45aa2e6eacbe8645f77fdea542ac97a494bcd117a67df9ff4d611f995" +checksum = "b49715b7073f385ba4bc528e5747d02e66cb39c6146efb66b781f131f0fb399c" dependencies = [ - "cfg-if", - "futures-util", "once_cell", "wasm-bindgen", ] [[package]] name = "jsonb" -version = "0.5.6" +version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb98fb29636087c40ad0d1274d9a30c0c1e83e03ae93f6e7e89247b37fcc6953" +checksum = "2a901f06163d352fbe41c3c2ff5e08b75330a003cc941e988fb501022f5421e6" dependencies = [ "byteorder", "ethnum", @@ -4533,11 +4557,11 @@ dependencies = [ "jiff", "nom 8.0.0", "num-traits", - "ordered-float 5.3.0", + "ordered-float 5.1.0", "rand 0.9.2", + "ryu", "serde", "serde_json", - "zmij", ] [[package]] @@ -5034,12 +5058,15 @@ dependencies = [ "deepsize", "futures", "http 1.4.0", + "io-uring", "lance-arrow", "lance-core", "lance-namespace", + "libc", "log", "mock_instant", "mockall", + "moka", "object_store", "object_store_opendal", "opendal", @@ -5388,9 +5415,9 @@ checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981" [[package]] name = "libredox" -version = "0.1.15" +version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ddbf48fd451246b1f8c2610bd3b4ac0cc6e149d89832867093ab69a17194f08" +checksum = "1744e39d1d6a9948f4f388969627434e31128196de472883b39f148769bfe30a" dependencies = [ "bitflags 2.11.0", "libc", @@ -5766,9 +5793,9 @@ dependencies = [ [[package]] name = "mio" -version = "1.2.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" +checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc" dependencies = [ "libc", "wasi 0.11.1+wasi-snapshot-preview1", @@ -5969,9 +5996,9 @@ dependencies = [ [[package]] name = "num-conv" -version = "0.2.1" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6673768db2d862beb9b39a78fdcb1a69439615d5794a1be50caa9bc92c81967" +checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050" [[package]] name = "num-format" @@ -6279,9 +6306,9 @@ dependencies = [ [[package]] name = "ordered-float" -version = "5.3.0" +version = "5.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7d950ca161dc355eaf28f82b11345ed76c6e1f6eb1f4f4479e0323b9e2fbd0e" +checksum = "7f4779c6901a562440c3786d08192c6fbda7c1c2060edd10006b05ee35d10f2d" dependencies = [ "num-traits", ] @@ -6827,9 +6854,9 @@ dependencies = [ [[package]] name = "proptest" -version = "1.11.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b45fcc2344c680f5025fe57779faef368840d0bd1f42f216291f0dc4ace4744" +checksum = "37566cb3fdacef14c0737f9546df7cfeadbfbc9fef10991038bf5015d0c80532" dependencies = [ "bit-set", "bit-vec", @@ -7532,9 +7559,9 @@ checksum = "b50b8869d9fc858ce7266cce0194bd74df58b9d0e3f6df3a9fc8eb470d95c09d" [[package]] name = "rustc-hash" -version = "2.1.2" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94300abf3f1ae2e2b8ffb7b58043de3d399c73fa6f4b73826402a5c457614dbe" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" [[package]] name = "rustc_version" @@ -7939,7 +7966,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ "cfg-if", - "cpufeatures 0.2.17", + "cpufeatures", "digest", ] @@ -7950,7 +7977,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" dependencies = [ "cfg-if", - "cpufeatures 0.2.17", + "cpufeatures", "digest", ] @@ -7991,9 +8018,9 @@ dependencies = [ [[package]] name = "simd-adler32" -version = "0.3.9" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "703d5c7ef118737c72f1af64ad2f6f8c5e1921f818cdcb97b8fe6fc69bf66214" +checksum = "e320a6c5ad31d271ad523dcf3ad13e2767ad8b1cb8f047f75a8aeaf8da139da2" [[package]] name = "simdutf8" @@ -8090,11 +8117,11 @@ dependencies = [ [[package]] name = "spade" -version = "2.15.1" +version = "2.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9699399fd9349b00b184f5635b074f9ec93afffef30c853f8c875b32c0f8c7fa" +checksum = "fb313e1c8afee5b5647e00ee0fe6855e3d529eb863a0fdae1d60006c4d1e9990" dependencies = [ - "hashbrown 0.16.1", + "hashbrown 0.15.5", "num-traits", "robust", "smallvec", @@ -8827,9 +8854,9 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.25.8+spec-1.1.0" +version = "0.25.7+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16bff38f1d86c47f9ff0647e6838d7bb362522bdf44006c7068c2b1e606f1f3c" +checksum = "d15b06e6c39068c203e7c1d0bc3944796d867449e7668ef7fa5ea43727cb846e" dependencies = [ "indexmap", "toml_datetime", @@ -9128,9 +9155,9 @@ dependencies = [ [[package]] name = "unicode-segmentation" -version = "1.13.2" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9629274872b2bfaf8d66f5f15725007f635594914870f65218920345aa11aa8c" +checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" [[package]] name = "unicode-width" @@ -9227,9 +9254,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.23.0" +version = "1.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ac8b6f42ead25368cf5b098aeb3dc8a1a2c05a3eee8a9a1a68c640edbfc79d9" +checksum = "a68d3c8f01c0cfa54a75291d83601161799e4a89a39e0929f4b0354d88757a37" dependencies = [ "getrandom 0.4.2", "js-sys", @@ -9333,9 +9360,9 @@ dependencies = [ [[package]] name = "wasm-bindgen" -version = "0.2.115" +version = "0.2.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6523d69017b7633e396a89c5efab138161ed5aafcbc8d3e5c5a42ae38f50495a" +checksum = "6532f9a5c1ece3798cb1c2cfdba640b9b3ba884f5db45973a6f442510a87d38e" dependencies = [ "cfg-if", "once_cell", @@ -9346,19 +9373,23 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.65" +version = "0.4.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d1faf851e778dfa54db7cd438b70758eba9755cb47403f3496edd7c8fc212f0" +checksum = "e9c5522b3a28661442748e09d40924dfb9ca614b21c00d3fd135720e48b67db8" dependencies = [ + "cfg-if", + "futures-util", "js-sys", + "once_cell", "wasm-bindgen", + "web-sys", ] [[package]] name = "wasm-bindgen-macro" -version = "0.2.115" +version = "0.2.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e3a6c758eb2f701ed3d052ff5737f5bfe6614326ea7f3bbac7156192dc32e67" +checksum = "18a2d50fcf105fb33bb15f00e7a77b772945a2ee45dcf454961fd843e74c18e6" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -9366,9 +9397,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.115" +version = "0.2.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "921de2737904886b52bcbb237301552d05969a6f9c40d261eb0533c8b055fedf" +checksum = "03ce4caeaac547cdf713d280eda22a730824dd11e6b8c3ca9e42247b25c631e3" dependencies = [ "bumpalo", "proc-macro2", @@ -9379,9 +9410,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.115" +version = "0.2.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a93e946af942b58934c604527337bad9ae33ba1d5c6900bbb41c2c07c2364a93" +checksum = "75a326b8c223ee17883a4251907455a2431acc2791c98c26279376490c378c16" dependencies = [ "unicode-ident", ] @@ -9435,9 +9466,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.92" +version = "0.3.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84cde8507f4d7cfcb1185b8cb5890c494ffea65edbe1ba82cfd63661c805ed94" +checksum = "854ba17bb104abfb26ba36da9729addc7ce7f06f5c0f90f3c391f8461cca21f9" dependencies = [ "js-sys", "wasm-bindgen", @@ -9805,9 +9836,9 @@ checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" [[package]] name = "winnow" -version = "1.0.1" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09dac053f1cd375980747450bfc7250c264eaae0583872e845c0c7cd578872b5" +checksum = "a90e88e4667264a994d34e6d1ab2d26d398dcdca8b7f52bec8668957517fc7d8" dependencies = [ "memchr", ] @@ -10022,18 +10053,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.48" +version = "0.8.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eed437bf9d6692032087e337407a86f04cd8d6a16a37199ed57949d415bd68e9" +checksum = "efbb2a062be311f2ba113ce66f697a4dc589f85e78a4aea276200804cea0ed87" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.48" +version = "0.8.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70e3cd084b1788766f53af483dd21f93881ff30d7320490ec3ef7526d203bad4" +checksum = "0e8bc7269b54418e7aeeef514aa68f8690b8c0489a06b0136e5f57c4c5ccab89" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index c922eff6b8b..527ad080889 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -146,6 +146,7 @@ geo-types = "0.7.16" http = "1.1.0" humantime = "2.2.0" hyperloglogplus = { version = "0.4.1", features = ["const-loop"] } +io-uring = "0.7" itertools = "0.13" jieba-rs = { version = "0.8.1", default-features = false } jsonb = { version = "0.5.3", default-features = false, features = ["databend"] } diff --git a/python/Cargo.lock b/python/Cargo.lock index 4507a617872..a090f245765 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -3667,6 +3667,17 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "io-uring" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdd7bddefd0a8833b88a4b68f90dae22c7450d11b354198baee3874fd811b344" +dependencies = [ + "bitflags 2.11.0", + "cfg-if", + "libc", +] + [[package]] name = "ipnet" version = "2.12.0" @@ -4212,10 +4223,13 @@ dependencies = [ "deepsize", "futures", "http 1.4.0", + "io-uring", "lance-arrow", "lance-core", "lance-namespace", + "libc", "log", + "moka", "object_store", "object_store_opendal", "opendal", diff --git a/rust/lance-io/Cargo.toml b/rust/lance-io/Cargo.toml index 3aa0e1dab96..e1cbfd70f2b 100644 --- a/rust/lance-io/Cargo.toml +++ b/rust/lance-io/Cargo.toml @@ -38,6 +38,7 @@ deepsize.workspace = true futures.workspace = true http.workspace = true log.workspace = true +moka.workspace = true pin-project.workspace = true prost.workspace = true serde.workspace = true @@ -49,6 +50,10 @@ path_abs.workspace = true rand.workspace = true tempfile.workspace = true +[target.'cfg(target_os = "linux")'.dependencies] +libc = { workspace = true } +io-uring = { workspace = true } + [dev-dependencies] criterion.workspace = true test-log.workspace = true diff --git a/rust/lance-io/src/lib.rs b/rust/lance-io/src/lib.rs index e1729db73be..f3c772e9b01 100644 --- a/rust/lance-io/src/lib.rs +++ b/rust/lance-io/src/lib.rs @@ -21,6 +21,8 @@ pub mod stream; #[cfg(test)] pub mod testing; pub mod traits; +#[cfg(target_os = "linux")] +pub mod uring; pub mod utils; pub use scheduler::{bytes_read_counter, iops_counter}; diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index 4eb1e4ff403..6fe5c152046 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -32,6 +32,8 @@ use tokio::io::AsyncWriteExt; use url::Url; use super::local::LocalObjectReader; +#[cfg(target_os = "linux")] +use crate::uring::{UringCurrentThreadReader, UringReader}; mod list_retry; pub mod providers; pub mod storage_options; @@ -597,6 +599,31 @@ impl ObjectStore { ) .await } + #[cfg(target_os = "linux")] + "file+uring" => { + // Check if current-thread mode enabled + let use_current_thread = std::env::var("LANCE_URING_CURRENT_THREAD") + .map(|v| str_is_truthy(&v)) + .unwrap_or(false); + + if use_current_thread { + UringCurrentThreadReader::open( + path, + self.block_size, + None, + Arc::new(self.io_tracker.clone()), + ) + .await + } else { + UringReader::open( + path, + self.block_size, + None, + Arc::new(self.io_tracker.clone()), + ) + .await + } + } _ => Ok(Box::new(CloudObjectReader::new( self.inner.clone(), path.clone(), @@ -634,6 +661,31 @@ impl ObjectStore { ) .await } + #[cfg(target_os = "linux")] + "file+uring" => { + // Check if current-thread mode enabled + let use_current_thread = std::env::var("LANCE_URING_CURRENT_THREAD") + .map(|v| str_is_truthy(&v)) + .unwrap_or(false); + + if use_current_thread { + UringCurrentThreadReader::open( + path, + self.block_size, + Some(known_size), + Arc::new(self.io_tracker.clone()), + ) + .await + } else { + UringReader::open( + path, + self.block_size, + Some(known_size), + Arc::new(self.io_tracker.clone()), + ) + .await + } + } _ => Ok(Box::new(CloudObjectReader::new( self.inner.clone(), path.clone(), diff --git a/rust/lance-io/src/object_store/providers.rs b/rust/lance-io/src/object_store/providers.rs index 3583b27b288..03efa281e01 100644 --- a/rust/lance-io/src/object_store/providers.rs +++ b/rust/lance-io/src/object_store/providers.rs @@ -89,6 +89,7 @@ pub struct ObjectStoreRegistryStats { /// - `file`: A local file object store, with optimized code paths. /// - `file-object-store`: A local file object store that uses the ObjectStore API, /// for all operations. Used for testing with ObjectStore wrappers. +/// - `file+uring`: A local file object store using io_uring (Linux only). /// - `s3`: An S3 object store. /// - `s3+ddb`: An S3 object store with DynamoDB for metadata. /// - `az`: An Azure Blob Storage object store. @@ -301,6 +302,8 @@ impl Default for ObjectStoreRegistry { "file-object-store".into(), Arc::new(local::FileStoreProvider), ); + #[cfg(target_os = "linux")] + providers.insert("file+uring".into(), Arc::new(local::FileStoreProvider)); #[cfg(feature = "aws")] { diff --git a/rust/lance-io/src/uring.rs b/rust/lance-io/src/uring.rs new file mode 100644 index 00000000000..06786178164 --- /dev/null +++ b/rust/lance-io/src/uring.rs @@ -0,0 +1,84 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! io_uring-based I/O for disks with high IOPS capacity (e.g. NVMe) +//! +//! This module provides two implementations of the [`Reader`](crate::traits::Reader) trait +//! using Linux's io_uring interface for asynchronous I/O. +//! +//! One of these uses a pool of dedicated background threads which each own an io_uring instance. +//! Read requests are submitted to a background thread's pool. +//! +//! The other implementation uses a thread-local io_uring instance. This only works if the future +//! is polled by the same thread that submitted the request. This means that the runtime must be +//! a single-threaded runtime. +//! +//! # Configuration +//! +//! The io_uring reader is enabled by using the `file+uring://` URI scheme instead of `file://`. +//! Additional tuning parameters are controlled by environment variables: +//! +//! - `LANCE_URING_CURRENT_THREAD` - Use thread-local io_uring (default: false) +//! - `LANCE_URING_BLOCK_SIZE` - Block size in bytes (default: 4KB) +//! - `LANCE_URING_IO_PARALLELISM` - Max concurrent operations (default: 128) +//! - `LANCE_URING_QUEUE_DEPTH` - io_uring queue depth (default: 16K) +//! - `LANCE_URING_THREAD_COUNT` - Number of io_uring threads to use (default: 2) +//! - `LANCE_URING_SUBMIT_BATCH_SIZE` - Number of requests to batch before submitting (default: 128) +//! - `LANCE_URING_POLL_TIMEOUT_MS` - Thread poll timeout in milliseconds (default: 10) +//! +//! Note: the block size and io parallelism are not actually used by the io_uring implementation. These +//! variables just control what the filesystem reports up to Lance. +//! +//! # Platform Support +//! +//! This module is only available on Linux and requires kernel 5.1 or newer. +//! On other platforms, the code falls back to [`LocalObjectReader`](crate::local::LocalObjectReader). +//! +//! # Example +//! +//! ```no_run +//! # use lance_io::object_store::ObjectStore; +//! # async fn example() -> lance_core::Result<()> { +//! // Enable io_uring by using the file+uring:// scheme +//! let uri = "file+uring:///path/to/file.dat"; +//! let (store, path) = ObjectStore::from_uri(uri).await?; +//! let reader = store.open(&path).await?; +//! +//! // Reader will use io_uring +//! let data = reader.get_range(0..1024).await?; +//! # Ok(()) +//! # } +//! ``` + +mod future; +mod reader; +mod requests; +mod thread; + +// Thread-local io_uring implementation for current-thread runtimes +pub(crate) mod current_thread; +pub(crate) mod current_thread_future; + +#[cfg(test)] +mod tests; + +use std::sync::LazyLock; + +pub(crate) use current_thread::UringCurrentThreadReader; +pub use reader::UringReader; + +/// Default block size for io_uring reads (4KB) +pub const DEFAULT_URING_BLOCK_SIZE: usize = 4 * 1024; + +/// Default I/O parallelism for io_uring (128 concurrent operations) +pub const DEFAULT_URING_IO_PARALLELISM: usize = 128; + +/// Default io_uring queue depth (16K entries) +pub const DEFAULT_URING_QUEUE_DEPTH: usize = 16 * 1024; + +/// Cached `LANCE_URING_BLOCK_SIZE` env var, read once at first access. +pub(crate) static URING_BLOCK_SIZE: LazyLock> = LazyLock::new(|| { + std::env::var("LANCE_URING_BLOCK_SIZE") + .ok() + .and_then(|s| s.parse().ok()) +}); diff --git a/rust/lance-io/src/uring/current_thread.rs b/rust/lance-io/src/uring/current_thread.rs new file mode 100644 index 00000000000..bc09af058e6 --- /dev/null +++ b/rust/lance-io/src/uring/current_thread.rs @@ -0,0 +1,421 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Thread-local io_uring implementation for current-thread runtimes. +//! +//! This implementation creates a thread-local IoUring instance per thread +//! and directly processes completions during future polling, eliminating +//! the need for background threads and MPSC channels. + +use super::requests::{IoRequest, RequestState}; +use super::{DEFAULT_URING_BLOCK_SIZE, DEFAULT_URING_IO_PARALLELISM, URING_BLOCK_SIZE}; +use crate::local::to_local_path; +use crate::traits::Reader; +use crate::uring::DEFAULT_URING_QUEUE_DEPTH; +use crate::utils::tracking_store::IOTracker; +use bytes::{Bytes, BytesMut}; +use deepsize::DeepSizeOf; +use futures::future::BoxFuture; +use futures::{FutureExt, TryFutureExt}; +use io_uring::{IoUring, opcode, types}; +use lance_core::{Error, Result}; +use object_store::path::Path; + +use std::cell::{LazyCell, RefCell}; +use std::collections::HashMap; +use std::fs::File; +use std::future::Future; +use std::io::{self, ErrorKind}; +use std::ops::Range; +use std::pin::Pin; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; +use tracing::instrument; + +// Re-use file handle types from reader.rs +use super::reader::{CacheKey, CachedReaderData, HANDLE_CACHE, UringFileHandle}; + +/// Global counter for generating unique user_data values +static USER_DATA_COUNTER: AtomicU64 = AtomicU64::new(1); + +/// Thread-local io_uring instance with pending requests +struct ThreadLocalUring { + ring: IoUring, + pending: HashMap>, +} + +thread_local! { + static URING: LazyCell> = LazyCell::new(|| { + let queue_depth = std::env::var("LANCE_URING_QUEUE_DEPTH") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(DEFAULT_URING_QUEUE_DEPTH); + + let ring = IoUring::builder() + // Ensures work is only done in submit_and_wait + .setup_defer_taskrun() + // Enable perf. optimization when there is only one issuer thread + .setup_single_issuer() + .build(queue_depth as u32) + .expect("Failed to create io_uring"); + + log::debug!( + "Created thread-local io_uring with queue depth {}", + queue_depth + ); + + RefCell::new(ThreadLocalUring { + ring, + pending: HashMap::new(), + }) + }); +} + +/// Push request to thread-local submission queue +pub(super) fn push_request(request: Arc) -> io::Result<()> { + URING.with(|cell| { + let mut uring = cell.borrow_mut(); + + // Generate unique user_data + let user_data = USER_DATA_COUNTER.fetch_add(1, Ordering::Relaxed); + + // Get buffer pointer, adjusting for any bytes already read (short read retry) + let (buffer_ptr, read_offset, read_length) = { + let state = request.state.lock().unwrap(); + let br = state.bytes_read; + ( + unsafe { state.buffer.as_ptr().add(br) as *mut u8 }, + request.offset + br as u64, + (request.length - br) as u32, + ) + }; + + // Prepare read operation + let read_op = + opcode::Read::new(types::Fd(request.fd), buffer_ptr, read_length).offset(read_offset); + + // Get submission queue + let mut sq = uring.ring.submission(); + + // Check if SQ has space + if sq.is_full() { + drop(sq); + return Err(io::Error::new( + io::ErrorKind::WouldBlock, + "io_uring submission queue full", + )); + } + + // Push to SQ + unsafe { + sq.push(&read_op.build().user_data(user_data)) + .map_err(|_| io::Error::other("Failed to push to SQ"))?; + } + drop(sq); + + // Track request in pending map + uring.pending.insert(user_data, request); + + // Don't submit here - let the future handle submission + + Ok(()) + }) +} + +/// Process completions from thread-local IoUring +pub(super) fn process_thread_local_completions() -> io::Result { + URING.with(|cell| { + let mut uring = cell.borrow_mut(); + let mut completed = 0; + let mut retries: Vec> = Vec::new(); + + // Collect completions first to avoid borrowing ring and pending simultaneously + let cqes: Vec<_> = uring + .ring + .completion() + .map(|cqe| (cqe.user_data(), cqe.result())) + .collect(); + + for (user_data, result) in cqes { + if let Some(request) = uring.pending.remove(&user_data) { + let mut state = request.state.lock().unwrap(); + + if result < 0 { + // Kernel error + state.err = Some(io::Error::from_raw_os_error(-result)); + state.completed = true; + } else if result == 0 { + // EOF before full read completed + let br = state.bytes_read; + state.err = Some(io::Error::new( + io::ErrorKind::UnexpectedEof, + format!("unexpected EOF: read {} of {} bytes", br, request.length), + )); + state.buffer.truncate(br); + state.completed = true; + } else { + // Positive result: n bytes read + let n = result as usize; + state.bytes_read += n; + let br = state.bytes_read; + + if br >= request.length { + // Full read complete + state.buffer.truncate(br); + state.completed = true; + } else { + // Short read — need retry; don't mark completed or wake + drop(state); + retries.push(request); + + continue; + } + } + + // Wake waiting future + if let Some(waker) = state.waker.take() { + drop(state); + waker.wake(); + } + + completed += 1; + } else { + log::warn!("Received completion for unknown user_data: {}", user_data); + } + } + + // Resubmit short-read retries + for request in retries { + // Generate unique user_data + let user_data = USER_DATA_COUNTER.fetch_add(1, Ordering::Relaxed); + + let (buffer_ptr, read_offset, read_length) = { + let state = request.state.lock().unwrap(); + let br = state.bytes_read; + ( + unsafe { state.buffer.as_ptr().add(br) as *mut u8 }, + request.offset + br as u64, + (request.length - br) as u32, + ) + }; + + let read_op = opcode::Read::new(types::Fd(request.fd), buffer_ptr, read_length) + .offset(read_offset); + + let mut sq = uring.ring.submission(); + if sq.is_full() { + drop(sq); + request.fail(io::Error::new( + io::ErrorKind::WouldBlock, + "io_uring submission queue full during retry", + )); + continue; + } + + unsafe { + if sq.push(&read_op.build().user_data(user_data)).is_err() { + request.fail(io::Error::other("Failed to push short-read retry to SQ")); + continue; + } + } + drop(sq); + + uring.pending.insert(user_data, request); + } + + if completed > 0 { + log::trace!("Processed {} completions", completed); + } + + Ok(completed) + }) +} + +/// Submit all pending requests and wait with timeout 0 (non-blocking) +pub(super) fn submit_and_wait_thread_local() -> io::Result<()> { + URING.with(|cell| { + let uring = cell.borrow_mut(); + // Submit with wait=1 (do at least some work) + uring.ring.submit_and_wait(1)?; + Ok(()) + }) +} + +/// Thread-local io_uring-based reader for current-thread runtimes +#[derive(Debug)] +pub struct UringCurrentThreadReader { + /// File handle + handle: Arc, + + /// Block size for I/O operations + block_size: usize, + + /// File size (determined at open time) + size: usize, + + /// I/O tracker for monitoring operations + io_tracker: Arc, +} + +impl DeepSizeOf for UringCurrentThreadReader { + fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize { + // Skip file handle (just a system resource) + // Only count the path's deep size + self.handle.path.as_ref().deep_size_of_children(context) + } +} + +impl UringCurrentThreadReader { + /// Open a file with thread-local io_uring + /// + /// This reuses the file handle caching infrastructure from UringReader + #[instrument(level = "debug")] + pub(crate) async fn open( + path: &Path, + block_size: usize, + known_size: Option, + io_tracker: Arc, + ) -> Result> { + // Determine block size with environment variable override + let block_size = URING_BLOCK_SIZE.unwrap_or(block_size.max(DEFAULT_URING_BLOCK_SIZE)); + + let cache_key = CacheKey::new(path, block_size); + + // Try to get from cache first + if let Some(data) = HANDLE_CACHE.get(&cache_key).await { + // Use known_size if provided, otherwise use cached size + let size = known_size.unwrap_or(data.size); + return Ok(Box::new(Self { + handle: data.handle, + block_size, + size, + io_tracker, + }) as Box); + } + + // Cache miss - open file and get size + let path_clone = path.clone(); + let local_path = to_local_path(path); + + let data = tokio::task::spawn_blocking(move || { + let file = File::open(&local_path).map_err(|e| match e.kind() { + ErrorKind::NotFound => Error::not_found(path_clone.to_string()), + _ => e.into(), + })?; + + // Get size from known_size or file metadata + let size = match known_size { + Some(s) => s, + None => file.metadata()?.len() as usize, + }; + + Ok::<_, Error>(CachedReaderData { + handle: Arc::new(UringFileHandle::new(file, path_clone)), + size, + }) + }) + .await??; + + // Insert into cache + HANDLE_CACHE.insert(cache_key, data.clone()).await; + + // Return new reader instance + Ok(Box::new(Self { + handle: data.handle.clone(), + block_size, + size: data.size, + io_tracker, + }) as Box) + } + + /// Submit a read request and return a future + fn submit_read( + &self, + offset: u64, + length: usize, + ) -> Pin> + Send>> { + let mut buffer = BytesMut::with_capacity(length); + unsafe { + buffer.set_len(length); + } + + let request = Arc::new(IoRequest { + fd: self.handle.fd, + offset, + length, + thread_id: std::thread::current().id(), + state: Mutex::new(RequestState { + completed: false, + waker: None, + err: None, + buffer, + bytes_read: 0, + }), + }); + + match push_request(request.clone()) { + Ok(()) => Box::pin(super::current_thread_future::UringCurrentThreadFuture::new( + request, + )), + Err(e) => Box::pin(async move { + Err(object_store::Error::Generic { + store: "io_uring_ct", + source: Box::new(e), + }) + }), + } + } +} + +impl Reader for UringCurrentThreadReader { + fn path(&self) -> &Path { + &self.handle.path + } + + fn block_size(&self) -> usize { + self.block_size + } + + fn io_parallelism(&self) -> usize { + std::env::var("LANCE_URING_IO_PARALLELISM") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(DEFAULT_URING_IO_PARALLELISM) + } + + /// Returns the file size + fn size(&self) -> BoxFuture<'_, object_store::Result> { + Box::pin(async move { Ok(self.size) }) + } + + /// Read a range of bytes using thread-local io_uring + #[instrument(level = "debug", skip(self))] + fn get_range(&self, range: Range) -> BoxFuture<'static, object_store::Result> { + let io_tracker = self.io_tracker.clone(); + let path = self.handle.path.clone(); + let num_bytes = range.len() as u64; + let range_u64 = (range.start as u64)..(range.end as u64); + + self.submit_read(range.start as u64, range.len()) + .map_ok(move |bytes| { + io_tracker.record_read("get_range", path, num_bytes, Some(range_u64)); + bytes + }) + .boxed() + } + + /// Read the entire file using thread-local io_uring + #[instrument(level = "debug", skip(self))] + fn get_all(&self) -> BoxFuture<'static, object_store::Result> { + let size = self.size; + let io_tracker = self.io_tracker.clone(); + let path = self.handle.path.clone(); + + self.submit_read(0, size) + .map_ok(move |bytes| { + io_tracker.record_read("get_all", path, bytes.len() as u64, None); + bytes + }) + .boxed() + } +} diff --git a/rust/lance-io/src/uring/current_thread_future.rs b/rust/lance-io/src/uring/current_thread_future.rs new file mode 100644 index 00000000000..dbcf2242673 --- /dev/null +++ b/rust/lance-io/src/uring/current_thread_future.rs @@ -0,0 +1,102 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Future implementation for thread-local io_uring operations. +//! +//! This future actively processes completions during polling instead of +//! relying on background tasks. + +use super::current_thread::{process_thread_local_completions, submit_and_wait_thread_local}; +use super::requests::IoRequest; +use bytes::Bytes; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +/// Future that awaits completion of a thread-local io_uring read operation +pub struct UringCurrentThreadFuture { + request: Arc, +} + +impl UringCurrentThreadFuture { + pub(super) fn new(request: Arc) -> Self { + Self { request } + } +} + +impl Future for UringCurrentThreadFuture { + type Output = object_store::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // Check thread safety + if self.request.thread_id != std::thread::current().id() { + panic!("Request thread ID does not match current thread ID"); + } + + // First, check if we've been completed by some other future polling for completions. + let mut state = self.request.state.lock().unwrap(); + + if state.completed { + // Take result and return Ready + match state.err.take() { + Some(err) => { + return Poll::Ready(Err(object_store::Error::Generic { + store: "io_uring_ct", + source: Box::new(err), + })); + } + None => { + let br = state.bytes_read; + state.buffer.truncate(br); + let bytes = std::mem::take(&mut state.buffer).freeze(); + return Poll::Ready(Ok(bytes)); + } + } + } + + drop(state); + + // If not, then we should do any available work and then process completions. + if let Err(e) = submit_and_wait_thread_local() { + log::debug!("Submit and wait error: {:?}", e); + } + + if let Err(e) = process_thread_local_completions() { + log::warn!("Error processing completions: {:?}", e); + } + + // Check if our request completed + let mut state = self.request.state.lock().unwrap(); + + if state.completed { + // Take result and return Ready + match state.err.take() { + Some(err) => { + return Poll::Ready(Err(object_store::Error::Generic { + store: "io_uring_ct", + source: Box::new(err), + })); + } + None => { + let br = state.bytes_read; + state.buffer.truncate(br); + let bytes = std::mem::take(&mut state.buffer).freeze(); + return Poll::Ready(Ok(bytes)); + } + } + } + + // Not done yet - immediately wake and return Pending (don't store waker) + // which will force the future to be polled again. This is intentionally + // a busy loop. io_uring is intended for fast disks where read latency is + // so small that the cost of a true context switch (parking and unparking) + // would be too high. + // + // We are effectively doing a "yield" here while we wait for + // the io_uring thread to complete the request. + drop(state); + cx.waker().wake_by_ref(); + Poll::Pending + } +} diff --git a/rust/lance-io/src/uring/future.rs b/rust/lance-io/src/uring/future.rs new file mode 100644 index 00000000000..64b38993683 --- /dev/null +++ b/rust/lance-io/src/uring/future.rs @@ -0,0 +1,46 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Future implementation for io_uring read operations. + +use super::requests::IoRequest; +use bytes::Bytes; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +/// Future that awaits completion of an io_uring read operation. +/// +/// This future is woken by the io_uring thread when the operation completes. +pub(super) struct UringReadFuture { + pub(super) request: Arc, +} + +impl Future for UringReadFuture { + type Output = object_store::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut state = self.request.state.lock().unwrap(); + + if state.completed { + // Operation completed - take the result + match state.err.take() { + Some(err) => Poll::Ready(Err(object_store::Error::Generic { + store: "io_uring", + source: Box::new(err), + })), + None => { + let br = state.bytes_read; + state.buffer.truncate(br); + let bytes = std::mem::take(&mut state.buffer).freeze(); + Poll::Ready(Ok(bytes)) + } + } + } else { + // Operation not yet complete - store waker and return Pending + state.waker = Some(cx.waker().clone()); + Poll::Pending + } + } +} diff --git a/rust/lance-io/src/uring/reader.rs b/rust/lance-io/src/uring/reader.rs new file mode 100644 index 00000000000..0e7b0101ba8 --- /dev/null +++ b/rust/lance-io/src/uring/reader.rs @@ -0,0 +1,292 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! UringReader implementation. + +use super::future::UringReadFuture; +use super::requests::IoRequest; +use super::thread::{SUBMITTED_COUNTER, THREAD_SELECTOR, URING_THREADS}; +use super::{DEFAULT_URING_BLOCK_SIZE, DEFAULT_URING_IO_PARALLELISM, URING_BLOCK_SIZE}; +use crate::local::to_local_path; +use crate::traits::Reader; +use crate::uring::requests::RequestState; +use crate::utils::tracking_store::IOTracker; +use bytes::{Bytes, BytesMut}; +use deepsize::DeepSizeOf; +use futures::future::BoxFuture; +use futures::{FutureExt, TryFutureExt}; +use lance_core::{Error, Result}; +use object_store::path::Path; +use std::fs::File; +use std::future::Future; +use std::io::{self, ErrorKind}; +use std::ops::Range; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::pin::Pin; +use std::sync::atomic::Ordering; +use std::sync::{Arc, LazyLock, Mutex}; +use std::time::Duration; +use tracing::instrument; + +/// Cache key for UringReader instances. +/// We cache by (path, block_size) because block_size affects reader behavior. +#[derive(Clone, Debug, Hash, Eq, PartialEq)] +pub(super) struct CacheKey { + path: String, + block_size: usize, +} + +impl CacheKey { + pub(super) fn new(path: &Path, block_size: usize) -> Self { + Self { + path: path.to_string(), + block_size, + } + } +} + +/// Data stored in the cache for each opened file. +#[derive(Clone)] +pub(super) struct CachedReaderData { + pub(super) handle: Arc, + pub(super) size: usize, +} + +/// Global cache of open file handles. +/// Entries expire after 60 seconds to ensure files are eventually closed. +pub(super) static HANDLE_CACHE: LazyLock> = + LazyLock::new(|| { + moka::future::Cache::builder() + .time_to_live(Duration::from_secs(60)) + .max_capacity(10_000) + .build() + }); + +/// File handle for io_uring operations. +/// +/// Keeps the file alive and provides the raw file descriptor. +#[derive(Debug)] +pub(super) struct UringFileHandle { + /// The file (kept alive via Arc) + #[allow(unused)] + file: Arc, + + /// Raw file descriptor for io_uring + pub(super) fd: RawFd, + + /// Object store path + pub(super) path: Path, +} + +impl UringFileHandle { + pub(super) fn new(file: File, path: Path) -> Self { + let fd = file.as_raw_fd(); + Self { + file: Arc::new(file), + fd, + path, + } + } +} + +/// io_uring-based reader for local files. +/// +/// This reader uses a dedicated process-wide thread running an io_uring event loop +/// for high-performance asynchronous I/O. +#[derive(Debug)] +pub struct UringReader { + /// File handle + handle: Arc, + + /// Block size for I/O operations + block_size: usize, + + /// File size (determined at open time) + size: usize, + + /// I/O tracker for monitoring operations + io_tracker: Arc, +} + +impl DeepSizeOf for UringReader { + fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize { + // Skip file handle (just a system resource) + // Only count the path's deep size + self.handle.path.as_ref().deep_size_of_children(context) + } +} + +impl UringReader { + /// Open a file with io_uring. + /// + /// This is the internal constructor used by ObjectStore. + #[instrument(level = "debug")] + pub(crate) async fn open( + path: &Path, + block_size: usize, + known_size: Option, + io_tracker: Arc, + ) -> Result> { + // Determine block size with environment variable override + let block_size = URING_BLOCK_SIZE.unwrap_or(block_size.max(DEFAULT_URING_BLOCK_SIZE)); + + let cache_key = CacheKey::new(path, block_size); + + // Try to get from cache first + if let Some(data) = HANDLE_CACHE.get(&cache_key).await { + // Use known_size if provided, otherwise use cached size + let size = known_size.unwrap_or(data.size); + return Ok(Box::new(Self { + handle: data.handle, + block_size, + size, + io_tracker, + }) as Box); + } + + // Cache miss - open file and get size + let path_clone = path.clone(); + let local_path = to_local_path(path); + + let data = tokio::task::spawn_blocking(move || { + let file = File::open(&local_path).map_err(|e| match e.kind() { + ErrorKind::NotFound => Error::not_found(path_clone.to_string()), + _ => e.into(), + })?; + + // Get size from known_size or file metadata + let size = match known_size { + Some(s) => s, + None => file.metadata()?.len() as usize, + }; + + Ok::<_, Error>(CachedReaderData { + handle: Arc::new(UringFileHandle::new(file, path_clone)), + size, + }) + }) + .await??; + + // Insert into cache + HANDLE_CACHE.insert(cache_key, data.clone()).await; + + // Return new reader instance + Ok(Box::new(Self { + handle: data.handle.clone(), + block_size, + size: data.size, + io_tracker, + }) as Box) + } + + /// Submit a read request to the io_uring thread via channel and return a future. + fn submit_read( + &self, + offset: u64, + length: usize, + ) -> Pin> + Send>> { + let mut buffer = BytesMut::with_capacity(length); + unsafe { + buffer.set_len(length); + } + + // Create IoRequest with all data + let request = Arc::new(IoRequest { + fd: self.handle.fd, + offset, + length, + thread_id: std::thread::current().id(), + state: Mutex::new(RequestState { + completed: false, + waker: None, + err: None, + buffer, + bytes_read: 0, + }), + }); + + // Increment submitted counter before sending to channel + SUBMITTED_COUNTER.fetch_add(1, Ordering::Relaxed); + + // Select thread in round-robin fashion + let thread_idx = + (THREAD_SELECTOR.fetch_add(1, Ordering::Relaxed) as usize) % URING_THREADS.len(); + + // Send to selected thread via channel + match URING_THREADS[thread_idx] + .request_tx + .send(Arc::clone(&request)) + { + Ok(()) => { + // Return future that will be woken when operation completes + Box::pin(UringReadFuture { request }) + } + Err(_) => { + // Thread died - decrement counter and return error future + SUBMITTED_COUNTER.fetch_sub(1, Ordering::Relaxed); + Box::pin(async move { + Err(object_store::Error::Generic { + store: "UringReader", + source: Box::new(io::Error::new( + io::ErrorKind::BrokenPipe, + "io_uring thread died", + )), + }) + }) + } + } + } +} + +impl Reader for UringReader { + fn path(&self) -> &Path { + &self.handle.path + } + + fn block_size(&self) -> usize { + self.block_size + } + + fn io_parallelism(&self) -> usize { + std::env::var("LANCE_URING_IO_PARALLELISM") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(DEFAULT_URING_IO_PARALLELISM) + } + + /// Returns the file size. + fn size(&self) -> BoxFuture<'_, object_store::Result> { + Box::pin(async move { Ok(self.size) }) + } + + /// Read a range of bytes using io_uring. + #[instrument(level = "debug", skip(self))] + fn get_range(&self, range: Range) -> BoxFuture<'static, object_store::Result> { + let io_tracker = self.io_tracker.clone(); + let path = self.handle.path.clone(); + let num_bytes = range.len() as u64; + let range_u64 = (range.start as u64)..(range.end as u64); + + self.submit_read(range.start as u64, range.len()) + .map_ok(move |bytes| { + io_tracker.record_read("get_range", path, num_bytes, Some(range_u64)); + bytes + }) + .boxed() + } + + /// Read the entire file using io_uring. + #[instrument(level = "debug", skip(self))] + fn get_all(&self) -> BoxFuture<'static, object_store::Result> { + let size = self.size; + let io_tracker = self.io_tracker.clone(); + let path = self.handle.path.clone(); + + self.submit_read(0, size) + .map_ok(move |bytes| { + io_tracker.record_read("get_all", path, bytes.len() as u64, None); + bytes + }) + .boxed() + } +} diff --git a/rust/lance-io/src/uring/requests.rs b/rust/lance-io/src/uring/requests.rs new file mode 100644 index 00000000000..fa257507dc1 --- /dev/null +++ b/rust/lance-io/src/uring/requests.rs @@ -0,0 +1,54 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Protocol types for communication between UringReader and the io_uring thread. + +use bytes::BytesMut; +use std::io; +use std::os::unix::io::RawFd; +use std::sync::Mutex; +use std::task::Waker; +use std::thread::ThreadId; + +pub(super) struct RequestState { + pub completed: bool, + pub waker: Option, + pub err: Option, + pub buffer: BytesMut, + /// Accumulated bytes read across retries (for handling short reads). + pub bytes_read: usize, +} + +/// I/O request object that contains all state for a single read operation. +/// This is shared between the submitter, uring thread, and future via Arc. +pub(super) struct IoRequest { + /// File descriptor to read from. + pub fd: RawFd, + + /// Byte offset to start reading from. + pub offset: u64, + + /// Number of bytes to read. + pub length: usize, + + pub thread_id: ThreadId, + + /// Completion flag - set to true when operation completes. + pub state: Mutex, +} + +impl IoRequest { + /// Mark this request as failed with the given error. + /// + /// Sets the error, marks completed, and wakes any waiting future. + /// Used when a request cannot be submitted (e.g. SQ full). + pub(super) fn fail(&self, err: io::Error) { + let mut state = self.state.lock().unwrap(); + state.err = Some(err); + state.completed = true; + if let Some(waker) = state.waker.take() { + drop(state); + waker.wake(); + } + } +} diff --git a/rust/lance-io/src/uring/tests.rs b/rust/lance-io/src/uring/tests.rs new file mode 100644 index 00000000000..19d931da629 --- /dev/null +++ b/rust/lance-io/src/uring/tests.rs @@ -0,0 +1,392 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Tests for io_uring reader implementation. + +use crate::object_store::ObjectStore; +use lance_core::Result; +use std::io::Write; +use std::time::Duration; +use tempfile::NamedTempFile; + +/// Helper to create a temporary file with test data +fn create_test_file(size: usize) -> Result<(NamedTempFile, Vec)> { + let mut file = NamedTempFile::new()?; + let data: Vec = (0..size).map(|i| (i % 256) as u8).collect(); + file.write_all(&data)?; + file.flush()?; + Ok((file, data)) +} + +#[tokio::test] +async fn test_read_small_file() -> Result<()> { + let (file, expected_data) = create_test_file(1024)?; + let file_path = file.path().to_str().unwrap(); + let uri = format!("file+uring://{}", file_path); + + let (store, path) = ObjectStore::from_uri(&uri).await?; + let reader = store.open(&path).await?; + + // Read entire file + let data = reader.get_all().await.unwrap(); + assert_eq!(data.as_ref(), expected_data.as_slice()); + + Ok(()) +} + +#[tokio::test] +async fn test_read_range() -> Result<()> { + let (file, expected_data) = create_test_file(4096)?; + let file_path = file.path().to_str().unwrap(); + let uri = format!("file+uring://{}", file_path); + + let (store, path) = ObjectStore::from_uri(&uri).await?; + let reader = store.open(&path).await?; + + // Read a range in the middle + let range = 1000..2000; + let data = reader.get_range(range.clone()).await.unwrap(); + assert_eq!(data.as_ref(), &expected_data[range]); + + Ok(()) +} + +#[tokio::test] +async fn test_read_multiple_ranges() -> Result<()> { + let (file, expected_data) = create_test_file(8192)?; + let file_path = file.path().to_str().unwrap(); + let uri = format!("file+uring://{}", file_path); + + let (store, path) = ObjectStore::from_uri(&uri).await?; + let reader = store.open(&path).await?; + + // Read multiple ranges + let ranges = vec![0..100, 500..600, 2000..3000]; + for range in ranges { + let data = reader.get_range(range.clone()).await.unwrap(); + assert_eq!(data.as_ref(), &expected_data[range]); + } + + Ok(()) +} + +#[tokio::test] +async fn test_file_size() -> Result<()> { + let size = 5000; + let (file, _) = create_test_file(size)?; + let file_path = file.path().to_str().unwrap(); + let uri = format!("file+uring://{}", file_path); + + let (store, path) = ObjectStore::from_uri(&uri).await?; + let reader = store.open(&path).await?; + + assert_eq!(reader.size().await.unwrap(), size); + + Ok(()) +} + +#[tokio::test] +async fn test_concurrent_reads() -> Result<()> { + let (file, expected_data) = create_test_file(16384)?; + let file_path = file.path().to_str().unwrap(); + let uri = format!("file+uring://{}", file_path); + + let (store, path) = ObjectStore::from_uri(&uri).await?; + + // Perform multiple concurrent reads + let mut tasks = vec![]; + for i in 0..10 { + let reader_clone = store.open(&path).await?; + let expected = expected_data.clone(); + tasks.push(tokio::spawn(async move { + let range = (i * 1000)..((i + 1) * 1000); + let data = reader_clone.get_range(range.clone()).await.unwrap(); + assert_eq!(data.as_ref(), &expected[range]); + })); + } + + // Wait for all tasks + for task in tasks { + task.await.unwrap(); + } + + Ok(()) +} + +#[tokio::test] +async fn test_large_file_read() -> Result<()> { + // Test with a larger file (1MB) + let size = 1024 * 1024; + let (file, expected_data) = create_test_file(size)?; + let file_path = file.path().to_str().unwrap(); + let uri = format!("file+uring://{}", file_path); + + let (store, path) = ObjectStore::from_uri(&uri).await?; + let reader = store.open(&path).await?; + + // Read entire file + let data = reader.get_all().await.unwrap(); + assert_eq!(data.len(), size); + assert_eq!(data.as_ref(), expected_data.as_slice()); + + Ok(()) +} + +#[tokio::test] +async fn test_read_edge_cases() -> Result<()> { + let (file, expected_data) = create_test_file(4096)?; + let file_path = file.path().to_str().unwrap(); + let uri = format!("file+uring://{}", file_path); + + let (store, path) = ObjectStore::from_uri(&uri).await?; + let reader = store.open(&path).await?; + + // Read from start + let data = reader.get_range(0..100).await.unwrap(); + assert_eq!(data.as_ref(), &expected_data[0..100]); + + // Read to end + let data = reader.get_range(4000..4096).await.unwrap(); + assert_eq!(data.as_ref(), &expected_data[4000..4096]); + + // Read single byte + let data = reader.get_range(2000..2001).await.unwrap(); + assert_eq!(data.as_ref(), &expected_data[2000..2001]); + + Ok(()) +} + +#[tokio::test] +async fn test_file_not_found() { + let uri = "file+uring:///nonexistent/file.dat"; + let (store, path) = ObjectStore::from_uri(uri).await.unwrap(); + + // Should fail to open non-existent file + let result = store.open(&path).await; + assert!(result.is_err()); +} + +#[tokio::test] +async fn test_block_size_and_parallelism() -> Result<()> { + let (file, _) = create_test_file(1024)?; + let file_path = file.path().to_str().unwrap(); + let uri = format!("file+uring://{}", file_path); + + let (store, path) = ObjectStore::from_uri(&uri).await?; + let reader = store.open(&path).await?; + + // Check default values (or configured values) + assert!(reader.block_size() > 0); + assert!(reader.io_parallelism() > 0); + + Ok(()) +} + +#[tokio::test] +async fn test_path() -> Result<()> { + let (file, _) = create_test_file(1024)?; + let file_path = file.path().to_str().unwrap(); + let uri = format!("file+uring://{}", file_path); + + let (store, path) = ObjectStore::from_uri(&uri).await?; + let reader = store.open(&path).await?; + + // Verify path is preserved + assert_eq!(reader.path(), &path); + + Ok(()) +} + +/// Test that reading past EOF returns an error. +/// +/// This exercises the case where `known_size` passed to `open_with_size` is larger +/// than the actual file, causing io_uring to hit EOF before the full read completes. +#[tokio::test] +async fn test_short_read_get_all() -> Result<()> { + let actual_size: usize = 8192; + let (file, _expected_data) = create_test_file(actual_size)?; + let file_path = file.path().to_str().unwrap(); + let uri = format!("file+uring://{}", file_path); + + let (store, path) = ObjectStore::from_uri(&uri).await?; + + // Open with inflated known_size — the reader will think the file is 2x its real size + let inflated_size = actual_size * 2; + let reader = store.open_with_size(&path, inflated_size).await?; + + // get_all() will submit a read for inflated_size bytes from an actual_size file. + // The kernel reads actual_size bytes then returns 0 (EOF) — this should be an error. + let result = reader.get_all().await; + assert!(result.is_err(), "reading past EOF should return an error"); + + Ok(()) +} + +/// Test that a range read extending past EOF returns an error. +#[tokio::test] +async fn test_short_read_get_range_past_eof() -> Result<()> { + let actual_size: usize = 8192; + let (file, _expected_data) = create_test_file(actual_size)?; + let file_path = file.path().to_str().unwrap(); + let uri = format!("file+uring://{}", file_path); + + let (store, path) = ObjectStore::from_uri(&uri).await?; + let reader = store.open(&path).await?; + + // Request a range that starts inside the file but extends past EOF. + // File is 8192 bytes; reading 4096..16384 hits EOF — this should be an error. + let range_start = 4096; + let range_end = actual_size * 2; // 16384, well past EOF + let result = reader.get_range(range_start..range_end).await; + assert!( + result.is_err(), + "range extending past EOF should return an error" + ); + + Ok(()) +} + +/// Test that when push_to_sq fails (SQ full), the request's future returns +/// an error instead of hanging forever. +/// +/// This directly tests the thread-path scenario: create an IoUring with +/// queue_depth=2, fill the SQ, then try to push a 3rd request. The 3rd +/// request's future should return an error within the timeout. +/// +/// BUG: currently the failed push silently drops the request, so the +/// future hangs and the timeout fires. +#[tokio::test] +async fn test_retry_sq_full_thread() -> Result<()> { + use super::future::UringReadFuture; + use super::requests::{IoRequest, RequestState}; + use super::thread::push_to_sq; + use bytes::BytesMut; + use io_uring::IoUring; + use std::collections::HashMap; + use std::os::unix::io::AsRawFd; + use std::sync::{Arc, Mutex}; + + let (file, _) = create_test_file(4096)?; + let fd = file.as_file().as_raw_fd(); + + // Create a tiny ring with queue_depth=2 + let mut ring = IoUring::new(2).unwrap(); + let mut pending: HashMap> = HashMap::new(); + + // Helper to create a request + let make_request = || { + Arc::new(IoRequest { + fd, + offset: 0, + length: 4096, + thread_id: std::thread::current().id(), + state: Mutex::new(RequestState { + completed: false, + waker: None, + err: None, + buffer: BytesMut::zeroed(4096), + bytes_read: 0, + }), + }) + }; + + // Fill the SQ (capacity=2) + let _r1 = make_request(); + let _r2 = make_request(); + push_to_sq(&mut ring, &mut pending, _r1).unwrap(); + push_to_sq(&mut ring, &mut pending, _r2).unwrap(); + + // 3rd push should fail — SQ is full + let r3 = make_request(); + let push_result = push_to_sq(&mut ring, &mut pending, r3.clone()); + assert!(push_result.is_err(), "3rd push should fail (SQ full)"); + + // r3's future should return an error, not hang forever. + // BUG: currently nobody sets completed=true or err on r3, so the future hangs. + let future = UringReadFuture { request: r3 }; + let result = tokio::time::timeout(Duration::from_secs(2), future).await; + assert!( + result.is_ok(), + "future timed out — request was dropped without error on SQ-full push failure" + ); + + Ok(()) +} + +/// Test that when push_to_sq fails (SQ full) on the current-thread path, +/// the request's future returns an error instead of hanging forever. +/// +/// Uses UringCurrentThreadFuture (which will be a no-op poller since the +/// thread-local URING has no knowledge of this request) after push_to_sq +/// has already completed the request with an error. +#[tokio::test(flavor = "current_thread")] +async fn test_retry_sq_full_current_thread() -> Result<()> { + use super::current_thread_future::UringCurrentThreadFuture; + use super::requests::{IoRequest, RequestState}; + use super::thread::push_to_sq; + use bytes::BytesMut; + use io_uring::IoUring; + use std::collections::HashMap; + use std::os::unix::io::AsRawFd; + use std::sync::{Arc, Mutex}; + + let (file, _) = create_test_file(4096)?; + let fd = file.as_file().as_raw_fd(); + + // Create a tiny ring with queue_depth=2 + let mut ring = IoUring::new(2).unwrap(); + let mut pending: HashMap> = HashMap::new(); + + let make_request = || { + Arc::new(IoRequest { + fd, + offset: 0, + length: 4096, + thread_id: std::thread::current().id(), + state: Mutex::new(RequestState { + completed: false, + waker: None, + err: None, + buffer: BytesMut::zeroed(4096), + bytes_read: 0, + }), + }) + }; + + // Fill the SQ (capacity=2) + push_to_sq(&mut ring, &mut pending, make_request()).unwrap(); + push_to_sq(&mut ring, &mut pending, make_request()).unwrap(); + + // 3rd push should fail — SQ is full + let r3 = make_request(); + let push_result = push_to_sq(&mut ring, &mut pending, r3.clone()); + assert!(push_result.is_err(), "3rd push should fail (SQ full)"); + + // r3's future should return an error, not hang forever. + let future = UringCurrentThreadFuture::new(r3); + let result = tokio::time::timeout(Duration::from_secs(2), future).await; + assert!( + result.is_ok(), + "future timed out — request was dropped without error on SQ-full push failure" + ); + + Ok(()) +} + +#[tokio::test] +async fn test_uring_not_enabled_with_file_scheme() -> Result<()> { + // Verify that files opened with file:// don't use uring + let (file, expected_data) = create_test_file(1024)?; + let file_path = file.path().to_str().unwrap(); + // Use regular file:// scheme, should NOT use uring + let uri = format!("file://{}", file_path); + + let (store, path) = ObjectStore::from_uri(&uri).await?; + let reader = store.open(&path).await?; + + // Should still be able to read, just won't use uring + let data = reader.get_all().await.unwrap(); + assert_eq!(data.as_ref(), expected_data.as_slice()); + + Ok(()) +} diff --git a/rust/lance-io/src/uring/thread.rs b/rust/lance-io/src/uring/thread.rs new file mode 100644 index 00000000000..d2ef197947d --- /dev/null +++ b/rust/lance-io/src/uring/thread.rs @@ -0,0 +1,396 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Dedicated thread for io_uring operations. +//! +//! This module provides a background thread that owns an io_uring instance +//! and processes read requests from a channel. Readers send requests via +//! an MPSC channel, and the thread handles submission and completion processing. + +use super::DEFAULT_URING_QUEUE_DEPTH; +use super::requests::IoRequest; +use io_uring::{IoUring, opcode, types}; +use std::collections::HashMap; +use std::io; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::mpsc::{Receiver, RecvTimeoutError, SyncSender, sync_channel}; +use std::sync::{Arc, LazyLock}; +use std::time::{Duration, Instant}; + +/// Handle to the io_uring background thread. +/// +/// This provides a channel sender for submitting read requests to the thread. +pub(super) struct UringThreadHandle { + pub request_tx: SyncSender>, +} + +/// Lazy-initialized io_uring thread pool. +/// +/// Multiple threads are spawned on first access and run until process exit. +pub(super) static URING_THREADS: LazyLock> = LazyLock::new(|| { + let queue_depth = get_queue_depth(); + let thread_count = get_thread_count(); + + let mut threads = Vec::with_capacity(thread_count); + + for i in 0..thread_count { + let (tx, rx) = sync_channel(queue_depth); + + std::thread::Builder::new() + .name(format!("lance-uring-{}", i)) + .spawn(move || run_uring_thread(rx, queue_depth, i)) + .expect("Failed to spawn io_uring thread"); + + threads.push(UringThreadHandle { request_tx: tx }); + } + + log::info!( + "io_uring thread pool spawned ({} threads, queue_depth={})", + thread_count, + queue_depth + ); + + threads +}); + +/// Atomic counter for round-robin thread selection. +pub(super) static THREAD_SELECTOR: AtomicU64 = AtomicU64::new(0); + +/// Counter for generating unique user_data values. +/// +/// Each io_uring operation needs a unique user_data ID to match completions +/// with their corresponding requests. +static USER_DATA_COUNTER: AtomicU64 = AtomicU64::new(1); + +/// Counter for requests that have been submitted to the thread but not yet received. +/// +/// This tracks requests sitting in the channel queue waiting to be received by the thread. +pub(super) static SUBMITTED_COUNTER: AtomicU64 = AtomicU64::new(0); + +/// Default batch size for submission - how many requests to batch before calling submit(). +const DEFAULT_SUBMIT_BATCH_SIZE: usize = 128; + +/// Default number of io_uring threads. +const DEFAULT_URING_THREAD_COUNT: usize = 2; + +/// Get the configured queue depth from environment variable. +fn get_queue_depth() -> usize { + std::env::var("LANCE_URING_QUEUE_DEPTH") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(DEFAULT_URING_QUEUE_DEPTH) +} + +/// Get the configured poll timeout from environment variable. +fn get_poll_timeout() -> Duration { + let timeout_ms = std::env::var("LANCE_URING_POLL_TIMEOUT_MS") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(10); + + Duration::from_millis(timeout_ms) +} + +/// Get the configured submit batch size from environment variable. +fn get_submit_batch_size() -> usize { + std::env::var("LANCE_URING_SUBMIT_BATCH_SIZE") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(DEFAULT_SUBMIT_BATCH_SIZE) +} + +/// Get the configured number of uring threads from environment variable. +fn get_thread_count() -> usize { + std::env::var("LANCE_URING_THREAD_COUNT") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(DEFAULT_URING_THREAD_COUNT) +} + +/// Main loop for the io_uring thread. +/// +/// This thread: +/// 1. Receives requests from the channel +/// 2. Submits them to io_uring +/// 3. Processes completions +/// 4. Wakes futures via their wakers +fn run_uring_thread(request_rx: Receiver>, queue_depth: usize, thread_id: usize) { + // Create local io_uring instance + let mut ring = IoUring::builder() + // .setup_sqpoll(100) + .build(queue_depth as u32) + .expect("Failed to create io_uring"); + + let mut pending: HashMap> = HashMap::with_capacity(queue_depth); + let poll_timeout = get_poll_timeout(); + let submit_batch_size = get_submit_batch_size(); + let mut last_log = Instant::now(); + let log_interval = Duration::from_millis(100); + let mut completed_iops = 0usize; + let mut completed_sectors = 0usize; + let mut min_in_flight = usize::MAX; + + loop { + // Track minimum in-flight count + let in_flight = pending.len(); + min_in_flight = min_in_flight.min(in_flight); + + // Log in-flight requests every 100ms + let now = Instant::now(); + if now.duration_since(last_log) >= log_interval { + let submitted = SUBMITTED_COUNTER.load(Ordering::Relaxed); + log::info!( + "io_uring[{}]: {} submitted, {} in flight (min {}), {} iops completed, {} sectors completed", + thread_id, + submitted, + in_flight, + min_in_flight, + completed_iops, + completed_sectors + ); + last_log = now; + completed_iops = 0; // Reset counter after logging + completed_sectors = 0; // Reset counter after logging + min_in_flight = usize::MAX; // Reset min tracker + } + + // Process all available completions first + let mut needs_submit = false; + let completions = process_completions(&mut ring, &mut pending); + match completions { + Ok(result) => { + completed_iops += result.iops; + completed_sectors += result.sectors; + + // Resubmit any short-read retries + for request in result.retries { + if let Err(e) = push_to_sq(&mut ring, &mut pending, request) { + log::error!("Failed to resubmit short read: {}", e); + } else { + needs_submit = true; + } + } + } + Err(e) => { + log::error!("Error processing io_uring completions: {}", e); + } + } + + min_in_flight = min_in_flight.min(pending.len()); + + // Batch submit requests - keep pulling from channel and pushing to SQ + // until we hit batch size or channel is empty + let mut batch_count = 0; + loop { + // Try to receive new request + // Use recv_timeout only when pending is empty, otherwise use try_recv + let recv_result = if pending.is_empty() && batch_count == 0 { + // No operations in flight and no batch started - we can afford to wait with timeout + request_rx.recv_timeout(poll_timeout).map_err(|e| match e { + RecvTimeoutError::Timeout => std::sync::mpsc::TryRecvError::Empty, + RecvTimeoutError::Disconnected => std::sync::mpsc::TryRecvError::Disconnected, + }) + } else { + // Operations in flight or batch in progress - busy loop with try_recv + request_rx.try_recv() + }; + + match recv_result { + Ok(request) => { + // Decrement submitted counter when we receive the request from channel + SUBMITTED_COUNTER.fetch_sub(1, Ordering::Relaxed); + + // Push to submission queue (but don't submit yet) + if let Err(e) = push_to_sq(&mut ring, &mut pending, request) { + log::error!("Failed to push to io_uring SQ: {}", e); + } else { + batch_count += 1; + } + + // Break if we've hit the batch size limit + if batch_count >= submit_batch_size { + break; + } + } + Err(std::sync::mpsc::TryRecvError::Empty) => { + // No more requests in channel - break to submit the batch + break; + } + Err(std::sync::mpsc::TryRecvError::Disconnected) => { + // All senders dropped - submit batch and shutdown + if batch_count > 0 + && let Err(e) = ring.submit() + { + log::error!( + "io_uring[{}]: Failed to submit io_uring batch: {}", + thread_id, + e + ); + } + log::info!( + "io_uring thread {} shutting down (channel disconnected)", + thread_id + ); + return; + } + } + } + + // Submit if we have any requests (from channel or retries) + if (batch_count > 0 || needs_submit) + && let Err(e) = ring.submit() + { + log::error!( + "Failed to submit io_uring batch of {} requests: {}", + batch_count, + e + ); + } + } +} + +/// Push a read request to the io_uring submission queue (without submitting). +/// +/// This generates a unique user_data ID, prepares the read operation, +/// and pushes it to the SQ. The caller is responsible for calling ring.submit(). +pub(super) fn push_to_sq( + ring: &mut IoUring, + pending: &mut HashMap>, + request: Arc, +) -> io::Result<()> { + // Generate unique user_data + let user_data = USER_DATA_COUNTER.fetch_add(1, Ordering::Relaxed); + + // Get buffer pointer, adjusting for any bytes already read (short read retry) + let (buffer_ptr, read_offset, read_length) = { + let state = request.state.lock().unwrap(); + let br = state.bytes_read; + ( + unsafe { state.buffer.as_ptr().add(br) as *mut u8 }, + request.offset + br as u64, + (request.length - br) as u32, + ) + }; + + // Prepare read operation + let read_op = + opcode::Read::new(types::Fd(request.fd), buffer_ptr, read_length).offset(read_offset); + + // Get submission queue + let mut sq = ring.submission(); + + // Check if SQ has space + if sq.is_full() { + drop(sq); + request.fail(io::Error::new( + io::ErrorKind::WouldBlock, + "io_uring submission queue full", + )); + return Err(io::Error::new( + io::ErrorKind::WouldBlock, + "io_uring submission queue full", + )); + } + + // Push to SQ + unsafe { + if sq.push(&read_op.build().user_data(user_data)).is_err() { + drop(sq); + request.fail(io::Error::other("Failed to push to SQ")); + return Err(io::Error::other("Failed to push to SQ")); + } + } + drop(sq); + + // Track request in pending map + pending.insert(user_data, request); + + Ok(()) +} + +struct CompletionResult { + iops: usize, + sectors: usize, + retries: Vec>, +} + +/// Process all available completions from the io_uring. +/// +/// This iterates through the completion queue, matches completions to requests, +/// updates their state, and wakes any waiting futures. Short reads are collected +/// into `retries` for resubmission; EOF before a full read is an error. +/// +/// Returns completion stats and a list of requests needing resubmission. +fn process_completions( + ring: &mut IoUring, + pending: &mut HashMap>, +) -> io::Result { + let mut iops = 0; + let mut sectors = 0; + let mut retries = Vec::new(); + + // Process all available completions + for cqe in ring.completion() { + let user_data = cqe.user_data(); + let result = cqe.result(); + + // Look up request + if let Some(request) = pending.remove(&user_data) { + let mut state = request.state.lock().unwrap(); + + if result < 0 { + // Kernel error + state.err = Some(io::Error::from_raw_os_error(-result)); + state.completed = true; + } else if result == 0 { + // EOF before full read completed + let br = state.bytes_read; + state.err = Some(io::Error::new( + io::ErrorKind::UnexpectedEof, + format!("unexpected EOF: read {} of {} bytes", br, request.length), + )); + state.buffer.truncate(br); + state.completed = true; + } else { + // Positive result: n bytes read + let n = result as usize; + state.bytes_read += n; + let br = state.bytes_read; + + if br >= request.length { + // Full read complete + state.buffer.truncate(br); + state.completed = true; + + if request.length > 0 { + let first_sector = request.offset / 4096; + let last_sector = (request.offset + request.length as u64 - 1) / 4096; + let num_sectors = (last_sector - first_sector + 1) as usize; + sectors += num_sectors; + } + } else { + // Short read — need retry; don't mark completed or wake + drop(state); + retries.push(request); + continue; + } + } + + // Wake the future if it's waiting + if let Some(waker) = state.waker.take() { + drop(state); // Release lock before waking + waker.wake(); + } + + iops += 1; + } else { + log::warn!("Received completion for unknown user_data: {}", user_data); + } + } + + Ok(CompletionResult { + iops, + sectors, + retries, + }) +} diff --git a/rust/lance/src/index/vector/builder.rs b/rust/lance/src/index/vector/builder.rs index c2d6427b251..54fa9f6ac96 100644 --- a/rust/lance/src/index/vector/builder.rs +++ b/rust/lance/src/index/vector/builder.rs @@ -259,7 +259,7 @@ impl IvfIndexBuilder progress .stage_start("train_ivf", max_iters, "iterations") .await?; - self.with_ivf(self.load_or_build_ivf().await?); + self.with_ivf(self.load_or_build_ivf().boxed().await?); progress.stage_complete("train_ivf").await?; progress.stage_start("train_quantizer", None, "").await?; @@ -270,7 +270,7 @@ impl IvfIndexBuilder if self.shuffle_reader.is_none() { let num_rows = self.num_rows_to_shuffle().await?; progress.stage_start("shuffle", num_rows, "rows").await?; - self.shuffle_dataset().await?; + self.shuffle_dataset().boxed().await?; progress.stage_complete("shuffle").await?; }