Skip to content

Commit

Permalink
Getting ingest optimizations into mb-spark-sql
Browse files Browse the repository at this point in the history
  • Loading branch information
kraftp committed Mar 13, 2018
1 parent db04ab6 commit 4bffa02
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ private MacroBaseSQLRepl(final boolean userWantsPaging) throws IOException {
.builder()
.appName("macrobase-sql-spark")
.getOrCreate();
queryEngineDistributed = new QueryEngineDistributed(spark);
// TODO: add configuration parameter for numPartitions
queryEngineDistributed = new QueryEngineDistributed(spark, 4);
} else {
queryEngineDistributed = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ class QueryEngineDistributed {
private final int numPartitions;
private final SparkSession spark;

QueryEngineDistributed(SparkSession spark) {
QueryEngineDistributed(SparkSession spark, int numPartitions) {
this.spark = spark;
numPartitions = 4; // TODO: add configuration parameter for numPartitions
this.numPartitions = numPartitions;
}

/**
Expand All @@ -44,44 +44,36 @@ class QueryEngineDistributed {
* @throws MacrobaseSQLException if there's an error parsing the CSV file
*/
Dataset<Row> importTableFromCsv(ImportCsv importStatement) throws MacrobaseSQLException {
// Increase the number of partitions to ensure enough memory for parsing overhead
int increasedPartitions = numPartitions * 10;
final String fileName = importStatement.getFilename();
final String tableName = importStatement.getTableName().toString();
final Map<String, ColType> schema = importStatement.getSchema();
try {
// Distribute and parse
JavaRDD<String[]> datasetRDD = spark.sparkContext().textFile(fileName, numPartitions).toJavaRDD().mapPartitions(
(Iterator<String> iter) -> {
CsvParserSettings settings = new CsvParserSettings();
settings.getFormat().setLineSeparator("\n");
settings.setMaxCharsPerColumn(16384);
CsvParser csvParser = new CsvParser(settings);
List<String[]> parsedRows = new ArrayList<>();
while(iter.hasNext()) {
String row = iter.next();
String[] parsedRow = csvParser.parseLine(row);
parsedRows.add(parsedRow);
JavaRDD<String> fileRDD = spark.sparkContext().textFile(fileName, increasedPartitions).toJavaRDD();
// Extract the header
CsvParserSettings headerSettings = new CsvParserSettings();
headerSettings.getFormat().setLineSeparator("\n");
headerSettings.setMaxCharsPerColumn(16384);
CsvParser headerParser = new CsvParser(headerSettings);
String[] header = headerParser.parseLine(fileRDD.first());
// Remove the header
fileRDD = fileRDD.mapPartitionsWithIndex(
(Integer index, Iterator<String> iter) -> {
if (index == 0) {
iter.next();
}
return parsedRows.iterator();
return iter;
}, true
);
// Extract the header
String[] header = datasetRDD.first();
JavaRDD<String> repartitionedRDD = fileRDD.repartition(increasedPartitions);
Map<Integer, ColType> indexToColTypeMap = new HashMap<>();
for (int i = 0; i < header.length; i++) {
if (schema.containsKey(header[i])) {
indexToColTypeMap.put(i, schema.get(header[i]));
}
}
// Remove the header
datasetRDD = datasetRDD.mapPartitionsWithIndex(
(Integer index, Iterator<String[]> iter) -> {
if (index == 0) {
iter.next();
iter.remove();
}
return iter;
}, true
);

// Create a schema from the header for the eventual Spark Dataframe.
List<StructField> fields = new ArrayList<>();
Expand All @@ -100,27 +92,37 @@ Dataset<Row> importTableFromCsv(ImportCsv importStatement) throws MacrobaseSQLEx
}
}

// Convert the RDD of arrays into an RDD of Spark Rows.
JavaRDD<Row> datasetRowRDD = datasetRDD.map((String[] record) ->
{
List<Object> rowList = new ArrayList<>();
for (int i = 0; i < record.length; i++) {
if (indexToColTypeMap.containsKey(i)) {
if (indexToColTypeMap.get(i) == ColType.STRING) {
rowList.add(record[i]);
} else if (indexToColTypeMap.get(i) == ColType.DOUBLE) {
try {
rowList.add(Double.parseDouble(record[i]));
} catch (NumberFormatException e) {
rowList.add(Double.NaN);
JavaRDD<Row> datasetRowRDD = repartitionedRDD.mapPartitions(
(Iterator<String> iter) -> {
CsvParserSettings settings = new CsvParserSettings();
settings.getFormat().setLineSeparator("\n");
settings.setMaxCharsPerColumn(16384);
CsvParser csvParser = new CsvParser(settings);
List<Row> parsedRows = new ArrayList<>();
while(iter.hasNext()) {
String row = iter.next();
String[] record = csvParser.parseLine(row);
List<Object> rowList = new ArrayList<>();
for (int i = 0; i < record.length; i++) {
if (indexToColTypeMap.containsKey(i)) {
if (indexToColTypeMap.get(i) == ColType.STRING) {
rowList.add(record[i]);
} else if (indexToColTypeMap.get(i) == ColType.DOUBLE) {
try {
rowList.add(Double.parseDouble(record[i]));
} catch (NumberFormatException e) {
rowList.add(Double.NaN);
}
} else {
throw new MacrobaseSQLException("Only strings and doubles supported in schema");
}
}
}
} else {
throw new MacrobaseSQLException("Only strings and doubles supported in schema");
parsedRows.add(RowFactory.create(rowList.toArray()));
}
}
}
return RowFactory.create(rowList.toArray());
});
return parsedRows.iterator();
}, true
);
// Create the Spark Dataset from the RDD of rows and the schema.
Dataset<Row> df = spark.createDataFrame(datasetRowRDD, DataTypes.createStructType(fields));
// Register the Dataset by name so Spark SQL commands recognize it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public void testIngestFromCSV() throws Exception {
.master("local[4]")
.appName("macrobase-sql-spark")
.getOrCreate();
QueryEngineDistributed queryEngineDistributed = new QueryEngineDistributed(spark);
QueryEngineDistributed queryEngineDistributed = new QueryEngineDistributed(spark, 1);
final Statement stmt;
try {
stmt = parser.createStatement(queryStr.replace(";", ""));
Expand All @@ -43,6 +43,17 @@ public void testIngestFromCSV() throws Exception {

List<Row> collectedResult = result.collectAsList();

collectedResult.sort((Row recordOne, Row recordTwo) -> {
Double doubleOne = Double.parseDouble(recordOne.getString(0));
Double doubleTwo = Double.parseDouble(recordTwo.getString(0));

if (doubleOne > doubleTwo)
return 1;
else if (doubleOne.equals(doubleTwo))
return 0;
else
return -1;});

assertEquals(3, collectedResult.size());

assertEquals("2.0", collectedResult.get(0).getString(0));
Expand All @@ -67,7 +78,7 @@ public void testDiffBasic() throws Exception {
.master("local[4]")
.appName("macrobase-sql-spark")
.getOrCreate();
QueryEngineDistributed queryEngineDistributed = new QueryEngineDistributed(spark);
QueryEngineDistributed queryEngineDistributed = new QueryEngineDistributed(spark, 1);
final Statement importStmt;
try {
importStmt = parser.createStatement(importStr.replace(";", ""));
Expand Down Expand Up @@ -114,7 +125,7 @@ public void testDiffSplit() throws Exception {
.master("local[4]")
.appName("macrobase-sql-spark")
.getOrCreate();
QueryEngineDistributed queryEngineDistributed = new QueryEngineDistributed(spark);
QueryEngineDistributed queryEngineDistributed = new QueryEngineDistributed(spark, 1);
final Statement importStmt;
try {
importStmt = parser.createStatement(importStr.replace(";", ""));
Expand Down

0 comments on commit 4bffa02

Please sign in to comment.