Skip to content

Commit

Permalink
fixing and renaming CSVFragmentTupleSource
Browse files Browse the repository at this point in the history
  • Loading branch information
jortiz16 committed Jun 7, 2017
1 parent 625b0e7 commit 406be71
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 31 deletions.
21 changes: 16 additions & 5 deletions src/edu/washington/escience/myria/CsvTupleReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Iterator;

import javax.annotation.Nullable;
Expand All @@ -27,7 +23,6 @@

import edu.washington.escience.myria.storage.TupleBatch;
import edu.washington.escience.myria.storage.TupleBatchBuffer;
import edu.washington.escience.myria.storage.TupleUtils;
import edu.washington.escience.myria.util.DateTimeUtils;

/**
Expand Down Expand Up @@ -192,6 +187,22 @@ public Schema getSchema() {
return schema;
}

public char getDelimiter() {
return delimiter;
}

public char getQuote() {
return quote;
}

public char getEscape() {
return escape;
}

public Integer getSkip() {
return numberOfSkippedLines;
}

@Override
public void close() throws IOException {
parser = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,17 @@
import edu.washington.escience.myria.CsvTupleReader;
import edu.washington.escience.myria.api.encoding.QueryConstruct.ConstructArgs;
import edu.washington.escience.myria.io.AmazonS3Source;
import edu.washington.escience.myria.operator.CSVFileScanFragment;
import edu.washington.escience.myria.operator.CSVFragmentTupleSource;

public class CSVFileScanFragmentEncoding extends LeafOperatorEncoding<CSVFileScanFragment> {
public class CSVFragmentTupleSourceEncoding extends LeafOperatorEncoding<CSVFragmentTupleSource> {

@Required public CsvTupleReader reader;
@Required public AmazonS3Source source;

public Character delimiter;
public Character quote;
public Character escape;
public Integer skip;

public Set<Integer> workers;

@Override
public CSVFileScanFragment construct(ConstructArgs args) {
public CSVFragmentTupleSource construct(ConstructArgs args) {
/* Attempt to use all the workers if not specified */
if (workers == null) {
workers = args.getServer().getAliveWorkers();
Expand All @@ -30,7 +25,13 @@ public CSVFileScanFragment construct(ConstructArgs args) {
int[] workersArray =
args.getServer().parallelIngestComputeNumWorkers(source.getFileSize(), workers);

return new CSVFileScanFragment(
source, reader.getSchema(), workersArray, delimiter, quote, escape, skip);
return new CSVFragmentTupleSource(
source,
reader.getSchema(),
workersArray,
reader.getDelimiter(),
reader.getQuote(),
reader.getEscape(),
reader.getSkip());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import edu.washington.escience.myria.api.MyriaApiException;
import edu.washington.escience.myria.api.encoding.QueryConstruct.ConstructArgs;
import edu.washington.escience.myria.operator.CSVFragmentTupleSource;
import edu.washington.escience.myria.operator.Operator;

/**
Expand All @@ -29,7 +30,7 @@
@Type(name = "Consumer", value = ConsumerEncoding.class),
@Type(name = "Counter", value = CounterEncoding.class),
@Type(name = "CrossWithSingleton", value = CrossWithSingletonEncoding.class),
@Type(name = "CSVFileScanFragment", value = CSVFileScanFragmentEncoding.class),
@Type(name = "CSVFileScanFragment", value = CSVFragmentTupleSource.class),
@Type(name = "DbInsert", value = DbInsertEncoding.class),
@Type(name = "DbQueryScan", value = QueryScanEncoding.class),
@Type(name = "DbCreateIndex", value = CreateIndexEncoding.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.util.Iterator;

import javax.annotation.Nullable;

import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.BooleanUtils;

import com.google.common.base.MoreObjects;
Expand All @@ -31,13 +29,12 @@
import edu.washington.escience.myria.io.FileSource;
import edu.washington.escience.myria.storage.TupleBatch;
import edu.washington.escience.myria.storage.TupleBatchBuffer;
import edu.washington.escience.myria.storage.TupleUtils;
import edu.washington.escience.myria.util.DateTimeUtils;

/**
*
*/
public class CSVFileScanFragment extends LeafOperator {
public class CSVFragmentTupleSource extends LeafOperator {

/** The Schema of the relation stored in this file. */
private final Schema schema;
Expand Down Expand Up @@ -85,9 +82,9 @@ public class CSVFileScanFragment extends LeafOperator {
* The logger for debug, trace, etc. messages in this class.
*/
private static final org.slf4j.Logger LOGGER =
org.slf4j.LoggerFactory.getLogger(CSVFileScanFragment.class);
org.slf4j.LoggerFactory.getLogger(CSVFragmentTupleSource.class);

public CSVFileScanFragment(
public CSVFragmentTupleSource(
final String filename,
final Schema schema,
final long startByteRange,
Expand All @@ -96,7 +93,7 @@ public CSVFileScanFragment(
this(filename, schema, startByteRange, endByteRange, isLastWorker, null, null, null, null);
}

public CSVFileScanFragment(
public CSVFragmentTupleSource(
final DataSource source,
final Schema schema,
final long startByteRange,
Expand All @@ -105,7 +102,7 @@ public CSVFileScanFragment(
this(source, schema, startByteRange, endByteRange, isLastWorker, null, null, null, null);
}

public CSVFileScanFragment(
public CSVFragmentTupleSource(
final String filename,
final Schema schema,
final long startByteRange,
Expand All @@ -124,7 +121,7 @@ public CSVFileScanFragment(
null);
}

public CSVFileScanFragment(
public CSVFragmentTupleSource(
final DataSource source,
final Schema schema,
final long startByteRange,
Expand All @@ -134,7 +131,7 @@ public CSVFileScanFragment(
this(source, schema, startByteRange, endByteRange, isLastWorker, delimiter, null, null, null);
}

public CSVFileScanFragment(
public CSVFragmentTupleSource(
final String filename,
final Schema schema,
final long startByteRange,
Expand All @@ -156,7 +153,7 @@ public CSVFileScanFragment(
numberOfSkippedLines);
}

public CSVFileScanFragment(
public CSVFragmentTupleSource(
final DataSource source,
final Schema schema,
final long partitionStartByteRange,
Expand Down Expand Up @@ -185,7 +182,7 @@ public CSVFileScanFragment(
flagAsRangeSelected = true;
}

public CSVFileScanFragment(
public CSVFragmentTupleSource(
final AmazonS3Source source,
final Schema schema,
final int[] workerIds,
Expand Down
6 changes: 3 additions & 3 deletions src/edu/washington/escience/myria/parallel/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
import edu.washington.escience.myria.io.DataSink;
import edu.washington.escience.myria.io.UriSink;
import edu.washington.escience.myria.operator.Apply;
import edu.washington.escience.myria.operator.CSVFileScanFragment;
import edu.washington.escience.myria.operator.CSVFragmentTupleSource;
import edu.washington.escience.myria.operator.DbCreateFunction;
import edu.washington.escience.myria.operator.DbCreateIndex;
import edu.washington.escience.myria.operator.DbCreateView;
Expand Down Expand Up @@ -899,8 +899,8 @@ public DatasetStatus parallelIngestDataset(

Map<Integer, SubQueryPlan> workerPlans = new HashMap<>();
for (int workerID = 1; workerID <= workersArray.length; workerID++) {
CSVFileScanFragment scanFragment =
new CSVFileScanFragment(
CSVFragmentTupleSource scanFragment =
new CSVFragmentTupleSource(
s3Source, schema, workersArray, delimiter, quote, escape, numberOfSkippedLines);
workerPlans.put(
workersArray[workerID - 1],
Expand Down

0 comments on commit 406be71

Please sign in to comment.