Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cli): read exact chunk len #7777

Merged
merged 3 commits into from Apr 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
94 changes: 66 additions & 28 deletions crates/net/downloaders/src/file_client.rs
Expand Up @@ -16,9 +16,11 @@ use thiserror::Error;
use tokio::{fs::File, io::AsyncReadExt};
use tokio_stream::StreamExt;
use tokio_util::codec::FramedRead;
use tracing::{trace, warn};
use tracing::{debug, trace, warn};

/// Byte length of chunk to read from chain file.
/// Default byte length of chunk to read from chain file.
///
/// Default is 1 GB.
pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000;

/// Front-end API for fetching chain data from a file.
Expand Down Expand Up @@ -70,7 +72,7 @@ impl FileClient {
let file_len = metadata.len();

let mut reader = vec![];
file.read_to_end(&mut reader).await.unwrap();
file.read_to_end(&mut reader).await?;

Ok(Self::from_reader(&reader[..], file_len).await?.0)
}
Expand All @@ -87,9 +89,15 @@ impl FileClient {
let mut hash_to_number = HashMap::new();
let mut bodies = HashMap::new();

// use with_capacity to make sure the internal buffer contains the entire file
// use with_capacity to make sure the internal buffer contains the entire chunk
let mut stream = FramedRead::with_capacity(reader, BlockFileCodec, num_bytes as usize);

trace!(target: "downloaders::file",
target_num_bytes=num_bytes,
capacity=stream.read_buffer().capacity(),
"init decode stream"
);

let mut remaining_bytes = vec![];

let mut log_interval = 0;
Expand All @@ -98,7 +106,12 @@ impl FileClient {
while let Some(block_res) = stream.next().await {
let block = match block_res {
Ok(block) => block,
Err(FileClientError::Rlp(_err, bytes)) => {
Err(FileClientError::Rlp(err, bytes)) => {
trace!(target: "downloaders::file",
%err,
bytes_len=bytes.len(),
"partial block returned from decoding chunk"
);
remaining_bytes = bytes;
break
}
Expand Down Expand Up @@ -135,7 +148,7 @@ impl FileClient {
log_interval += 1;
}

trace!(blocks = headers.len(), "Initialized file client");
trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");

Ok((Self { headers, hash_to_number, bodies }, remaining_bytes))
}
Expand Down Expand Up @@ -204,6 +217,16 @@ impl FileClient {
}
self
}

/// Returns the current number of headers in the client.
pub fn headers_len(&self) -> usize {
self.headers.len()
}

/// Returns the current number of bodies in the client.
pub fn bodies_len(&self) -> usize {
self.bodies.len()
}
}

impl HeadersClient for FileClient {
Expand Down Expand Up @@ -297,8 +320,8 @@ impl DownloadClient for FileClient {
pub struct ChunkedFileReader {
/// File to read from.
file: File,
/// Current file length.
file_len: u64,
/// Current file byte length.
file_byte_len: u64,
/// Bytes that have been read.
chunk: Vec<u8>,
/// Max bytes per chunk.
Expand All @@ -322,58 +345,73 @@ impl ChunkedFileReader {
pub async fn from_file(file: File, chunk_byte_len: u64) -> Result<Self, FileClientError> {
// get file len from metadata before reading
let metadata = file.metadata().await?;
let file_len = metadata.len();
let file_byte_len = metadata.len();

Ok(Self { file, file_len, chunk: vec![], chunk_byte_len })
Ok(Self { file, file_byte_len, chunk: vec![], chunk_byte_len })
}

/// Calculates the number of bytes to read from the chain file. Returns a tuple of the chunk
/// length and the remaining file length.
fn chunk_len(&self) -> u64 {
let Self { chunk_byte_len, file_len, .. } = *self;
let file_len = file_len + self.chunk.len() as u64;
let Self { chunk_byte_len, file_byte_len, .. } = *self;
let file_byte_len = file_byte_len + self.chunk.len() as u64;

if chunk_byte_len > file_len {
if chunk_byte_len > file_byte_len {
// last chunk
file_len
file_byte_len
} else {
chunk_byte_len
}
}

/// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
pub async fn next_chunk(&mut self) -> Result<Option<FileClient>, FileClientError> {
if self.file_len == 0 && self.chunk.is_empty() {
if self.file_byte_len == 0 && self.chunk.is_empty() {
// eof
return Ok(None)
}

let chunk_len = self.chunk_len();
let chunk_target_len = self.chunk_len();
let old_bytes_len = self.chunk.len() as u64;

// calculate reserved space in chunk
let new_bytes_len = chunk_len - old_bytes_len;
let new_read_bytes_target_len = chunk_target_len - old_bytes_len;

// read new bytes from file
let mut reader = BytesMut::with_capacity(new_bytes_len as usize);
self.file.read_buf(&mut reader).await?;
let mut reader = BytesMut::zeroed(new_read_bytes_target_len as usize);
// actual bytes that have been read
let new_read_bytes_len = self.file.read_exact(&mut reader).await? as u64;

// update remaining file length
self.file_len -= new_bytes_len;
self.file_byte_len -= new_read_bytes_len;

trace!(target: "downloaders::file",
max_chunk_byte_len=self.chunk_byte_len,
prev_read_bytes_len=self.chunk.len(),
new_bytes_len,
remaining_file_byte_len=self.file_len,
"new bytes were read from file"
);
let prev_read_bytes_len = self.chunk.len();

// read new bytes from file into chunk
self.chunk.extend_from_slice(&reader[..]);
let next_chunk_byte_len = self.chunk.len();

debug!(target: "downloaders::file",
max_chunk_byte_len=self.chunk_byte_len,
prev_read_bytes_len,
new_read_bytes_target_len,
new_read_bytes_len,
reader_capacity=reader.capacity(),
next_chunk_byte_len,
remaining_file_byte_len=self.file_byte_len,
"new bytes were read from file"
);

// make new file client from chunk
let (file_client, bytes) = FileClient::from_reader(&self.chunk[..], chunk_len).await?;
let (file_client, bytes) =
FileClient::from_reader(&self.chunk[..], next_chunk_byte_len as u64).await?;

debug!(target: "downloaders::file",
headers_len=file_client.headers.len(),
bodies_len=file_client.bodies.len(),
remaining_bytes_len=bytes.len(),
"parsed blocks that were read from file"
);

// save left over bytes
self.chunk = bytes;
Expand Down