Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(java): refactor error handling #2325

Merged
merged 13 commits into from
May 21, 2024
369 changes: 310 additions & 59 deletions java/core/lance-jni/src/blocking_dataset.rs

Large diffs are not rendered by default.

222 changes: 128 additions & 94 deletions java/core/lance-jni/src/blocking_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

use std::sync::Arc;

use crate::error::{JavaErrorExt, JavaResult};
use crate::ffi::JNIEnvExt;
use crate::JavaError;
use arrow::{ffi::FFI_ArrowSchema, ffi_stream::FFI_ArrowArrayStream};
use arrow_schema::SchemaRef;
use jni::{objects::JObject, sys::jlong, JNIEnv};
Expand All @@ -24,7 +26,7 @@ use lance_io::ffi::to_ffi_arrow_array_stream;
use crate::{
blocking_dataset::{BlockingDataset, NATIVE_DATASET},
traits::IntoJava,
Error, Result, RT,
RT,
};

pub const NATIVE_SCANNER: &str = "nativeScannerHandle";
Expand All @@ -41,57 +43,22 @@ impl BlockingScanner {
}
}

pub fn open_stream(&self) -> Result<DatasetRecordBatchStream> {
Ok(RT.block_on(self.inner.try_into_stream())?)
pub fn open_stream(&self) -> JavaResult<DatasetRecordBatchStream> {
RT.block_on(self.inner.try_into_stream()).infer_error()
}

pub fn schema(&self) -> Result<SchemaRef> {
Ok(RT.block_on(self.inner.schema())?)
pub fn schema(&self) -> JavaResult<SchemaRef> {
RT.block_on(self.inner.schema()).infer_error()
}

pub fn count_rows(&self) -> Result<u64> {
Ok(RT.block_on(self.inner.count_rows())?)
pub fn count_rows(&self) -> JavaResult<u64> {
RT.block_on(self.inner.count_rows()).infer_error()
}
}

impl IntoJava for BlockingScanner {
fn into_java<'a>(self, env: &mut JNIEnv<'a>) -> JObject<'a> {
attach_native_scanner(env, self)
}
}

fn attach_native_scanner<'local>(
env: &mut JNIEnv<'local>,
scanner: BlockingScanner,
) -> JObject<'local> {
let j_scanner = create_java_scanner_object(env);
// This block sets a native Rust object (scanner) as a field in the Java object (j_scanner).
// Caution: This creates a potential for memory leaks. The Rust object (scanner) is not
// automatically garbage-collected by Java, and its memory will not be freed unless
// explicitly handled.
//
// To prevent memory leaks, ensure the following:
// 1. The Java object (`j_scanner`) should implement the `java.io.Closeable` interface.
// 2. Users of this Java object should be instructed to always use it within a try-with-resources
// statement (or manually call the `close()` method) to ensure that `self.close()` is invoked.
match unsafe { env.set_rust_field(&j_scanner, NATIVE_SCANNER, scanner) } {
Ok(_) => j_scanner,
Err(err) => {
env.throw_new(
"java/lang/RuntimeException",
format!("Failed to set native handle for scanner: {}", err),
)
.expect("Error throwing exception");
JObject::null()
}
}
}

fn create_java_scanner_object<'a>(env: &mut JNIEnv<'a>) -> JObject<'a> {
env.new_object("com/lancedb/lance/ipc/LanceScanner", "()V", &[])
.expect("Failed to create Java Lance Scanner instance")
}

///////////////////
// Write Methods //
///////////////////
#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_ipc_LanceScanner_createScanner<'local>(
mut env: JNIEnv<'local>,
Expand All @@ -103,78 +70,134 @@ pub extern "system" fn Java_com_lancedb_lance_ipc_LanceScanner_createScanner<'lo
filter_obj: JObject, // Optional<String>
batch_size_obj: JObject, // Optional<Long>
) -> JObject<'local> {
let dataset = {
let dataset =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(jdataset, NATIVE_DATASET) }
.expect("Dataset handle not set");
dataset.clone()
};
let mut scanner = dataset.inner.scan();
let fragment_ids_opt = ok_or_throw!(env, env.get_ints_opt(&fragment_ids_obj));
ok_or_throw!(
env,
inner_create_scanner(
&mut env,
jdataset,
fragment_ids_obj,
columns_obj,
substrait_filter_obj,
filter_obj,
batch_size_obj
)
)
}

