From 757ae6d1f842a9b0c4fd4e10e65d2248b8582646 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Mon, 25 Nov 2024 15:29:02 +0100 Subject: [PATCH 1/3] controllers/krate/publish: Validate JSON metadata before reading tarball Previously, the `BytesRequest` allocated memory for the full tarball (and the JSON metadata blob) before even validating that the `name` and `vers` fields of the JSON metadata correspond to a crate and version that the user has publish access too. This commit changes the code to first read the JSON metadata from the request body stream, validate it, and then read the tarball bytes afterwards. --- Cargo.lock | 1 + Cargo.toml | 1 + src/controllers/krate/publish.rs | 112 +++++++++++++++++------------ src/tests/krate/publish/tarball.rs | 26 +++++-- 4 files changed, 87 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8a4dcd993f4..746641a3aa2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1114,6 +1114,7 @@ dependencies = [ "tikv-jemallocator", "tokio", "tokio-postgres", + "tokio-util", "toml", "tower", "tower-http", diff --git a/Cargo.toml b/Cargo.toml index fbc6baca651..79de19769c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -114,6 +114,7 @@ tempfile = "=3.14.0" thiserror = "=2.0.3" tokio = { version = "=1.41.1", features = ["net", "signal", "io-std", "io-util", "rt-multi-thread", "macros", "process"]} tokio-postgres = "=0.7.12" +tokio-util = "=0.7.12" toml = "=0.8.19" tower = "=0.5.1" tower-http = { version = "=0.6.2", features = ["add-extension", "fs", "catch-panic", "timeout", "compression-full"] } diff --git a/src/controllers/krate/publish.rs b/src/controllers/krate/publish.rs index 819771cbc94..dda8070b8c3 100644 --- a/src/controllers/krate/publish.rs +++ b/src/controllers/krate/publish.rs @@ -5,7 +5,7 @@ use crate::auth::AuthCheck; use crate::worker::jobs::{ self, CheckTyposquat, SendPublishNotificationsJob, UpdateDefaultVersion, }; -use axum::body::Bytes; +use axum::body::{Body, Bytes}; use axum::Json; use cargo_manifest::{Dependency, DepsSet, TargetDepsSet}; use chrono::{DateTime, SecondsFormat, Utc}; @@ -17,10 +17,12 @@ use diesel_async::scoped_futures::ScopedFutureExt; use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl}; use futures_util::TryStreamExt; use hex::ToHex; +use http::request::Parts; use http::StatusCode; -use hyper::body::Buf; use sha2::{Digest, Sha256}; use std::collections::HashMap; +use tokio::io::AsyncReadExt; +use tokio_util::io::StreamReader; use url::Url; use crate::models::{ @@ -35,7 +37,7 @@ use crate::rate_limiter::LimitedAction; use crate::schema::*; use crate::sql::canon_crate_name; use crate::util::errors::{bad_request, custom, internal, AppResult, BoxedAppError}; -use crate::util::{BytesRequest, Maximums}; +use crate::util::Maximums; use crate::views::{ EncodableCrate, EncodableCrateDependency, GoodCrate, PublishMetadata, PublishWarnings, }; @@ -54,13 +56,50 @@ const MAX_DESCRIPTION_LENGTH: usize = 1000; /// Currently blocks the HTTP thread, perhaps some function calls can spawn new /// threads and return completion or error through other methods a `cargo publish /// --status` command, via crates.io's front end, or email. -pub async fn publish(app: AppState, req: BytesRequest) -> AppResult> { - let (req, bytes) = req.0.into_parts(); - let (json_bytes, tarball_bytes) = split_body(bytes)?; +pub async fn publish(app: AppState, req: Parts, body: Body) -> AppResult> { + let stream = body.into_data_stream(); + let stream = stream.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)); + let mut reader = StreamReader::new(stream); + + // The format of the req.body() of a publish request is as follows: + // + // metadata length + // metadata in JSON about the crate being published + // .crate tarball length + // .crate tarball file + + const MAX_JSON_LENGTH: u32 = 1024 * 1024; // 1 MB + + let json_len = reader.read_u32_le().await.map_err(|e| { + if e.kind() == std::io::ErrorKind::UnexpectedEof { + bad_request("invalid metadata length") + } else { + e.into() + } + })?; + + if json_len > MAX_JSON_LENGTH { + return Err(custom( + StatusCode::PAYLOAD_TOO_LARGE, + "JSON metadata blob too large", + )); + } + + let mut json_bytes = vec![0; json_len as usize]; + reader.read_exact(&mut json_bytes).await.map_err(|e| { + if e.kind() == std::io::ErrorKind::UnexpectedEof { + let message = format!("invalid metadata length for remaining payload: {json_len}"); + bad_request(message) + } else { + e.into() + } + })?; let metadata: PublishMetadata = serde_json::from_slice(&json_bytes) .map_err(|e| bad_request(format_args!("invalid upload request: {e}")))?; + drop(json_bytes); + Crate::validate_crate_name("crate", &metadata.name).map_err(bad_request)?; let semver = match semver::Version::parse(&metadata.vers) { @@ -136,7 +175,14 @@ pub async fn publish(app: AppState, req: BytesRequest) -> AppResult AppResult AppResult<(Bytes, Bytes)> { - // The format of the req.body() of a publish request is as follows: - // - // metadata length - // metadata in JSON about the crate being published - // .crate tarball length - // .crate tarball file - - if bytes.len() < 4 { - // Avoid panic in `get_u32_le()` if there is not enough remaining data - return Err(bad_request("invalid metadata length")); - } - - let json_len = bytes.get_u32_le() as usize; - if json_len > bytes.len() { - return Err(bad_request(format!( - "invalid metadata length for remaining payload: {json_len}" - ))); - } - - let json_bytes = bytes.split_to(json_len); - - if bytes.len() < 4 { - // Avoid panic in `get_u32_le()` if there is not enough remaining data - return Err(bad_request("invalid tarball length")); - } - - let tarball_len = bytes.get_u32_le() as usize; - if tarball_len > bytes.len() { - return Err(bad_request(format!( - "invalid tarball length for remaining payload: {tarball_len}" - ))); - } - - let tarball_bytes = bytes.split_to(tarball_len); - - Ok((json_bytes, tarball_bytes)) -} - async fn is_reserved_name(name: &str, conn: &mut AsyncPgConnection) -> QueryResult { select(exists(reserved_crate_names::table.filter( canon_crate_name(reserved_crate_names::name).eq(canon_crate_name(name)), diff --git a/src/tests/krate/publish/tarball.rs b/src/tests/krate/publish/tarball.rs index 95cf9a57f41..43ce7010b81 100644 --- a/src/tests/krate/publish/tarball.rs +++ b/src/tests/krate/publish/tarball.rs @@ -1,5 +1,6 @@ use crate::tests::builders::PublishBuilder; use crate::tests::util::{RequestHelper, TestApp}; +use bytes::{BufMut, BytesMut}; use crates_io_tarball::TarballBuilder; use googletest::prelude::*; use http::StatusCode; @@ -80,9 +81,16 @@ async fn json_bytes_truncated() { async fn tarball_len_truncated() { let (app, _, _, token) = TestApp::full().with_token().await; - let response = token - .publish_crate(&[2, 0, 0, 0, b'{', b'}', 0, 0] as &[u8]) - .await; + let json = br#"{ "name": "foo", "vers": "1.0.0" }"#; + + let mut bytes = BytesMut::new(); + bytes.put_u32_le(json.len() as u32); + bytes.put_slice(json); + bytes.put_u8(0); + bytes.put_u8(0); + + let response = token.publish_crate(bytes.freeze()).await; + assert_eq!(response.status(), StatusCode::BAD_REQUEST); assert_snapshot!(response.text(), @r#"{"errors":[{"detail":"invalid tarball length"}]}"#); assert_that!(app.stored_files().await, empty()); @@ -92,9 +100,15 @@ async fn tarball_len_truncated() { async fn tarball_bytes_truncated() { let (app, _, _, token) = TestApp::full().with_token().await; - let response = token - .publish_crate(&[2, 0, 0, 0, b'{', b'}', 100, 0, 0, 0, 0] as &[u8]) - .await; + let json = br#"{ "name": "foo", "vers": "1.0.0" }"#; + + let mut bytes = BytesMut::new(); + bytes.put_u32_le(json.len() as u32); + bytes.put_slice(json); + bytes.put_u32_le(100); + bytes.put_u8(0); + + let response = token.publish_crate(bytes.freeze()).await; assert_eq!(response.status(), StatusCode::BAD_REQUEST); assert_snapshot!(response.text(), @r#"{"errors":[{"detail":"invalid tarball length for remaining payload: 100"}]}"#); assert_that!(app.stored_files().await, empty()); From 973360942f3fda0d69740b1222212d92d5eae435 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Mon, 25 Nov 2024 15:45:44 +0100 Subject: [PATCH 2/3] controllers/krate/publish: Extract `read_json_metadata()` fn --- src/controllers/krate/publish.rs | 64 ++++++++++++++++---------------- 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/src/controllers/krate/publish.rs b/src/controllers/krate/publish.rs index dda8070b8c3..38bced9441a 100644 --- a/src/controllers/krate/publish.rs +++ b/src/controllers/krate/publish.rs @@ -21,7 +21,7 @@ use http::request::Parts; use http::StatusCode; use sha2::{Digest, Sha256}; use std::collections::HashMap; -use tokio::io::AsyncReadExt; +use tokio::io::{AsyncRead, AsyncReadExt}; use tokio_util::io::StreamReader; use url::Url; @@ -69,36 +69,7 @@ pub async fn publish(app: AppState, req: Parts, body: Body) -> AppResult MAX_JSON_LENGTH { - return Err(custom( - StatusCode::PAYLOAD_TOO_LARGE, - "JSON metadata blob too large", - )); - } - - let mut json_bytes = vec![0; json_len as usize]; - reader.read_exact(&mut json_bytes).await.map_err(|e| { - if e.kind() == std::io::ErrorKind::UnexpectedEof { - let message = format!("invalid metadata length for remaining payload: {json_len}"); - bad_request(message) - } else { - e.into() - } - })?; - - let metadata: PublishMetadata = serde_json::from_slice(&json_bytes) - .map_err(|e| bad_request(format_args!("invalid upload request: {e}")))?; - - drop(json_bytes); + let metadata = read_json_metadata(&mut reader, MAX_JSON_LENGTH).await?; Crate::validate_crate_name("crate", &metadata.name).map_err(bad_request)?; @@ -631,6 +602,37 @@ async fn count_versions_published_today( .await } +async fn read_json_metadata( + reader: &mut R, + max_length: u32, +) -> Result { + let json_len = reader.read_u32_le().await.map_err(|e| { + if e.kind() == std::io::ErrorKind::UnexpectedEof { + bad_request("invalid metadata length") + } else { + e.into() + } + })?; + + if json_len > max_length { + let message = "JSON metadata blob too large"; + return Err(custom(StatusCode::PAYLOAD_TOO_LARGE, message)); + } + + let mut json_bytes = vec![0; json_len as usize]; + reader.read_exact(&mut json_bytes).await.map_err(|e| { + if e.kind() == std::io::ErrorKind::UnexpectedEof { + let message = format!("invalid metadata length for remaining payload: {json_len}"); + bad_request(message) + } else { + e.into() + } + })?; + + serde_json::from_slice(&json_bytes) + .map_err(|e| bad_request(format_args!("invalid upload request: {e}"))) +} + async fn is_reserved_name(name: &str, conn: &mut AsyncPgConnection) -> QueryResult { select(exists(reserved_crate_names::table.filter( canon_crate_name(reserved_crate_names::name).eq(canon_crate_name(name)), From cf2a4feae4e418e5653fa0420aca0cdf7d3ea98c Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Mon, 25 Nov 2024 15:50:17 +0100 Subject: [PATCH 3/3] controllers/krate/publish: Extract `read_tarball_bytes()` fn --- src/controllers/krate/publish.rs | 59 +++++++++++++++++--------------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/src/controllers/krate/publish.rs b/src/controllers/krate/publish.rs index 38bced9441a..57e0320f765 100644 --- a/src/controllers/krate/publish.rs +++ b/src/controllers/krate/publish.rs @@ -146,39 +146,14 @@ pub async fn publish(app: AppState, req: Parts, body: Body) -> AppResult maximums.max_upload_size { - return Err(custom( - StatusCode::PAYLOAD_TOO_LARGE, - format!("max upload size is: {}", maximums.max_upload_size), - )); - } - - let mut tarball_bytes = vec![0; tarball_len as usize]; - reader.read_exact(&mut tarball_bytes).await.map_err(|e| { - if e.kind() == std::io::ErrorKind::UnexpectedEof { - let message = format!("invalid tarball length for remaining payload: {tarball_len}"); - bad_request(message) - } else { - e.into() - } - })?; - - let tarball_bytes = Bytes::from(tarball_bytes); + let tarball_bytes = read_tarball_bytes(&mut reader, maximums.max_upload_size as u32).await?; + let content_length = tarball_bytes.len() as u64; let pkg_name = format!("{}-{}", &*metadata.name, &version_string); let tarball_info = @@ -633,6 +608,36 @@ async fn read_json_metadata( .map_err(|e| bad_request(format_args!("invalid upload request: {e}"))) } +async fn read_tarball_bytes( + reader: &mut R, + max_length: u32, +) -> Result { + let tarball_len = reader.read_u32_le().await.map_err(|e| { + if e.kind() == std::io::ErrorKind::UnexpectedEof { + bad_request("invalid tarball length") + } else { + e.into() + } + })?; + + if tarball_len > max_length { + let message = format!("max upload size is: {}", max_length); + return Err(custom(StatusCode::PAYLOAD_TOO_LARGE, message)); + } + + let mut tarball_bytes = vec![0; tarball_len as usize]; + reader.read_exact(&mut tarball_bytes).await.map_err(|e| { + if e.kind() == std::io::ErrorKind::UnexpectedEof { + let message = format!("invalid tarball length for remaining payload: {tarball_len}"); + bad_request(message) + } else { + e.into() + } + })?; + + Ok(Bytes::from(tarball_bytes)) +} + async fn is_reserved_name(name: &str, conn: &mut AsyncPgConnection) -> QueryResult { select(exists(reserved_crate_names::table.filter( canon_crate_name(reserved_crate_names::name).eq(canon_crate_name(name)),