Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: optimize IO path for reading manifest #2396

Merged
merged 8 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions docs/format.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ A `Lance Dataset` is organized in a directory.

/path/to/dataset:
data/*.lance -- Data directory
latest.manifest -- The manifest file for the latest version.
_versions/*.manifest -- Manifest file for each dataset version.
_indices/{UUID-*}/index.idx -- Secondary index, each index per directory.
_deletions/*.{arrow,bin} -- Deletion files, which contain ids of rows
Expand Down Expand Up @@ -249,8 +248,7 @@ Committing Datasets
-------------------

A new version of a dataset is committed by writing a new manifest file to the
``_versions`` directory. Only after successfully committing this file should
the ``_latest.manifest`` file be updated.
``_versions`` directory.

To prevent concurrent writers from overwriting each other, the commit process
must be atomic and consistent for all writers. If two writers try to commit
Expand Down Expand Up @@ -287,7 +285,6 @@ The commit process is as follows:
conflicts, abort the commit. Otherwise, continue.
4. Build a manifest and attempt to commit it to the next version. If the commit
fails because another writer has already committed, go back to step 3.
5. If the commit succeeds, update the ``_latest.manifest`` file.

When checking whether two transactions conflict, be conservative. If the
transaction file is missing, assume it conflicts. If the transaction file
Expand Down
4 changes: 2 additions & 2 deletions rust/lance-file/src/page_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ mod tests {
.unwrap();
writer.shutdown().await.unwrap();

let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();
let actual = PageTable::load(
Expand Down Expand Up @@ -284,7 +284,7 @@ mod tests {
let mut writer = tokio::fs::File::create(&path).await.unwrap();
let res = page_table.write(&mut writer, 0).await.unwrap();

let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();

Expand Down
14 changes: 7 additions & 7 deletions rust/lance-io/src/encodings/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl<'a, T: ByteArrayType> BinaryDecoder<'a, T> {
/// use lance_io::{local::LocalObjectReader, encodings::binary::BinaryDecoder, traits::Reader};
///
/// async {
/// let reader = LocalObjectReader::open_local_path("/tmp/foo.lance", 2048).await.unwrap();
/// let reader = LocalObjectReader::open_local_path("/tmp/foo.lance", 2048, None).await.unwrap();
/// let string_decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), 100, 1024, true);
/// };
/// ```
Expand Down Expand Up @@ -494,7 +494,7 @@ mod tests {

let pos = write_test_data(&path, arrs).await.unwrap();

let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();
let read_len = arrs.iter().map(|a| a.len()).sum();
Expand Down Expand Up @@ -562,7 +562,7 @@ mod tests {
let pos = encoder.encode(&[&data]).await.unwrap();
object_writer.shutdown().await.unwrap();

let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();
let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
Expand Down Expand Up @@ -605,7 +605,7 @@ mod tests {
let path = temp_dir.path().join("foo");

let pos = write_test_data(&path, &[&data]).await.unwrap();
let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();
let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
Expand All @@ -627,7 +627,7 @@ mod tests {
let temp_dir = tempfile::tempdir().unwrap();
let path = temp_dir.path().join("foo");
let pos = write_test_data(&path, &[&data]).await.unwrap();
let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();
let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
Expand Down Expand Up @@ -658,7 +658,7 @@ mod tests {
let path = temp_dir.path().join("foo");
let pos = write_test_data(&path, &[&data]).await.unwrap();

let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();
let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
Expand Down Expand Up @@ -738,7 +738,7 @@ mod tests {
pos
};

let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();
let decoder = BinaryDecoder::<BinaryType>::new(reader.as_ref(), pos, data.len(), true);
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-io/src/encodings/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ mod tests {
object_writer.shutdown().await.unwrap();
}

let reader = LocalObjectReader::open_local_path(&path, 2048)
let reader = LocalObjectReader::open_local_path(&path, 2048, None)
.await
.unwrap();
let decoder = DictionaryDecoder::new(
Expand Down
6 changes: 3 additions & 3 deletions rust/lance-io/src/encodings/plain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ mod tests {
writer.flush().await.unwrap();
}

let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();
assert!(reader.size().await.unwrap() > 0);
Expand Down Expand Up @@ -705,7 +705,7 @@ mod tests {
writer.flush().await.unwrap();
}

let reader = LocalObjectReader::open_local_path(&path, 2048)
let reader = LocalObjectReader::open_local_path(&path, 2048, None)
.await
.unwrap();
assert!(reader.size().await.unwrap() > 0);
Expand Down Expand Up @@ -753,7 +753,7 @@ mod tests {
writer.shutdown().await.unwrap();
}

let reader = LocalObjectReader::open_local_path(&path, 2048)
let reader = LocalObjectReader::open_local_path(&path, 2048, None)
.await
.unwrap();
assert!(reader.size().await.unwrap() > 0);
Expand Down
47 changes: 34 additions & 13 deletions rust/lance-io/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use lance_core::{Error, Result};
use object_store::path::Path;
use snafu::{location, Location};
use tokio::io::AsyncSeekExt;
use tokio::sync::OnceCell;
use tracing::instrument;

use crate::traits::{Reader, Writer};
Expand Down Expand Up @@ -55,6 +56,10 @@ pub struct LocalObjectReader {
/// Fie path.
path: Path,

/// Known size of the file. This is either passed in on construction or
/// cached on the first metadata call.
size: OnceCell<usize>,

/// Block size, in bytes.
block_size: usize,
}
Expand All @@ -63,23 +68,20 @@ impl LocalObjectReader {
pub async fn open_local_path(
path: impl AsRef<std::path::Path>,
block_size: usize,
known_size: Option<usize>,
) -> Result<Box<dyn Reader>> {
let path = path.as_ref().to_owned();
let object_store_path = Path::from_filesystem_path(&path)?;
tokio::task::spawn_blocking(move || {
let local_file = File::open(&path)?;
Ok(Box::new(Self {
file: Arc::new(local_file),
path: object_store_path,
block_size,
}) as Box<dyn Reader>)
})
.await?
Self::open(&object_store_path, block_size, known_size).await
}

/// Open a local object reader, with default prefetch size.
#[instrument(level = "debug")]
pub async fn open(path: &Path, block_size: usize) -> Result<Box<dyn Reader>> {
pub async fn open(
path: &Path,
block_size: usize,
known_size: Option<usize>,
) -> Result<Box<dyn Reader>> {
let path = path.clone();
let local_path = to_local_path(&path);
tokio::task::spawn_blocking(move || {
Expand All @@ -90,9 +92,11 @@ impl LocalObjectReader {
},
_ => e.into(),
})?;
let size = OnceCell::new_with(known_size);
Ok(Box::new(Self {
file: Arc::new(file),
block_size,
size,
path: path.clone(),
}) as Box<dyn Reader>)
})
Expand All @@ -111,13 +115,26 @@ impl Reader for LocalObjectReader {
}

/// Returns the file size.
async fn size(&self) -> Result<usize> {
Ok(self.file.metadata()?.len() as usize)
async fn size(&self) -> object_store::Result<usize> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you! for object_store::Result

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still hate the error handling overall. Will refactor it later.

let file = self.file.clone();
self.size
.get_or_try_init(|| async move {
let metadata = tokio::task::spawn_blocking(move || {
file.metadata().map_err(|err| object_store::Error::Generic {
store: "LocalFileSystem",
source: err.into(),
})
})
.await??;
Ok(metadata.len() as usize)
})
.await
.cloned()
}

/// Reads a range of data.
#[instrument(level = "debug", skip(self))]
async fn get_range(&self, range: Range<usize>) -> Result<Bytes> {
async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes> {
let file = self.file.clone();
tokio::task::spawn_blocking(move || {
let mut buf = BytesMut::with_capacity(range.len());
Expand All @@ -132,6 +149,10 @@ impl Reader for LocalObjectReader {
Ok(buf.freeze())
})
.await?
.map_err(|err: std::io::Error| object_store::Error::Generic {
store: "LocalFileSystem",
source: err.into(),
})
}
}

Expand Down
32 changes: 23 additions & 9 deletions rust/lance-io/src/object_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use bytes::Bytes;
use futures::future::BoxFuture;
use lance_core::Result;
use object_store::{path::Path, ObjectStore};
use tokio::sync::OnceCell;
use tracing::instrument;

use crate::traits::Reader;
Expand All @@ -22,16 +23,24 @@ pub struct CloudObjectReader {
pub object_store: Arc<dyn ObjectStore>,
// File path
pub path: Path,
// File size, if known.
size: OnceCell<usize>,

block_size: usize,
}

impl CloudObjectReader {
/// Create an ObjectReader from URI
pub fn new(object_store: Arc<dyn ObjectStore>, path: Path, block_size: usize) -> Result<Self> {
pub fn new(
object_store: Arc<dyn ObjectStore>,
path: Path,
block_size: usize,
known_size: Option<usize>,
) -> Result<Self> {
Ok(Self {
object_store,
path,
size: OnceCell::new_with(known_size),
block_size,
})
}
Expand All @@ -42,14 +51,14 @@ impl CloudObjectReader {
async fn do_with_retry<'a, O>(
&self,
f: impl Fn() -> BoxFuture<'a, std::result::Result<O, object_store::Error>>,
) -> Result<O> {
) -> object_store::Result<O> {
let mut retries = 3;
loop {
match f().await {
Ok(val) => return Ok(val),
Err(err) => {
if retries == 0 {
return Err(err.into());
return Err(err);
}
retries -= 1;
}
Expand All @@ -69,15 +78,20 @@ impl Reader for CloudObjectReader {
}

/// Object/File Size.
async fn size(&self) -> Result<usize> {
let meta = self
.do_with_retry(|| self.object_store.head(&self.path))
.await?;
Ok(meta.size)
async fn size(&self) -> object_store::Result<usize> {
self.size
.get_or_try_init(|| async move {
let meta = self
.do_with_retry(|| self.object_store.head(&self.path))
.await?;
Ok(meta.size)
})
.await
.cloned()
}

#[instrument(level = "debug", skip(self))]
async fn get_range(&self, range: Range<usize>) -> Result<Bytes> {
async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes> {
self.do_with_retry(|| self.object_store.get_range(&self.path, range.clone()))
.await
}
Expand Down
20 changes: 19 additions & 1 deletion rust/lance-io/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,11 +440,29 @@ impl ObjectStore {
/// - ``path``: Absolute path to the file.
pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
match self.scheme.as_str() {
"file" => LocalObjectReader::open(path, self.block_size).await,
"file" => LocalObjectReader::open(path, self.block_size, None).await,
_ => Ok(Box::new(CloudObjectReader::new(
self.inner.clone(),
path.clone(),
self.block_size,
None,
)?)),
}
}

/// Open a reader for a file with known size.
///
/// This size may either have been retrieved from a list operation or
/// cached metadata. By passing in the known size, we can skip a HEAD / metadata
/// call.
pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
match self.scheme.as_str() {
"file" => LocalObjectReader::open(path, self.block_size, Some(known_size)).await,
_ => Ok(Box::new(CloudObjectReader::new(
self.inner.clone(),
path.clone(),
self.block_size,
Some(known_size),
)?)),
}
}
Expand Down
3 changes: 2 additions & 1 deletion rust/lance-io/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ impl IoTask {
let bytes = self
.reader
.get_range(self.to_read.start as usize..self.to_read.end as usize)
.await;
.await
.map_err(Error::from);
(self.when_done)(bytes);
}
}
Expand Down
4 changes: 2 additions & 2 deletions rust/lance-io/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ pub trait Reader: std::fmt::Debug + Send + Sync {
fn block_size(&self) -> usize;

/// Object/File Size.
async fn size(&self) -> Result<usize>;
async fn size(&self) -> object_store::Result<usize>;

/// Read a range of bytes from the object.
///
/// TODO: change to read_at()?
async fn get_range(&self, range: Range<usize>) -> Result<Bytes>;
async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes>;
}
Loading
Loading