Skip to content

Commit

Permalink
Add csv support (apache#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Apr 7, 2021
1 parent 67466a2 commit 554ad19
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 2 deletions.
2 changes: 2 additions & 0 deletions cpp/src/jni/dataset/jni_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ std::shared_ptr<arrow::dataset::FileFormat> GetFileFormat(JNIEnv *env, jint id)
switch (id) {
case 0:
return std::make_shared<arrow::dataset::ParquetFileFormat>();
case 1:
return std::make_shared<arrow::dataset::CsvFileFormat>();
default:
std::string error_message = "illegal file format id: " + std::to_string(id);
env->ThrowNew(illegal_argument_exception_class, error_message.c_str());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
* File format definitions.
*/
public enum FileFormat {
PARQUET(0);
PARQUET(0),
CSV(1),
NONE(-1);

private int id;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,26 @@
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.memory.ReservationListener;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

public class NativeDatasetTest {

private String sampleParquetLocal() {
return "file://" + NativeDatasetTest.class.getResource(File.separator + "userdata1.parquet").getPath();
return "file://" + resourcePath("userdata1.parquet");
}

private String resourcePath(String resource) {
return NativeDatasetTest.class.getResource(File.separator + resource).getPath();
}

private void testDatasetFactoryEndToEnd(DatasetFactory factory, int taskCount, int vectorCount, int rowCount) {
Expand Down Expand Up @@ -319,6 +328,42 @@ public void testScannerWithEmptyProjector() {
allocator.close();
}

@Test
public void testCsvRead() throws Exception {
RootAllocator allocator = new RootAllocator(Long.MAX_VALUE);
SingleFileDatasetFactory factory = new SingleFileDatasetFactory(allocator,
NativeMemoryPool.getDefault(), FileFormat.CSV, "file://" + resourcePath("data/people.csv"));
ScanOptions options = new ScanOptions(new String[]{}, Filter.EMPTY, 100);
Schema schema = factory.inspect();
NativeDataset dataset = factory.finish(schema);
NativeScanner nativeScanner = dataset.newScan(options);
List<? extends ScanTask> scanTasks = collect(nativeScanner.scan());
Assert.assertEquals(1, scanTasks.size());
ScanTask scanTask = scanTasks.get(0);
ScanTask.Itr itr = scanTask.scan();

VectorSchemaRoot vsr = null;
int rowCount = 0;
while (itr.hasNext()) {
// FIXME VectorSchemaRoot is not actually something ITERABLE. Using a reader convention instead.
vsr = itr.next().valueVectors;
rowCount += vsr.getRowCount();

// check if projector is applied
Assert.assertEquals("Schema<name: Utf8, age: Int(64, true), job: Utf8>",
vsr.getSchema().toString());
}
Assert.assertEquals(2, rowCount);
assertEquals(3, schema.getFields().size());
assertEquals("name", schema.getFields().get(0).getName());
assertEquals("age", schema.getFields().get(1).getName());
assertEquals("job", schema.getFields().get(2).getName());
if (vsr != null) {
vsr.close();
}
allocator.close();
}

@Ignore
public void testFilter() {
// todo
Expand Down
3 changes: 3 additions & 0 deletions java/dataset/src/test/resources/data/people.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
name,age,job
Jorge,30,Developer
Bob,32,Developer

0 comments on commit 554ad19

Please sign in to comment.