Skip to content

Commit

Permalink
apacheGH-37910: [Java][Integration] Implement C Data Interface integr…
Browse files Browse the repository at this point in the history
…ation testing
  • Loading branch information
pitrou committed Oct 12, 2023
1 parent a2561e3 commit db393ef
Show file tree
Hide file tree
Showing 14 changed files with 281 additions and 31 deletions.
4 changes: 2 additions & 2 deletions ci/scripts/integration_arrow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ arrow_dir=${1}
gold_dir=$arrow_dir/testing/data/arrow-ipc-stream/integration

pip install -e $arrow_dir/dev/archery[integration]
# For C# C Data Interface testing
pip install pythonnet
# For C Data Interface testing
pip install jpype1 pythonnet

# Get more detailed context on crashes
export PYTHONFAULTHANDLER=1
Expand Down
1 change: 0 additions & 1 deletion dev/archery/archery/integration/datagen.py
Original file line number Diff line number Diff line change
Expand Up @@ -1722,7 +1722,6 @@ def generate_dictionary_unsigned_case():

# TODO: JavaScript does not support uint64 dictionary indices, so disabled
# for now

# dict3 = Dictionary(3, StringField('dictionary3'), size=5, name='DICT3')
fields = [
DictionaryField('f0', get_field('', 'uint8'), dict0),
Expand Down
9 changes: 9 additions & 0 deletions dev/archery/archery/integration/tester_csharp.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from contextlib import contextmanager
import gc
import os
import weakref

from . import cdata
from .tester import Tester, CDataExporter, CDataImporter
Expand Down Expand Up @@ -72,6 +73,14 @@ def __init__(self, debug, args):
self.ffi = cdata.ffi()
_load_clr()

def _finalize():
# Collect GC handles so as to call release functions from other
# exporters before it gets too late.
# TODO make this a run_gc() function?
from Apache.Arrow.IntegrationTest import CDataInterface
CDataInterface.RunGC()
weakref.finalize(self, _finalize)

def _pointer_to_int(self, c_ptr):
return int(self.ffi.cast('uintptr_t', c_ptr))

Expand Down
177 changes: 169 additions & 8 deletions dev/archery/archery/integration/tester_java.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
# under the License.

import contextlib
import functools
import os
import subprocess

from .tester import Tester
from . import cdata
from .tester import Tester, CDataExporter, CDataImporter
from .util import run_cmd, log
from ..utils.source import ARROW_ROOT_DEFAULT

Expand All @@ -42,18 +44,25 @@ def load_version_from_pom():
"ARROW_JAVA_INTEGRATION_JAR",
os.path.join(
ARROW_ROOT_DEFAULT,
"java/tools/target/arrow-tools-{}-"
"jar-with-dependencies.jar".format(_arrow_version),
),
"java/tools/target",
f"arrow-tools-{_arrow_version}-jar-with-dependencies.jar"
)
)
_ARROW_C_DATA_JAR = os.environ.get(
"ARROW_C_DATA_JAVA_INTEGRATION_JAR",
os.path.join(
ARROW_ROOT_DEFAULT,
"java/c/target",
f"arrow-c-data-{_arrow_version}.jar"
)
)
_ARROW_FLIGHT_JAR = os.environ.get(
"ARROW_FLIGHT_JAVA_INTEGRATION_JAR",
os.path.join(
ARROW_ROOT_DEFAULT,
"java/flight/flight-integration-tests/target/"
"flight-integration-tests-{}-jar-with-dependencies.jar".format(
_arrow_version),
),
"java/flight/flight-integration-tests/target",
f"flight-integration-tests-{_arrow_version}-jar-with-dependencies.jar"
)
)
_ARROW_FLIGHT_SERVER = (
"org.apache.arrow.flight.integration.tests.IntegrationTestServer"
Expand All @@ -63,11 +72,157 @@ def load_version_from_pom():
)


