From 8295c69eb42907bce15cda19f50031d920d85c9b Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 2 Apr 2026 22:33:05 +0800 Subject: [PATCH 1/4] fix: replace panic with Result in LANCE_INITIAL_UPLOAD_SIZE validation Setting LANCE_INITIAL_UPLOAD_SIZE to an out-of-range value (<5MB or >5GB) previously crashed the process with panic!(). Now returns an error from ObjectWriter::new(), letting callers handle it gracefully. - Change initial_upload_size() return type from usize to Result - Cache Result in OnceLock (lance_core::Error is not Clone) - Store validated upload_size in ObjectWriter struct to avoid repeated OnceLock access and eliminate .expect() in next_part_buffer() - Preserve silent fallback to default for non-numeric values, consistent with sibling env vars (LANCE_UPLOAD_CONCURRENCY, LANCE_CONN_RESET_RETRIES) - Extract MAX_UPLOAD_PART_SIZE constant for the 5GB upper bound --- rust/lance-io/src/object_writer.rs | 63 ++++++++++++++++++++---------- 1 file changed, 42 insertions(+), 21 deletions(-) diff --git a/rust/lance-io/src/object_writer.rs b/rust/lance-io/src/object_writer.rs index 24b0e589d4e..eb554640460 100644 --- a/rust/lance-io/src/object_writer.rs +++ b/rust/lance-io/src/object_writer.rs @@ -47,23 +47,34 @@ fn max_conn_reset_retries() -> u16 { }) } -fn initial_upload_size() -> usize { - static LANCE_INITIAL_UPLOAD_SIZE: OnceLock = OnceLock::new(); - *LANCE_INITIAL_UPLOAD_SIZE.get_or_init(|| { - std::env::var("LANCE_INITIAL_UPLOAD_SIZE") - .ok() - .and_then(|s| s.parse::().ok()) - .inspect(|size| { - if *size < INITIAL_UPLOAD_STEP { - // Minimum part size in GCS and S3 - panic!("LANCE_INITIAL_UPLOAD_SIZE must be at least 5MB"); - } else if *size > 1024 * 1024 * 1024 * 5 { - // Maximum part size in GCS and S3 - panic!("LANCE_INITIAL_UPLOAD_SIZE must be at most 5GB"); - } - }) - .unwrap_or(INITIAL_UPLOAD_STEP) - }) +/// Maximum part size in GCS and S3: 5GB. +const MAX_UPLOAD_PART_SIZE: usize = 1024 * 1024 * 1024 * 5; + +fn initial_upload_size() -> Result { + static LANCE_INITIAL_UPLOAD_SIZE: OnceLock> = + OnceLock::new(); + LANCE_INITIAL_UPLOAD_SIZE + .get_or_init(|| { + let size = std::env::var("LANCE_INITIAL_UPLOAD_SIZE") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(INITIAL_UPLOAD_STEP); + if size < INITIAL_UPLOAD_STEP { + Err(format!( + "LANCE_INITIAL_UPLOAD_SIZE must be at least 5MB, got {} bytes", + size + )) + } else if size > MAX_UPLOAD_PART_SIZE { + Err(format!( + "LANCE_INITIAL_UPLOAD_SIZE must be at most 5GB, got {} bytes", + size + )) + } else { + Ok(size) + } + }) + .clone() + .map_err(Error::invalid_input) } /// Writer to an object in an object store. @@ -79,6 +90,7 @@ pub struct ObjectWriter { cursor: usize, connection_resets: u16, buffer: Vec, + upload_size: usize, // TODO: use constant size to support R2 use_constant_size_upload_parts: bool, } @@ -157,25 +169,32 @@ impl UploadState { impl ObjectWriter { pub async fn new(object_store: &LanceObjectStore, path: &Path) -> Result { + let upload_size = initial_upload_size()?; Ok(Self { state: UploadState::Started(object_store.inner.clone()), cursor: 0, path: Arc::new(path.clone()), connection_resets: 0, - buffer: Vec::with_capacity(initial_upload_size()), + buffer: Vec::with_capacity(upload_size), + upload_size, use_constant_size_upload_parts: object_store.use_constant_size_upload_parts, }) } /// Returns the contents of `buffer` as a `Bytes` object and resets `buffer`. /// The new capacity of `buffer` is determined by the current part index. - fn next_part_buffer(buffer: &mut Vec, part_idx: u16, constant_upload_size: bool) -> Bytes { + fn next_part_buffer( + buffer: &mut Vec, + part_idx: u16, + constant_upload_size: bool, + upload_size: usize, + ) -> Bytes { let new_capacity = if constant_upload_size { // The store does not support variable part sizes, so use the initial size. - initial_upload_size() + upload_size } else { // Increase the upload size every 100 parts. This gives maximum part size of 2.5TB. - initial_upload_size().max(((part_idx / 100) as usize + 1) * INITIAL_UPLOAD_STEP) + upload_size.max(((part_idx / 100) as usize + 1) * INITIAL_UPLOAD_STEP) }; let new_buffer = Vec::with_capacity(new_capacity); let part = std::mem::replace(buffer, new_buffer); @@ -222,6 +241,7 @@ impl ObjectWriter { &mut mut_self.buffer, 0, mut_self.use_constant_size_upload_parts, + mut_self.upload_size, ); futures.spawn(Self::put_part(upload.as_mut(), data, 0, None)); @@ -386,6 +406,7 @@ impl AsyncWrite for ObjectWriter { &mut mut_self.buffer, *part_idx, mut_self.use_constant_size_upload_parts, + mut_self.upload_size, ); futures.spawn( Self::put_part(upload.as_mut(), data, *part_idx, None) From 4017f868f5e46b6e9389bac05a57c933e6923706 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 9 Apr 2026 11:26:09 +0800 Subject: [PATCH 2/4] fix: warn and clamp LANCE_INITIAL_UPLOAD_SIZE instead of returning error Per review feedback on #6389: an out-of-range perf-tuning env var shouldn't fail writes. Clamp the value to the valid [5MB, 5GB] range and emit a tracing::warn! once (the existing OnceLock cache guarantees the warning fires at most once per process). This also matches the silent-fallback behavior of the sibling env vars LANCE_UPLOAD_CONCURRENCY and LANCE_CONN_RESET_RETRIES. --- rust/lance-io/src/object_writer.rs | 47 +++++++++++++----------------- 1 file changed, 21 insertions(+), 26 deletions(-) diff --git a/rust/lance-io/src/object_writer.rs b/rust/lance-io/src/object_writer.rs index eb554640460..0dcc2ddec20 100644 --- a/rust/lance-io/src/object_writer.rs +++ b/rust/lance-io/src/object_writer.rs @@ -50,31 +50,26 @@ fn max_conn_reset_retries() -> u16 { /// Maximum part size in GCS and S3: 5GB. const MAX_UPLOAD_PART_SIZE: usize = 1024 * 1024 * 1024 * 5; -fn initial_upload_size() -> Result { - static LANCE_INITIAL_UPLOAD_SIZE: OnceLock> = - OnceLock::new(); - LANCE_INITIAL_UPLOAD_SIZE - .get_or_init(|| { - let size = std::env::var("LANCE_INITIAL_UPLOAD_SIZE") - .ok() - .and_then(|s| s.parse::().ok()) - .unwrap_or(INITIAL_UPLOAD_STEP); - if size < INITIAL_UPLOAD_STEP { - Err(format!( - "LANCE_INITIAL_UPLOAD_SIZE must be at least 5MB, got {} bytes", - size - )) - } else if size > MAX_UPLOAD_PART_SIZE { - Err(format!( - "LANCE_INITIAL_UPLOAD_SIZE must be at most 5GB, got {} bytes", - size - )) - } else { - Ok(size) - } - }) - .clone() - .map_err(Error::invalid_input) +fn initial_upload_size() -> usize { + static LANCE_INITIAL_UPLOAD_SIZE: OnceLock = OnceLock::new(); + *LANCE_INITIAL_UPLOAD_SIZE.get_or_init(|| { + let Some(raw) = std::env::var("LANCE_INITIAL_UPLOAD_SIZE") + .ok() + .and_then(|s| s.parse::().ok()) + else { + return INITIAL_UPLOAD_STEP; + }; + let clamped = raw.clamp(INITIAL_UPLOAD_STEP, MAX_UPLOAD_PART_SIZE); + if clamped != raw { + // OnceLock caches the result, so this warning fires at most once per process. + tracing::warn!( + requested = raw, + clamped, + "LANCE_INITIAL_UPLOAD_SIZE must be between 5MB and 5GB; clamping to valid range" + ); + } + clamped + }) } /// Writer to an object in an object store. @@ -169,7 +164,7 @@ impl UploadState { impl ObjectWriter { pub async fn new(object_store: &LanceObjectStore, path: &Path) -> Result { - let upload_size = initial_upload_size()?; + let upload_size = initial_upload_size(); Ok(Self { state: UploadState::Started(object_store.inner.clone()), cursor: 0, From a47f8ece64255dd661886655c55d13e49fb97eec Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 9 Apr 2026 11:48:55 +0800 Subject: [PATCH 3/4] test: add boundary tests for clamp_initial_upload_size Extract the clamp logic into a pure helper so the boundary policy ([5MB, 5GB]) and was_clamped signal can be unit-tested without fighting the OnceLock cache in initial_upload_size(). --- rust/lance-io/src/object_writer.rs | 46 ++++++++++++++++++++++++++++-- 1 file changed, 44 insertions(+), 2 deletions(-) diff --git a/rust/lance-io/src/object_writer.rs b/rust/lance-io/src/object_writer.rs index 0dcc2ddec20..642357fa58f 100644 --- a/rust/lance-io/src/object_writer.rs +++ b/rust/lance-io/src/object_writer.rs @@ -50,6 +50,13 @@ fn max_conn_reset_retries() -> u16 { /// Maximum part size in GCS and S3: 5GB. const MAX_UPLOAD_PART_SIZE: usize = 1024 * 1024 * 1024 * 5; +/// Clamps a requested upload part size to the valid [5MB, 5GB] range. +/// Returns the clamped value and whether clamping was necessary. +fn clamp_initial_upload_size(raw: usize) -> (usize, bool) { + let clamped = raw.clamp(INITIAL_UPLOAD_STEP, MAX_UPLOAD_PART_SIZE); + (clamped, clamped != raw) +} + fn initial_upload_size() -> usize { static LANCE_INITIAL_UPLOAD_SIZE: OnceLock = OnceLock::new(); *LANCE_INITIAL_UPLOAD_SIZE.get_or_init(|| { @@ -59,8 +66,8 @@ fn initial_upload_size() -> usize { else { return INITIAL_UPLOAD_STEP; }; - let clamped = raw.clamp(INITIAL_UPLOAD_STEP, MAX_UPLOAD_PART_SIZE); - if clamped != raw { + let (clamped, was_clamped) = clamp_initial_upload_size(raw); + if was_clamped { // OnceLock caches the result, so this warning fires at most once per process. tracing::warn!( requested = raw, @@ -836,4 +843,39 @@ mod tests { assert!(!temp_file_path.exists()); assert!(!file_path.exists()); } + + #[test] + fn clamp_initial_upload_size_below_min_is_clamped_up() { + assert_eq!(clamp_initial_upload_size(0), (INITIAL_UPLOAD_STEP, true)); + assert_eq!( + clamp_initial_upload_size(INITIAL_UPLOAD_STEP - 1), + (INITIAL_UPLOAD_STEP, true) + ); + } + + #[test] + fn clamp_initial_upload_size_within_range_is_unchanged() { + assert_eq!( + clamp_initial_upload_size(INITIAL_UPLOAD_STEP), + (INITIAL_UPLOAD_STEP, false) + ); + assert_eq!( + clamp_initial_upload_size(MAX_UPLOAD_PART_SIZE), + (MAX_UPLOAD_PART_SIZE, false) + ); + let mid = INITIAL_UPLOAD_STEP * 8; // 40MB, in range + assert_eq!(clamp_initial_upload_size(mid), (mid, false)); + } + + #[test] + fn clamp_initial_upload_size_above_max_is_clamped_down() { + assert_eq!( + clamp_initial_upload_size(MAX_UPLOAD_PART_SIZE + 1), + (MAX_UPLOAD_PART_SIZE, true) + ); + assert_eq!( + clamp_initial_upload_size(usize::MAX), + (MAX_UPLOAD_PART_SIZE, true) + ); + } } From 105338ed1a5ceb850740d28bbf15f0b3fd685555 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Fri, 10 Apr 2026 00:23:13 +0800 Subject: [PATCH 4/4] refactor: remove redundant upload_size field from ObjectWriter Per review feedback, the cached upload_size field is unnecessary since initial_upload_size() is backed by OnceLock and cheap to call directly. --- rust/lance-io/src/object_writer.rs | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/rust/lance-io/src/object_writer.rs b/rust/lance-io/src/object_writer.rs index 642357fa58f..878687be91e 100644 --- a/rust/lance-io/src/object_writer.rs +++ b/rust/lance-io/src/object_writer.rs @@ -92,7 +92,6 @@ pub struct ObjectWriter { cursor: usize, connection_resets: u16, buffer: Vec, - upload_size: usize, // TODO: use constant size to support R2 use_constant_size_upload_parts: bool, } @@ -171,32 +170,25 @@ impl UploadState { impl ObjectWriter { pub async fn new(object_store: &LanceObjectStore, path: &Path) -> Result { - let upload_size = initial_upload_size(); Ok(Self { state: UploadState::Started(object_store.inner.clone()), cursor: 0, path: Arc::new(path.clone()), connection_resets: 0, - buffer: Vec::with_capacity(upload_size), - upload_size, + buffer: Vec::with_capacity(initial_upload_size()), use_constant_size_upload_parts: object_store.use_constant_size_upload_parts, }) } /// Returns the contents of `buffer` as a `Bytes` object and resets `buffer`. /// The new capacity of `buffer` is determined by the current part index. - fn next_part_buffer( - buffer: &mut Vec, - part_idx: u16, - constant_upload_size: bool, - upload_size: usize, - ) -> Bytes { + fn next_part_buffer(buffer: &mut Vec, part_idx: u16, constant_upload_size: bool) -> Bytes { let new_capacity = if constant_upload_size { // The store does not support variable part sizes, so use the initial size. - upload_size + initial_upload_size() } else { // Increase the upload size every 100 parts. This gives maximum part size of 2.5TB. - upload_size.max(((part_idx / 100) as usize + 1) * INITIAL_UPLOAD_STEP) + initial_upload_size().max(((part_idx / 100) as usize + 1) * INITIAL_UPLOAD_STEP) }; let new_buffer = Vec::with_capacity(new_capacity); let part = std::mem::replace(buffer, new_buffer); @@ -243,7 +235,6 @@ impl ObjectWriter { &mut mut_self.buffer, 0, mut_self.use_constant_size_upload_parts, - mut_self.upload_size, ); futures.spawn(Self::put_part(upload.as_mut(), data, 0, None)); @@ -408,7 +399,6 @@ impl AsyncWrite for ObjectWriter { &mut mut_self.buffer, *part_idx, mut_self.use_constant_size_upload_parts, - mut_self.upload_size, ); futures.spawn( Self::put_part(upload.as_mut(), data, *part_idx, None)