Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into closure-cleaner
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Apr 25, 2015
2 parents e45e904 + a11c868 commit 4aab379
Show file tree
Hide file tree
Showing 45 changed files with 2,763 additions and 693 deletions.

This file was deleted.

18 changes: 9 additions & 9 deletions docs/sql-programming-guide.md
Expand Up @@ -681,8 +681,8 @@ In the simplest form, the default data source (`parquet` unless otherwise config
<div data-lang="scala" markdown="1">

{% highlight scala %}
val df = sqlContext.load("people.parquet")
df.select("name", "age").save("namesAndAges.parquet")
val df = sqlContext.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").save("namesAndFavColors.parquet")
{% endhighlight %}

</div>
Expand All @@ -691,8 +691,8 @@ df.select("name", "age").save("namesAndAges.parquet")

{% highlight java %}

DataFrame df = sqlContext.load("people.parquet");
df.select("name", "age").save("namesAndAges.parquet");
DataFrame df = sqlContext.load("examples/src/main/resources/users.parquet");
df.select("name", "favorite_color").save("namesAndFavColors.parquet");

{% endhighlight %}

Expand All @@ -702,8 +702,8 @@ df.select("name", "age").save("namesAndAges.parquet");

{% highlight python %}

df = sqlContext.load("people.parquet")
df.select("name", "age").save("namesAndAges.parquet")
df = sqlContext.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").save("namesAndFavColors.parquet")

{% endhighlight %}

Expand All @@ -722,7 +722,7 @@ using this syntax.
<div data-lang="scala" markdown="1">

{% highlight scala %}
val df = sqlContext.load("people.json", "json")
val df = sqlContext.load("examples/src/main/resources/people.json", "json")
df.select("name", "age").save("namesAndAges.parquet", "parquet")
{% endhighlight %}

Expand All @@ -732,7 +732,7 @@ df.select("name", "age").save("namesAndAges.parquet", "parquet")

{% highlight java %}

DataFrame df = sqlContext.load("people.json", "json");
DataFrame df = sqlContext.load("examples/src/main/resources/people.json", "json");
df.select("name", "age").save("namesAndAges.parquet", "parquet");

{% endhighlight %}
Expand All @@ -743,7 +743,7 @@ df.select("name", "age").save("namesAndAges.parquet", "parquet");

{% highlight python %}

df = sqlContext.load("people.json", "json")
df = sqlContext.load("examples/src/main/resources/people.json", "json")
df.select("name", "age").save("namesAndAges.parquet", "parquet")

{% endhighlight %}
Expand Down
2 changes: 1 addition & 1 deletion examples/pom.xml
Expand Up @@ -245,7 +245,7 @@
<dependency>
<groupId>com.twitter</groupId>
<artifactId>algebird-core_${scala.binary.version}</artifactId>
<version>0.8.1</version>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
Expand Down
7 changes: 6 additions & 1 deletion examples/src/main/python/sql.py
Expand Up @@ -18,6 +18,7 @@
from __future__ import print_function

import os
import sys

from pyspark import SparkContext
from pyspark.sql import SQLContext
Expand Down Expand Up @@ -50,7 +51,11 @@

# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files.
path = os.path.join(os.environ['SPARK_HOME'], "examples/src/main/resources/people.json")
if len(sys.argv) < 2:
path = "file://" + \
os.path.join(os.environ['SPARK_HOME'], "examples/src/main/resources/people.json")
else:
path = sys.argv[1]
# Create a DataFrame from the file(s) pointed to by path
people = sqlContext.jsonFile(path)
# root
Expand Down
Expand Up @@ -22,10 +22,9 @@ import scala.language.reflectiveCalls

import scopt.OptionParser

import org.apache.spark.ml.tree.DecisionTreeModel
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.examples.mllib.AbstractParams
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.{Pipeline, PipelineStage, Transformer}
import org.apache.spark.ml.classification.{DecisionTreeClassificationModel, DecisionTreeClassifier}
import org.apache.spark.ml.feature.{VectorIndexer, StringIndexer}
import org.apache.spark.ml.regression.{DecisionTreeRegressionModel, DecisionTreeRegressor}
Expand Down Expand Up @@ -64,8 +63,6 @@ object DecisionTreeExample {
maxBins: Int = 32,
minInstancesPerNode: Int = 1,
minInfoGain: Double = 0.0,
numTrees: Int = 1,
featureSubsetStrategy: String = "auto",
fracTest: Double = 0.2,
cacheNodeIds: Boolean = false,
checkpointDir: Option[String] = None,
Expand Down Expand Up @@ -123,8 +120,8 @@ object DecisionTreeExample {
.required()
.action((x, c) => c.copy(input = x))
checkConfig { params =>
if (params.fracTest < 0 || params.fracTest > 1) {
failure(s"fracTest ${params.fracTest} value incorrect; should be in [0,1].")
if (params.fracTest < 0 || params.fracTest >= 1) {
failure(s"fracTest ${params.fracTest} value incorrect; should be in [0,1).")
} else {
success
}
Expand Down Expand Up @@ -200,9 +197,18 @@ object DecisionTreeExample {
throw new IllegalArgumentException("Algo ${params.algo} not supported.")
}
}
val dataframes = splits.map(_.toDF()).map(labelsToStrings).map(_.cache())
val dataframes = splits.map(_.toDF()).map(labelsToStrings)
val training = dataframes(0).cache()
val test = dataframes(1).cache()

(dataframes(0), dataframes(1))
val numTraining = training.count()
val numTest = test.count()
val numFeatures = training.select("features").first().getAs[Vector](0).size
println("Loaded data:")
println(s" numTraining = $numTraining, numTest = $numTest")
println(s" numFeatures = $numFeatures")

(training, test)
}

def run(params: Params) {
Expand All @@ -217,13 +223,6 @@ object DecisionTreeExample {
val (training: DataFrame, test: DataFrame) =
loadDatasets(sc, params.input, params.dataFormat, params.testInput, algo, params.fracTest)

val numTraining = training.count()
val numTest = test.count()
val numFeatures = training.select("features").first().getAs[Vector](0).size
println("Loaded data:")
println(s" numTraining = $numTraining, numTest = $numTest")
println(s" numFeatures = $numFeatures")

// Set up Pipeline
val stages = new mutable.ArrayBuffer[PipelineStage]()
// (1) For classification, re-index classes.
Expand All @@ -241,7 +240,7 @@ object DecisionTreeExample {
.setOutputCol("indexedFeatures")
.setMaxCategories(10)
stages += featuresIndexer
// (3) Learn DecisionTree
// (3) Learn Decision Tree
val dt = algo match {
case "classification" =>
new DecisionTreeClassifier()
Expand Down Expand Up @@ -275,62 +274,86 @@ object DecisionTreeExample {
println(s"Training time: $elapsedTime seconds")

// Get the trained Decision Tree from the fitted PipelineModel
val treeModel: DecisionTreeModel = algo match {
algo match {
case "classification" =>
pipelineModel.getModel[DecisionTreeClassificationModel](
val treeModel = pipelineModel.getModel[DecisionTreeClassificationModel](
dt.asInstanceOf[DecisionTreeClassifier])
if (treeModel.numNodes < 20) {
println(treeModel.toDebugString) // Print full model.
} else {
println(treeModel) // Print model summary.
}
case "regression" =>
pipelineModel.getModel[DecisionTreeRegressionModel](dt.asInstanceOf[DecisionTreeRegressor])
case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.")
}
if (treeModel.numNodes < 20) {
println(treeModel.toDebugString) // Print full model.
} else {
println(treeModel) // Print model summary.
}

// Predict on training
val trainingFullPredictions = pipelineModel.transform(training).cache()
val trainingPredictions = trainingFullPredictions.select("prediction")
.map(_.getDouble(0))
val trainingLabels = trainingFullPredictions.select(labelColName).map(_.getDouble(0))
// Predict on test data
val testFullPredictions = pipelineModel.transform(test).cache()
val testPredictions = testFullPredictions.select("prediction")
.map(_.getDouble(0))
val testLabels = testFullPredictions.select(labelColName).map(_.getDouble(0))

// For classification, print number of classes for reference.
if (algo == "classification") {
val numClasses =
MetadataUtils.getNumClasses(trainingFullPredictions.schema(labelColName)) match {
case Some(n) => n
case None => throw new RuntimeException(
"DecisionTreeExample had unknown failure when indexing labels for classification.")
val treeModel = pipelineModel.getModel[DecisionTreeRegressionModel](
dt.asInstanceOf[DecisionTreeRegressor])
if (treeModel.numNodes < 20) {
println(treeModel.toDebugString) // Print full model.
} else {
println(treeModel) // Print model summary.
}
println(s"numClasses = $numClasses.")
case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.")
}

// Evaluate model on training, test data
algo match {
case "classification" =>
val trainingAccuracy =
new MulticlassMetrics(trainingPredictions.zip(trainingLabels)).precision
println(s"Train accuracy = $trainingAccuracy")
val testAccuracy =
new MulticlassMetrics(testPredictions.zip(testLabels)).precision
println(s"Test accuracy = $testAccuracy")
println("Training data results:")
evaluateClassificationModel(pipelineModel, training, labelColName)
println("Test data results:")
evaluateClassificationModel(pipelineModel, test, labelColName)
case "regression" =>
val trainingRMSE =
new RegressionMetrics(trainingPredictions.zip(trainingLabels)).rootMeanSquaredError
println(s"Training root mean squared error (RMSE) = $trainingRMSE")
val testRMSE =
new RegressionMetrics(testPredictions.zip(testLabels)).rootMeanSquaredError
println(s"Test root mean squared error (RMSE) = $testRMSE")
println("Training data results:")
evaluateRegressionModel(pipelineModel, training, labelColName)
println("Test data results:")
evaluateRegressionModel(pipelineModel, test, labelColName)
case _ =>
throw new IllegalArgumentException("Algo ${params.algo} not supported.")
}

sc.stop()
}

/**
* Evaluate the given ClassificationModel on data. Print the results.
* @param model Must fit ClassificationModel abstraction
* @param data DataFrame with "prediction" and labelColName columns
* @param labelColName Name of the labelCol parameter for the model
*
* TODO: Change model type to ClassificationModel once that API is public. SPARK-5995
*/
private[ml] def evaluateClassificationModel(
model: Transformer,
data: DataFrame,
labelColName: String): Unit = {
val fullPredictions = model.transform(data).cache()
val predictions = fullPredictions.select("prediction").map(_.getDouble(0))
val labels = fullPredictions.select(labelColName).map(_.getDouble(0))
// Print number of classes for reference
val numClasses = MetadataUtils.getNumClasses(fullPredictions.schema(labelColName)) match {
case Some(n) => n
case None => throw new RuntimeException(
"Unknown failure when indexing labels for classification.")
}
val accuracy = new MulticlassMetrics(predictions.zip(labels)).precision
println(s" Accuracy ($numClasses classes): $accuracy")
}

/**
* Evaluate the given RegressionModel on data. Print the results.
* @param model Must fit RegressionModel abstraction
* @param data DataFrame with "prediction" and labelColName columns
* @param labelColName Name of the labelCol parameter for the model
*
* TODO: Change model type to RegressionModel once that API is public. SPARK-5995
*/
private[ml] def evaluateRegressionModel(
model: Transformer,
data: DataFrame,
labelColName: String): Unit = {
val fullPredictions = model.transform(data).cache()
val predictions = fullPredictions.select("prediction").map(_.getDouble(0))
val labels = fullPredictions.select(labelColName).map(_.getDouble(0))
val RMSE = new RegressionMetrics(predictions.zip(labels)).rootMeanSquaredError
println(s" Root mean squared error (RMSE): $RMSE")
}
}

0 comments on commit 4aab379

Please sign in to comment.