Skip to content

Commit

Permalink
rm nippyjar from reth-interfaces (#7081)
Browse files Browse the repository at this point in the history
  • Loading branch information
justcode740 committed Mar 12, 2024
1 parent 9707cb2 commit 024bb26
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 38 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/interfaces/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ workspace = true

[dependencies]
reth-primitives.workspace = true
reth-nippy-jar.workspace = true
reth-rpc-types.workspace = true
reth-network-api.workspace = true
# TODO(onbjerg): We only need this for [BlockBody]
Expand Down
6 changes: 0 additions & 6 deletions crates/interfaces/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,6 @@ pub enum ProviderError {
BlockNumberOverflow(U256),
}

impl From<reth_nippy_jar::NippyJarError> for ProviderError {
fn from(err: reth_nippy_jar::NippyJarError) -> Self {
ProviderError::NippyJar(err.to_string())
}
}

impl From<reth_primitives::fs::FsPathError> for ProviderError {
fn from(err: reth_primitives::fs::FsPathError) -> Self {
ProviderError::FsPathError(err.to_string())
Expand Down
8 changes: 6 additions & 2 deletions crates/static-file/src/segments/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use reth_primitives::{
},
BlockNumber, StaticFileSegment,
};
use reth_provider::{providers::StaticFileProvider, DatabaseProviderRO, TransactionsProviderExt};
use reth_provider::{
providers::StaticFileProvider, DatabaseProviderRO, ProviderError, TransactionsProviderExt,
};
use std::{ops::RangeInclusive, path::Path};

pub(crate) type Rows<const COLUMNS: usize> = [Vec<Vec<u8>>; COLUMNS];
Expand Down Expand Up @@ -82,7 +84,9 @@ pub(crate) fn prepare_jar<DB: Database, const COLUMNS: usize>(
let dataset = prepare_compression()?;

nippy_jar = nippy_jar.with_zstd(true, 5_000_000);
nippy_jar.prepare_compression(dataset.to_vec())?;
nippy_jar
.prepare_compression(dataset.to_vec())
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
nippy_jar
}
Compression::Uncompressed => nippy_jar,
Expand Down
10 changes: 7 additions & 3 deletions crates/storage/db/src/static_file/cursor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::mask::{ColumnSelectorOne, ColumnSelectorThree, ColumnSelectorTwo};
use crate::table::Decompress;
use derive_more::{Deref, DerefMut};
use reth_interfaces::provider::ProviderResult;
use reth_interfaces::provider::{ProviderError, ProviderResult};
use reth_nippy_jar::{DataReader, NippyJar, NippyJarCursor};
use reth_primitives::{static_file::SegmentHeader, B256};
use std::sync::Arc;
Expand All @@ -13,7 +13,10 @@ pub struct StaticFileCursor<'a>(NippyJarCursor<'a, SegmentHeader>);
impl<'a> StaticFileCursor<'a> {
/// Returns a new [`StaticFileCursor`].
pub fn new(jar: &'a NippyJar<SegmentHeader>, reader: Arc<DataReader>) -> ProviderResult<Self> {
Ok(Self(NippyJarCursor::with_reader(jar, reader)?))
Ok(Self(
NippyJarCursor::with_reader(jar, reader)
.map_err(|err| ProviderError::NippyJar(err.to_string()))?,
))
}

/// Returns the current `BlockNumber` or `TxNumber` of the cursor depending on the kind of
Expand Down Expand Up @@ -43,7 +46,8 @@ impl<'a> StaticFileCursor<'a> {
}
None => Ok(None),
},
}?;
}
.map_or(None, |v| v);

Ok(row)
}
Expand Down
25 changes: 19 additions & 6 deletions crates/storage/db/src/static_file/generation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
RawKey, RawTable,
};

