Skip to content

Commit

Permalink
Java jni code skeleton
Browse files Browse the repository at this point in the history
  • Loading branch information
beinan committed Feb 13, 2024
1 parent 69bf18b commit 378ed98
Show file tree
Hide file tree
Showing 9 changed files with 585 additions and 0 deletions.
1 change: 1 addition & 0 deletions java/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.iml
19 changes: 19 additions & 0 deletions java/lance-jni/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "lance-jni"
version = "0.1.0"
edition = "2021"

[lib]
crate-type = ["cdylib"]

[dependencies]
lance = {version = "=0.9.1", path="../../rust/lance"}
arrow = { version = "49.0.0", optional = false, features = ["ffi"] }
tokio = { version = "1.23", features = [
"rt-multi-thread",
"macros",
"fs",
"sync",
] }
jni = "0.21.1"
snafu = "0.7.4"
50 changes: 50 additions & 0 deletions java/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2023 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use arrow::ffi_stream::ArrowArrayStreamReader;
use tokio::runtime::Runtime;

use lance::Dataset;
use lance::Result;

pub struct BlockingDataset {
inner: Dataset,
rt: Runtime,
}

impl BlockingDataset {
pub fn write(reader: ArrowArrayStreamReader, uri: &str) -> Result<Self> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;

let inner = rt.block_on(
Dataset::write(reader, uri, Option::None)
)?;
Ok(BlockingDataset { inner, rt })
}
pub fn open(uri: &str) -> Result<Self> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;

let inner = rt.block_on(Dataset::open(uri))?;
Ok(BlockingDataset { inner, rt })
}

pub fn count_rows(&self) -> Result<usize> {
self.rt.block_on(self.inner.count_rows())
}
pub fn close(&self) {}
}
161 changes: 161 additions & 0 deletions java/lance-jni/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright 2023 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream};
use crate::blocking_dataset::BlockingDataset;
use jni::objects::{JObject, JString};
use jni::JNIEnv;
use jni::sys::{jint, jlong};

use lance::Error;

mod blocking_dataset;

#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_Dataset_writeWithFFIStream<'local>(
mut env: JNIEnv<'local>,
_obj: JObject,
arrow_array_stream_addr: jlong,
path: JString,
) -> JObject<'local> {
let path_str = match extract_path_str(&mut env, &path) {
Ok(value) => value,
Err(_) => return JObject::null(),
};
unsafe {
match ArrowArrayStreamReader::from_raw(arrow_array_stream_addr as *mut FFI_ArrowArrayStream) {
Ok(reader) => {
match BlockingDataset::write(reader, &path_str) {
Ok(dataset) => attach_native_dataset(&mut env, dataset),
Err(err) => {
throw_java_exception(&mut env, format!("Failed to write from arrow array stream: {}", err).as_str());
JObject::null()
}
}
},
Err(err) => {
throw_java_exception(&mut env, &format!("Failed to extract arrow array stream: {}", err));
JObject::null()
}
}
}
}

fn attach_native_dataset<'local>(mut env: &mut JNIEnv<'local>, dataset: BlockingDataset) -> JObject<'local> {
let j_dataset = create_java_dataset_object(&mut env);
// This block sets a native Rust object (dataset) as a field in the Java object (j_dataset).
// Caution: This creates a potential for memory leaks. The Rust object (dataset) is not
// automatically garbage-collected by Java, and its memory will not be freed unless
// explicitly handled.
//
// To prevent memory leaks, ensure the following:
// 1. The Java object (`j_dataset`) should implement the `java.io.Closeable` interface.
// 2. Users of this Java object should be instructed to always use it within a try-with-resources
// statement (or manually call the `close()` method) to ensure that `self.close()` is invoked.
unsafe {
match env.set_rust_field(&j_dataset, "nativeDatasetHandle", dataset) {
Ok(_) => j_dataset,
Err(err) => {
env.throw_new(
"java/lang/RuntimeException",
format!("Failed to set native handle: {}", err),
)
.expect("Error throwing exception");
JObject::null()
}
}
}
}

fn throw_java_exception(env: &mut JNIEnv, err_msg: &str) {
env.throw_new(
"java/lang/RuntimeException",
err_msg
).expect("Error throwing exception");
}