@functools.lru_cache
def setup_jpype():
import jpype
jar_path = f"{_ARROW_TOOLS_JAR}:{_ARROW_C_DATA_JAR}"
# XXX Didn't manage to tone down the logging level here (DEBUG -> INFO)
jpype.startJVM(jpype.getDefaultJVMPath(),
"-Djava.class.path=" + jar_path, *_JAVA_OPTS)


class _CDataBase:

def __init__(self, debug, args):
import jpype
self.debug = debug
self.args = args
self.ffi = cdata.ffi()
setup_jpype()
# JPype pointers to java.io, org.apache.arrow...
self.java_io = jpype.JPackage("java").io
self.java_arrow = jpype.JPackage("org").apache.arrow
self.java_allocator = self._make_java_allocator()

def _pointer_to_int(self, c_ptr):
return int(self.ffi.cast('uintptr_t', c_ptr))

def _wrap_c_schema_ptr(self, c_schema_ptr):
return self.java_arrow.c.ArrowSchema.wrap(
self._pointer_to_int(c_schema_ptr))

def _wrap_c_array_ptr(self, c_array_ptr):
return self.java_arrow.c.ArrowArray.wrap(
self._pointer_to_int(c_array_ptr))

def _make_java_allocator(self):
# Return a new allocator
return self.java_arrow.memory.RootAllocator()

def _assert_schemas_equal(self, expected, actual):
# XXX This is fragile for dictionaries, as Schema.equals compares
# dictionary ids.
self.java_arrow.vector.util.Validator.compareSchemas(
expected, actual)

def _assert_batches_equal(self, expected, actual):
self.java_arrow.vector.util.Validator.compareVectorSchemaRoot(
expected, actual)

def _assert_dict_providers_equal(self, expected, actual):
self.java_arrow.vector.util.Validator.compareDictionaryProviders(
expected, actual)


class JavaCDataExporter(CDataExporter, _CDataBase):

def export_schema_from_json(self, json_path, c_schema_ptr):
json_file = self.java_io.File(json_path)
with self.java_arrow.vector.ipc.JsonFileReader(
json_file, self.java_allocator) as json_reader:
schema = json_reader.start()
dict_provider = json_reader
self.java_arrow.c.Data.exportSchema(
self.java_allocator, schema, dict_provider,
self._wrap_c_schema_ptr(c_schema_ptr)
)

def export_batch_from_json(self, json_path, num_batch, c_array_ptr):
json_file = self.java_io.File(json_path)
with self.java_arrow.vector.ipc.JsonFileReader(
json_file, self.java_allocator) as json_reader:
json_reader.start()
if num_batch > 0:
actually_skipped = json_reader.skip(num_batch)
assert actually_skipped == num_batch
with json_reader.read() as batch:
dict_provider = json_reader
self.java_arrow.c.Data.exportVectorSchemaRoot(
self.java_allocator, batch, dict_provider,
self._wrap_c_array_ptr(c_array_ptr))

@property
def supports_releasing_memory(self):
return True

def record_allocation_state(self):
return self.java_allocator.getAllocatedMemory()

def compare_allocation_state(self, recorded, gc_until):
def pred():
return self.java_allocator.getAllocatedMemory() == recorded

return gc_until(pred)


class JavaCDataImporter(CDataImporter, _CDataBase):

def import_schema_and_compare_to_json(self, json_path, c_schema_ptr):
json_file = self.java_io.File(json_path)
with self.java_arrow.vector.ipc.JsonFileReader(
json_file, self.java_allocator) as json_reader:
json_schema = json_reader.start()
with self.java_arrow.c.CDataDictionaryProvider() as dict_provider:
imported_schema = self.java_arrow.c.Data.importSchema(
self.java_allocator,
self._wrap_c_schema_ptr(c_schema_ptr),
dict_provider)
self._assert_schemas_equal(json_schema, imported_schema)