use reth_interfaces::provider::ProviderResult;
use reth_interfaces::provider::{ProviderError, ProviderResult};
use reth_nippy_jar::{ColumnResult, NippyJar, NippyJarHeader, PHFKey};
use reth_tracing::tracing::*;
use std::{error::Error as StdError, ops::RangeInclusive};
Expand Down Expand Up @@ -55,15 +55,28 @@ macro_rules! generate_static_file_func {
// Create PHF and Filter if required
if let Some(keys) = keys {
debug!(target: "reth::static_file", "Calculating Filter, PHF and offset index list");
nippy_jar.prepare_index(keys, row_count)?;
debug!(target: "reth::static_file", "Filter, PHF and offset index list calculated.");
match nippy_jar.prepare_index(keys, row_count) {
Ok(_) => {
debug!(target: "reth::static_file", "Filter, PHF and offset index list calculated.");
},
Err(e) => {
return Err(ProviderError::NippyJar(e.to_string()));
}
}
}

// Create compression dictionaries if required
if let Some(data_sets) = dict_compression_set {
debug!(target: "reth::static_file", "Creating compression dictionaries.");
nippy_jar.prepare_compression(data_sets)?;
debug!(target: "reth::static_file", "Compression dictionaries created.");
match nippy_jar.prepare_compression(data_sets){
Ok(_) => {
debug!(target: "reth::static_file", "Compression dictionaries created.");
},
Err(e) => {
return Err(ProviderError::NippyJar(e.to_string()));
}
}

}

// Creates the cursors for the columns
Expand All @@ -88,7 +101,7 @@ macro_rules! generate_static_file_func {

debug!(target: "reth::static_file", jar=?nippy_jar, "Generating static file.");

let nippy_jar = nippy_jar.freeze(col_iterators.into_iter().chain(additional).collect(), row_count as u64)?;
let nippy_jar = nippy_jar.freeze(col_iterators.into_iter().chain(additional).collect(), row_count as u64).map_err(|e| ProviderError::NippyJar(e.to_string()));

debug!(target: "reth::static_file", jar=?nippy_jar, "Static file generated.");

Expand Down
22 changes: 14 additions & 8 deletions crates/storage/provider/src/providers/static_file/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ impl StaticFileProvider {
pub fn report_metrics(&self) -> ProviderResult<()> {
let Some(metrics) = &self.metrics else { return Ok(()) };

let static_files = iter_static_files(&self.path)?;
let static_files =
iter_static_files(&self.path).map_err(|e| ProviderError::NippyJar(e.to_string()))?;
for (segment, ranges) in static_files {
let mut entries = 0;
let mut size = 0;
Expand Down Expand Up @@ -243,14 +244,15 @@ impl StaticFileProvider {
} else {
let mut jar = NippyJar::<SegmentHeader>::load(
&self.path.join(segment.filename(&fixed_block_range)),
)?;
)
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
if self.load_filters {
jar.load_filters()?;
jar.load_filters().map_err(|e| ProviderError::NippyJar(e.to_string()))?;
}
jar
};

jar.delete()?;
jar.delete().map_err(|e| ProviderError::NippyJar(e.to_string()))?;

let mut segment_max_block = None;
if fixed_block_range.start() > 0 {
Expand All @@ -276,9 +278,10 @@ impl StaticFileProvider {
jar.into()
} else {
let path = self.path.join(segment.filename(fixed_block_range));
let mut jar = NippyJar::load(&path)?;
let mut jar =
NippyJar::load(&path).map_err(|e| ProviderError::NippyJar(e.to_string()))?;
if self.load_filters {
jar.load_filters()?;
jar.load_filters().map_err(|e| ProviderError::NippyJar(e.to_string()))?;
}

self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
Expand Down Expand Up @@ -353,7 +356,8 @@ impl StaticFileProvider {

let jar = NippyJar::<SegmentHeader>::load(
&self.path.join(segment.filename(&fixed_range)),
)?;
)
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;

// Updates the tx index by first removing all entries which have a higher
// block_start than our current static file.
Expand Down Expand Up @@ -416,7 +420,9 @@ impl StaticFileProvider {

tx_index.clear();

for (segment, ranges) in iter_static_files(&self.path)? {
for (segment, ranges) in
iter_static_files(&self.path).map_err(|e| ProviderError::NippyJar(e.to_string()))?
{
// Update last block for each segment
if let Some((block_range, _)) = ranges.last() {
max_block.insert(segment, block_range.end());
Expand Down
11 changes: 8 additions & 3 deletions crates/storage/provider/src/providers/static_file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub use writer::{StaticFileProviderRW, StaticFileProviderRWRefMut};

mod metrics;

use reth_interfaces::provider::ProviderResult;
use reth_interfaces::provider::{ProviderError, ProviderResult};
use reth_nippy_jar::NippyJar;
use reth_primitives::{static_file::SegmentHeader, StaticFileSegment};
use std::{ops::Deref, sync::Arc};
Expand All @@ -28,8 +28,13 @@ pub struct LoadedJar {

impl LoadedJar {
fn new(jar: NippyJar<SegmentHeader>) -> ProviderResult<Self> {
let mmap_handle = Arc::new(jar.open_data_reader()?);
Ok(Self { jar, mmap_handle })
match jar.open_data_reader() {
Ok(data_reader) => {
let mmap_handle = Arc::new(data_reader);
Ok(Self { jar, mmap_handle })
}
Err(e) => Err(ProviderError::NippyJar(e.to_string())),
}
}

/// Returns a clone of the mmap handle that can be used to instantiate a cursor.
Expand Down
31 changes: 23 additions & 8 deletions crates/storage/provider/src/providers/static_file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ impl StaticFileProviderRW {
block_range.start(),
None,
) {
Ok(provider) => (NippyJar::load(provider.data_path())?, provider.data_path().into()),
Ok(provider) => (
NippyJar::load(provider.data_path())
.map_err(|e| ProviderError::NippyJar(e.to_string()))?,
provider.data_path().into(),
),
Err(ProviderError::MissingStaticFileBlock(_, _)) => {
let path = static_file_provider.directory().join(segment.filename(&block_range));
(create_jar(segment, &path, block_range), path)
Expand All @@ -78,7 +82,7 @@ impl StaticFileProviderRW {
// This static file has been frozen, so we should
Err(ProviderError::FinalizedStaticFile(segment, block))
}
Err(e) => Err(e.into()),
Err(e) => Err(ProviderError::NippyJar(e.to_string())),
}?;

if let Some(metrics) = &metrics {
Expand All @@ -97,7 +101,7 @@ impl StaticFileProviderRW {
let start = Instant::now();

// Commits offsets and new user_header to disk
self.writer.commit()?;
self.writer.commit().map_err(|e| ProviderError::NippyJar(e.to_string()))?;

if let Some(metrics) = &self.metrics {
metrics.record_segment_operation(
Expand Down Expand Up @@ -128,7 +132,9 @@ impl StaticFileProviderRW {
let start = Instant::now();

// Commits offsets and new user_header to disk
self.writer.commit_without_sync_all()?;
self.writer
.commit_without_sync_all()
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;

if let Some(metrics) = &self.metrics {
metrics.record_segment_operation(
Expand Down Expand Up @@ -249,11 +255,16 @@ impl StaticFileProviderRW {
self.writer = writer;
self.data_path = data_path;

NippyJar::<SegmentHeader>::load(&previous_snap)?.delete()?;
NippyJar::<SegmentHeader>::load(&previous_snap)
.map_err(|e| ProviderError::NippyJar(e.to_string()))?
.delete()
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
} else {
// Update `SegmentHeader`
self.writer.user_header_mut().prune(len);
self.writer.prune_rows(len as usize)?;
self.writer
.prune_rows(len as usize)
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
break
}

Expand All @@ -263,7 +274,9 @@ impl StaticFileProviderRW {
self.writer.user_header_mut().prune(num_rows);

// Truncate data
self.writer.prune_rows(num_rows as usize)?;
self.writer
.prune_rows(num_rows as usize)
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
num_rows = 0;
}
}
Expand All @@ -285,7 +298,9 @@ impl StaticFileProviderRW {
self.buf.clear();
column.to_compact(&mut self.buf);

self.writer.append_column(Some(Ok(&self.buf)))?;
self.writer
.append_column(Some(Ok(&self.buf)))
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
Ok(())
}

Expand Down

0 comments on commit 024bb26

Please sign in to comment.