Skip to content
34 changes: 32 additions & 2 deletions crates/trident/src/engine/context/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
collections::{BTreeMap, HashMap},
path::{Path, PathBuf},
time::Duration,
};

use anyhow::{bail, Context, Error};
Expand All @@ -9,14 +10,23 @@ use log::{debug, trace};

use trident_api::{
config::{HostConfiguration, Partition, VerityDevice},
constants::ROOT_MOUNT_POINT_PATH,
constants::{
internal_params::{
STREAM_SLOW_SPEED_REPORTING_INTERVAL_SECONDS,
STREAM_SLOW_SPEED_REPORTING_THRESHOLD_MBPS,
},
ROOT_MOUNT_POINT_PATH,
},
error::{InternalError, ReportError, TridentError},
status::{AbVolumeSelection, ServicingType},
storage_graph::graph::StorageGraph,
BlockDeviceId,
};

use crate::osimage::OsImage;
use crate::{
osimage::OsImage, STREAM_SLOW_SPEED_REPORTING_INTERVAL_SECONDS_DEFAULT,
STREAM_SLOW_SPEED_REPORTING_THRESHOLD_MBPS_DEFAULT,
};

#[allow(dead_code)]
pub mod filesystem;
Expand Down Expand Up @@ -313,6 +323,26 @@ impl EngineContext {
.zstd_decompression_parameters()
.and_then(|p| p.max_window_log)
}

/// Returns the threshold and interval for reporting slow streaming speed.
pub(crate) fn read_monitor_params(&self) -> Result<(f64, Duration), TridentError> {
Ok((
self.spec
.internal_params
.get::<f64>(STREAM_SLOW_SPEED_REPORTING_THRESHOLD_MBPS)
.transpose()
.map_err(TridentError::new)?
.unwrap_or(STREAM_SLOW_SPEED_REPORTING_THRESHOLD_MBPS_DEFAULT),
Duration::from_secs(
self.spec
.internal_params
.get_u64(STREAM_SLOW_SPEED_REPORTING_INTERVAL_SECONDS)
.transpose()
.map_err(TridentError::new)?
.unwrap_or(STREAM_SLOW_SPEED_REPORTING_INTERVAL_SECONDS_DEFAULT),
),
))
Comment thread
frhuelsz marked this conversation as resolved.
}
}

#[cfg(test)]
Expand Down
21 changes: 17 additions & 4 deletions crates/trident/src/engine/storage/image.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use trident_api::{

use crate::{
engine::{context::filesystem::FileSystemDataImage, EngineContext},
io_utils::{hashing_reader::HashingReader384, image_streamer},
io_utils::{hashing_reader::HashingReader384, image_streamer, read_monitor::ReadMonitor},
osimage::{OsImageFile, OsImagePartition},
};

Expand Down Expand Up @@ -150,6 +150,11 @@ pub(super) fn deploy_images(ctx: &EngineContext) -> Result<(), TridentError> {
);
}

// Get the threshold and interval for reporting slow streaming speed from
// the context, to be used in the ReadMonitor while streaming images to the
// block devices.
let (threshold_reporting, reporting_interval) = ctx.read_monitor_params()?;

