From 554ad19863fa56b18ef9d1483e43eb07cdf3f827 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 7 Apr 2021 10:07:33 +0800 Subject: [PATCH] Add csv support (#9) --- cpp/src/jni/dataset/jni_wrapper.cpp | 2 + .../apache/arrow/dataset/file/FileFormat.java | 4 +- .../arrow/dataset/jni/NativeDatasetTest.java | 47 ++++++++++++++++++- .../src/test/resources/data/people.csv | 3 ++ 4 files changed, 54 insertions(+), 2 deletions(-) create mode 100644 java/dataset/src/test/resources/data/people.csv diff --git a/cpp/src/jni/dataset/jni_wrapper.cpp b/cpp/src/jni/dataset/jni_wrapper.cpp index 97f69373ee13d..063419cb1f15b 100644 --- a/cpp/src/jni/dataset/jni_wrapper.cpp +++ b/cpp/src/jni/dataset/jni_wrapper.cpp @@ -196,6 +196,8 @@ std::shared_ptr GetFileFormat(JNIEnv *env, jint id) switch (id) { case 0: return std::make_shared(); + case 1: + return std::make_shared(); default: std::string error_message = "illegal file format id: " + std::to_string(id); env->ThrowNew(illegal_argument_exception_class, error_message.c_str()); diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java index ed5b3d4b184e4..4dbbc4d2f2123 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java @@ -21,7 +21,9 @@ * File format definitions. */ public enum FileFormat { - PARQUET(0); + PARQUET(0), + CSV(1), + NONE(-1); private int id; diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/jni/NativeDatasetTest.java b/java/dataset/src/test/java/org/apache/arrow/dataset/jni/NativeDatasetTest.java index 66904ec4e0df3..eacbc01d4a323 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/jni/NativeDatasetTest.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/jni/NativeDatasetTest.java @@ -39,17 +39,26 @@ import org.apache.arrow.dataset.source.DatasetFactory; import org.apache.arrow.memory.ReservationListener; import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.AutoCloseables; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.dictionary.Dictionary; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.types.pojo.Schema; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; +import static org.junit.Assert.assertEquals; + public class NativeDatasetTest { private String sampleParquetLocal() { - return "file://" + NativeDatasetTest.class.getResource(File.separator + "userdata1.parquet").getPath(); + return "file://" + resourcePath("userdata1.parquet"); + } + + private String resourcePath(String resource) { + return NativeDatasetTest.class.getResource(File.separator + resource).getPath(); } private void testDatasetFactoryEndToEnd(DatasetFactory factory, int taskCount, int vectorCount, int rowCount) { @@ -319,6 +328,42 @@ public void testScannerWithEmptyProjector() { allocator.close(); } + @Test + public void testCsvRead() throws Exception { + RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); + SingleFileDatasetFactory factory = new SingleFileDatasetFactory(allocator, + NativeMemoryPool.getDefault(), FileFormat.CSV, "file://" + resourcePath("data/people.csv")); + ScanOptions options = new ScanOptions(new String[]{}, Filter.EMPTY, 100); + Schema schema = factory.inspect(); + NativeDataset dataset = factory.finish(schema); + NativeScanner nativeScanner = dataset.newScan(options); + List scanTasks = collect(nativeScanner.scan()); + Assert.assertEquals(1, scanTasks.size()); + ScanTask scanTask = scanTasks.get(0); + ScanTask.Itr itr = scanTask.scan(); + + VectorSchemaRoot vsr = null; + int rowCount = 0; + while (itr.hasNext()) { + // FIXME VectorSchemaRoot is not actually something ITERABLE. Using a reader convention instead. + vsr = itr.next().valueVectors; + rowCount += vsr.getRowCount(); + + // check if projector is applied + Assert.assertEquals("Schema", + vsr.getSchema().toString()); + } + Assert.assertEquals(2, rowCount); + assertEquals(3, schema.getFields().size()); + assertEquals("name", schema.getFields().get(0).getName()); + assertEquals("age", schema.getFields().get(1).getName()); + assertEquals("job", schema.getFields().get(2).getName()); + if (vsr != null) { + vsr.close(); + } + allocator.close(); + } + @Ignore public void testFilter() { // todo diff --git a/java/dataset/src/test/resources/data/people.csv b/java/dataset/src/test/resources/data/people.csv new file mode 100644 index 0000000000000..4d9b27bf9ac8e --- /dev/null +++ b/java/dataset/src/test/resources/data/people.csv @@ -0,0 +1,3 @@ +name,age,job +Jorge,30,Developer +Bob,32,Developer