def import_batch_and_compare_to_json(self, json_path, num_batch,
c_array_ptr):
json_file = self.java_io.File(json_path)
with self.java_arrow.vector.ipc.JsonFileReader(
json_file, self.java_allocator) as json_reader:
schema = json_reader.start()
if num_batch > 0:
actually_skipped = json_reader.skip(num_batch)
assert actually_skipped == num_batch
with (json_reader.read() as batch,
self.java_arrow.vector.VectorSchemaRoot.create(
schema, self.java_allocator) as imported_batch):
# We need to pass a dict provider primed with dictionary ids
# matching those in the schema, hence an empty
# CDataDictionaryProvider would not work here!
dict_provider = (self.java_arrow.vector.dictionary
.DictionaryProvider.MapDictionaryProvider())
dict_provider.copyStructureFrom(json_reader, self.java_allocator)
with dict_provider:
self.java_arrow.c.Data.importIntoVectorSchemaRoot(
self.java_allocator,
self._wrap_c_array_ptr(c_array_ptr),
imported_batch, dict_provider)
self._assert_batches_equal(batch, imported_batch)
self._assert_dict_providers_equal(json_reader, dict_provider)

@property
def supports_releasing_memory(self):
return True

def gc_until(self, predicate):
# No need to call the Java GC thanks to AutoCloseable (?)
return predicate()


class JavaTester(Tester):
PRODUCER = True
CONSUMER = True
FLIGHT_SERVER = True
FLIGHT_CLIENT = True
C_DATA_SCHEMA_EXPORTER = True
C_DATA_SCHEMA_IMPORTER = True
C_DATA_ARRAY_EXPORTER = True
C_DATA_ARRAY_IMPORTER = True

name = 'Java'

Expand Down Expand Up @@ -186,3 +341,9 @@ def flight_server(self, scenario_name=None):
finally:
server.kill()
server.wait(5)

def make_c_data_exporter(self):
return JavaCDataExporter(self.debug, self.args)

def make_c_data_importer(self):
return JavaCDataImporter(self.debug, self.args)
11 changes: 8 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1730,16 +1730,21 @@ services:
volumes: *conda-volumes
environment:
<<: [*common, *ccache]
# tell archery where the arrow binaries are located
ARCHERY_INTEGRATION_WITH_RUST: 0
# Tell Archery where the arrow C++ binaries are located
ARROW_CPP_EXE_PATH: /build/cpp/debug
ARROW_GO_INTEGRATION: 1
ARCHERY_INTEGRATION_WITH_RUST: 0
ARROW_JAVA_CDATA: "ON"
JAVA_JNI_CMAKE_ARGS: >-
-DARROW_JAVA_JNI_ENABLE_DEFAULT=OFF
-DARROW_JAVA_JNI_ENABLE_C=ON
command:
["/arrow/ci/scripts/rust_build.sh /arrow /build &&
/arrow/ci/scripts/cpp_build.sh /arrow /build &&
/arrow/ci/scripts/csharp_build.sh /arrow /build &&
/arrow/ci/scripts/go_build.sh /arrow &&
/arrow/ci/scripts/java_build.sh /arrow /build &&
/arrow/ci/scripts/java_jni_build.sh /arrow $${ARROW_HOME} /build /tmp/dist/java/$$(arch) &&
/arrow/ci/scripts/java_build.sh /arrow /build /tmp/dist/java &&
/arrow/ci/scripts/js_build.sh /arrow /build &&
/arrow/ci/scripts/integration_arrow.sh /arrow /build"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ public List<ArrowBuf> visit(ArrowType.Union type) {
return Collections.singletonList(importFixedBytes(type, 0, UnionVector.TYPE_WIDTH));
case Dense:
return Arrays.asList(importFixedBytes(type, 0, DenseUnionVector.TYPE_WIDTH),
importFixedBytes(type, 0, DenseUnionVector.OFFSET_WIDTH));
importFixedBytes(type, 1, DenseUnionVector.OFFSET_WIDTH));
default:
throw new UnsupportedOperationException("Importing buffers for type: " + type);
throw new UnsupportedOperationException("Importing buffers for union type: " + type);
}
}

