Skip to content

Commit

Permalink
Finishing rebase from master
Browse files Browse the repository at this point in the history
  • Loading branch information
kraftp committed Mar 21, 2018
1 parent 0554765 commit a3e2205
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import edu.stanford.futuredata.macrobase.analysis.summary.Explanation;
import edu.stanford.futuredata.macrobase.analysis.summary.aplinear.APLOutlierSummarizer;
import edu.stanford.futuredata.macrobase.analysis.summary.BatchSummarizer;
import edu.stanford.futuredata.macrobase.analysis.summary.ratios.ExplanationMetric;
import edu.stanford.futuredata.macrobase.analysis.summary.ratios.GlobalRatioMetric;
import edu.stanford.futuredata.macrobase.analysis.summary.ratios.RiskRatioMetric;
import edu.stanford.futuredata.macrobase.distributed.analysis.classify.DistributedClassifier;
import edu.stanford.futuredata.macrobase.distributed.analysis.classify.PredicateClassifierDistributed;
import edu.stanford.futuredata.macrobase.distributed.analysis.summary.DistributedBatchSummarizer;
Expand Down Expand Up @@ -108,10 +111,7 @@ public Classifier getClassifier() throws MacroBaseException {
}
}

<<<<<<< d92c91618cbb2c4186681691bb997ae31e271752
public BatchSummarizer getSummarizer(String outlierColumnName) throws MacroBaseException {
=======
private DistributedClassifier getDistributedClassifier() throws MacrobaseException {
private DistributedClassifier getDistributedClassifier() throws MacroBaseException {
switch (classifierType.toLowerCase()) {
case "predicate": {
if (isStrPredicate){
Expand All @@ -122,12 +122,12 @@ private DistributedClassifier getDistributedClassifier() throws MacrobaseExcepti
return classifier;
}
default : {
throw new MacrobaseException("Bad Classifier Type");
throw new MacroBaseException("Bad Classifier Type");
}
}
}

public ExplanationMetric getRatioMetric() throws MacrobaseException {
public ExplanationMetric getRatioMetric() throws MacroBaseException {
switch (ratioMetric.toLowerCase()) {
case "globalratio": {
return new GlobalRatioMetric();
Expand All @@ -136,13 +136,12 @@ public ExplanationMetric getRatioMetric() throws MacrobaseException {
return new RiskRatioMetric();
}
default: {
throw new MacrobaseException("Bad Ratio Metric");
throw new MacroBaseException("Bad Ratio Metric");
}
}
}

private BatchSummarizer getSummarizer(String outlierColumnName) throws MacrobaseException {
>>>>>>> Fully working distributed pipeline
private BatchSummarizer getSummarizer(String outlierColumnName) throws MacroBaseException {
switch (summarizerType.toLowerCase()) {
case "fpgrowth": {
FPGrowthSummarizer summarizer = new FPGrowthSummarizer();
Expand All @@ -164,12 +163,12 @@ private BatchSummarizer getSummarizer(String outlierColumnName) throws Macrobase
return summarizer;
}
default: {
throw new MacrobaseException("Bad Summarizer Type");
throw new MacroBaseException("Bad Summarizer Type");
}
}
}

private DistributedBatchSummarizer getDistributedSummarizer(String outlierColumnName) throws MacrobaseException {
private DistributedBatchSummarizer getDistributedSummarizer(String outlierColumnName) throws MacroBaseException {
switch (summarizerType.toLowerCase()) {
case "aplineardistributed": {
APLOutlierSummarizerDistributed summarizer = new APLOutlierSummarizerDistributed();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package edu.stanford.futuredata.macrobase.distributed.analysis.classify;

import edu.stanford.futuredata.macrobase.analysis.classify.Classifier;
import edu.stanford.futuredata.macrobase.analysis.classify.stats.MBPredicate;
import edu.stanford.futuredata.macrobase.datamodel.DataFrame;
import edu.stanford.futuredata.macrobase.datamodel.Schema;
import edu.stanford.futuredata.macrobase.distributed.datamodel.DistributedDataFrame;
import edu.stanford.futuredata.macrobase.util.MacrobaseException;
import edu.stanford.futuredata.macrobase.util.MacrobaseInternalError;
import edu.stanford.futuredata.macrobase.util.MacroBaseException;
import org.apache.spark.api.java.JavaPairRDD;
import scala.Serializable;
import scala.Tuple2;

import java.util.function.DoublePredicate;
Expand Down Expand Up @@ -40,10 +36,10 @@ public class PredicateClassifierDistributed extends DistributedClassifier {
* @param columnName Column on which to classifier outliers
* @param predicateStr Predicate used for classification: "==", "!=", "<", ">", "<=", or ">="
* @param sentinel Sentinel value used when evaluating the predicate to determine outlier
* @throws MacrobaseException
* @throws MacroBaseException
*/
public PredicateClassifierDistributed(final String columnName, final String predicateStr, final double sentinel)
throws MacrobaseException {
throws MacroBaseException {
super(columnName);
this.doubleSentinel = sentinel;
this.predicateStr = predicateStr;
Expand All @@ -55,10 +51,10 @@ public PredicateClassifierDistributed(final String columnName, final String pred
* @param columnName Column on which to classifier outliers
* @param predicateStr Predicate used for classification: "==", "!=", "<", ">", "<=", or ">="
* @param sentinel Sentinel value used when evaluating the predicate to determine outlier
* @throws MacrobaseException
* @throws MacroBaseException
*/
public PredicateClassifierDistributed(final String columnName, final String predicateStr, final String sentinel)
throws MacrobaseException {
throws MacroBaseException {
super(columnName);
this.stringSentinel = sentinel;
this.predicateStr = predicateStr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import edu.stanford.futuredata.macrobase.distributed.analysis.summary.DistributedBatchSummarizer;
import edu.stanford.futuredata.macrobase.distributed.analysis.summary.util.AttributeEncoderDistributed;
import edu.stanford.futuredata.macrobase.distributed.datamodel.DistributedDataFrame;
import edu.stanford.futuredata.macrobase.util.MacrobaseSQLException;
import edu.stanford.futuredata.macrobase.util.MacroBaseSQLException;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Expand Down Expand Up @@ -84,7 +84,7 @@ public static JavaPairRDD<String[], double[]> transformSparkDataFrame(Dataset<Ro
else if (rowObject instanceof java.lang.Double)
newAttributesCol[i] = Double.toString((Double) rowObject);
else
throw new MacrobaseSQLException("Only strings and doubles supported in schema not " + rowObject.getClass().getName());
throw new MacroBaseSQLException("Only strings and doubles supported in schema not " + rowObject.getClass().getName());
}
}
return new Tuple2<>(newAttributesCol, newAggregatesCol);
Expand All @@ -102,7 +102,7 @@ else if (rowObject instanceof java.lang.Double)
else if (rowObject instanceof java.lang.Double)
newAttributesCol[i] = Double.toString((Double) rowObject);
else
throw new MacrobaseSQLException("Only strings and doubles supported in schema not " + rowObject.getClass().getName());
throw new MacroBaseSQLException("Only strings and doubles supported in schema not " + rowObject.getClass().getName());
}
}
return new Tuple2<>(newAttributesCol, newAggregatesCol);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import edu.stanford.futuredata.macrobase.analysis.summary.util.*;
import edu.stanford.futuredata.macrobase.analysis.summary.util.qualitymetrics.QualityMetric;
import edu.stanford.futuredata.macrobase.util.MacrobaseInternalError;
import edu.stanford.futuredata.macrobase.util.MacroBaseInternalError;
import edu.stanford.futuredata.macrobase.analysis.summary.aplinear.APLExplanationResult;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
Expand Down Expand Up @@ -208,7 +208,7 @@ public static List<APLExplanationResult> explain(
}
}
} else {
throw new MacrobaseInternalError("High Order not supported");
throw new MacroBaseInternalError("High Order not supported");
}
return thisThreadSetAggregates.asHashMap();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import edu.stanford.futuredata.macrobase.datamodel.Schema.ColType;
import edu.stanford.futuredata.macrobase.distributed.analysis.summary.aplinearDistributed.APLOutlierSummarizerDistributed;
import edu.stanford.futuredata.macrobase.sql.tree.*;
import edu.stanford.futuredata.macrobase.util.MacrobaseException;
import edu.stanford.futuredata.macrobase.util.MacrobaseSQLException;
import edu.stanford.futuredata.macrobase.util.MacroBaseException;
import edu.stanford.futuredata.macrobase.util.MacroBaseSQLException;

import java.util.*;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -41,9 +41,9 @@ class QueryEngineDistributed {
* Top-level method for importing tables from CSV files into MacroBase SQL
*
* @return A DataFrame that contains the data loaded from the CSV file
* @throws MacrobaseSQLException if there's an error parsing the CSV file
* @throws MacroBaseSQLException if there's an error parsing the CSV file
*/
Dataset<Row> importTableFromCsv(ImportCsv importStatement) throws MacrobaseSQLException {
Dataset<Row> importTableFromCsv(ImportCsv importStatement) throws MacroBaseSQLException {
// Increase the number of partitions to ensure enough memory for parsing overhead
int increasedPartitions = numPartitions * 10;
final String fileName = importStatement.getFilename();
Expand Down Expand Up @@ -87,7 +87,7 @@ Dataset<Row> importTableFromCsv(ImportCsv importStatement) throws MacrobaseSQLEx
StructField field = DataTypes.createStructField(fieldName, DataTypes.DoubleType, true);
fields.add(field);
} else {
throw new MacrobaseSQLException("Only strings and doubles supported in schema");
throw new MacroBaseSQLException("Only strings and doubles supported in schema");
}
}
}
Expand All @@ -114,7 +114,7 @@ Dataset<Row> importTableFromCsv(ImportCsv importStatement) throws MacrobaseSQLEx
rowList.add(Double.NaN);
}
} else {
throw new MacrobaseSQLException("Only strings and doubles supported in schema");
throw new MacroBaseSQLException("Only strings and doubles supported in schema");
}
}
}
Expand All @@ -129,18 +129,18 @@ Dataset<Row> importTableFromCsv(ImportCsv importStatement) throws MacrobaseSQLEx
df.createOrReplaceTempView(tableName);
return df;
} catch (Exception e) {
throw new MacrobaseSQLException(e.getMessage());
throw new MacroBaseSQLException(e.getMessage());
}
}

/**
* Top-level method for executing a SQL query in MacroBase SQL
*
* @return A DataFrame corresponding to the results of the query
* @throws MacrobaseException If there's an error -- syntactic or logical -- processing the
* @throws MacroBaseException If there's an error -- syntactic or logical -- processing the
* query, an exception is thrown
*/
Dataset<Row> executeQuery(Query query) throws MacrobaseException {
Dataset<Row> executeQuery(Query query) throws MacroBaseException {
QueryBody queryBody = query.getQueryBody();
if (queryBody instanceof QuerySpecification) {
// If the query is pure SQL (without MBSQL commands) just execute it
Expand All @@ -155,7 +155,7 @@ Dataset<Row> executeQuery(Query query) throws MacrobaseException {
log.debug(diffQuery.toString());
return executeDiffQuerySpec(diffQuery);
}
throw new MacrobaseSQLException(
throw new MacroBaseSQLException(
"query of type " + queryBody.getClass().getSimpleName() + " not yet supported");
}

Expand All @@ -164,11 +164,11 @@ Dataset<Row> executeQuery(Query query) throws MacrobaseException {
* contain DIFF and SPLIT operators).
*
* @return A DataFrame containing the results of the query
* @throws MacrobaseException If there's an error -- syntactic or logical -- processing the
* @throws MacroBaseException If there's an error -- syntactic or logical -- processing the
* query, an exception is thrown
*/
private Dataset<Row> executeDiffQuerySpec(final DiffQuerySpecification diffQuery)
throws MacrobaseException {
throws MacroBaseException {
final String outlierColName = "outlier_col";
final Dataset<Row> outliersDF;
final Dataset<Row> inliersDF;
Expand Down Expand Up @@ -207,13 +207,13 @@ private Dataset<Row> executeDiffQuerySpec(final DiffQuerySpecification diffQuery
.map(Identifier::getValue)
.collect(Collectors.toList());
if ((explainCols.size() == 1) && explainCols.get(0).equals("*")) {
throw new MacrobaseSQLException("No ON * yet");
throw new MacroBaseSQLException("No ON * yet");
}

// Make sure all attribute columns are valid.
for (String explainCol: explainCols) {
if (!Arrays.asList(outliersDF.columns()).contains(explainCol))
throw new MacrobaseSQLException(
throw new MacroBaseSQLException(
"ON " + Joiner.on(", ").join(explainCols) + " not present in table");
}

Expand Down

0 comments on commit a3e2205

Please sign in to comment.