Skip to content

Commit

Permalink
Enable support for loading Booleans and Strings from a binary ingest
Browse files Browse the repository at this point in the history
New types are tested
  • Loading branch information
dhutchis committed Aug 9, 2016
1 parent f8e2e5e commit 4633a08
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 75 deletions.
31 changes: 17 additions & 14 deletions src/edu/washington/escience/myria/operator/BinaryFileScan.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
package edu.washington.escience.myria.operator;

import com.google.common.collect.ImmutableMap;
import com.google.common.io.LittleEndianDataInputStream;
import edu.washington.escience.myria.DbException;
import edu.washington.escience.myria.Schema;
import edu.washington.escience.myria.io.DataSource;
import edu.washington.escience.myria.storage.TupleBatch;
import edu.washington.escience.myria.storage.TupleBatchBuffer;

import java.io.BufferedInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
Expand All @@ -9,15 +17,6 @@
import java.io.InputStream;
import java.util.Objects;

import com.google.common.collect.ImmutableMap;
import com.google.common.io.LittleEndianDataInputStream;

import edu.washington.escience.myria.DbException;
import edu.washington.escience.myria.Schema;
import edu.washington.escience.myria.io.DataSource;
import edu.washington.escience.myria.storage.TupleBatch;
import edu.washington.escience.myria.storage.TupleBatchBuffer;

/**
* Reads data from binary file. This class is written base on the code from FileScan.java
*
Expand Down Expand Up @@ -72,23 +71,27 @@ protected final TupleBatch fetchNextReady() throws DbException {
while (buffer.numTuples() < TupleBatch.BATCH_SIZE) {
for (int count = 0; count < schema.numColumns(); ++count) {
switch (schema.getColumnType(count)) {
case BOOLEAN_TYPE:
buffer.putBoolean(count, dataInput.readBoolean());
break;
case DOUBLE_TYPE:
buffer.putDouble(count, dataInput.readDouble());
break;
case FLOAT_TYPE:
float readFloat = dataInput.readFloat();
buffer.putFloat(count, readFloat);
buffer.putFloat(count, dataInput.readFloat());
break;
case INT_TYPE:
buffer.putInt(count, dataInput.readInt());
break;
case LONG_TYPE:
long readLong = dataInput.readLong();
buffer.putLong(count, readLong);
buffer.putLong(count, dataInput.readLong());
break;
case STRING_TYPE:
buffer.putString(count, dataInput.readUTF());
break;
default:
throw new UnsupportedOperationException(
"BinaryFileScan only support reading fixed width type from the binary file.");
"BinaryFileScan does not support the type "+schema.getColumnType(count));
}
building = true;
}
Expand Down
31 changes: 14 additions & 17 deletions src/edu/washington/escience/myria/operator/FileScan.java
Original file line number Diff line number Diff line change
@@ -1,29 +1,26 @@
package edu.washington.escience.myria.operator;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Iterator;

import javax.annotation.Nullable;

import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.lang.BooleanUtils;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.Floats;

import edu.washington.escience.myria.DbException;
import edu.washington.escience.myria.Schema;
import edu.washington.escience.myria.io.DataSource;
import edu.washington.escience.myria.io.FileSource;
import edu.washington.escience.myria.storage.TupleBatch;
import edu.washington.escience.myria.storage.TupleBatchBuffer;
import edu.washington.escience.myria.util.DateTimeUtils;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.lang.BooleanUtils;

import javax.annotation.Nullable;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Iterator;

/**
* Reads data from a file. For CSV files, the default parser follows the RFC 4180 (http://tools.ietf.org/html/rfc4180).
Expand Down Expand Up @@ -208,11 +205,11 @@ protected TupleBatch fetchNextReady() throws DbException, IOException {
try {
switch (schema.getColumnType(column)) {
case BOOLEAN_TYPE:
if (Floats.tryParse(cell) != null) {
buffer.putBoolean(column, Floats.tryParse(cell) != 0);
} else if (BooleanUtils.toBoolean(cell)) {
Float f = Floats.tryParse(cell);
if (f != null)
buffer.putBoolean(column, f != 0);
else if (BooleanUtils.toBoolean(cell))
buffer.putBoolean(column, Boolean.parseBoolean(cell));
}
break;
case DOUBLE_TYPE:
buffer.putDouble(column, Double.parseDouble(cell));
Expand Down
114 changes: 70 additions & 44 deletions test/edu/washington/escience/myria/operator/BinaryFileScanTest.java
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
package edu.washington.escience.myria.operator;

import static org.junit.Assert.assertEquals;
import com.google.common.collect.ImmutableList;
import edu.washington.escience.myria.DbException;
import edu.washington.escience.myria.Schema;
import edu.washington.escience.myria.Type;
import edu.washington.escience.myria.io.ByteArraySource;
import edu.washington.escience.myria.io.FileSource;
import edu.washington.escience.myria.storage.TupleBatch;
import org.junit.Test;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Arrays;

import org.junit.Test;

import com.google.common.collect.ImmutableList;

import edu.washington.escience.myria.DbException;
import edu.washington.escience.myria.Schema;
import edu.washington.escience.myria.Type;
import edu.washington.escience.myria.io.FileSource;
import edu.washington.escience.myria.storage.TupleBatch;
import static org.junit.Assert.assertEquals;

/**
* To test BinaryFileScan, and it is based on the code from FileScanTest
Expand Down Expand Up @@ -108,46 +110,70 @@ public void testNumRowsFromCosmo24Star() throws DbException {
assertEquals(1291, getRowCount(bfs));
}

@Test
public void testGenerateReadBinary() throws Exception {
Schema schema = new Schema(ImmutableList.of( // one of each
Type.BOOLEAN_TYPE, Type.DOUBLE_TYPE, Type.FLOAT_TYPE, Type.INT_TYPE,
Type.LONG_TYPE, Type.STRING_TYPE
));
byte[] buf;
{
ByteArrayOutputStream bos = new ByteArrayOutputStream(1000);
DataOutputStream stream = new DataOutputStream(bos);
generateBinaryData(stream, schema.getColumnTypes().toArray(new Type[0]), 10);
stream.close();
buf = bos.toByteArray();
}

BinaryFileScan bfs = new BinaryFileScan(schema, new ByteArraySource(buf));
assertEquals(10, getRowCount(bfs));
}

/**
* Generates a binary file with the given file name, type array and the number of row.
* Generates a binary file with the given file name, type array and the number of rows.
*/
@SuppressWarnings("unused")
private void generateBinaryFile(String filename, Type[] typeAr, int numrows) {
try (DataOutputStream raf = new DataOutputStream(new FileOutputStream(filename))) {
generateBinaryData(raf, typeAr, numrows);
} catch (IOException e) {
throw new RuntimeException("", e);
}
}

