Skip to content

Commit

Permalink
Add create dataset, fragment, getSchema for lance jni
Browse files Browse the repository at this point in the history
  • Loading branch information
LuQQiu committed Apr 10, 2024
1 parent d189ca2 commit 988d1c6
Show file tree
Hide file tree
Showing 12 changed files with 473 additions and 107 deletions.
22 changes: 21 additions & 1 deletion .github/workflows/java.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,24 @@ jobs:
- name: Running tests with Java 11
run: mvn test
- name: Running tests with Java 17
run: JAVA_HOME=$JAVA_17 JAVA_TOOL_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED" mvn test
run: |
export JAVA_TOOL_OPTIONS="$JAVA_TOOL_OPTIONS \
-XX:+IgnoreUnrecognizedVMOptions \
--add-opens=java.base/java.lang=ALL-UNNAMED \
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED \
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED \
--add-opens=java.base/java.io=ALL-UNNAMED \
--add-opens=java.base/java.net=ALL-UNNAMED \
--add-opens=java.base/java.nio=ALL-UNNAMED \
--add-opens=java.base/java.util=ALL-UNNAMED \
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED \
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED \
--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED \
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED \
--add-opens=java.base/sun.security.action=ALL-UNNAMED \
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED \
--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED \
-Djdk.reflect.useDirectMethodHandle=false \
-Dio.netty.tryReflectionSetAccessible=true"
JAVA_HOME=$JAVA_17 mvn test
38 changes: 35 additions & 3 deletions java/core/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::{traits::IntoJava, Result, RT};
use arrow::array::RecordBatchReader;

use arrow::ffi::FFI_ArrowSchema;
use arrow_schema::Schema;
use jni::sys::jlong;
use jni::{objects::JObject, JNIEnv};
use lance::dataset::{Dataset, WriteParams};

use crate::{traits::IntoJava, Result, RT};

pub const NATIVE_DATASET: &str = "nativeDatasetHandle";

#[derive(Clone)]
Expand Down Expand Up @@ -118,3 +119,34 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_getFragmentsIds<'a>(
.expect("Failed to set int array region");
array_list.into()
}

#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_Dataset_importFfiSchema(
mut env: JNIEnv,
jdataset: JObject,
arrow_schema_addr: jlong,
) {
let schema = {
let dataset =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(jdataset, NATIVE_DATASET) }
.expect("Failed to get native dataset handle");
Schema::from(dataset.inner.schema())
};
let out_c_schema = arrow_schema_addr as *mut FFI_ArrowSchema;
let c_schema = match FFI_ArrowSchema::try_from(&schema) {
Ok(schema) => schema,
Err(err) => {
env.throw_new(
"java/lang/RuntimeException",
format!("Failed to convert Arrow schema: {}", err),
)
.expect("Error throwing exception");
return;
}
};

unsafe {
std::ptr::copy(std::ptr::addr_of!(c_schema), out_c_schema, 1);
std::mem::forget(c_schema);
};
}
20 changes: 17 additions & 3 deletions java/core/lance-jni/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ pub enum Error {
Arrow { message: String, location: Location },
#[snafu(display("Index error: {}, location", message))]
Index { message: String, location: Location },
#[snafu(display("Dataset not found error: {}", path))]
DatasetNotFound { path: String },
#[snafu(display("Dataset already exists error: {}", uri))]
DatasetAlreadyExists { uri: String },
#[snafu(display("Unknown error"))]
Other { message: String },
}
Expand All @@ -59,9 +63,11 @@ impl Error {
self.throw_as(env, JavaException::IllegalArgumentException)
}
Self::IO { .. } | Self::Index { .. } => self.throw_as(env, JavaException::IOException),
Self::Arrow { .. } | Self::Other { .. } | Self::Jni { .. } => {
self.throw_as(env, JavaException::RuntimeException)
}
Self::Arrow { .. }
| Self::DatasetNotFound { .. }
| Self::DatasetAlreadyExists { .. }
| Self::Other { .. }
| Self::Jni { .. } => self.throw_as(env, JavaException::RuntimeException),
}
}