Expand Down
4 changes: 4 additions & 0 deletions java/c/src/main/java/org/apache/arrow/c/Format.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ static String asString(ArrowType arrowType) {
return "tiD";
case YEAR_MONTH:
return "tiM";
case MONTH_DAY_NANO:
return "tin";
default:
throw new UnsupportedOperationException(
String.format("Interval type with unit %s is unsupported", type.getUnit()));
Expand Down Expand Up @@ -277,6 +279,8 @@ static ArrowType asType(String format, long flags)
return new ArrowType.Interval(IntervalUnit.YEAR_MONTH);
case "tiD":
return new ArrowType.Interval(IntervalUnit.DAY_TIME);
case "tin":
return new ArrowType.Interval(IntervalUnit.MONTH_DAY_NANO);
case "+l":
return new ArrowType.List();
case "+L":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ final class SchemaImporter {
private static final Logger logger = LoggerFactory.getLogger(SchemaImporter.class);

private static final int MAX_IMPORT_RECURSION_LEVEL = 64;
private long nextDictionaryID = 1L;
private long nextDictionaryID = 0L;

private final BufferAllocator allocator;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public List<FieldVector> getChildrenFromFields() {
@Override
public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
Preconditions.checkArgument(ownBuffers.isEmpty(), "Null vector has no buffers");
valueCount = fieldNode.getLength();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,11 @@ public boolean rangeEquals(Range range) {
"rightStart %s must be non negative.", range.getRightStart());

Preconditions.checkArgument(range.getRightStart() + range.getLength() <= right.getValueCount(),
"(rightStart + length) %s out of range[0, %s].", 0, right.getValueCount());
"(rightStart + length) %s out of range[0, %s].",
range.getRightStart() + range.getLength(), right.getValueCount());
Preconditions.checkArgument(range.getLeftStart() + range.getLength() <= left.getValueCount(),
"(leftStart + length) %s out of range[0, %s].", 0, left.getValueCount());
"(leftStart + length) %s out of range[0, %s].",
range.getLeftStart() + range.getLength(), left.getValueCount());

return left.accept(this, range);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Map;
import java.util.Set;

import org.apache.arrow.memory.BufferAllocator;

/**
* A manager for association of dictionary IDs to their corresponding {@link Dictionary}.
*/
Expand All @@ -35,7 +37,7 @@ public interface DictionaryProvider {
/**
* Implementation of {@link DictionaryProvider} that is backed by a hash-map.
*/
class MapDictionaryProvider implements DictionaryProvider {
class MapDictionaryProvider implements AutoCloseable, DictionaryProvider {

private final Map<Long, Dictionary> map;

Expand All @@ -49,6 +51,21 @@ public MapDictionaryProvider(Dictionary... dictionaries) {
}
}

/**
* Initialize the map structure from another provider, but with empty vectors.
*
* @param other the {@link DictionaryProvider} to copy the ids and fields from
* @param allocator allocator to create the empty vectors
*/
public void copyStructureFrom(DictionaryProvider other, BufferAllocator allocator) {
for (Long id : other.getDictionaryIds()) {
Dictionary otherDict = other.lookup(id);
Dictionary newDict = new Dictionary(otherDict.getVector().getField().createVector(allocator),
otherDict.getEncoding());
put(newDict);
}
}

public void put(Dictionary dictionary) {
map.put(dictionary.getEncoding().getId(), dictionary);
}
Expand All @@ -62,5 +79,12 @@ public final Set<Long> getDictionaryIds() {
public Dictionary lookup(long id) {
return map.get(id);
}

@Override
public void close() {
for (Dictionary dictionary : map.values()) {
dictionary.getVector().close();
}
}
}
}
Loading

0 comments on commit db393ef

Please sign in to comment.