/**
* Write binary data to a stream, for given types and number of rows.
*
* @param filename The filename to create.
* @param typeAr The type array.
* @param row The number of row.
* @param stream The data stream to write data to. Does not close the stream.
*/
@SuppressWarnings("unused")
private void generateBinaryFile(String filename, Type[] typeAr, int row) {
try {
RandomAccessFile raf = new RandomAccessFile(filename, "rw");
for (int i = 0; i < row; i++) {
for (Type element : typeAr) {
switch (element) {
case BOOLEAN_TYPE:
raf.writeBoolean(true);
break;
case DOUBLE_TYPE:
raf.writeDouble(i);
break;
case FLOAT_TYPE:
raf.writeFloat(i);
break;
case INT_TYPE:
raf.writeInt(i);
break;
case LONG_TYPE:
raf.writeLong(i);
break;
default:
throw new UnsupportedOperationException(
"can only write fix length field to bin file");
}
private void generateBinaryData(DataOutputStream stream, Type[] typeAr, int numrows) throws IOException {
for (int i = 0; i < numrows; i++) {
for (Type element : typeAr) {
switch (element) {
case BOOLEAN_TYPE:
stream.writeBoolean(true);
break;
case DOUBLE_TYPE:
stream.writeDouble(i);
break;
case FLOAT_TYPE:
stream.writeFloat(i);
break;
case INT_TYPE:
stream.writeInt(i);
break;
case LONG_TYPE:
stream.writeLong(i);
break;
case STRING_TYPE:
stream.writeUTF("string"+i);
break;
default:
throw new UnsupportedOperationException(
"can only write fix length field to bin file");
}
}
raf.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}

Expand Down

0 comments on commit 4633a08

Please sign in to comment.