Skip to content

Commit

Permalink
specify errors for immutable storage
Browse files Browse the repository at this point in the history
  • Loading branch information
Mr-Leshiy committed May 8, 2024
1 parent c992da0 commit 8c68863
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 99 deletions.
53 changes: 27 additions & 26 deletions pallas-hardano/src/storage/immutable/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,52 +15,59 @@ pub type SecondaryEntry = super::secondary::Entry;
pub struct Reader {
inner: BufReader<File>,
index: SecondaryIndex,
current: Option<Result<SecondaryEntry, std::io::Error>>,
next: Option<Result<SecondaryEntry, std::io::Error>>,
current: Option<Result<SecondaryEntry, secondary::Error>>,
next: Option<Result<SecondaryEntry, secondary::Error>>,
}

#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Cannot open chunk file, error: {0}")]
CannotOpenChunkFile(std::io::Error),
#[error("Cannot read block, error: {0}")]
CannotReadBlock(std::io::Error),
#[error(transparent)]
SecondaryIndexError(secondary::Error),
}

impl Reader {
fn open(mut index: SecondaryIndex, chunks: File) -> Result<Self, std::io::Error> {
fn open(mut index: SecondaryIndex, chunks: File) -> Self {
let inner = BufReader::new(chunks);

let current = index.next();
let next = index.next();

Ok(Self {
Self {
inner,
index,
current,
next,
})
}
}

fn read_middle_block(
file: &mut BufReader<File>,
next_offset: u64,
) -> Result<Vec<u8>, std::io::Error> {
let start = file.stream_position()?;
fn read_middle_block(file: &mut BufReader<File>, next_offset: u64) -> Result<Vec<u8>, Error> {
let start = file.stream_position().map_err(Error::CannotReadBlock)?;
let delta = next_offset - start;
trace!(start, delta, "reading chunk middle block");

let mut buf = vec![0u8; delta as usize];
file.read_exact(&mut buf)?;
file.read_exact(&mut buf).map_err(Error::CannotReadBlock)?;

Ok(buf)
}

fn read_last_block(file: &mut BufReader<File>) -> Result<Vec<u8>, std::io::Error> {
let start = file.stream_position()?;
fn read_last_block(file: &mut BufReader<File>) -> Result<Vec<u8>, Error> {
let start = file.stream_position().map_err(Error::CannotReadBlock)?;
trace!(start, "reading chunk last block");

let mut buf = vec![];
file.read_to_end(&mut buf)?;
file.read_to_end(&mut buf).map_err(Error::CannotReadBlock)?;

Ok(buf)
}
}

impl Iterator for Reader {
type Item = Result<Vec<u8>, std::io::Error>;
type Item = Result<Vec<u8>, Error>;

fn next(&mut self) -> Option<Self::Item> {
match (self.current.take(), self.next.take()) {
Expand All @@ -69,7 +76,7 @@ impl Iterator for Reader {
self.current = None;
self.next = None;

Some(Err(next))
Some(Err(Error::SecondaryIndexError(next)))
}
(Some(_), Some(Ok(next))) => {
let block = Self::read_middle_block(&mut self.inner, next.block_offset);
Expand All @@ -91,18 +98,12 @@ impl Iterator for Reader {
}
}

pub fn read_blocks(dir: &Path, name: &str) -> Result<Reader, std::io::Error> {
let primary = dir.join(name).with_extension("primary");
let primary = std::fs::File::open(primary)?;
let primary = immutable::primary::Reader::open(primary)?;

let secondary = dir.join(name).with_extension("secondary");
let secondary = std::fs::File::open(secondary)?;
let secondary = secondary::Reader::open(primary, secondary)?;
pub fn read_blocks(dir: &Path, name: &str) -> Result<Reader, Error> {
let secondary = secondary::read_entries(dir, name).map_err(Error::SecondaryIndexError)?;

let chunk = dir.join(name).with_extension("chunk");
let chunk = std::fs::File::open(chunk)?;
Reader::open(secondary, chunk)
let chunk = std::fs::File::open(chunk).map_err(Error::CannotOpenChunkFile)?;
Ok(Reader::open(secondary, chunk))
}

#[cfg(test)]
Expand Down
84 changes: 46 additions & 38 deletions pallas-hardano/src/storage/immutable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::{

use pallas_traverse::MultiEraBlock;
use tap::Tap;
use thiserror::Error;
use tracing::debug;

pub mod chunk;
Expand All @@ -17,13 +16,18 @@ pub mod secondary;
// `network`.
pub type Point = pallas_network::miniprotocols::Point;

#[derive(Debug, Error, PartialEq, Eq)]
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Cannot find block by the provided point: {0:?}")]
CannotFindBlock(Point),

#[error("Origin block is missing, provided truncated chain data")]
OriginMissing,
#[error("Cannot read directory, error: {0}")]
CannotReadDir(std::io::Error),
#[error("Cannot decode block, error: {0}")]
CannotDecodeBlock(pallas_traverse::Error),
#[error(transparent)]
ChunkReadError(chunk::Error),
}

/// Performs a binary search of the given sorted chunks in descending order
Expand All @@ -39,8 +43,8 @@ pub enum Error {
fn chunk_binary_search<ChunkT, PointT>(
chunks: &[ChunkT],
point: &PointT,
cmp: impl Fn(&ChunkT, &PointT) -> Result<Ordering, Box<dyn std::error::Error>>,
) -> Result<Option<usize>, Box<dyn std::error::Error>> {
cmp: impl Fn(&ChunkT, &PointT) -> Result<Ordering, Error>,
) -> Result<Option<usize>, Error> {
let mut size = chunks.len();
let mut left = 0;
let mut right: usize = size;
Expand Down Expand Up @@ -75,20 +79,21 @@ fn iterate_till_point(
iter: impl Iterator<Item = FallibleBlock>,
slot: u64,
block_hash: &[u8],
) -> Result<impl Iterator<Item = FallibleBlock>, Box<dyn std::error::Error>> {
) -> Result<impl Iterator<Item = FallibleBlock>, Error> {
let mut iter = iter.peekable();
match iter.peek() {
Some(Ok(block_data)) => {
let mut block_data = block_data.clone();
let mut block = MultiEraBlock::decode(&block_data)?;
let mut block = MultiEraBlock::decode(&block_data).map_err(Error::CannotDecodeBlock)?;

while block.slot() < slot {
iter.next();

match iter.peek() {
Some(Ok(data)) => {
block_data = data.clone();
block = MultiEraBlock::decode(&block_data)?;
block =
MultiEraBlock::decode(&block_data).map_err(Error::CannotDecodeBlock)?;
}
Some(Err(_)) | None => return Ok(iter),
}
Expand All @@ -104,8 +109,9 @@ fn iterate_till_point(
}
}

fn build_stack_of_chunk_names(dir: &Path) -> Result<ChunkNameSack, std::io::Error> {
let mut chunks = std::fs::read_dir(dir)?
fn build_stack_of_chunk_names(dir: &Path) -> Result<ChunkNameSack, Error> {
let mut chunks = std::fs::read_dir(dir)
.map_err(Error::CannotReadDir)?
.map_while(|e| e.ok())
.filter(|e| {
e.path()
Expand All @@ -131,7 +137,7 @@ pub type ChunkNameSack = Vec<ChunkName>;
pub struct ChunkReaders(PathBuf, ChunkNameSack);

impl Iterator for ChunkReaders {
type Item = Result<chunk::Reader, std::io::Error>;
type Item = Result<chunk::Reader, chunk::Error>;

fn next(&mut self) -> Option<Self::Item> {
self.1
Expand All @@ -141,9 +147,9 @@ impl Iterator for ChunkReaders {
}
}

pub type FallibleBlock = Result<Block, std::io::Error>;
pub type FallibleBlock = Result<Block, chunk::Error>;

pub fn read_blocks(dir: &Path) -> Result<impl Iterator<Item = FallibleBlock>, std::io::Error> {
pub fn read_blocks(dir: &Path) -> Result<impl Iterator<Item = FallibleBlock>, Error> {
let names = build_stack_of_chunk_names(dir)?;

let iter = ChunkReaders(dir.to_owned(), names)
Expand All @@ -162,8 +168,9 @@ pub fn read_blocks(dir: &Path) -> Result<impl Iterator<Item = FallibleBlock>, st
/// genesis block.
/// * `Error::CannotFindBlock` - If the specific block indicated by the `Point`
/// value is not found.
/// * `std::io::Error` - If an I/O error occurs.
/// * `pallas_traverse::Error` - If the block cannot be decoded.
/// * `Error::CannotReadDir` - If the directory cannot be read.
/// * `Error::ChunkReadError` - Chunk read error.
/// * `Error::CannotDecodeBlock` - If the block cannot be decoded.
///
/// # Example
///
Expand Down Expand Up @@ -200,7 +207,7 @@ pub fn read_blocks(dir: &Path) -> Result<impl Iterator<Item = FallibleBlock>, st
pub fn read_blocks_from_point(
dir: &Path,
point: Point,
) -> Result<Box<dyn Iterator<Item = FallibleBlock> + Send + Sync>, Box<dyn std::error::Error>> {
) -> Result<Box<dyn Iterator<Item = FallibleBlock> + Send + Sync>, Error> {
let names = build_stack_of_chunk_names(dir)?;

match point {
Expand All @@ -214,7 +221,8 @@ pub fn read_blocks_from_point(
// check the first block
match iter.peek() {
Some(Ok(block_data)) => {
let block = MultiEraBlock::decode(block_data)?;
let block =
MultiEraBlock::decode(block_data).map_err(Error::CannotDecodeBlock)?;
// check that the first block is genesis
if block.slot() == 0 && block.number() == 0 {
Ok(Box::new(iter))
Expand All @@ -232,13 +240,15 @@ pub fn read_blocks_from_point(
// and compares block's slot with provided slot number
let cmp = {
|chunk_name: &String, point: &u64| {
let mut blocks = chunk::read_blocks(dir, chunk_name)?;
let mut blocks =
chunk::read_blocks(dir, chunk_name).map_err(Error::ChunkReadError)?;

// Try to read the first block from the chunk
if let Some(block_data) = blocks.next() {
let block_data = block_data?;
let block_data = block_data.map_err(Error::ChunkReadError)?;

let block = MultiEraBlock::decode(&block_data)?;
let block =
MultiEraBlock::decode(&block_data).map_err(Error::CannotDecodeBlock)?;
Ok(block.slot().cmp(point))
} else {
Ok(Ordering::Greater)
Expand All @@ -250,9 +260,10 @@ pub fn read_blocks_from_point(
// index.
let names = chunk_binary_search(&names, &slot, cmp)?
.map(|chunk_index| names[..chunk_index + 1].to_vec())
.ok_or::<Box<dyn std::error::Error>>(
Error::CannotFindBlock(Point::Specific(slot, block_hash.clone())).into(),
)?;
.ok_or(Error::CannotFindBlock(Point::Specific(
slot,
block_hash.clone(),
)))?;

let iter = ChunkReaders(dir.to_owned(), names.clone())
.map_while(Result::ok)
Expand All @@ -273,8 +284,9 @@ pub fn read_blocks_from_point(
///
/// # Errors
///
/// * `std::io::Error` - If an I/O error occurs.
/// * `pallas_traverse::Error` - If the block cannot be decoded.
/// * `Error::CannotReadDir` - If the directory cannot be read.
/// * `Error::ChunkReadError` - Chunk read error.
/// * `Error::CannotDecodeBlock` - If the block cannot be decoded.
///
/// # Example
///
Expand All @@ -299,19 +311,19 @@ pub fn read_blocks_from_point(
/// Ok(())
/// }
/// ```
pub fn get_tip(dir: &Path) -> Result<Option<Point>, Box<dyn std::error::Error>> {
pub fn get_tip(dir: &Path) -> Result<Option<Point>, Error> {
match build_stack_of_chunk_names(dir)?.into_iter().next() {
Some(name) => {
let tip_point = ChunkReaders(dir.to_owned(), vec![name])
.map_while(Result::ok)
.flatten()
.last()
.transpose()?
.transpose().map_err(Error::ChunkReadError)?
.map(|tip_data| {
MultiEraBlock::decode(&tip_data)
.map(|block| Point::Specific(block.slot(), block.hash().to_vec()))
})
.transpose()?;
.transpose().map_err(Error::CannotDecodeBlock)?;
Ok(tip_point)
}
None => Ok(None),
Expand Down Expand Up @@ -507,25 +519,21 @@ mod tests {
assert_eq!(Point::Specific(block.slot(), block.hash().to_vec()), point);

// Try to read an origin block
assert_eq!(
assert!(matches!(
read_blocks_from_point(Path::new("../test_data"), Point::Origin)
.err()
.unwrap()
.downcast::<super::Error>()
.unwrap(),
Box::new(super::Error::OriginMissing)
);
super::Error::OriginMissing
));

// Try to read from a non existing point
let point = Point::Specific(0, vec![]);
assert_eq!(
assert!(matches!(
read_blocks_from_point(Path::new("../test_data"), point.clone())
.err()
.unwrap()
.downcast::<super::Error>()
.unwrap(),
Box::new(super::Error::CannotFindBlock(point))
);
super::Error::CannotFindBlock(_)
));
}

#[test]
Expand Down

0 comments on commit 8c68863

Please sign in to comment.