Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ jobs:
- nightly
defaults:
run:
working-directory: ./rust/lance
working-directory: ./rust
steps:
- uses: actions/checkout@v4
- uses: Swatinem/rust-cache@v2
Expand All @@ -200,7 +200,7 @@ jobs:
runs-on: windows-latest
defaults:
run:
working-directory: rust/lance
working-directory: rust
steps:
- uses: actions/checkout@v4
- uses: Swatinem/rust-cache@v2
Expand Down
3 changes: 1 addition & 2 deletions java/core/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ impl BlockingDataset {
read_version: Option<u64>,
storage_options: HashMap<String, String>,
) -> Result<Self> {
let object_store_registry = Arc::new(ObjectStoreRegistry::default());
let inner = RT.block_on(Dataset::commit(
uri,
operation,
Expand All @@ -126,7 +125,7 @@ impl BlockingDataset {
..Default::default()
}),
None,
object_store_registry,
Default::default(),
false, // TODO: support enable_v2_manifest_paths
))?;
Ok(Self { inner })
Expand Down
1 change: 0 additions & 1 deletion java/core/lance-jni/src/file_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ fn inner_open<'local>(env: &mut JNIEnv<'local>, file_uri: JString) -> Result<JOb

let reader = RT.block_on(async move {
let (obj_store, path) = ObjectStore::from_uri(&file_uri_str).await?;
let obj_store = Arc::new(obj_store);
let config = SchedulerConfig::max_bandwidth(&obj_store);
let scan_scheduler = ScanScheduler::new(obj_store, config);

Expand Down
6 changes: 1 addition & 5 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2196,11 +2196,7 @@ impl PyFullTextQuery {

#[staticmethod]
#[pyo3(signature = (positive, negative,negative_boost=None))]
fn boost_query(
positive: PyFullTextQuery,
negative: PyFullTextQuery,
negative_boost: Option<f32>,
) -> PyResult<Self> {
fn boost_query(positive: Self, negative: Self, negative_boost: Option<f32>) -> PyResult<Self> {
Ok(Self {
inner: BoostQuery::new(positive.inner, negative.inner, negative_boost).into(),
})
Expand Down
8 changes: 4 additions & 4 deletions python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ fn path_to_parent(path: &Path) -> PyResult<(Path, String)> {

pub async fn object_store_from_uri_or_path_no_options(
uri_or_path: impl AsRef<str>,
) -> PyResult<(ObjectStore, Path)> {
) -> PyResult<(Arc<ObjectStore>, Path)> {
object_store_from_uri_or_path(uri_or_path, None).await
}

Expand All @@ -344,7 +344,7 @@ pub async fn object_store_from_uri_or_path_no_options(
pub async fn object_store_from_uri_or_path(
uri_or_path: impl AsRef<str>,
storage_options: Option<HashMap<String, String>>,
) -> PyResult<(ObjectStore, Path)> {
) -> PyResult<(Arc<ObjectStore>, Path)> {
if let Ok(mut url) = Url::parse(uri_or_path.as_ref()) {
if url.scheme().len() > 1 {
let path = object_store::path::Path::parse(url.path()).map_err(|e| {
Expand Down Expand Up @@ -376,7 +376,7 @@ pub async fn object_store_from_uri_or_path(
let path = Path::parse(uri_or_path.as_ref()).map_err(|e| {
PyIOError::new_err(format!("Invalid path `{}`: {}", uri_or_path.as_ref(), e))
})?;
let object_store = ObjectStore::local();
let object_store = Arc::new(ObjectStore::local());
Ok((object_store, path))
}

Expand All @@ -393,7 +393,7 @@ impl LanceFileReader {
let (object_store, path) =
object_store_from_uri_or_path(uri_or_path, storage_options).await?;
let scheduler = ScanScheduler::new(
Arc::new(object_store),
object_store,
SchedulerConfig {
io_buffer_size_bytes: 2 * 1024 * 1024 * 1024,
},
Expand Down
1 change: 1 addition & 0 deletions python/src/indices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ pub fn transform_vectors(
)?
}

#[allow(deprecated)]
async fn do_shuffle_transformed_vectors(
unsorted_filenames: Vec<String>,
dir_path: &str,
Expand Down
23 changes: 14 additions & 9 deletions rust/lance-file/benches/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@ fn bench_reader(c: &mut Criterion) {

let tempdir = tempfile::tempdir().unwrap();
let test_path = tempdir.path();
let (object_store, base_path) =
ObjectStore::from_path(test_path.as_os_str().to_str().unwrap()).unwrap();
let (object_store, base_path) = rt
.block_on(ObjectStore::from_uri(
test_path.as_os_str().to_str().unwrap(),
))
.unwrap();

let file_path = base_path.child("foo.lance");
let object_writer = rt.block_on(object_store.create(&file_path)).unwrap();

Expand All @@ -59,7 +63,7 @@ fn bench_reader(c: &mut Criterion) {
let data = &data;
rt.block_on(async move {
let store_scheduler = ScanScheduler::new(
Arc::new(object_store.clone()),
object_store.clone(),
SchedulerConfig::default_for_testing(),
);
let scheduler = store_scheduler.open_file(file_path).await.unwrap();
Expand Down Expand Up @@ -125,8 +129,11 @@ fn bench_random_access(c: &mut Criterion) {

let tempdir = tempfile::tempdir().unwrap();
let test_path = tempdir.path();
let (object_store, base_path) =
ObjectStore::from_path(test_path.as_os_str().to_str().unwrap()).unwrap();
let (object_store, base_path) = rt
.block_on(ObjectStore::from_uri(
test_path.as_os_str().to_str().unwrap(),
))
.unwrap();
let file_path = base_path.child("foo.lance");
let object_writer = rt.block_on(object_store.create(&file_path)).unwrap();

Expand All @@ -150,10 +157,8 @@ fn bench_random_access(c: &mut Criterion) {
let object_store = &object_store;
let file_path = &file_path;
let reader = rt.block_on(async move {
let store_scheduler = ScanScheduler::new(
Arc::new(object_store.clone()),
SchedulerConfig::default_for_testing(),
);
let store_scheduler =
ScanScheduler::new(object_store.clone(), SchedulerConfig::default_for_testing());
let scheduler = store_scheduler.open_file(file_path).await.unwrap();
Arc::new(
FileReader::try_open(
Expand Down
14 changes: 12 additions & 2 deletions rust/lance-file/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,7 @@ mod tests {
};
use arrow_array::{BooleanArray, Int32Array};
use arrow_schema::{Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema};
use lance_io::object_store::ObjectStoreParams;

#[tokio::test]
async fn test_take() {
Expand Down Expand Up @@ -1364,8 +1365,17 @@ mod tests {

#[tokio::test]
async fn test_take_boolean_beyond_chunk() {
let mut store = ObjectStore::memory();
store.set_block_size(256);
let store = ObjectStore::from_uri_and_params(
Arc::new(Default::default()),
"memory://",
&ObjectStoreParams {
block_size: Some(256),
..Default::default()
},
)
.await
.unwrap()
.0;
let path = Path::from("/take_bools");

let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/benches/inverted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fn bench_inverted(c: &mut Criterion) {
let index_dir = Path::from_filesystem_path(tempdir.path()).unwrap();
let store = rt.block_on(async {
Arc::new(LanceIndexStore::new(
ObjectStore::local(),
Arc::new(ObjectStore::local()),
index_dir,
FileMetadataCache::no_cache(),
))
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/benches/ngram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fn bench_ngram(c: &mut Criterion) {
let index_dir = Path::from_filesystem_path(tempdir.path()).unwrap();
let store = rt.block_on(async {
Arc::new(LanceIndexStore::new(
ObjectStore::local(),
Arc::new(ObjectStore::local()),
index_dir,
FileMetadataCache::no_cache(),
))
Expand Down
6 changes: 3 additions & 3 deletions rust/lance-index/src/scalar/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1422,7 +1422,7 @@ mod tests {
async fn test_null_ids() {
let tmpdir = Arc::new(tempdir().unwrap());
let test_store = Arc::new(LanceIndexStore::new(
ObjectStore::local(),
Arc::new(ObjectStore::local()),
Path::from_filesystem_path(tmpdir.path()).unwrap(),
FileMetadataCache::no_cache(),
));
Expand Down Expand Up @@ -1453,7 +1453,7 @@ mod tests {

let remap_dir = Arc::new(tempdir().unwrap());
let remap_store = Arc::new(LanceIndexStore::new(
ObjectStore::local(),
Arc::new(ObjectStore::local()),
Path::from_filesystem_path(remap_dir.path()).unwrap(),
FileMetadataCache::no_cache(),
));
Expand Down Expand Up @@ -1489,7 +1489,7 @@ mod tests {
async fn test_nan_ordering() {
let tmpdir = Arc::new(tempdir().unwrap());
let test_store = Arc::new(LanceIndexStore::new(
ObjectStore::local(),
Arc::new(ObjectStore::local()),
Path::from_filesystem_path(tmpdir.path()).unwrap(),
FileMetadataCache::no_cache(),
));
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/src/scalar/inverted/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ impl IndexWorker {
async fn new(existing_tokens: HashMap<String, u32>, with_position: bool) -> Result<Self> {
let tmpdir = tempdir()?;
let store = Arc::new(LanceIndexStore::new(
ObjectStore::local(),
Arc::new(ObjectStore::local()),
Path::from_filesystem_path(tmpdir.path())?,
FileMetadataCache::no_cache(),
));
Expand Down
9 changes: 6 additions & 3 deletions rust/lance-index/src/scalar/lance_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,10 @@ impl DeepSizeOf for LanceIndexStore {
impl LanceIndexStore {
/// Create a new index store at the given directory
pub fn new(
object_store: ObjectStore,
object_store: Arc<ObjectStore>,
index_dir: Path,
metadata_cache: FileMetadataCache,
) -> Self {
let object_store = Arc::new(object_store);
let scheduler = ScanScheduler::new(
object_store.clone(),
SchedulerConfig::max_bandwidth(&object_store),
Expand Down Expand Up @@ -319,14 +318,18 @@ pub mod tests {
use arrow_select::take::TakeOptions;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_common::ScalarValue;
use futures::FutureExt;
use lance_core::{cache::CapacityMode, utils::mask::RowIdTreeMap};
use lance_datagen::{array, gen, ArrayGeneratorExt, BatchCount, ByteCount, RowCount};
use tempfile::{tempdir, TempDir};

fn test_store(tempdir: &TempDir) -> Arc<dyn IndexStore> {
let test_path: &Path = tempdir.path();
let (object_store, test_path) =
ObjectStore::from_path(test_path.as_os_str().to_str().unwrap()).unwrap();
ObjectStore::from_uri(test_path.as_os_str().to_str().unwrap())
.now_or_never()
.unwrap()
.unwrap();
let cache = FileMetadataCache::with_capacity(128 * 1024 * 1024, CapacityMode::Bytes);
Arc::new(LanceIndexStore::new(object_store, test_path, cache))
}
Expand Down
10 changes: 5 additions & 5 deletions rust/lance-index/src/scalar/ngram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ impl NGramIndexBuilder {

let tmpdir = Arc::new(tempdir()?);
let spill_store = Arc::new(LanceIndexStore::new(
ObjectStore::local(),
Arc::new(ObjectStore::local()),
Path::from_filesystem_path(tmpdir.path())?,
FileMetadataCache::no_cache(),
));
Expand Down Expand Up @@ -1249,7 +1249,7 @@ mod tests {

let tmpdir = Arc::new(tempdir().unwrap());
let test_store = LanceIndexStore::new(
ObjectStore::local(),
Arc::new(ObjectStore::local()),
Path::from_filesystem_path(tmpdir.path()).unwrap(),
FileMetadataCache::no_cache(),
);
Expand Down Expand Up @@ -1453,7 +1453,7 @@ mod tests {

let new_tmpdir = Arc::new(tempdir().unwrap());
let test_store = Arc::new(LanceIndexStore::new(
ObjectStore::local(),
Arc::new(ObjectStore::local()),
Path::from_filesystem_path(new_tmpdir.path()).unwrap(),
FileMetadataCache::no_cache(),
));
Expand Down Expand Up @@ -1488,7 +1488,7 @@ mod tests {

let new_tmpdir = Arc::new(tempdir().unwrap());
let test_store = Arc::new(LanceIndexStore::new(
ObjectStore::local(),
Arc::new(ObjectStore::local()),
Path::from_filesystem_path(new_tmpdir.path()).unwrap(),
FileMetadataCache::no_cache(),
));
Expand Down Expand Up @@ -1528,7 +1528,7 @@ mod tests {

let new_tmpdir = Arc::new(tempdir().unwrap());
let test_store = Arc::new(LanceIndexStore::new(
ObjectStore::local(),
Arc::new(ObjectStore::local()),
Path::from_filesystem_path(new_tmpdir.path()).unwrap(),
FileMetadataCache::no_cache(),
));
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-io/benches/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async fn create_data(num_bytes: u64) -> (Arc<ObjectStore>, Path) {
rand::thread_rng().fill_bytes(&mut some_data);
obj_store.put(&tmp_file, &some_data).await.unwrap();

(Arc::new(obj_store), tmp_file)
(obj_store, tmp_file)
}

const DATA_SIZE: u64 = 128 * 1024 * 1024;
Expand Down
Loading
Loading