fn inner_create_scanner<'local>(
env: &mut JNIEnv<'local>,
jdataset: JObject,
fragment_ids_obj: JObject,
columns_obj: JObject,
substrait_filter_obj: JObject,
filter_obj: JObject,
batch_size_obj: JObject,
) -> JavaResult<JObject<'local>> {
let fragment_ids_opt = env.get_ints_opt(&fragment_ids_obj)?;
let dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(jdataset, NATIVE_DATASET) }
.infer_error()?;
let mut scanner = dataset_guard.inner.scan();
if let Some(fragment_ids) = fragment_ids_opt {
let mut fragments = Vec::with_capacity(fragment_ids.len());
for fragment_id in fragment_ids {
let Some(fragment) = dataset.inner.get_fragment(fragment_id as usize) else {
env.throw_new(
"java/lang/RuntimeException",
format!("fragment id {fragment_id} not found"),
)
.expect("failed to throw java exception");
return JObject::null();
let Some(fragment) = dataset_guard.inner.get_fragment(fragment_id as usize) else {
return Err(JavaError::input_error(format!(
"Fragment {fragment_id} not found"
)));
};
fragments.push(fragment.metadata().clone());
}
scanner.with_fragments(fragments);
}
let columns_opt = ok_or_throw!(env, env.get_strings_opt(&columns_obj));
drop(dataset_guard);
let columns_opt = env.get_strings_opt(&columns_obj)?;
if let Some(columns) = columns_opt {
ok_or_throw!(env, scanner.project(&columns));
scanner.project(&columns).infer_error()?;
};
let substrait_opt = ok_or_throw!(env, env.get_bytes_opt(&substrait_filter_obj));
let substrait_opt = env.get_bytes_opt(&substrait_filter_obj)?;
if let Some(substrait) = substrait_opt {
ok_or_throw!(
env,
RT.block_on(async { scanner.filter_substrait(substrait).await })
);
RT.block_on(async { scanner.filter_substrait(substrait).await })
.infer_error()?;
}
let filter_opt = ok_or_throw!(env, env.get_string_opt(&filter_obj));
let filter_opt = env.get_string_opt(&filter_obj)?;
if let Some(filter) = filter_opt {
ok_or_throw!(env, scanner.filter(filter.as_str()));
scanner.filter(filter.as_str()).infer_error()?;
}
let batch_size_opt = ok_or_throw!(env, env.get_long_opt(&batch_size_obj));
let batch_size_opt = env.get_long_opt(&batch_size_obj)?;
if let Some(batch_size) = batch_size_opt {
scanner.batch_size(batch_size as usize);
}
let scanner = BlockingScanner::create(scanner);
scanner.into_java(&mut env)
scanner.into_java(env)
}

#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_ipc_LanceScanner_releaseNativeScanner(
mut env: JNIEnv,
j_scanner: JObject,
) {
let _: BlockingScanner = unsafe {
env.take_rust_field(j_scanner, NATIVE_SCANNER)
.expect("Failed to take native scanner handle")
};
ok_or_throw_without_return!(env, inner_release_native_scanner(&mut env, j_scanner));
}

fn inner_release_native_scanner(env: &mut JNIEnv, j_scanner: JObject) -> JavaResult<()> {
let _: BlockingScanner =
unsafe { env.take_rust_field(j_scanner, NATIVE_SCANNER) }.infer_error()?;
Ok(())
}

impl IntoJava for BlockingScanner {
fn into_java<'local>(self, env: &mut JNIEnv<'local>) -> JavaResult<JObject<'local>> {
attach_native_scanner(env, self)
}
}

