Skip to content

Commit

Permalink
wip: start upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Dec 15, 2023
1 parent e41a24c commit 6f082c3
Show file tree
Hide file tree
Showing 26 changed files with 258 additions and 275 deletions.
4 changes: 2 additions & 2 deletions integration/duckdb_lance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ lance = { path = "../../rust/lance" }
duckdb-ext = { path = "./duckdb-ext" }
lazy_static = "1.4.0"
tokio = { version = "1.23", features = ["rt-multi-thread"] }
arrow-schema = "47.0.0"
arrow-array = "47.0.0"
arrow-schema = "49.0.0"
arrow-array = "49.0.0"
futures = "0.3"
num-traits = "0.2"

Expand Down
12 changes: 6 additions & 6 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ name = "lance"
crate-type = ["cdylib"]

[dependencies]
arrow = { version = "47.0.0", features = ["pyarrow"] }
arrow-array = "47.0"
arrow-data = "47.0"
arrow-schema = "47.0"
object_store = "0.7.1"
arrow = { version = "49.0.0", features = ["pyarrow"] }
arrow-array = "49.0"
arrow-data = "49.0"
arrow-schema = "49.0"
object_store = "0.8.0"
async-trait = "0.1"
chrono = "0.4.31"
env_logger = "0.10"
Expand All @@ -30,7 +30,7 @@ lance-linalg = { path = "../rust/lance-linalg" }
lazy_static = "1"
log = "0.4"
prost = "0.12"
pyo3 = { version = "0.19", features = ["extension-module", "abi3-py38"] }
pyo3 = { version = "0.20", features = ["extension-module", "abi3-py38"] }
tokio = { version = "1.23", features = ["rt-multi-thread"] }
uuid = "1.3.0"
serde_json = "1"
Expand Down
71 changes: 36 additions & 35 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,17 +343,17 @@ impl Dataset {

if let Some(nearest) = nearest {
let column = nearest
.get_item("column")
.get_item("column")?
.ok_or_else(|| PyKeyError::new_err("Need column for nearest"))?
.to_string();

let qval = nearest
.get_item("q")
.get_item("q")?
.ok_or_else(|| PyKeyError::new_err("Need q for nearest"))?;
let data = ArrayData::from_pyarrow(qval)?;
let q = Float32Array::from(data);

let k: usize = if let Some(k) = nearest.get_item("k") {
let k: usize = if let Some(k) = nearest.get_item("k")? {
if k.is_none() {
// Use limit if k is not specified, default to 10.
limit.unwrap_or(10) as usize
Expand All @@ -364,7 +364,7 @@ impl Dataset {
10
};

let nprobes: usize = if let Some(nprobes) = nearest.get_item("nprobes") {
let nprobes: usize = if let Some(nprobes) = nearest.get_item("nprobes")? {
if nprobes.is_none() {
DEFAULT_NPROBS
} else {
Expand All @@ -374,23 +374,24 @@ impl Dataset {
DEFAULT_NPROBS
};

let metric_type: Option<MetricType> = if let Some(metric) = nearest.get_item("metric") {
if metric.is_none() {
None
let metric_type: Option<MetricType> =
if let Some(metric) = nearest.get_item("metric")? {
if metric.is_none() {
None
} else {
Some(
MetricType::try_from(metric.to_string().to_lowercase().as_str())
.map_err(|err| PyValueError::new_err(err.to_string()))?,
)
}
} else {
Some(
MetricType::try_from(metric.to_string().to_lowercase().as_str())
.map_err(|err| PyValueError::new_err(err.to_string()))?,
)
}
} else {
None
};
None
};

// When refine factor is specified, a final Refine stage will be added to the I/O plan,
// and use Flat index over the raw vectors to refine the results.
// By default, `refine_factor` is None to not involve extra I/O exec node and random access.
let refine_factor: Option<u32> = if let Some(rf) = nearest.get_item("refine_factor") {
let refine_factor: Option<u32> = if let Some(rf) = nearest.get_item("refine_factor")? {
if rf.is_none() {
None
} else {
Expand All @@ -400,7 +401,7 @@ impl Dataset {
None
};

let use_index: bool = if let Some(idx) = nearest.get_item("use_index") {
let use_index: bool = if let Some(idx) = nearest.get_item("use_index")? {
PyAny::downcast::<PyBool>(idx)?.extract()?
} else {
true
Expand Down Expand Up @@ -674,24 +675,24 @@ impl Dataset {
let mut pq_params = PQBuildParams::default();
let mut m_type = MetricType::L2;
if let Some(kwargs) = kwargs {
if let Some(mt) = kwargs.get_item("metric_type") {
if let Some(mt) = kwargs.get_item("metric_type")? {
m_type = MetricType::try_from(mt.to_string().to_lowercase().as_str())
.map_err(|err| PyValueError::new_err(err.to_string()))?;
}

if let Some(n) = kwargs.get_item("num_partitions") {
if let Some(n) = kwargs.get_item("num_partitions")? {
ivf_params.num_partitions = PyAny::downcast::<PyInt>(n)?.extract()?
};

if let Some(n) = kwargs.get_item("num_bits") {
if let Some(n) = kwargs.get_item("num_bits")? {
pq_params.num_bits = PyAny::downcast::<PyInt>(n)?.extract()?
};

if let Some(n) = kwargs.get_item("num_sub_vectors") {
if let Some(n) = kwargs.get_item("num_sub_vectors")? {
pq_params.num_sub_vectors = PyAny::downcast::<PyInt>(n)?.extract()?
};

if let Some(o) = kwargs.get_item("use_opq") {
if let Some(o) = kwargs.get_item("use_opq")? {
#[cfg(not(feature = "opq"))]
if PyAny::downcast::<PyBool>(o)?.extract()? {
return Err(PyValueError::new_err(
Expand All @@ -701,11 +702,11 @@ impl Dataset {
pq_params.use_opq = PyAny::downcast::<PyBool>(o)?.extract()?
};

if let Some(o) = kwargs.get_item("max_opq_iterations") {
if let Some(o) = kwargs.get_item("max_opq_iterations")? {
pq_params.max_opq_iters = PyAny::downcast::<PyInt>(o)?.extract()?
};

if let Some(c) = kwargs.get_item("ivf_centroids") {
if let Some(c) = kwargs.get_item("ivf_centroids")? {
let batch = RecordBatch::from_pyarrow(c)?;
if "_ivf_centroids" != batch.schema().field(0).name() {
return Err(PyValueError::new_err(
Expand All @@ -716,7 +717,7 @@ impl Dataset {
ivf_params.centroids = Some(Arc::new(centroids.clone()))
};

if let Some(f) = kwargs.get_item("precomputed_partitions_file") {
if let Some(f) = kwargs.get_item("precomputed_partitions_file")? {
ivf_params.precomputed_partitons_file = Some(f.to_string());
};
}
Expand All @@ -728,20 +729,20 @@ impl Dataset {
let mut params = DiskANNParams::default();
let mut m_type = MetricType::L2;
if let Some(kwargs) = kwargs {
if let Some(mt) = kwargs.get_item("metric_type") {
if let Some(mt) = kwargs.get_item("metric_type")? {
m_type = MetricType::try_from(mt.to_string().to_lowercase().as_str())
.map_err(|err| PyValueError::new_err(err.to_string()))?;
}

if let Some(n) = kwargs.get_item("r") {
if let Some(n) = kwargs.get_item("r")? {
params.r = PyAny::downcast::<PyInt>(n)?.extract()?
};

if let Some(n) = kwargs.get_item("alpha") {
if let Some(n) = kwargs.get_item("alpha")? {
params.alpha = PyAny::downcast::<PyFloat>(n)?.extract()?
};

if let Some(n) = kwargs.get_item("l") {
if let Some(n) = kwargs.get_item("l")? {
params.l = PyAny::downcast::<PyInt>(n)?.extract()?
};
}
Expand Down Expand Up @@ -909,7 +910,7 @@ fn parse_write_mode(mode: &str) -> PyResult<WriteMode> {
pub fn get_object_store_params(options: &PyDict) -> Option<ObjectStoreParams> {
if options.is_none() {
None
} else if let Some(commit_handler) = options.get_item("commit_handler") {
} else if let Ok(Some(commit_handler)) = options.get_item("commit_handler") {
let py_commit_lock = PyCommitLock::new(commit_handler.to_object(options.py()));
let mut object_store_params = ObjectStoreParams::default();
object_store_params.set_commit_lock(Arc::new(py_commit_lock));
Expand All @@ -924,19 +925,19 @@ pub fn get_write_params(options: &PyDict) -> PyResult<Option<WriteParams>> {
None
} else {
let mut p = WriteParams::default();
if let Some(mode) = options.get_item("mode") {
if let Some(mode) = options.get_item("mode")? {
p.mode = parse_write_mode(mode.extract::<String>()?.as_str())?;
};
if let Some(maybe_nrows) = options.get_item("max_rows_per_file") {
if let Some(maybe_nrows) = options.get_item("max_rows_per_file")? {
p.max_rows_per_file = usize::extract(maybe_nrows)?;
}
if let Some(maybe_nrows) = options.get_item("max_rows_per_group") {
if let Some(maybe_nrows) = options.get_item("max_rows_per_group")? {
p.max_rows_per_group = usize::extract(maybe_nrows)?;
}
if let Some(maybe_nbytes) = options.get_item("max_bytes_per_file") {
if let Some(maybe_nbytes) = options.get_item("max_bytes_per_file")? {
p.max_bytes_per_file = usize::extract(maybe_nbytes)?;
}
if let Some(progress) = options.get_item("progress") {
if let Some(progress) = options.get_item("progress")? {
if !progress.is_none() {
p.progress = Arc::new(PyWriteProgress::new(progress.to_object(options.py())));
}
Expand Down
42 changes: 24 additions & 18 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,17 @@ lance-test-macros = { version = "=0.8.21", path = "./lance-test-macros" }
lance-testing = { version = "=0.8.21", path = "./lance-testing" }
approx = "0.5.1"
# Note that this one does not include pyarrow
arrow = { version = "47.0.0", optional = false }
arrow-arith = "47.0"
arrow-array = "47.0"
arrow-buffer = "47.0"
arrow-cast = "47.0"
arrow-data = "47.0"
arrow-ipc = { version = "47.0", features = ["zstd"] }
arrow-ord = "47.0"
arrow-row = "47.0"
arrow-schema = "47.0"
arrow-select = "47.0"
arrow = { version = "49.0.0", optional = false, features = ["prettyprint"] }
arrow-arith = "49.0"
arrow-array = "49.0"
arrow-buffer = "49.0"
arrow-cast = "49.0"
arrow-data = "49.0"
arrow-ipc = { version = "49.0", features = ["zstd"] }
arrow-ord = "49.0"
arrow-row = "49.0"
arrow-schema = "49.0"
arrow-select = "49.0"
async-recursion = "1.0"
async-trait = "0.1"
aws-config = "0.56"
Expand All @@ -71,13 +71,19 @@ bytes = "1.4"
byteorder = "1.5"
chrono = "0.4.23"
criterion = { version = "0.5", features = ["async", "async_tokio"] }
datafusion = { version = "32.0.0", default-features = false, features = [
# datafusion = { version = "34.0.0", default-features = false, features = [
# "regex_expressions",
# ] }
# datafusion-common = "34.0"
# datafusion-sql = "34.0"
# datafusion-expr = "34.0"
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", tag = "34.0.0-rc3", default-features = false, features = [
"regex_expressions",
] }
datafusion-common = "32.0"
datafusion-sql = "32.0"
datafusion-expr = "32.0"
datafusion-physical-expr = { version = "32.0.0", default-features = false }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", tag = "34.0.0-rc3" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", tag = "34.0.0-rc3" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", tag = "34.0.0-rc3" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", tag = "34.0.0-rc3", default-features = false }
either = "1.0"
futures = "0.3"
http = "0.2.9"
Expand All @@ -87,8 +93,8 @@ mock_instant = { version = "0.3.1", features = ["sync"] }
moka = "0.11"
num_cpus = "1.0"
num-traits = "0.2"
object_store = { version = "0.7.1", features = ["aws", "gcp", "azure"] }
parquet = "47.0"
object_store = { version = "0.8.0", features = ["aws", "gcp", "azure"] }
parquet = "49.0"
pin-project = "1.0"
pprof = { version = "0.13", features = ["flamegraph", "criterion"] }
prost = "0.12"
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ url.workspace = true
uuid.workspace = true

[dev-dependencies]
arrow = "47.0"
arrow = "49.0"
rand.workspace = true
tempfile.workspace = true
lance-testing.workspace = true
Expand Down
3 changes: 1 addition & 2 deletions rust/lance-core/src/io/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
dir_path: impl Into<&Path> + Send,
unmodified_since: Option<DateTime<Utc>>,
) -> Result<BoxStream<Result<ObjectMeta>>> {
let mut output = self.list(Some(dir_path.into())).await?;
let mut output = self.list(Some(dir_path.into()));
if let Some(unmodified_since_val) = unmodified_since {
output = output
.try_filter(move |file| future::ready(file.last_modified < unmodified_since_val))
Expand Down Expand Up @@ -571,7 +571,6 @@ impl ObjectStore {
let sub_entries = self
.inner
.list(Some(&path))
.await?
.map(|m| m.map(|meta| meta.location))
.boxed();
self.inner
Expand Down
24 changes: 15 additions & 9 deletions rust/lance-core/src/io/object_store/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use bytes::Bytes;
use futures::stream::BoxStream;
use object_store::path::Path;
use object_store::{
GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, Result as OSResult,
GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, PutOptions, PutResult,
Result as OSResult,
};
use pin_project::pin_project;
use tokio::io::AsyncWrite;
Expand Down Expand Up @@ -83,10 +84,20 @@ impl std::fmt::Display for TracedObjectStore {
#[async_trait::async_trait]
impl object_store::ObjectStore for TracedObjectStore {
#[instrument(level = "debug", skip(self, bytes))]
async fn put(&self, location: &Path, bytes: Bytes) -> OSResult<()> {
async fn put(&self, location: &Path, bytes: Bytes) -> OSResult<PutResult> {
self.target.put(location, bytes).await
}

#[instrument(level = "debug", skip(self, bytes))]
async fn put_opts(
&self,
location: &Path,
bytes: Bytes,
opts: PutOptions,
) -> OSResult<PutResult> {
self.target.put_opts(location, bytes, opts).await
}

async fn put_multipart(
&self,
location: &Path,
Expand All @@ -107,11 +118,6 @@ impl object_store::ObjectStore for TracedObjectStore {
self.target.abort_multipart(location, multipart_id).await
}

#[instrument(level = "debug", skip(self))]
async fn append(&self, location: &Path) -> OSResult<Box<dyn AsyncWrite + Unpin + Send>> {
self.target.append(location).await
}

#[instrument(level = "debug", skip(self, options))]
async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
self.target.get_opts(location, options).await
Expand All @@ -138,8 +144,8 @@ impl object_store::ObjectStore for TracedObjectStore {
}

#[instrument(level = "debug", skip(self))]
async fn list(&self, prefix: Option<&Path>) -> OSResult<BoxStream<'_, OSResult<ObjectMeta>>> {
self.target.list(prefix).await
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, OSResult<ObjectMeta>> {
self.target.list(prefix)
}

#[instrument(level = "debug", skip(self))]
Expand Down
Loading

0 comments on commit 6f082c3

Please sign in to comment.