From 0425402301ee38b488d400621ca877a3b3b56cf3 Mon Sep 17 00:00:00 2001 From: Jeongkyu Shin Date: Tue, 19 May 2026 00:51:53 +0900 Subject: [PATCH] update: bump deps across workspace and sync russh forks to upstream stable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit bssh deps: lru 0.17→0.18 (lifetime fix), signal-hook 0.3→0.4 (only the unused low_level::pipe API changed), opentelemetry / opentelemetry_sdk / opentelemetry-otlp 0.31→0.32 (source-compatible at our LogExporter / SdkLoggerProvider / Resource builders + LogRecord call sites; the removed ExportConfig / HasExportConfig trio isn't used here), plus nix 0.31.2→0.31.3 and transitive pin-project / tower-http / zerofrom patches via cargo update. bssh-russh 0.60.1→0.60.3: track upstream stable. Bump russh-cryptovec 0.59.0→0.60.3 (carries the CryptoVec hardening half of CVE-2026-46673) and aws-lc-rs 1.16.3→1.17.0. Manually forward-port the agent-frame-length-cap half of CVE-2026-46673 (commit a2d48a7) into src/keys/agent/{client,server}.rs — adds MAX_AGENT_FRAME_LEN = 256 KiB, factors out read_frame(), and rejects oversized frames with Error::AgentProtocolError before allocating. Recorded as patches/agent-frame-length-cap.patch so the next sync-upstream.sh run will auto-skip it via the reverse-apply dry-run check. bssh-russh-sftp 2.1.1→2.1.2: full source sync to upstream 2.1.2. Upstream absorbed the original serde_bytes perf fix in protocol/{write,data}.rs and the matching serialize_bytes implementation; the patch is moved to patches/historical/ for provenance. The fork's remaining value-add is two pipelined File I/O helpers (write_all_pipelined / read_to_writer_pipelined) that drive reader→file and file→writer transfers with bounded in-flight chunks to hide per-request RTT; they were undocumented before and have now been re-ported on top of the new Features API — chunk size derives from features.limits.{write,read}_len or features.max_packet_len.saturating_sub({WRITE,READ}_OVERHEAD_LENGTH) instead of the removed MAX_*_LENGTH constants. Cargo.toml replaces flurry with dashmap, adds serde_bytes as a direct dep, keeps futures (used by our pipelined helpers' FuturesUnordered), and pins versions to upstream's set. The fork description now records both the absorbed perf fix and the still-custom pipelined helpers so future maintainers won't repeat the mistake of dropping the fork without realising the pipelined methods aren't in upstream. Verification: cargo build, cargo check --all-targets, and cargo clippy -p bssh --bins -p bssh-russh-sftp --lib -- -D warnings all clean. Affected-module tests pass — server::audit::otel 5, ssh::config_cache 14, pty:: 195, ssh:: 280. cargo audit reports 0 vulnerabilities and 0 unmaintained/yanked warnings against the 1090-advisory database after the bumps. --- Cargo.lock | 214 ++++---- Cargo.toml | 17 +- crates/bssh-russh-sftp/Cargo.toml | 28 +- .../sftp-serde-bytes-perf.patch | 0 crates/bssh-russh-sftp/src/client/fs/file.rs | 481 ++++++------------ crates/bssh-russh-sftp/src/client/mod.rs | 22 +- .../bssh-russh-sftp/src/client/rawsession.rs | 198 +++---- crates/bssh-russh-sftp/src/client/session.rs | 89 ++-- crates/bssh-russh-sftp/src/de.rs | 2 +- crates/bssh-russh-sftp/src/lib.rs | 3 - crates/bssh-russh-sftp/src/protocol/mod.rs | 103 +--- crates/bssh-russh-sftp/src/ser.rs | 4 +- crates/bssh-russh-sftp/src/server/mod.rs | 32 +- crates/bssh-russh-sftp/src/utils.rs | 10 +- crates/bssh-russh/Cargo.toml | 6 +- .../patches/agent-frame-length-cap.patch | 96 ++++ crates/bssh-russh/src/keys/agent/client.rs | 22 +- crates/bssh-russh/src/keys/agent/server.rs | 28 +- 18 files changed, 616 insertions(+), 739 deletions(-) rename crates/bssh-russh-sftp/patches/{ => historical}/sftp-serde-bytes-perf.patch (100%) create mode 100644 crates/bssh-russh/patches/agent-frame-length-cap.patch diff --git a/Cargo.lock b/Cargo.lock index 014270a2..27e1d0f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -78,19 +78,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "ahash" -version = "0.8.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" -dependencies = [ - "cfg-if", - "const-random", - "once_cell", - "version_check", - "zerocopy", -] - [[package]] name = "aho-corasick" version = "1.1.4" @@ -254,9 +241,9 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] name = "aws-lc-rs" -version = "1.16.3" +version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ec6fb3fe69024a75fa7e1bfb48aa6cf59706a101658ea01bfd33b2b248a038f" +checksum = "5ec2f1fc3ec205783a5da9a7e6c1509cc69dedf09a1949e412c1e18469326d00" dependencies = [ "aws-lc-sys", "untrusted 0.7.1", @@ -265,9 +252,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.40.0" +version = "0.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f50037ee5e1e41e7b8f9d161680a725bd1626cb6f8c7e901f91f942850852fe7" +checksum = "1a2f9779ce85b93ab6170dd940ad0169b5766ff848247aff13bb788b832fe3f4" dependencies = [ "cc", "cmake", @@ -441,10 +428,10 @@ dependencies = [ "insta", "ipnetwork", "libc", - "lru 0.17.0", + "lru 0.18.0", "mockall", "mockito", - "nix 0.31.2", + "nix 0.31.3", "opentelemetry", "opentelemetry-otlp", "opentelemetry_sdk", @@ -462,7 +449,7 @@ dependencies = [ "serde_yaml", "serial_test", "shell-words", - "signal-hook", + "signal-hook 0.4.4", "smallvec", "socket2", "ssh-key", @@ -484,7 +471,7 @@ dependencies = [ [[package]] name = "bssh-russh" -version = "0.60.1" +version = "0.60.3" dependencies = [ "aes 0.8.4", "async-trait", @@ -551,13 +538,13 @@ dependencies = [ [[package]] name = "bssh-russh-sftp" -version = "2.1.1" +version = "2.1.2" dependencies = [ "async-trait", "bitflags 2.11.1", "bytes", "chrono", - "flurry", + "dashmap", "futures", "log", "serde", @@ -866,26 +853,6 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6ef517f0926dd24a1582492c791b6a4818a4d94e789a334894aa15b0d12f55c" -[[package]] -name = "const-random" -version = "0.1.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" -dependencies = [ - "const-random-macro", -] - -[[package]] -name = "const-random-macro" -version = "0.1.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" -dependencies = [ - "getrandom 0.2.17", - "once_cell", - "tiny-keccak", -] - [[package]] name = "convert_case" version = "0.10.0" @@ -1017,7 +984,7 @@ dependencies = [ "mio", "parking_lot", "rustix", - "signal-hook", + "signal-hook 0.3.18", "signal-hook-mio", "winapi", ] @@ -1197,6 +1164,20 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "dashmap" +version = "6.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6361d5c062261c78a176addb82d4c821ae42bed6089de0e12603cd25de2059c" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.11.0" @@ -1603,18 +1584,6 @@ dependencies = [ "miniz_oxide", ] -[[package]] -name = "flurry" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf5efcf77a4da27927d3ab0509dec5b0954bb3bc59da5a1de9e52642ebd4cdf9" -dependencies = [ - "ahash", - "num_cpus", - "parking_lot", - "seize", -] - [[package]] name = "fnv" version = "1.0.7" @@ -1872,6 +1841,12 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.5" @@ -1909,12 +1884,6 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" -[[package]] -name = "hermit-abi" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" - [[package]] name = "hex" version = "0.4.3" @@ -2529,9 +2498,9 @@ dependencies = [ [[package]] name = "lru" -version = "0.17.0" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e0b564323a0fb6d54b864f625ae139de9612e27edb944dda37c109f05aac531" +checksum = "8a860605968fce16869fd239cf4237a82f3ac470723415db603b0e8b6c8d4fb9" dependencies = [ "hashbrown 0.17.1", ] @@ -2709,9 +2678,9 @@ dependencies = [ [[package]] name = "nix" -version = "0.31.2" +version = "0.31.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d6d0705320c1e6ba1d912b5e37cf18071b6c2e9b7fa8215a1e8a7651966f5d3" +checksum = "cf20d2fde8ff38632c426f1165ed7436270b44f199fc55284c38276f9db47c3d" dependencies = [ "bitflags 2.11.1", "cfg-if", @@ -2812,16 +2781,6 @@ dependencies = [ "libm", ] -[[package]] -name = "num_cpus" -version = "1.17.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" -dependencies = [ - "hermit-abi", - "libc", -] - [[package]] name = "num_threads" version = "0.1.7" @@ -2881,9 +2840,9 @@ checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" [[package]] name = "opentelemetry" -version = "0.31.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0" +checksum = "b0142c63252a9e054e68a4c61a5778f7b14f576274d593f8ce883d191a099682" dependencies = [ "futures-core", "futures-sink", @@ -2895,9 +2854,9 @@ dependencies = [ [[package]] name = "opentelemetry-http" -version = "0.31.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" +checksum = "5683015d09e2df236ef005b17f6f196f0d5f6313c4fa43a7b6a53b52776e4331" dependencies = [ "async-trait", "bytes", @@ -2908,9 +2867,9 @@ dependencies = [ [[package]] name = "opentelemetry-otlp" -version = "0.31.1" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f69cd6acbb9af919df949cd1ec9e5e7fdc2ef15d234b6b795aaa525cc02f71f" +checksum = "9966929966d17620d7c316c643ba62631826e10021409357772d5eea84f62c35" dependencies = [ "http", "opentelemetry", @@ -2922,14 +2881,14 @@ dependencies = [ "thiserror 2.0.18", "tokio", "tonic", - "tracing", + "tonic-types", ] [[package]] name = "opentelemetry-proto" -version = "0.31.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" +checksum = "56d658ba1faf63f7b9c492cfbe6e0ec365440a16132d3270c1065f7b33f1b638" dependencies = [ "opentelemetry", "opentelemetry_sdk", @@ -2940,15 +2899,16 @@ dependencies = [ [[package]] name = "opentelemetry_sdk" -version = "0.31.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd" +checksum = "368afaed344110f40b179bb8fbe54bc52d98f9bd2b281799ef32487c2650c956" dependencies = [ "futures-channel", "futures-executor", "futures-util", "opentelemetry", "percent-encoding", + "portable-atomic", "rand 0.9.4", "thiserror 2.0.18", "tokio", @@ -3240,18 +3200,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.12" +version = "1.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbf0d9e68100b3a7989b4901972f265cd542e560a3a8a724e1e20322f4d06ce9" +checksum = "2466b2336ed02bcdca6b294417127b90ec92038d1d5c4fbeac971a922e0e0924" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.12" +version = "1.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a990e22f43e84855daf260dded30524ef4a9021cc7541c26540500a50b624389" +checksum = "c96395f0a926bc13b1c17622aaddda1ecb55d49c8f1bf9777e4d877800a43f8b" dependencies = [ "proc-macro2", "quote", @@ -3516,6 +3476,15 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "prost-types" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8991c4cbdb8bc5b11f0b074ffe286c30e523de90fee5ba8132f1399f23cb3dd7" +dependencies = [ + "prost", +] + [[package]] name = "quote" version = "1.0.45" @@ -3779,9 +3748,9 @@ checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" [[package]] name = "reqwest" -version = "0.12.28" +version = "0.13.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" +checksum = "62e0021ea2c22aed41653bc7e1419abb2c97e038ff2c33d0e1309e49a97deec0" dependencies = [ "base64", "bytes", @@ -3797,9 +3766,6 @@ dependencies = [ "log", "percent-encoding", "pin-project-lite", - "serde", - "serde_json", - "serde_urlencoded", "sync_wrapper", "tokio", "tower", @@ -3908,12 +3874,12 @@ dependencies = [ [[package]] name = "russh-cryptovec" -version = "0.59.0" +version = "0.60.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36140e8a20297bc2e8338807c3d9ca911f7fa49d7539cbcd6d48d3befd70efd8" +checksum = "37cb4d0360bdd8935392a306d8b5edb539cc455b30e8bf13dd213a0cf7879b40" dependencies = [ "log", - "nix 0.31.2", + "nix 0.31.3", "ssh-encoding", "windows-sys 0.61.2", ] @@ -4040,7 +4006,7 @@ dependencies = [ "libc", "log", "memchr", - "nix 0.31.2", + "nix 0.31.3", "radix_trie", "unicode-segmentation", "unicode-width", @@ -4176,12 +4142,6 @@ dependencies = [ "libc", ] -[[package]] -name = "seize" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "689224d06523904ebcc9b482c6a3f4f7fb396096645c4cd10c0d2ff7371a34d3" - [[package]] name = "semver" version = "1.0.28" @@ -4387,6 +4347,16 @@ dependencies = [ "signal-hook-registry", ] +[[package]] +name = "signal-hook" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2a0c28ca5908dbdbcd52e6fdaa00358ab88637f8ab33e1f188dd510eb44b53d" +dependencies = [ + "libc", + "signal-hook-registry", +] + [[package]] name = "signal-hook-mio" version = "0.2.5" @@ -4395,7 +4365,7 @@ checksum = "b75a19a7a740b25bc7944bdee6172368f988763b744e3d4dfe753f6b4ece40cc" dependencies = [ "libc", "mio", - "signal-hook", + "signal-hook 0.3.18", ] [[package]] @@ -4706,7 +4676,7 @@ dependencies = [ "pest_derive", "phf", "sha2 0.10.9", - "signal-hook", + "signal-hook 0.3.18", "siphasher", "terminfo", "termios", @@ -4792,15 +4762,6 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" -[[package]] -name = "tiny-keccak" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" -dependencies = [ - "crunchy", -] - [[package]] name = "tinystr" version = "0.8.3" @@ -4931,6 +4892,17 @@ dependencies = [ "tonic", ] +[[package]] +name = "tonic-types" +version = "0.14.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73ab1b02061f83d519bba3caa167f88f261ef05720ab8ebc954ade70de3348e8" +dependencies = [ + "prost", + "prost-types", + "tonic", +] + [[package]] name = "tower" version = "0.5.3" @@ -4952,9 +4924,9 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.6.10" +version = "0.6.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68d6fdd9f81c2819c9a8b0e0cd91660e7746a8e6ea2ba7c6b2b057985f6bcb51" +checksum = "4cfcf7e2740e6fc6d4d688b4ef00650406bb94adf4731e43c096c3a19fe40840" dependencies = [ "bitflags 2.11.1", "bytes", @@ -5786,9 +5758,9 @@ dependencies = [ [[package]] name = "zerofrom" -version = "0.1.7" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69faa1f2a1ea75661980b013019ed6687ed0e83d069bc1114e2cc74c6c04c4df" +checksum = "0ec05a11813ea801ff6d75110ad09cd0824ddba17dfe17128ea0d5f68e6c5272" dependencies = [ "zerofrom-derive", ] diff --git a/Cargo.toml b/Cargo.toml index fccc92ec..b48b0478 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,9 +23,10 @@ tokio = { version = "1.52.1", features = ["full"] } # Use our internal russh fork with session loop fixes # - Development: uses local path (crates/bssh-russh) # - Publishing: uses crates.io version (path ignored) -russh = { package = "bssh-russh", version = "0.60.1", path = "crates/bssh-russh" } -# Use our internal russh-sftp fork with a serde_bytes perf fix -russh-sftp = { package = "bssh-russh-sftp", version = "2.1.1", path = "crates/bssh-russh-sftp" } +russh = { package = "bssh-russh", version = "0.60.3", path = "crates/bssh-russh" } +# Use our internal russh-sftp fork tracking upstream 2.1.2 +# (adds pipelined File I/O; serde_bytes perf fix is now upstreamed) +russh-sftp = { package = "bssh-russh-sftp", version = "2.1.2", path = "crates/bssh-russh-sftp" } clap = { version = "4.6.1", features = ["derive", "env"] } anyhow = "1.0.102" thiserror = "2.0.18" @@ -50,10 +51,10 @@ rustyline = "18.0.0" crossterm = "0.29" ratatui = "0.30" regex = "1.12.3" -signal-hook = "0.3" +signal-hook = "0.4" nix = { version = "0.31", features = ["fs", "poll", "process", "signal", "term"] } smallvec = "1.15.1" -lru = "0.17.0" +lru = "0.18.0" uuid = { version = "1.23.1", features = ["v4"] } tokio-util = "0.7.18" socket2 = "0.6" @@ -66,9 +67,9 @@ rand = "0.10" ssh-key = { version = "0.6", features = ["std"] } async-compression = { version = "0.4", features = ["tokio", "gzip"] } serde_json = "1.0" -opentelemetry = "0.31" -opentelemetry_sdk = { version = "0.31", features = ["rt-tokio", "logs"] } -opentelemetry-otlp = { version = "0.31", features = ["grpc-tonic", "logs"] } +opentelemetry = "0.32" +opentelemetry_sdk = { version = "0.32", features = ["rt-tokio", "logs"] } +opentelemetry-otlp = { version = "0.32", features = ["grpc-tonic", "logs"] } url = "2.5" tokio-rustls = "0.26" rustls-native-certs = "0.8" diff --git a/crates/bssh-russh-sftp/Cargo.toml b/crates/bssh-russh-sftp/Cargo.toml index 9bd21c6c..418cf809 100644 --- a/crates/bssh-russh-sftp/Cargo.toml +++ b/crates/bssh-russh-sftp/Cargo.toml @@ -1,8 +1,8 @@ [package] name = "bssh-russh-sftp" -version = "2.1.1" +version = "2.1.2" authors = ["Jeongkyu Shin "] -description = "Temporary fork of russh-sftp with a serde_bytes performance fix for SFTP Write/Data packets" +description = "Temporary fork of russh-sftp 2.1.2 adding pipelined SFTP File I/O (write_all_pipelined / read_to_writer_pipelined). Note: the serde_bytes perf fix that originally motivated this fork is now upstreamed in russh-sftp 2.1.2; only the pipelined helpers remain as fork value-add." documentation = "https://docs.rs/bssh-russh-sftp" edition = "2021" homepage = "https://github.com/lablup/bssh" @@ -19,18 +19,22 @@ tokio = { version = "1", default-features = false, features = [ "time", "macros", ] } -tokio-util = "0.7" -futures = { version = "0.3", default-features = false, features = ["std", "async-await"] } -serde = { version = "1.0", features = ["derive"] } +tokio-util = "0.7.18" +serde = { version = "1.0.228", features = ["derive"] } serde_bytes = "0.11" -bitflags = { version = "2.9", features = ["serde"] } -async-trait = { version = "0.1", optional = true } +bitflags = { version = "2.11.1", features = ["serde"] } +async-trait = { version = "0.1.89", optional = true } -thiserror = "2.0" -chrono = "0.4" -bytes = "1.10" -log = "0.4" -flurry = "0.5" +# futures: required by our forward-ported pipelined helpers +# (write_all_pipelined / read_to_writer_pipelined use FuturesUnordered). +# Upstream russh-sftp 2.1.2 does not need this dependency. +futures = { version = "0.3.32", default-features = false, features = ["std", "async-await"] } + +thiserror = "2.0.18" +chrono = "0.4.44" +bytes = "1.11.1" +log = "0.4.29" +dashmap = "6.1.0" [features] async-trait = ["dep:async-trait"] diff --git a/crates/bssh-russh-sftp/patches/sftp-serde-bytes-perf.patch b/crates/bssh-russh-sftp/patches/historical/sftp-serde-bytes-perf.patch similarity index 100% rename from crates/bssh-russh-sftp/patches/sftp-serde-bytes-perf.patch rename to crates/bssh-russh-sftp/patches/historical/sftp-serde-bytes-perf.patch diff --git a/crates/bssh-russh-sftp/src/client/fs/file.rs b/crates/bssh-russh-sftp/src/client/fs/file.rs index a0e28d44..d143695f 100644 --- a/crates/bssh-russh-sftp/src/client/fs/file.rs +++ b/crates/bssh-russh-sftp/src/client/fs/file.rs @@ -1,5 +1,6 @@ use std::{ - future::Future, + collections::VecDeque, + future::{self, Future}, io::{self, SeekFrom}, pin::Pin, sync::Arc, @@ -8,29 +9,28 @@ use std::{ use tokio::{ io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}, runtime::Handle, + sync::oneshot, }; use super::Metadata; use crate::{ - client::{error::Error, rawsession::SftpResult, session::Extensions, RawSftpSession}, - protocol::StatusCode, + client::{error::Error, rawsession::SftpResult, session::Features, RawSftpSession}, + protocol::{Packet, StatusCode}, }; type StateFn = Option> + Send + Sync + 'static>>>; -const MAX_READ_LENGTH: u64 = 261120; -const MAX_WRITE_LENGTH: u64 = 261120; - -fn bounded_chunk_size(limit: Option, default_limit: u64) -> usize { - limit.map_or(default_limit, |n| n.min(default_limit)) as usize -} +// read packet overhead: type(1) + id(4) + data_len(4) +const READ_OVERHEAD_LENGTH: u32 = 9; +// write packet overhead excluding handle: type(1) + id(4) + handle_len(4) + offset(8) + data_len(4) +const WRITE_OVERHEAD_LENGTH: u32 = 21; struct FileState { f_read: StateFn>>, f_seek: StateFn, - f_write: StateFn, f_flush: StateFn<()>, f_shutdown: StateFn<()>, + write_acks: VecDeque>>, } /// Provides high-level methods for interaction with a remote file. @@ -47,28 +47,24 @@ pub struct File { state: FileState, pos: u64, closed: bool, - extensions: Arc, + features: Features, } impl File { - pub(crate) fn new( - session: Arc, - handle: String, - extensions: Arc, - ) -> Self { + pub(crate) fn new(session: Arc, handle: String, features: Features) -> Self { Self { session, handle, state: FileState { f_read: None, f_seek: None, - f_write: None, f_flush: None, f_shutdown: None, + write_acks: VecDeque::with_capacity(features.max_concurrent_writes), }, pos: 0, closed: false, - extensions, + features, } } @@ -90,7 +86,7 @@ impl File { /// If the server does not support `fsync@openssh.com` sending the request will /// be omitted, but will still pseudo-successfully pub async fn sync_all(&self) -> SftpResult<()> { - if !self.extensions.fsync { + if !self.features.fsync { return Ok(()); } @@ -99,16 +95,16 @@ impl File { /// Streams `reader` to this remote file with up to `max_inflight` concurrent /// SFTP `WRITE` requests in flight. Each request carries up to the negotiated - /// `write_len` (or [`MAX_WRITE_LENGTH`] when no limit is advertised). + /// `write_len` (or the per-handle packet ceiling when no limit is advertised). /// - /// The high-level [`AsyncWrite`] impl issues one `WRITE` at a time and waits - /// for its `STATUS` reply before sending the next, so sustained throughput is - /// bounded by `chunk_size / RTT`. This helper hides the per-request RTT by - /// keeping multiple in-flight, mirroring how OpenSSH's `sftp` client behaves - /// (~64 outstanding requests by default). + /// The high-level [`AsyncWrite`] impl can pipeline writes via the file's + /// `write_acks` ring, but that path requires the caller to feed bytes via + /// repeated `poll_write` calls. This helper hides the per-request RTT by + /// driving the reader and dispatching WRITEs in lockstep, mirroring how + /// OpenSSH's `sftp` client behaves (~64 outstanding requests by default). /// - /// On success returns the number of bytes streamed. Updates `self.pos` to - /// the new write offset. Reading from `reader` and dispatching writes are + /// On success returns the number of bytes streamed. Updates `self.pos` to + /// the new write offset. Reading from `reader` and dispatching writes are /// interleaved, so memory usage is bounded by `max_inflight * chunk_size`. pub async fn write_all_pipelined( &mut self, @@ -127,10 +123,14 @@ impl File { )); } - let chunk_size = bounded_chunk_size( - self.extensions.limits.as_ref().and_then(|l| l.write_len), - MAX_WRITE_LENGTH, - ); + let chunk_size = self + .features + .limits + .and_then(|l| l.write_len) + .unwrap_or_else(|| { + let overhead = WRITE_OVERHEAD_LENGTH + self.handle.len() as u32; + self.features.max_packet_len.saturating_sub(overhead) as u64 + }) as usize; let mut total: u64 = 0; let mut offset = self.pos; @@ -138,7 +138,6 @@ impl File { let mut eof = false; loop { - // Top up the pipeline with new chunks until we hit the cap or EOF. while !eof && in_flight.len() < max_inflight { let mut buf = vec![0u8; chunk_size]; let n = reader.read(&mut buf).await?; @@ -161,13 +160,10 @@ impl File { total += n as u64; } - // Drain at least one in-flight write before reading more, otherwise - // we busy-loop the read path while writes never get a chance to make - // progress. match in_flight.next().await { Some(Ok(_)) => {} Some(Err(e)) => return Err(e), - None => break, // pipeline drained and no more data -> done + None => break, } } @@ -176,16 +172,16 @@ impl File { } /// Streams the remote file from the current position to `writer` using up to - /// `max_inflight` concurrent SFTP `READ` requests. Each request asks for up - /// to the negotiated `read_len`, capped at [`MAX_READ_LENGTH`]. + /// `max_inflight` concurrent SFTP `READ` requests. Each request asks for up + /// to the negotiated `read_len`, capped at the packet ceiling. /// - /// Like [`Self::write_all_pipelined`], this hides per-request RTT. Chunks + /// Like [`Self::write_all_pipelined`], this hides per-request RTT. Chunks /// are reassembled in offset order before being written to `writer`, so the - /// output is identical to a sequential read. For regular files, the current + /// output is identical to a sequential read. For regular files, the current /// file size is used to avoid speculative reads beyond EOF; if the size is /// unavailable, the transfer stops on EOF or the first short read. /// - /// Returns the number of bytes streamed. Updates `self.pos`. + /// Returns the number of bytes streamed. Updates `self.pos`. pub async fn read_to_writer_pipelined( &mut self, writer: &mut W, @@ -204,10 +200,15 @@ impl File { )); } - let chunk_size = bounded_chunk_size( - self.extensions.limits.as_ref().and_then(|l| l.read_len), - MAX_READ_LENGTH, - ); + let chunk_size = self + .features + .limits + .and_then(|l| l.read_len) + .unwrap_or_else(|| { + self.features + .max_packet_len + .saturating_sub(READ_OVERHEAD_LENGTH) as u64 + }) as usize; let file_end = self .metadata() .await @@ -223,9 +224,6 @@ impl File { let mut eof = false; loop { - // Keep the total reorder buffer bounded. A slow early read can make - // later replies arrive first; counting both pending and in-flight - // chunks prevents unbounded memory growth in that case. while !eof && in_flight.len() + pending.len() < max_inflight && file_end.is_none_or(|end| next_offset < end) @@ -282,7 +280,6 @@ impl File { None => break, } - // Flush in-order chunks to writer as they become available. while let Some(chunk) = pending.remove(&next_to_write) { let n = chunk.len() as u64; writer.write_all(&chunk).await?; @@ -296,202 +293,43 @@ impl File { } } -#[cfg(test)] -mod tests { - use std::{ - future::Future, - sync::{Arc, Mutex}, - }; - - use tokio::io::duplex; - - use super::*; - use crate::{ - client::SftpSession, - protocol::{Attrs, Data, FileAttributes, Handle, OpenFlags, Status, Version}, - server, - server::Handler, - }; - - struct MemoryHandler { - data: Arc>>, +fn check_write_result( + result: Result, oneshot::error::RecvError>, +) -> io::Result<()> { + match result { + Err(_) => Err(io::Error::new( + io::ErrorKind::BrokenPipe, + "write channel closed", + )), + Ok(Ok(Packet::Status(s))) if s.status_code == StatusCode::Ok => Ok(()), + Ok(Ok(Packet::Status(s))) => Err(io::Error::other(s.error_message)), + Ok(Ok(_)) => Err(io::Error::other("unexpected response packet")), + Ok(Err(e)) => Err(io::Error::other(e.to_string())), } +} - impl MemoryHandler { - fn ok_status(id: u32) -> Status { - Status { - id, - status_code: StatusCode::Ok, - error_message: String::new(), - language_tag: String::new(), - } - } - } - - impl Handler for MemoryHandler { - type Error = StatusCode; - - fn unimplemented(&self) -> Self::Error { - StatusCode::OpUnsupported - } - - fn init( - &mut self, - _version: u32, - _extensions: std::collections::HashMap, - ) -> impl Future> + Send { - async { Ok(Version::new()) } - } - - fn open( - &mut self, - id: u32, - _filename: String, - _pflags: OpenFlags, - _attrs: FileAttributes, - ) -> impl Future> + Send { - async move { - Ok(Handle { - id, - handle: "memory".to_owned(), - }) - } - } - - fn close( - &mut self, - id: u32, - _handle: String, - ) -> impl Future> + Send { - async move { Ok(Self::ok_status(id)) } - } - - fn fstat( - &mut self, - id: u32, - _handle: String, - ) -> impl Future> + Send { - let data = self.data.clone(); - - async move { - let mut attrs = FileAttributes::empty(); - attrs.size = Some(data.lock().expect("memory file lock poisoned").len() as u64); - Ok(Attrs { id, attrs }) - } - } - - fn read( - &mut self, - id: u32, - _handle: String, - offset: u64, - len: u32, - ) -> impl Future> + Send { - let data = self.data.clone(); - - async move { - let data = data.lock().expect("memory file lock poisoned"); - let offset = usize::try_from(offset).map_err(|_| StatusCode::Failure)?; - if offset >= data.len() { - return Err(StatusCode::Eof); - } - let end = offset.saturating_add(len as usize).min(data.len()); - - Ok(Data { - id, - data: data[offset..end].to_vec(), - }) - } - } - - fn write( - &mut self, - id: u32, - _handle: String, - offset: u64, - bytes: Vec, - ) -> impl Future> + Send { - let data = self.data.clone(); - - async move { - let mut data = data.lock().expect("memory file lock poisoned"); - let offset = usize::try_from(offset).map_err(|_| StatusCode::Failure)?; - let end = offset.checked_add(bytes.len()).ok_or(StatusCode::Failure)?; - if data.len() < end { - data.resize(end, 0); - } - data[offset..end].copy_from_slice(&bytes); - - Ok(Self::ok_status(id)) - } +fn poll_oldest_write( + pending: &mut VecDeque>>, + cx: &mut Context<'_>, +) -> Option>> { + let rx = pending.front_mut()?; + Some(match Pin::new(rx).poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(r) => { + pending.pop_front(); + Poll::Ready(check_write_result(r)) } - } - - async fn memory_session(data: Arc>>) -> SftpSession { - let (client, server_stream) = duplex(64 * 1024); - server::run(server_stream, MemoryHandler { data }).await; - SftpSession::new(client).await.expect("memory SFTP init") - } - - #[test] - fn advertised_chunk_sizes_are_capped() { - assert_eq!( - bounded_chunk_size(None, MAX_READ_LENGTH), - MAX_READ_LENGTH as usize - ); - assert_eq!(bounded_chunk_size(Some(1024), MAX_READ_LENGTH), 1024); - assert_eq!( - bounded_chunk_size(Some(MAX_READ_LENGTH * 4), MAX_READ_LENGTH), - MAX_READ_LENGTH as usize - ); - } - - #[tokio::test] - async fn write_all_pipelined_streams_all_bytes() { - let remote_data = Arc::new(Mutex::new(Vec::new())); - let sftp = memory_session(remote_data.clone()).await; - let input: Vec = (0..(MAX_WRITE_LENGTH as usize * 2 + 123)) - .map(|n| (n % 251) as u8) - .collect(); - let mut reader = &input[..]; - let mut file = sftp - .open_with_flags( - "ignored", - OpenFlags::CREATE | OpenFlags::TRUNCATE | OpenFlags::WRITE, - ) - .await - .expect("open memory file"); - - let written = file - .write_all_pipelined(&mut reader, 4) - .await - .expect("pipelined write"); - - assert_eq!(written as usize, input.len()); - assert_eq!( - *remote_data.lock().expect("memory file lock poisoned"), - input - ); - } - - #[tokio::test] - async fn read_to_writer_pipelined_streams_all_bytes() { - let input: Vec = (0..(MAX_READ_LENGTH as usize * 2 + 123)) - .map(|n| (n % 251) as u8) - .collect(); - let remote_data = Arc::new(Mutex::new(input.clone())); - let sftp = memory_session(remote_data).await; - let mut file = sftp.open("ignored").await.expect("open memory file"); - let mut output = Vec::new(); - - let read = file - .read_to_writer_pipelined(&mut output, 4) - .await - .expect("pipelined read"); + }) +} - assert_eq!(read as usize, input.len()); - assert_eq!(output, input); +fn poll_drain_writes( + pending: &mut VecDeque>>, + cx: &mut Context<'_>, +) -> Poll> { + while let Some(poll) = poll_oldest_write(pending, cx) { + ready!(poll)?; } + Poll::Ready(Ok(())) } impl Drop for File { @@ -522,30 +360,28 @@ impl AsyncRead for File { None => { let session = self.session.clone(); let max_read_len = self - .extensions + .features .limits - .as_ref() .and_then(|l| l.read_len) - .unwrap_or(MAX_READ_LENGTH) as usize; + .unwrap_or_else(|| { + self.features + .max_packet_len + .saturating_sub(READ_OVERHEAD_LENGTH) as u64 + }) as usize; let file_handle = self.handle.clone(); let offset = self.pos; - let len = if buf.remaining() > max_read_len { - max_read_len - } else { - buf.remaining() - }; + let len = usize::min(buf.remaining(), max_read_len); self.state.f_read.get_or_insert(Box::pin(async move { let result = session.read(file_handle, offset, len as u32).await; - match result { Ok(data) => Ok(Some(data.data)), Err(Error::Status(status)) if status.status_code == StatusCode::Eof => { Ok(None) } - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())), + Err(e) => Err(io::Error::other(e.to_string())), } })) } @@ -571,51 +407,49 @@ impl AsyncRead for File { impl AsyncSeek for File { fn start_seek(mut self: Pin<&mut Self>, position: io::SeekFrom) -> io::Result<()> { - match self.state.f_seek { - Some(_) => Err(io::Error::new( - io::ErrorKind::Other, + if self.state.f_seek.is_some() { + return Err(io::Error::other( "other file operation is pending, call poll_complete before start_seek", - )), - None => { + )); + } + + self.state.f_seek = Some(match position { + SeekFrom::Start(pos) => Box::pin(future::ready(Ok(pos))), + SeekFrom::Current(pos) => { + let new_pos = self.pos as i64 + pos; + if new_pos < 0 { + return Err(io::Error::other( + "cannot move file pointer before the beginning", + )); + } + Box::pin(future::ready(Ok(new_pos as u64))) + } + SeekFrom::End(pos) => { let session = self.session.clone(); let file_handle = self.handle.clone(); - let cur_pos = self.pos as i64; - - self.state.f_seek = Some(Box::pin(async move { - let new_pos = match position { - SeekFrom::Start(pos) => pos as i64, - SeekFrom::Current(pos) => cur_pos + pos, - SeekFrom::End(pos) => { - let result = session - .fstat(file_handle) - .await - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - - match result.attrs.size { - Some(size) => size as i64 + pos, - None => { - return Err(io::Error::new( - io::ErrorKind::Other, - "file size unknown", - )) - } + + Box::pin(async move { + let result = session + .fstat(file_handle) + .await + .map_err(|e| io::Error::other(e.to_string()))?; + match result.attrs.size { + Some(size) => { + let new_pos = size as i64 + pos; + if new_pos < 0 { + return Err(io::Error::other( + "cannot move file pointer before the beginning", + )); } + Ok(new_pos as u64) } - }; - - if new_pos < 0 { - return Err(io::Error::new( - io::ErrorKind::Other, - "cannot move file pointer before the beginning", - )); + None => Err(io::Error::other("file size unknown")), } - - Ok(new_pos as u64) - })); - - Ok(()) + }) } - } + }); + + Ok(()) } fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -636,51 +470,40 @@ impl AsyncWrite for File { cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - let poll = Pin::new(match self.state.f_write.as_mut() { - Some(f) => f, - None => { - let session = self.session.clone(); - let max_write_len = self - .extensions - .limits - .as_ref() - .and_then(|l| l.write_len) - .unwrap_or(MAX_WRITE_LENGTH) as usize; - - let file_handle = self.handle.clone(); - let data = buf.to_vec(); - - let offset = self.pos; - let len = if data.len() > max_write_len { - max_write_len - } else { - data.len() - }; - - self.state.f_write.get_or_insert(Box::pin(async move { - session - .write(file_handle, offset, data[..len].to_vec()) - .await - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - Ok(len) - })) + if self.state.write_acks.len() >= self.features.max_concurrent_writes { + if let Some(poll) = poll_oldest_write(&mut self.state.write_acks, cx) { + ready!(poll)?; } - }) - .poll(cx); - - if poll.is_ready() { - self.state.f_write = None; } - if let Poll::Ready(Ok(len)) = poll { - self.pos += len as u64; + let max_write_len = self + .features + .limits + .and_then(|l| l.write_len) + .unwrap_or_else(|| { + let overhead = WRITE_OVERHEAD_LENGTH + self.handle.len() as u32; + self.features.max_packet_len.saturating_sub(overhead) as u64 + }) as usize; + + let len = usize::min(buf.len(), max_write_len); + let data = buf[..len].to_vec(); + let handle = self.handle.clone(); + let offset = self.pos; + + match self.session.write_nowait(handle, offset, data) { + Ok(rx) => { + self.pos += len as u64; + self.state.write_acks.push_back(rx); + Poll::Ready(Ok(len)) + } + Err(e) => Poll::Ready(Err(io::Error::other(e.to_string()))), } - - poll } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if !self.extensions.fsync { + ready!(poll_drain_writes(&mut self.state.write_acks, cx))?; + + if !self.features.fsync { return Poll::Ready(Ok(())); } @@ -695,7 +518,7 @@ impl AsyncWrite for File { .fsync(file_handle) .await .map(|_| ()) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) + .map_err(|e| io::Error::other(e.to_string())) })) } }) @@ -712,6 +535,8 @@ impl AsyncWrite for File { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { + ready!(poll_drain_writes(&mut self.state.write_acks, cx))?; + let poll = Pin::new(match self.state.f_shutdown.as_mut() { Some(f) => f, None => { @@ -722,7 +547,7 @@ impl AsyncWrite for File { session .close(file_handle) .await - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + .map_err(|e| io::Error::other(e.to_string()))?; Ok(()) })) } diff --git a/crates/bssh-russh-sftp/src/client/mod.rs b/crates/bssh-russh-sftp/src/client/mod.rs index 6b4421a3..7c4ae52e 100644 --- a/crates/bssh-russh-sftp/src/client/mod.rs +++ b/crates/bssh-russh-sftp/src/client/mod.rs @@ -27,6 +27,26 @@ macro_rules! into_wrap { }; } +#[derive(Clone, Debug)] +pub struct Config { + /// Maximum size of a single packet in bytes. Default: 256 KiB. + pub max_packet_len: u32, + /// Maximum number of concurrent in-flight write requests. Default: 8. + pub max_concurrent_writes: usize, + /// Timeout in seconds for each request. Default: 10. + pub request_timeout_secs: u64, +} + +impl Default for Config { + fn default() -> Self { + Self { + max_packet_len: 262144, + max_concurrent_writes: 8, + request_timeout_secs: 10, + } + } +} + async fn execute_handler(bytes: &mut Bytes, handler: &mut H) -> Result<(), error::Error> where H: Handler + Send, @@ -50,7 +70,7 @@ where S: AsyncRead + Unpin, H: Handler + Send, { - let mut bytes = read_packet(stream).await?; + let mut bytes = read_packet(stream, u32::MAX).await?; Ok(execute_handler(&mut bytes, handler).await?) } diff --git a/crates/bssh-russh-sftp/src/client/rawsession.rs b/crates/bssh-russh-sftp/src/client/rawsession.rs index 7457f76a..6e8174d2 100644 --- a/crates/bssh-russh-sftp/src/client/rawsession.rs +++ b/crates/bssh-russh-sftp/src/client/rawsession.rs @@ -1,5 +1,5 @@ use bytes::Bytes; -use flurry::HashMap; +use dashmap::DashMap as HashMap; use std::{ sync::{ atomic::{AtomicU32, AtomicU64, Ordering}, @@ -9,12 +9,13 @@ use std::{ }; use tokio::{ io::{AsyncRead, AsyncWrite}, - sync::{mpsc, RwLock}, + sync::{mpsc, oneshot}, time, }; -use super::{error::Error, run, Handler}; +use super::{error::Error, Handler}; use crate::{ + client::{run, Config}, de, extensions::{ self, FsyncExtension, HardlinkExtension, LimitsExtension, Statvfs, StatvfsExtension, @@ -27,7 +28,7 @@ use crate::{ }; pub type SftpResult = Result; -type SharedRequests = HashMap, mpsc::Sender>>; +type SharedRequests = HashMap, oneshot::Sender>>; pub(crate) struct SessionInner { version: Option, @@ -35,8 +36,8 @@ pub(crate) struct SessionInner { } impl SessionInner { - pub async fn reply(&mut self, id: Option, packet: Packet) -> SftpResult<()> { - if let Some(sender) = self.requests.pin().remove(&id) { + pub fn reply(&mut self, id: Option, packet: Packet) -> SftpResult<()> { + if let Some((_, sender)) = self.requests.remove(&id) { let validate = if id.is_some() && self.version.is_none() { Err(Error::UnexpectedPacket) } else if id.is_none() && self.version.is_some() { @@ -45,9 +46,8 @@ impl SessionInner { Ok(()) }; - sender - .try_send(validate.clone().map(|_| packet)) - .map_err(|e| Error::UnexpectedBehavior(e.to_string()))?; + // Ignore send error: receiver was dropped (request timed out). + let _ = sender.send(validate.clone().map(|_| packet)); return validate; } @@ -59,46 +59,44 @@ impl SessionInner { } } -#[cfg_attr(feature = "async-trait", async_trait::async_trait)] impl Handler for SessionInner { type Error = Error; async fn version(&mut self, packet: Version) -> Result<(), Self::Error> { let version = packet.version; - self.reply(None, packet.into()).await?; + self.reply(None, packet.into())?; self.version = Some(version); Ok(()) } async fn name(&mut self, name: Name) -> Result<(), Self::Error> { - self.reply(Some(name.id), name.into()).await + self.reply(Some(name.id), name.into()) } async fn status(&mut self, status: Status) -> Result<(), Self::Error> { - self.reply(Some(status.id), status.into()).await + self.reply(Some(status.id), status.into()) } async fn handle(&mut self, handle: Handle) -> Result<(), Self::Error> { - self.reply(Some(handle.id), handle.into()).await + self.reply(Some(handle.id), handle.into()) } async fn data(&mut self, data: Data) -> Result<(), Self::Error> { - self.reply(Some(data.id), data.into()).await + self.reply(Some(data.id), data.into()) } async fn attrs(&mut self, attrs: Attrs) -> Result<(), Self::Error> { - self.reply(Some(attrs.id), attrs.into()).await + self.reply(Some(attrs.id), attrs.into()) } async fn extended_reply(&mut self, reply: ExtendedReply) -> Result<(), Self::Error> { - self.reply(Some(reply.id), reply.into()).await + self.reply(Some(reply.id), reply.into()) } } #[derive(Debug, Clone, Copy, Default)] pub struct Limits { - // todo: implement - //pub packet_len: Option, + pub packet_len: Option, pub read_len: Option, pub write_len: Option, pub open_handles: Option, @@ -107,30 +105,14 @@ pub struct Limits { impl From for Limits { fn from(limits: LimitsExtension) -> Self { Self { - read_len: if limits.max_read_len > 0 { - Some(limits.max_read_len) - } else { - None - }, - write_len: if limits.max_write_len > 0 { - Some(limits.max_write_len) - } else { - None - }, - open_handles: if limits.max_open_handles > 0 { - Some(limits.max_open_handles) - } else { - None - }, + packet_len: (limits.max_packet_len > 0).then_some(limits.max_packet_len), + read_len: (limits.max_read_len > 0).then_some(limits.max_read_len), + write_len: (limits.max_write_len > 0).then_some(limits.max_write_len), + open_handles: (limits.max_open_handles > 0).then_some(limits.max_open_handles), } } } -pub(crate) struct Options { - timeout: RwLock, - limits: Arc, -} - /// Implements raw work with the protocol in request-response format. /// If the server returns a `Status` packet and it has the code Ok /// then the packet is returned as Ok in other error cases @@ -140,7 +122,8 @@ pub struct RawSftpSession { requests: Arc, next_req_id: AtomicU32, handles: AtomicU64, - options: Options, + timeout: AtomicU64, + limits: Limits, } macro_rules! into_with_status { @@ -165,6 +148,13 @@ macro_rules! into_status { impl RawSftpSession { pub fn new(stream: S) -> Self + where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, + { + Self::new_with_config(stream, Config::default()) + } + + pub fn new_with_config(stream: S, cfg: Config) -> Self where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, { @@ -179,51 +169,62 @@ impl RawSftpSession { requests: req_map, next_req_id: AtomicU32::new(1), handles: AtomicU64::new(0), - options: Options { - timeout: RwLock::new(10), - limits: Arc::new(Limits::default()), - }, + timeout: AtomicU64::new(cfg.request_timeout_secs), + limits: Limits::default(), } } /// Set the maximum response time in seconds. /// Default: 10 seconds - pub async fn set_timeout(&self, secs: u64) { - *self.options.timeout.write().await = secs; + pub fn set_timeout(&self, secs: u64) { + self.timeout.store(secs, Ordering::Relaxed); } /// Setting limits. For the `limits@openssh.com` extension - pub fn set_limits(&mut self, limits: Arc) { - self.options.limits = limits; + pub fn set_limits(&mut self, limits: Limits) { + self.limits = limits; } - async fn send(&self, id: Option, packet: Packet) -> SftpResult { + fn send( + &self, + id: Option, + packet: Packet, + ) -> SftpResult>> { if self.tx.is_closed() { return Err(Error::UnexpectedBehavior("session closed".into())); } - let (tx, mut rx) = mpsc::channel(1); + let bytes = Bytes::try_from(packet)?; - self.requests.pin().insert(id, tx); - self.tx.send(Bytes::try_from(packet)?)?; + if let Some(max_len) = self.limits.packet_len { + if bytes.len() as u64 > max_len { + return Err(Error::Limited("packet exceeds server limit".to_owned())); + } + } - let timeout = *self.options.timeout.read().await; + let (tx, rx) = oneshot::channel(); + self.requests.insert(id, tx); + self.tx.send(bytes)?; - match time::timeout(Duration::from_secs(timeout), rx.recv()).await { - Ok(Some(result)) => result, - Ok(None) => { - self.requests.pin().remove(&id); - Err(Error::UnexpectedBehavior("recv none message".into())) - } + Ok(rx) + } + + async fn request(&self, id: Option, packet: Packet) -> SftpResult { + let rx = self.send(id, packet)?; + let timeout = self.timeout.load(Ordering::Relaxed); + + match time::timeout(Duration::from_secs(timeout), rx).await { + Ok(Ok(result)) => result, + Ok(Err(_)) => Err(Error::UnexpectedBehavior("sender dropped".into())), Err(error) => { - self.requests.pin().remove(&id); + self.requests.remove(&id); Err(error.into()) } } } fn use_next_id(&self) -> u32 { - self.next_req_id.fetch_add(1, Ordering::SeqCst) + self.next_req_id.fetch_add(1, Ordering::Relaxed) } /// Closes the inner channel stream. Called by [`Drop`] @@ -236,7 +237,7 @@ impl RawSftpSession { } pub async fn init(&self) -> SftpResult { - let result = self.send(None, Init::default().into()).await?; + let result = self.request(None, Init::default().into()).await?; if let Packet::Version(version) = result { Ok(version) } else { @@ -251,7 +252,6 @@ impl RawSftpSession { attrs: FileAttributes, ) -> SftpResult { if self - .options .limits .open_handles .is_some_and(|h| self.handles.load(Ordering::SeqCst) >= h) @@ -261,7 +261,7 @@ impl RawSftpSession { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), Open { id, @@ -283,7 +283,7 @@ impl RawSftpSession { pub async fn close>(&self, handle: H) -> SftpResult { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), Close { id, @@ -319,13 +319,13 @@ impl RawSftpSession { offset: u64, len: u32, ) -> SftpResult { - if self.options.limits.read_len.is_some_and(|r| len as u64 > r) { + if self.limits.read_len.is_some_and(|r| len as u64 > r) { return Err(Error::Limited("read limit reached".to_owned())); } let id = self.use_next_id(); let result = self - .send( + .request( Some(id), Read { id, @@ -346,18 +346,13 @@ impl RawSftpSession { offset: u64, data: Vec, ) -> SftpResult { - if self - .options - .limits - .write_len - .is_some_and(|w| data.len() as u64 > w) - { + if self.limits.write_len.is_some_and(|w| data.len() as u64 > w) { return Err(Error::Limited("write limit reached".to_owned())); } let id = self.use_next_id(); let result = self - .send( + .request( Some(id), Write { id, @@ -372,10 +367,34 @@ impl RawSftpSession { into_status!(result) } + /// Sends a write packet without awaiting the server's acknowledgement. + pub(crate) fn write_nowait( + &self, + handle: String, + offset: u64, + data: Vec, + ) -> SftpResult>> { + if self.limits.write_len.is_some_and(|w| data.len() as u64 > w) { + return Err(Error::Limited("write limit reached".to_owned())); + } + + let id = self.use_next_id(); + self.send( + Some(id), + Write { + id, + handle, + offset, + data, + } + .into(), + ) + } + pub async fn lstat>(&self, path: P) -> SftpResult { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), Lstat { id, @@ -391,7 +410,7 @@ impl RawSftpSession { pub async fn fstat>(&self, handle: H) -> SftpResult { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), Fstat { id, @@ -411,7 +430,7 @@ impl RawSftpSession { ) -> SftpResult { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), SetStat { id, @@ -432,7 +451,7 @@ impl RawSftpSession { ) -> SftpResult { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), FSetStat { id, @@ -448,7 +467,6 @@ impl RawSftpSession { pub async fn opendir>(&self, path: P) -> SftpResult { if self - .options .limits .open_handles .is_some_and(|h| self.handles.load(Ordering::SeqCst) >= h) @@ -458,7 +476,7 @@ impl RawSftpSession { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), OpenDir { id, @@ -478,7 +496,7 @@ impl RawSftpSession { pub async fn readdir>(&self, handle: H) -> SftpResult { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), ReadDir { id, @@ -494,7 +512,7 @@ impl RawSftpSession { pub async fn remove>(&self, filename: T) -> SftpResult { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), Remove { id, @@ -514,7 +532,7 @@ impl RawSftpSession { ) -> SftpResult { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), MkDir { id, @@ -531,7 +549,7 @@ impl RawSftpSession { pub async fn rmdir>(&self, path: P) -> SftpResult { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), RmDir { id, @@ -547,7 +565,7 @@ impl RawSftpSession { pub async fn realpath>(&self, path: P) -> SftpResult { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), RealPath { id, @@ -563,7 +581,7 @@ impl RawSftpSession { pub async fn stat>(&self, path: P) -> SftpResult { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), Stat { id, @@ -583,7 +601,7 @@ impl RawSftpSession { { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), Rename { id, @@ -600,7 +618,7 @@ impl RawSftpSession { pub async fn readlink>(&self, path: P) -> SftpResult { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), ReadLink { id, @@ -620,7 +638,7 @@ impl RawSftpSession { { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), Symlink { id, @@ -638,7 +656,7 @@ impl RawSftpSession { /// The extension can return any packet, so it's not specific pub async fn extended>(&self, request: R, data: Vec) -> SftpResult { let id = self.use_next_id(); - self.send( + self.request( Some(id), Extended { id, diff --git a/crates/bssh-russh-sftp/src/client/session.rs b/crates/bssh-russh-sftp/src/client/session.rs index 0fe16728..f840168d 100644 --- a/crates/bssh-russh-sftp/src/client/session.rs +++ b/crates/bssh-russh-sftp/src/client/session.rs @@ -8,23 +8,26 @@ use super::{ RawSftpSession, }; use crate::{ + client::Config, extensions::{self, Statvfs}, protocol::{FileAttributes, OpenFlags, StatusCode}, }; -#[derive(Debug, Default)] -pub(crate) struct Extensions { +#[derive(Debug, Clone, Copy)] +pub(crate) struct Features { pub hardlink: bool, pub fsync: bool, pub statvfs: bool, - pub limits: Option>, + pub limits: Option, + pub max_concurrent_writes: usize, + pub max_packet_len: u32, } /// High-level SFTP implementation for easy interaction with a remote file system. /// Contains most methods similar to the native [filesystem](std::fs) pub struct SftpSession { session: Arc, - extensions: Arc, + features: Features, } impl SftpSession { @@ -33,60 +36,62 @@ impl SftpSession { where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, { - Self::new_opts(stream, None).await + Self::new_with_config(stream, Config::default()).await } /// Creates a new session with timeout opt before the first request + #[deprecated(note = "use SftpSession::new_with_config with Config::req_timeout_secs instead")] pub async fn new_opts(stream: S, timeout: Option) -> SftpResult where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, { - let mut session = RawSftpSession::new(stream); - - // todo: for new options we need builder - if let Some(timeout) = timeout { - session.set_timeout(timeout).await; + let mut cfg = Config::default(); + if let Some(secs) = timeout { + cfg.request_timeout_secs = secs; } + Self::new_with_config(stream, cfg).await + } + + /// Creates a new session with custom configuration + pub async fn new_with_config(stream: S, cfg: Config) -> SftpResult + where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, + { + let max_concurrent_writes = cfg.max_concurrent_writes; + let max_packet_len = cfg.max_packet_len; + let mut session = RawSftpSession::new_with_config(stream, cfg); let version = session.init().await?; - let mut extensions = Extensions { - hardlink: version - .extensions - .get(extensions::HARDLINK) - .is_some_and(|e| e == "1"), - fsync: version - .extensions - .get(extensions::FSYNC) - .is_some_and(|e| e == "1"), - statvfs: version - .extensions - .get(extensions::STATVFS) - .is_some_and(|e| e == "2"), + let has_extension = |name, ver| version.extensions.get(name).is_some_and(|v| v == ver); + + let mut features = Features { + hardlink: has_extension(extensions::HARDLINK, "1"), + fsync: has_extension(extensions::FSYNC, "1"), + statvfs: has_extension(extensions::STATVFS, "2"), limits: None, + max_concurrent_writes, + max_packet_len, }; - if version - .extensions - .get(extensions::LIMITS) - .is_some_and(|e| e == "1") - { - let limits = session.limits().await?; - let limits = Arc::new(Limits::from(limits)); - - session.set_limits(limits.clone()); - extensions.limits = Some(limits); + if has_extension(extensions::LIMITS, "1") { + let limits = Limits::from(session.limits().await?); + session.set_limits(limits); + features.limits = Some(limits); + if let Some(plen) = limits.packet_len { + features.max_packet_len = (plen as u32).min(max_packet_len); + } } Ok(Self { session: Arc::new(session), - extensions: Arc::new(extensions), + features, }) } /// Set the maximum response time in seconds. /// Default: 10 seconds - pub async fn set_timeout(&self, secs: u64) { - self.session.set_timeout(secs).await; + pub fn set_timeout(&self, secs: u64) { + self.session.set_timeout(secs); } /// Closes the inner channel stream. @@ -128,11 +133,7 @@ impl SftpSession { attributes: FileAttributes, ) -> SftpResult { let handle = self.session.open(filename, flags, attributes).await?.handle; - Ok(File::new( - self.session.clone(), - handle, - self.extensions.clone(), - )) + Ok(File::new(self.session.clone(), handle, self.features)) } /// Requests the remote party for the absolute from the relative path. @@ -190,7 +191,7 @@ impl SftpSession { .files .into_iter() .map(|f| (f.filename, f.attrs)) - .chain(files) + .chain(files.into_iter()) .collect(); } Err(Error::Status(status)) if status.status_code == StatusCode::Eof => break, @@ -265,7 +266,7 @@ impl SftpSession { O: Into, N: Into, { - if !self.extensions.hardlink { + if !self.features.hardlink { return Ok(false); } @@ -275,7 +276,7 @@ impl SftpSession { /// Performs a statvfs on the remote file system path. /// Returns [`Ok(None)`] if the remote SFTP server does not support `statvfs@openssh.com` extension v2. pub async fn fs_info>(&self, path: P) -> SftpResult> { - if !self.extensions.statvfs { + if !self.features.statvfs { return Ok(None); } diff --git a/crates/bssh-russh-sftp/src/de.rs b/crates/bssh-russh-sftp/src/de.rs index 8e63a48c..75b505f7 100644 --- a/crates/bssh-russh-sftp/src/de.rs +++ b/crates/bssh-russh-sftp/src/de.rs @@ -170,7 +170,7 @@ impl<'de> serde::Deserializer<'de> for &mut Deserializer<'de> { where V: serde::de::Visitor<'de>, { - visitor.visit_byte_buf(self.input.try_get_bytes()?) + self.deserialize_bytes(visitor) } fn deserialize_option(self, _visitor: V) -> Result diff --git a/crates/bssh-russh-sftp/src/lib.rs b/crates/bssh-russh-sftp/src/lib.rs index 240bb300..80a1d489 100644 --- a/crates/bssh-russh-sftp/src/lib.rs +++ b/crates/bssh-russh-sftp/src/lib.rs @@ -1,6 +1,3 @@ -// Lints tripped by vendored upstream source that we do not want to diverge from. -#![allow(clippy::io_other_error)] - //! SFTP subsystem with client and server support for Russh and more! //! //! Crate can provide compatibility with anything that can provide the raw data diff --git a/crates/bssh-russh-sftp/src/protocol/mod.rs b/crates/bssh-russh-sftp/src/protocol/mod.rs index f1eb5210..50d58ab5 100644 --- a/crates/bssh-russh-sftp/src/protocol/mod.rs +++ b/crates/bssh-russh-sftp/src/protocol/mod.rs @@ -270,11 +270,7 @@ impl TryFrom for Bytes { Packet::ExtendedReply(reply) => (SSH_FXP_EXTENDED_REPLY, ser::to_bytes(&reply)?), }; - let length = payload - .len() - .checked_add(1) - .and_then(|len| u32::try_from(len).ok()) - .ok_or_else(|| Error::BadMessage("packet length exceeds u32".to_owned()))?; + let length = payload.len() as u32 + 1; let mut bytes = BytesMut::new(); bytes.put_u32(length); bytes.put_u8(r#type); @@ -282,100 +278,3 @@ impl TryFrom for Bytes { Ok(bytes.freeze()) } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn write_packet_uses_length_prefixed_bulk_data() { - let packet = Packet::Write(Write { - id: 7, - handle: "h".to_owned(), - offset: 9, - data: vec![0, 1, 2, 3], - }); - - let encoded = Bytes::try_from(packet).expect("serialize write packet"); - assert_eq!( - encoded.as_ref(), - &[ - 0, - 0, - 0, - 26, // packet length - SSH_FXP_WRITE, - 0, - 0, - 0, - 7, // request id - 0, - 0, - 0, - 1, - b'h', // handle - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 9, // offset - 0, - 0, - 0, - 4, - 0, - 1, - 2, - 3, // data - ] - ); - - let mut payload = encoded.slice(4..); - let decoded = Packet::try_from(&mut payload).expect("deserialize write packet"); - match decoded { - Packet::Write(write) => assert_eq!(write.data, [0, 1, 2, 3]), - _ => panic!("expected write packet"), - } - } - - #[test] - fn data_packet_uses_length_prefixed_bulk_data() { - let packet = Packet::Data(Data { - id: 8, - data: vec![4, 5, 6], - }); - - let encoded = Bytes::try_from(packet).expect("serialize data packet"); - assert_eq!( - encoded.as_ref(), - &[ - 0, - 0, - 0, - 12, // packet length - SSH_FXP_DATA, - 0, - 0, - 0, - 8, // request id - 0, - 0, - 0, - 3, - 4, - 5, - 6, // data - ] - ); - - let mut payload = encoded.slice(4..); - let decoded = Packet::try_from(&mut payload).expect("deserialize data packet"); - match decoded { - Packet::Data(data) => assert_eq!(data.data, [4, 5, 6]), - _ => panic!("expected data packet"), - } - } -} diff --git a/crates/bssh-russh-sftp/src/ser.rs b/crates/bssh-russh-sftp/src/ser.rs index ee7f6853..8ffba16f 100644 --- a/crates/bssh-russh-sftp/src/ser.rs +++ b/crates/bssh-russh-sftp/src/ser.rs @@ -104,9 +104,7 @@ impl<'a> serde::Serializer for &'a mut Serializer { } fn serialize_bytes(self, v: &[u8]) -> Result { - let len = u32::try_from(v.len()) - .map_err(|_| Error::BadMessage("bytes length exceeds u32".to_owned()))?; - self.output.put_u32(len); + self.output.put_u32(v.len() as u32); self.output.put_slice(v); Ok(()) } diff --git a/crates/bssh-russh-sftp/src/server/mod.rs b/crates/bssh-russh-sftp/src/server/mod.rs index 6228ed44..27a626cb 100644 --- a/crates/bssh-russh-sftp/src/server/mod.rs +++ b/crates/bssh-russh-sftp/src/server/mod.rs @@ -20,6 +20,21 @@ macro_rules! into_wrap { }; } +/// Configuration for the SFTP server. +#[derive(Clone, Debug)] +pub struct Config { + /// Maximum allowed size of SFTP packets sent by clients. Default: 256 KiB. + pub max_client_packet_len: u32, +} + +impl Default for Config { + fn default() -> Self { + Self { + max_client_packet_len: 262144, + } + } +} + async fn process_request(packet: Packet, handler: &mut H) -> Packet where H: Handler + Send, @@ -51,12 +66,12 @@ where } } -async fn process_handler(stream: &mut S, handler: &mut H) -> Result<(), Error> +async fn process_handler(stream: &mut S, handler: &mut H, cfg: &Config) -> Result<(), Error> where H: Handler + Send, S: AsyncRead + AsyncWrite + Unpin, { - let mut bytes = read_packet(stream).await?; + let mut bytes = read_packet(stream, cfg.max_client_packet_len).await?; let response = match Packet::try_from(&mut bytes) { Ok(request) => process_request(request, handler).await, @@ -71,14 +86,23 @@ where } /// Run processing stream as SFTP -pub async fn run(mut stream: S, mut handler: H) +pub async fn run(stream: S, handler: H) +where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, + H: Handler + Send + 'static, +{ + run_with_config(stream, handler, Config::default()).await +} + +/// Run processing stream as SFTP with custom configuration +pub async fn run_with_config(mut stream: S, mut handler: H, cfg: Config) where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, H: Handler + Send + 'static, { tokio::spawn(async move { loop { - match process_handler(&mut stream, &mut handler).await { + match process_handler(&mut stream, &mut handler, &cfg).await { Err(Error::UnexpectedEof) => break, Err(err) => warn!("{}", err), Ok(_) => (), diff --git a/crates/bssh-russh-sftp/src/utils.rs b/crates/bssh-russh-sftp/src/utils.rs index 783cdf14..b841dbb0 100644 --- a/crates/bssh-russh-sftp/src/utils.rs +++ b/crates/bssh-russh-sftp/src/utils.rs @@ -5,12 +5,18 @@ use tokio::io::{AsyncRead, AsyncReadExt}; use crate::error::Error; -pub fn unix(time: SystemTime) -> u32 { +pub(crate) fn unix(time: SystemTime) -> u32 { DateTime::::from(time).timestamp() as u32 } -pub async fn read_packet(stream: &mut S) -> Result { +pub(crate) async fn read_packet( + stream: &mut S, + max_length: u32, +) -> Result { let length = stream.read_u32().await?; + if length > max_length { + return Err(Error::BadMessage("packet length limit exceeded".to_owned())); + } let mut buf = vec![0; length as usize]; stream.read_exact(&mut buf).await?; diff --git a/crates/bssh-russh/Cargo.toml b/crates/bssh-russh/Cargo.toml index 35106428..cf5c9bda 100644 --- a/crates/bssh-russh/Cargo.toml +++ b/crates/bssh-russh/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bssh-russh" -version = "0.60.1" +version = "0.60.3" authors = ["Jeongkyu Shin "] description = "Temporary fork of russh with high-frequency PTY output fix (Handle::data from spawned tasks)" documentation = "https://docs.rs/bssh-russh" @@ -26,7 +26,7 @@ serde = ["ssh-key/serde"] [dependencies] aes = "0.8" async-trait = { version = "0.1.50", optional = true } -aws-lc-rs = { version = "1.16.3", optional = true } +aws-lc-rs = { version = "1.17.0", optional = true } bitflags = "2.0" block-padding = { version = "0.3", features = ["std"] } byteorder = "1.4" @@ -88,7 +88,7 @@ yasna = { version = "0.5.0", features = ["bit-vec", "num-bigint"], optional = tr zeroize = "1.7" # Public russh crates (no modifications needed) -russh-cryptovec = { version = "0.59.0", features = ["ssh-encoding"] } +russh-cryptovec = { version = "0.60.3", features = ["ssh-encoding"] } russh-util = "0.52.0" # Use the forked ssh-key from russh diff --git a/crates/bssh-russh/patches/agent-frame-length-cap.patch b/crates/bssh-russh/patches/agent-frame-length-cap.patch new file mode 100644 index 00000000..8ebd2716 --- /dev/null +++ b/crates/bssh-russh/patches/agent-frame-length-cap.patch @@ -0,0 +1,96 @@ +--- a/src/keys/agent/client.rs ++++ b/src/keys/agent/client.rs +@@ -16,6 +16,8 @@ + + impl AgentStream for S {} + ++const MAX_AGENT_FRAME_LEN: usize = 256 * 1024; ++ + /// SSH agent client. + pub struct AgentClient { + stream: S, +@@ -112,25 +114,29 @@ + } + + impl AgentClient { +- async fn read_response(&mut self) -> Result<(), Error> { +- // Writing the message +- self.stream.write_all(&self.buf).await?; +- self.stream.flush().await?; +- +- // Reading the length ++ async fn read_frame(&mut self) -> Result<(), Error> { + self.buf.clear(); + self.buf.resize(4, 0); + self.stream.read_exact(&mut self.buf).await?; + +- // Reading the rest of the buffer + let len = BigEndian::read_u32(&self.buf) as usize; ++ if len > MAX_AGENT_FRAME_LEN { ++ return Err(Error::AgentProtocolError); ++ } ++ + self.buf.clear(); + self.buf.resize(len, 0); + self.stream.read_exact(&mut self.buf).await?; +- + Ok(()) + } + ++ async fn read_response(&mut self) -> Result<(), Error> { ++ // Writing the message ++ self.stream.write_all(&self.buf).await?; ++ self.stream.flush().await?; ++ self.read_frame().await ++ } ++ + async fn read_success(&mut self) -> Result<(), Error> { + self.read_response().await?; + if self.buf.first() == Some(&msg::SUCCESS) { +--- a/src/keys/agent/server.rs ++++ b/src/keys/agent/server.rs +@@ -19,6 +19,8 @@ + use crate::keys::Error; + use crate::CryptoVec; + ++const MAX_AGENT_FRAME_LEN: usize = 256 * 1024; ++ + #[derive(Clone)] + #[allow(clippy::type_complexity)] + struct KeyStore(Arc, (Arc, SystemTime, Vec)>>>); +@@ -97,18 +99,26 @@ + impl + Connection + { ++ async fn read_frame(&mut self) -> Result<(), Error> { ++ self.buf.clear(); ++ self.buf.resize(4, 0); ++ self.s.read_exact(&mut self.buf).await?; ++ ++ let len = BigEndian::read_u32(&self.buf) as usize; ++ if len > MAX_AGENT_FRAME_LEN { ++ return Err(Error::AgentProtocolError); ++ } ++ ++ self.buf.clear(); ++ self.buf.resize(len, 0); ++ self.s.read_exact(&mut self.buf).await?; ++ Ok(()) ++ } ++ + async fn run(mut self) -> Result<(), Error> { + let mut writebuf = Vec::new(); + loop { +- // Reading the length +- self.buf.clear(); +- self.buf.resize(4, 0); +- self.s.read_exact(&mut self.buf).await?; +- // Reading the rest of the buffer +- let len = BigEndian::read_u32(&self.buf) as usize; +- self.buf.clear(); +- self.buf.resize(len, 0); +- self.s.read_exact(&mut self.buf).await?; ++ self.read_frame().await?; + // respond + writebuf.clear(); + self.respond(&mut writebuf).await?; diff --git a/crates/bssh-russh/src/keys/agent/client.rs b/crates/bssh-russh/src/keys/agent/client.rs index 53c39cf2..ec2e7c17 100644 --- a/crates/bssh-russh/src/keys/agent/client.rs +++ b/crates/bssh-russh/src/keys/agent/client.rs @@ -17,6 +17,8 @@ pub trait AgentStream: AsyncRead + AsyncWrite {} impl AgentStream for S {} +const MAX_AGENT_FRAME_LEN: usize = 256 * 1024; + /// SSH agent client. pub struct AgentClient { stream: S, @@ -112,25 +114,29 @@ impl AgentClient { } impl AgentClient { - async fn read_response(&mut self) -> Result<(), Error> { - // Writing the message - self.stream.write_all(&self.buf).await?; - self.stream.flush().await?; - - // Reading the length + async fn read_frame(&mut self) -> Result<(), Error> { self.buf.clear(); self.buf.resize(4, 0); self.stream.read_exact(&mut self.buf).await?; - // Reading the rest of the buffer let len = BigEndian::read_u32(&self.buf) as usize; + if len > MAX_AGENT_FRAME_LEN { + return Err(Error::AgentProtocolError); + } + self.buf.clear(); self.buf.resize(len, 0); self.stream.read_exact(&mut self.buf).await?; - Ok(()) } + async fn read_response(&mut self) -> Result<(), Error> { + // Writing the message + self.stream.write_all(&self.buf).await?; + self.stream.flush().await?; + self.read_frame().await + } + async fn read_success(&mut self) -> Result<(), Error> { self.read_response().await?; if self.buf.first() == Some(&msg::SUCCESS) { diff --git a/crates/bssh-russh/src/keys/agent/server.rs b/crates/bssh-russh/src/keys/agent/server.rs index 58c32b4e..9832826a 100644 --- a/crates/bssh-russh/src/keys/agent/server.rs +++ b/crates/bssh-russh/src/keys/agent/server.rs @@ -19,6 +19,8 @@ use crate::keys::key::PrivateKeyWithHashAlg; use crate::keys::Error; use crate::CryptoVec; +const MAX_AGENT_FRAME_LEN: usize = 256 * 1024; + #[derive(Clone)] #[allow(clippy::type_complexity)] struct KeyStore(Arc, (Arc, SystemTime, Vec)>>>); @@ -97,18 +99,26 @@ struct Connection { impl Connection { + async fn read_frame(&mut self) -> Result<(), Error> { + self.buf.clear(); + self.buf.resize(4, 0); + self.s.read_exact(&mut self.buf).await?; + + let len = BigEndian::read_u32(&self.buf) as usize; + if len > MAX_AGENT_FRAME_LEN { + return Err(Error::AgentProtocolError); + } + + self.buf.clear(); + self.buf.resize(len, 0); + self.s.read_exact(&mut self.buf).await?; + Ok(()) + } + async fn run(mut self) -> Result<(), Error> { let mut writebuf = Vec::new(); loop { - // Reading the length - self.buf.clear(); - self.buf.resize(4, 0); - self.s.read_exact(&mut self.buf).await?; - // Reading the rest of the buffer - let len = BigEndian::read_u32(&self.buf) as usize; - self.buf.clear(); - self.buf.resize(len, 0); - self.s.read_exact(&mut self.buf).await?; + self.read_frame().await?; // respond writebuf.clear(); self.respond(&mut writebuf).await?;