fn create_java_dataset_object<'a>(env: &mut JNIEnv<'a>) -> JObject<'a> {
env
.new_object("com/lancedb/lance/Dataset", "()V", &[])
.expect("Failed to create Java Dataset instance")
}

#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_Dataset_open<'local>(
mut env: JNIEnv<'local>,
_obj: JObject,
path: JString,
) -> JObject<'local> {
let path_str = match extract_path_str(&mut env, &path) {
Ok(value) => value,
Err(_) => return JObject::null(),
};

match BlockingDataset::open(&path_str) {
Ok(dataset) => attach_native_dataset(&mut env, dataset),
Err(err) => {
env.throw_new(
"java/lang/RuntimeException",
format!("Failed to open dataset: {}", err),
)
.expect("Error throwing exception");
JObject::null()
}
}
}

#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_Dataset_countRows(
mut env: JNIEnv,
java_dataset: JObject,
) -> jint {
let dataset_guard = unsafe {
env.get_rust_field::<_, _, BlockingDataset>(java_dataset, "nativeDatasetHandle")
};
match dataset_guard {
Ok(dataset) =>
dataset.count_rows()
.expect("Faild to get the row count from dataset's metadata.") as jint,
Err(_) => {
-1
}
}
}

fn extract_path_str(env: &mut JNIEnv, path: &JString) -> Result<String, Error> {
match env.get_string(&path) {
Ok(path) => Ok(path.into()),
Err(err) => {
env.throw_new(
"java/lang/IllegalArgumentException",
format!("Invalid path string: {}", err),
)
.expect("Error throwing exception");
Err(Error::InvalidTableLocation { message: format!("Invalid path string: {}", err)})
}
}
}

#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_Dataset_releaseNativeDataset(
mut env: JNIEnv,
obj: JObject,
) {
unsafe {
let dataset: BlockingDataset = env
.take_rust_field(obj, "nativeDatasetHandle")
.expect("Failed to take native dataset handle");
dataset.close()
}
}
80 changes: 80 additions & 0 deletions java/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.lancedb</groupId>
<artifactId>lance</artifactId>
<version>0.1-SNAPSHOT</version>

<name>lance</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<arrow.version>14.0.2</arrow.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-c-data</artifactId>
<version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.10.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<compilerArgs>
<arg>-h</arg>
<arg>target/headers</arg>
</compilerArgs>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
77 changes: 77 additions & 0 deletions java/src/main/java/com/lancedb/lance/Dataset.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.lancedb.lance;

import org.apache.arrow.c.ArrowArrayStream;

import java.io.Closeable;

/**
* Class representing a Lance dataset, interfacing with the native lance library.
* This class provides functionality to open and manage datasets with native code.
*
* The native library is loaded statically and utilized through native methods.
* It implements the {@link java.io.Closeable} interface to ensure proper resource management.
*/
public class Dataset implements Closeable {
private long nativeDatasetHandle;

static {
LanceNativeManager.loadLanceNative();
}

private Dataset() {
}

public static Dataset write(ArrowArrayStream stream, String path) {
return writeWithFFIStream(stream.memoryAddress(), path);
}

private static native Dataset writeWithFFIStream(long ArrowStreamMemoryAddress, String path);

public native static Dataset write(NativeArrowArrayStream stream, String path);

/**
* Opens a dataset from the specified path using the native library.
*
* @param path The file path of the dataset to open.
* @return A new instance of {@link Dataset} linked to the opened dataset.
*/
public native static Dataset open(String path);

/**
* Count the number of rows in the dataset.
* @return num of rows
*/
public native int countRows();

/**
* Closes this dataset and releases any system resources associated with it.
* If the dataset is already closed, then invoking this method has no effect.
*/
@Override
public void close() {
if (nativeDatasetHandle != 0) {
releaseNativeDataset(nativeDatasetHandle);
nativeDatasetHandle = 0;
}
}

/**
* Native method to release the Lance dataset resources associated with the given handle.
*
* @param handle The native handle to the dataset resource.
*/
private native void releaseNativeDataset(long handle);
}

0 comments on commit 378ed98

Please sign in to comment.