Expand Down Expand Up @@ -93,6 +99,14 @@ impl From<Utf8Error> for Error {
impl From<lance::Error> for Error {
fn from(source: lance::Error) -> Self {
match source {
lance::Error::DatasetNotFound {
path,
source: _,
location: _,
} => Self::DatasetNotFound { path },
lance::Error::DatasetAlreadyExists { uri, location: _ } => {
Self::DatasetAlreadyExists { uri }
}
lance::Error::IO { message, location } => Self::IO { message, location },
lance::Error::Arrow { message, location } => Self::Arrow { message, location },
lance::Error::Index { message, location } => Self::Index { message, location },
Expand Down
85 changes: 82 additions & 3 deletions java/core/lance-jni/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use arrow::ffi::FFI_ArrowSchema;
use arrow::array::{RecordBatch, StructArray};
use arrow::ffi::{from_ffi_and_data_type, FFI_ArrowArray, FFI_ArrowSchema};
use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream};
use arrow_schema::Schema;
use arrow_schema::{DataType, Schema};
use jni::{
objects::{JObject, JString},
sys::{jint, jlong},
Expand All @@ -25,6 +26,7 @@ use snafu::{location, Location};
use lance::dataset::{fragment::FileFragment, scanner::Scanner};
use lance_io::ffi::to_ffi_arrow_array_stream;

use crate::traits::SingleRecordBatchReader;
use crate::{
blocking_dataset::{BlockingDataset, NATIVE_DATASET},
error::{Error, Result},
Expand Down Expand Up @@ -76,7 +78,84 @@ impl FragmentScanner {
}

#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_Fragment_createNative(
pub extern "system" fn Java_com_lancedb_lance_Fragment_createWithFfiArray(
mut env: JNIEnv,
_obj: JObject,
dataset_uri: JString,
arrow_array_addr: jlong,
arrow_schema_addr: jlong,
fragment_id: JObject, // Optional<Integer>
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
) -> jint {
let c_array_ptr = arrow_array_addr as *mut FFI_ArrowArray;
let c_schema_ptr = arrow_schema_addr as *mut FFI_ArrowSchema;

let c_array = unsafe { FFI_ArrowArray::from_raw(c_array_ptr) };
let c_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema_ptr) };
let data_type = ok_or_throw_with_return!(
env,
DataType::try_from(&c_schema).map_err(|e| {
Error::Arrow {
message: e.to_string(),
location: location!(),
}
}),
-1
);

let array_data = ok_or_throw_with_return!(
env,
unsafe {
from_ffi_and_data_type(c_array, data_type).map_err(|e| Error::Arrow {
message: e.to_string(),
location: location!(),
})
},
-1
);

let record_batch = RecordBatch::from(StructArray::from(array_data));
let batch_schema = record_batch.schema().clone();
let reader = SingleRecordBatchReader::new(Some(record_batch), batch_schema);

let path_str: String = ok_or_throw_with_return!(env, dataset_uri.extract(&mut env), -1);
let fragment_id_opts = ok_or_throw_with_return!(env, env.get_int_opt(&fragment_id), -1);

let write_params = ok_or_throw_with_return!(
env,
extract_write_params(
&mut env,
&max_rows_per_file,
&max_rows_per_group,
&max_bytes_per_file,
&mode
),
-1
);

match RT.block_on(FileFragment::create(
&path_str,
fragment_id_opts.unwrap_or(0) as usize,
reader,
Some(write_params),
)) {
Ok(fragment) => fragment.id as jint,
Err(e) => {
Error::IO {
message: e.to_string(),
location: location!(),
}
.throw(&mut env);
-1
}
}
}

#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_Fragment_createWithFfiStream(
mut env: JNIEnv,
_obj: JObject,
dataset_uri: JString,
Expand Down
47 changes: 47 additions & 0 deletions java/core/lance-jni/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use arrow::ffi::FFI_ArrowSchema;
use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream};
use arrow_schema::Schema;
use jni::objects::{JObject, JString};
use jni::sys::{jint, jlong};
use jni::JNIEnv;
use lazy_static::lazy_static;
use snafu::{location, Location};
use traits::IntoJava;

use crate::traits::SingleRecordBatchReader;
use crate::utils::extract_write_params;

#[macro_export]
Expand Down Expand Up @@ -66,6 +71,48 @@ lazy_static! {
.expect("Failed to create tokio runtime");
}

#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_Dataset_createWithFfiSchema<'local>(
mut env: JNIEnv<'local>,
_obj: JObject,
arrow_schema_addr: jlong,
path: JString,
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
) -> JObject<'local> {
let c_schema_ptr = arrow_schema_addr as *mut FFI_ArrowSchema;
let c_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema_ptr) };
let schema = ok_or_throw!(
env,
Schema::try_from(&c_schema).map_err(|e| Error::Arrow {
message: e.to_string(),
location: location!(),
})
);
let reader = SingleRecordBatchReader::new(None, Arc::new(schema));

let path_str: String = ok_or_throw!(env, path.extract(&mut env));

let write_params = ok_or_throw!(
env,
extract_write_params(
&mut env,
&max_rows_per_file,
&max_rows_per_group,
&max_bytes_per_file,
&mode
)
);

let dataset = ok_or_throw!(
env,
BlockingDataset::write(reader, &path_str, Some(write_params))
);
dataset.into_java(&mut env)
}

#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_Dataset_writeWithFfiStream<'local>(
mut env: JNIEnv<'local>,
Expand Down
33 changes: 33 additions & 0 deletions java/core/lance-jni/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use arrow::array::{RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, Schema, SchemaRef};
use jni::objects::{JMap, JObject, JString, JValue};
use jni::JNIEnv;

Expand Down Expand Up @@ -120,3 +124,32 @@ impl JMapExt for JMap<'_, '_, '_> {
get_map_value(env, self, key)
}
}

pub struct SingleRecordBatchReader {
record_batch: Option<RecordBatch>,
schema: Arc<Schema>,
}

impl SingleRecordBatchReader {
// Public constructor method
pub fn new(record_batch: Option<RecordBatch>, schema: Arc<Schema>) -> Self {
Self {
record_batch,
schema,
}
}
}

impl Iterator for SingleRecordBatchReader {
type Item = std::result::Result<RecordBatch, ArrowError>;

fn next(&mut self) -> Option<Self::Item> {
self.record_batch.take().map(Ok)
}
}

impl RecordBatchReader for SingleRecordBatchReader {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
Loading

0 comments on commit 988d1c6

Please sign in to comment.