fn attach_native_scanner<'local>(
env: &mut JNIEnv<'local>,
scanner: BlockingScanner,
) -> JavaResult<JObject<'local>> {
let j_scanner = create_java_scanner_object(env)?;
// This block sets a native Rust object (scanner) as a field in the Java object (j_scanner).
// Caution: This creates a potential for memory leaks. The Rust object (scanner) is not
// automatically garbage-collected by Java, and its memory will not be freed unless
// explicitly handled.
//
// To prevent memory leaks, ensure the following:
// 1. The Java object (`j_scanner`) should implement the `java.io.Closeable` interface.
// 2. Users of this Java object should be instructed to always use it within a try-with-resources
// statement (or manually call the `close()` method) to ensure that `self.close()` is invoked.
unsafe { env.set_rust_field(&j_scanner, NATIVE_SCANNER, scanner) }.infer_error()?;
Ok(j_scanner)
}

fn create_java_scanner_object<'a>(env: &mut JNIEnv<'a>) -> JavaResult<JObject<'a>> {
env.new_object("com/lancedb/lance/ipc/LanceScanner", "()V", &[])
.infer_error()
}

//////////////////
// Read Methods //
//////////////////
#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_ipc_LanceScanner_openStream(
mut env: JNIEnv,
j_scanner: JObject,
stream_addr: jlong,
) {
let scanner = {
ok_or_throw_without_return!(env, inner_open_stream(&mut env, j_scanner, stream_addr));
}

fn inner_open_stream(env: &mut JNIEnv, j_scanner: JObject, stream_addr: jlong) -> JavaResult<()> {
let record_batch_stream = {
let scanner_guard =
unsafe { env.get_rust_field::<_, _, BlockingScanner>(j_scanner, NATIVE_SCANNER) }
.expect("Failed to get native scanner handle");
scanner_guard.clone()
.infer_error()?;
scanner_guard.open_stream()?
};
let record_batch_stream = ok_or_throw_without_return!(env, scanner.open_stream());
let ffi_stream = to_ffi_arrow_array_stream(record_batch_stream, RT.handle().clone()).unwrap();
let ffi_stream =
to_ffi_arrow_array_stream(record_batch_stream, RT.handle().clone()).infer_error()?;
unsafe { std::ptr::write_unaligned(stream_addr as *mut FFI_ArrowArrayStream, ffi_stream) }
Ok(())
}

#[no_mangle]
Expand All @@ -183,28 +206,39 @@ pub extern "system" fn Java_com_lancedb_lance_ipc_LanceScanner_importFfiSchema(
j_scanner: JObject,
schema_addr: jlong,
) {
let scanner = {
ok_or_throw_without_return!(
env,
inner_import_ffi_schema(&mut env, j_scanner, schema_addr)
);
}

fn inner_import_ffi_schema(
env: &mut JNIEnv,
j_scanner: JObject,
schema_addr: jlong,
) -> JavaResult<()> {
let schema = {
let scanner_guard =
unsafe { env.get_rust_field::<_, _, BlockingScanner>(j_scanner, NATIVE_SCANNER) }
.expect("Failed to get native scanner handle");
scanner_guard.clone()
.infer_error()?;
scanner_guard.schema()?
};
let schema = ok_or_throw_without_return!(env, scanner.schema());
let ffi_schema = ok_or_throw_without_return!(env, FFI_ArrowSchema::try_from(&*schema));
let ffi_schema = FFI_ArrowSchema::try_from(&*schema).infer_error()?;
unsafe { std::ptr::write_unaligned(schema_addr as *mut FFI_ArrowSchema, ffi_schema) }
Ok(())
}

#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_ipc_LanceScanner_countRows(
pub extern "system" fn Java_com_lancedb_lance_ipc_LanceScanner_nativeCountRows(
mut env: JNIEnv,
j_scanner: JObject,
) -> jlong {
let scanner = {
let scanner_guard =
unsafe { env.get_rust_field::<_, _, BlockingScanner>(j_scanner, NATIVE_SCANNER) }
.expect("Failed to get native scanner handle");
scanner_guard.clone()
};
let rows = ok_or_throw_with_return!(env, scanner.count_rows(), -1);
rows as jlong
ok_or_throw_with_return!(env, inner_count_rows(&mut env, j_scanner), -1) as jlong
}

fn inner_count_rows(env: &mut JNIEnv, j_scanner: JObject) -> JavaResult<u64> {
let scanner_guard =
unsafe { env.get_rust_field::<_, _, BlockingScanner>(j_scanner, NATIVE_SCANNER) }
.infer_error()?;
scanner_guard.count_rows()
}