os_img.read_images(|path, reader| -> ControlFlow<Result<(), TridentError>> {
let Some((id, image_file, resize)) = combined_images.remove(path) else {
debug!(
Expand All @@ -163,7 +168,15 @@ pub(super) fn deploy_images(ctx: &EngineContext) -> Result<(), TridentError> {
"Initializing '{id}': writing image for filesystem from '{}'",
os_img.source()
);
if let Err(e) = deploy_os_image_file(ctx, &id, image_file, resize, reader) {

let monitored_reader = ReadMonitor::new(
reader,
image_file.compressed_size,
threshold_reporting,
reporting_interval,
);

if let Err(e) = deploy_os_image_file(ctx, &id, image_file, resize, monitored_reader) {
return ControlFlow::Break(Err(e).structured(ServicingError::DeployImages));
}

Expand Down Expand Up @@ -290,12 +303,12 @@ enum FileSystemResize {
}

/// Deploys an individual OS image file from an OS image.
fn deploy_os_image_file(
fn deploy_os_image_file<R: Read>(
ctx: &EngineContext,
id: &BlockDeviceId,
image_file: &OsImageFile,
fs_resize: FileSystemResize,
reader: Box<dyn Read>,
reader: R,
) -> Result<(), Error> {
let block_device_path = ctx
.get_block_device_path(id)
Expand Down
87 changes: 79 additions & 8 deletions crates/trident/src/io_utils/http/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use anyhow::{ensure, Context, Error};
use log::{debug, trace, warn};
use oci_client::{secrets::RegistryAuth, Client as OciClient, Reference, RegistryOperation};
use reqwest::{
blocking::Client,
blocking::{Client, ClientBuilder},
header::{ACCEPT_RANGES, AUTHORIZATION},
};
use tokio::runtime::Runtime;
Expand All @@ -19,7 +19,14 @@ use url::Url;
#[cfg(feature = "dangerous-options")]
use docker_credential::{self, DockerCredential};

use crate::io_utils::http::subfile::HttpSubFile;
use super::subfile::HttpSubFile;

/// The maximum timeout for a single HTTP request to establish a connection.
/// This is not a timeout for the entire file read operation, but rather a
/// timeout for each individual HTTP request. The `HttpFile` implementation will
/// retry requests that fail due to transient errors, up to the overall timeout
/// specified when creating the `HttpFile`.
const MAX_PER_REQUEST_TIMEOUT_SECONDS: u64 = 10;

#[cfg(feature = "dangerous-options")]
const DOCKER_CONFIG_FILE_PATH: &str = ".docker/config.json";
Expand Down Expand Up @@ -81,8 +88,21 @@ impl HttpFile {
) -> IoResult<Self> {
debug!("Opening HTTP file '{}'", url);

// Create a new client for this file.
let client = Client::new();
// Create a new client for this file with a per-request connect timeout
// that is clamped to at most `MAX_PER_REQUEST_TIMEOUT_SECONDS` and the
// overall `timeout` passed in. We intentionally do not set a total
// request timeout here because body reads for large range requests can
// take much longer than the connection timeout, and reqwest's
// `.timeout()` applies to the entire transfer including body streaming.
//
// The clamped connect timeout is per request. We always do requests in
// a retry loop that respects the overall timeout given to us.
let connect_timeout = Duration::from_secs(MAX_PER_REQUEST_TIMEOUT_SECONDS).min(timeout);
let client = ClientBuilder::new()
.connect_timeout(connect_timeout)
.build()
.map_err(|e| IoError::other(format!("Failed to create HTTP client: {e}")))?;

let request_sender = || {
let mut request = client.head(url.as_str());
if let Some(token) = &token {
Expand Down Expand Up @@ -241,8 +261,16 @@ impl HttpFile {
})
}

/// Returns an HTTPSubFile object covering a specific section of the file.
pub(crate) fn section_reader(&self, section_offset: u64, size: u64) -> HttpSubFile {
fn section_reader_inner(&self, section_offset: u64, size: u64) -> HttpSubFile {
if size == 0 {
// When size is 0, create an empty subfile reader. This avoids
// making an HTTP request with an invalid range header (e.g. "Range:
// bytes=100-99") and also allows us to return an empty reader even
// if the server does not support range requests, as long as we
// never actually try to read from it.
return HttpSubFile::new_empty_with_client(self.url.clone(), self.client.clone());
}

let end = section_offset + size - 1;
Comment thread
frhuelsz marked this conversation as resolved.
trace!(
"Reading HTTP file '{}' from {} to {} (inclusive) [{} bytes]",
Expand All @@ -267,11 +295,17 @@ impl HttpFile {
subfile
}

/// Returns an HTTPSubFile object covering a specific section of the file.
pub(crate) fn section_reader(&self, section_offset: u64, size: u64) -> HttpSubFile {
self.section_reader_inner(section_offset, size)
}

/// Returns an HTTPSubFile object covering the complete file.
pub(crate) fn complete_reader(&self) -> HttpSubFile {
trace!("Reading complete HTTP file '{}'", self.url);
// Create a section reader optimized to read the complete file.
self.section_reader(0, self.size).with_end_is_parent_eof()
self.section_reader_inner(0, self.size)
.with_end_is_parent_eof()
}
}

Expand Down Expand Up @@ -327,21 +361,58 @@ impl Seek for HttpFile {
}

impl Read for HttpFile {
/// Implementation of `read()` from the `Read` trait for the HTTP file
/// reader provided for broad compatibility. Where possible, it is
/// recommended to use specialized methods, such as `section_reader()` or
/// `complete_reader()`, as they will make more efficient use of HTTP range
/// requests and avoid unnecessary requests. Each call to `read` will result
/// in a new HTTP request for the requested range of bytes, so using this
/// method for large reads may be inefficient.
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
let mut subfile = self.section_reader(self.position, buf.len() as u64);
if self.position >= self.size || buf.is_empty() {
return Ok(0);
}

let size_to_read = std::cmp::min(buf.len() as u64, self.size - self.position) as usize;
let mut subfile = self.section_reader(self.position, size_to_read as u64);
Comment thread
frhuelsz marked this conversation as resolved.
let res = subfile.read(buf)?;
self.position += res as u64;
Ok(res)
}

/// Implementation of `read_exact()` from the `Read` trait for the HTTP file
/// reader. Each call to `read_exact` will result in a new HTTP request for
/// the requested range of bytes, so using this method for large reads may
/// be inefficient. This method will return an error if there are not enough
/// bytes remaining in the file to fill the buffer, even if the end of the
/// file has not been reached yet.
fn read_exact(&mut self, buf: &mut [u8]) -> IoResult<()> {
if buf.is_empty() {
return Ok(());
}

if buf.len() as u64 > self.size - self.position {
return Err(IoError::new(
IoErrorKind::UnexpectedEof,
"Not enough bytes remaining in the file to fill the buffer",
));
}

let mut subfile = self.section_reader(self.position, buf.len() as u64);
subfile.read_exact(buf)?;
Comment thread
frhuelsz marked this conversation as resolved.
self.position += buf.len() as u64;
Ok(())
}

/// Implementation of `read_to_end()` from the `Read` trait for the HTTP
/// file reader. This method will read until the end of the file is reached.
/// In best case scenarios, only one HTTP request will be made. Internal
/// retries may result in additional requests.
fn read_to_end(&mut self, buf: &mut Vec<u8>) -> IoResult<usize> {
if self.position >= self.size {
return Ok(0);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be error?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question for read()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, per read's docs, Ok(0) is the expected response for EOF.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is position being passed the size really the same as EOF?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe?

    if self.position == self.size {
        return Ok(0);
    if self.position > self.size {
        return Err(..);

Copy link
Copy Markdown
Contributor

@fintelia fintelia Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't self.position > self.size already be caught in seek? That seems like the more natural place to return an error

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't know ... was just reacting to the code here :)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, seek does cover all out-of-bounds requests. The check here >= and not just == for safety. But I can also see the argument for making this an error.

Copy link
Copy Markdown
Contributor Author

@frhuelsz frhuelsz Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From stdlib, the general convention does not appear to be to return errors in this case. Cursor has the same behavior as this where the position is clamped (via max(position, size) ), and Ok(0) is returned for any position >= size.

Same for BufReader. Here they add a debug_assert to check for the condition in debug builds.

}

let mut subfile = self.section_reader(self.position, self.size - self.position);
let res = subfile.read_to_end(buf)?;
self.position += res as u64;
Expand Down
66 changes: 60 additions & 6 deletions crates/trident/src/io_utils/http/subfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ use super::HttpRangeRequest;
/// occurs, without the caller even needing to be aware of it. Note that the
/// `read()` call never fails in this example, even though multiple requests
/// were needed and a network error occurred during the process.
///
/// When `start > end`, the subfile is considered empty and all reads will
/// return EOF. This allows creating empty subfiles without needing to make an
/// HTTP request, which can be useful for certain edge cases and optimizations.
pub struct HttpSubFile {
/// The URL of the HTTP resource.
url: Url,
Expand Down Expand Up @@ -108,8 +112,8 @@ pub struct HttpSubFile {
}

impl HttpSubFile {
/// Creates a new HttpSubFile that reads the byte range [start, end] from the
/// given URL. The range is inclusive like the HTTP Range header, and is
/// Creates a new HttpSubFile that reads the byte range [start, end] from
/// the given URL. The range is inclusive like the HTTP Range header, and is
/// expected to have been validated beforehand.
#[allow(dead_code)] // Used in tests
pub fn new(url: Url, start: u64, end: u64) -> Self {
Expand All @@ -131,6 +135,22 @@ impl HttpSubFile {
}
}

/// Creates a new empty HttpSubFile with the given URL and HTTP client. The
/// returned HttpSubFile will always return `Ok(0)` when read.
pub(super) fn new_empty_with_client(url: Url, client: Client) -> Self {
HttpSubFile {
url,
start: 1,
end: 0,
client,
position: 0,
reader: None,
authorization: None,
timeout: Duration::from_secs(30),
end_is_parent_eof: false,
}
}

/// Sets the authorization header value to use for requests.
pub(super) fn with_authorization(mut self, authorization: impl Into<String>) -> Self {
self.authorization = Some(authorization.into());
Expand All @@ -153,8 +173,14 @@ impl HttpSubFile {

/// Returns the length of the subfile in bytes.
pub fn size(&self) -> u64 {
// Add 1 because the range is inclusive.
self.end - self.start + 1
if self.start > self.end {
// Invalid range, this means the subfile is empty and all reads
// should return EOF.
0
Comment thread
frhuelsz marked this conversation as resolved.
} else {
// Add 1 because the range is inclusive.
self.end - self.start + 1
}
}
Comment thread
frhuelsz marked this conversation as resolved.

/// Returns whether we have reached the end of the subfile.
Expand Down Expand Up @@ -236,7 +262,7 @@ impl HttpSubFile {

let response = super::retriable_request_sender(
|| {
let mut req = self.client.get(self.url.clone()).timeout(self.timeout);
let mut req = self.client.get(self.url.clone());

if let Some(range) = range.to_header_value_option() {
Comment thread
frhuelsz marked this conversation as resolved.
req = req.header(RANGE, range);
Expand Down Expand Up @@ -287,7 +313,7 @@ impl Read for HttpSubFile {
Ok(n) => n,
Err(e) => {
warn!(
"Error reading from HTTP subfile at position {}: {e}",
"Error reading from HTTP subfile at position {}: {e:?}",
self.position,
);

Expand Down Expand Up @@ -779,6 +805,34 @@ mod tests {
mock.assert();
}

#[test]
fn test_empty_subfile_when_start_greater_than_end() {
let url = Url::parse("http://localhost:1/should-not-be-contacted").unwrap();
let mut subfile = HttpSubFile::new(url, 10, 5);

assert_eq!(subfile.size(), 0);

let mut buf = vec![0_u8; 16];
let bytes_read = subfile.read(&mut buf).unwrap();
assert_eq!(bytes_read, 0);

// A second read should also return EOF.
let bytes_read = subfile.read(&mut buf).unwrap();
assert_eq!(bytes_read, 0);
}

#[test]
fn test_new_empty_with_client_returns_eof() {
let url = Url::parse("http://localhost:1/should-not-be-contacted").unwrap();
let mut subfile = HttpSubFile::new_empty_with_client(url, Client::new());

assert_eq!(subfile.size(), 0);

let mut buf = vec![0_u8; 16];
let bytes_read = subfile.read(&mut buf).unwrap();
assert_eq!(bytes_read, 0);
}

#[test]
fn test_timeout_is_respected() {
let mut subfile = HttpSubFile::new(
Expand Down
Loading
Loading