Skip to content

Commit

Permalink
clean up code paths
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed May 26, 2024
1 parent 35d4f85 commit 5c564bf
Show file tree
Hide file tree
Showing 10 changed files with 213 additions and 192 deletions.
17 changes: 14 additions & 3 deletions rust/lance-io/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,20 @@ impl Reader for LocalObjectReader {
}

/// Returns the file size.
async fn size(&self) -> Result<usize> {
Ok(self.file.metadata()?.len() as usize)
async fn size(&self) -> object_store::Result<usize> {
let metadata = self
.file
.metadata()
.map_err(|err| object_store::Error::Generic {
store: "LocalFileSystem",
source: err.into(),
})?;
Ok(metadata.len() as usize)
}

/// Reads a range of data.
#[instrument(level = "debug", skip(self))]
async fn get_range(&self, range: Range<usize>) -> Result<Bytes> {
async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes> {
let file = self.file.clone();
tokio::task::spawn_blocking(move || {
let mut buf = BytesMut::with_capacity(range.len());
Expand All @@ -132,6 +139,10 @@ impl Reader for LocalObjectReader {
Ok(buf.freeze())
})
.await?
.map_err(|err: std::io::Error| object_store::Error::Generic {
store: "LocalFileSystem",
source: err.into(),
})
}
}

Expand Down
8 changes: 4 additions & 4 deletions rust/lance-io/src/object_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ impl CloudObjectReader {
async fn do_with_retry<'a, O>(
&self,
f: impl Fn() -> BoxFuture<'a, std::result::Result<O, object_store::Error>>,
) -> Result<O> {
) -> object_store::Result<O> {
let mut retries = 3;
loop {
match f().await {
Ok(val) => return Ok(val),
Err(err) => {
if retries == 0 {
return Err(err.into());
return Err(err);
}
retries -= 1;
}
Expand All @@ -69,15 +69,15 @@ impl Reader for CloudObjectReader {
}

/// Object/File Size.
async fn size(&self) -> Result<usize> {
async fn size(&self) -> object_store::Result<usize> {
let meta = self
.do_with_retry(|| self.object_store.head(&self.path))
.await?;
Ok(meta.size)
}

#[instrument(level = "debug", skip(self))]
async fn get_range(&self, range: Range<usize>) -> Result<Bytes> {
async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes> {
self.do_with_retry(|| self.object_store.get_range(&self.path, range.clone()))
.await
}
Expand Down
3 changes: 2 additions & 1 deletion rust/lance-io/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ impl IoTask {
let bytes = self
.reader
.get_range(self.to_read.start as usize..self.to_read.end as usize)
.await;
.await
.map_err(Error::from);
(self.when_done)(bytes);
}
}
Expand Down
4 changes: 2 additions & 2 deletions rust/lance-io/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ pub trait Reader: std::fmt::Debug + Send + Sync {
fn block_size(&self) -> usize;

/// Object/File Size.
async fn size(&self) -> Result<usize>;
async fn size(&self) -> object_store::Result<usize>;

/// Read a range of bytes from the object.
///
/// TODO: change to read_at()?
async fn get_range(&self, range: Range<usize>) -> Result<Bytes>;
async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes>;
}
11 changes: 9 additions & 2 deletions rust/lance-io/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,15 @@ pub async fn read_struct<
T::try_from(msg)
}

pub async fn read_last_block(reader: &dyn Reader) -> Result<Bytes> {
let file_size = reader.size().await?;
pub async fn read_last_block(
reader: &dyn Reader,
size: Option<u64>,
) -> object_store::Result<Bytes> {
let file_size = if let Some(size) = size {
size as usize
} else {
reader.size().await?
};
let block_size = reader.block_size();
let begin = if file_size < block_size {
0
Expand Down
Loading

0 comments on commit 5c564bf

Please sign in to comment.