Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-6939
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Apr 30, 2015
2 parents 18607a1 + 5553198 commit 4a8f886
Show file tree
Hide file tree
Showing 38 changed files with 1,318 additions and 361 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.util.collection.OpenHashSet

import scala.collection.mutable.HashMap

private[jobs] object UIData {
private[spark] object UIData {

class ExecutorSummary {
var taskTime : Long = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.spark.ml.classification.ClassificationModel;
import org.apache.spark.ml.param.IntParam;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.param.Params;
import org.apache.spark.ml.param.Params$;
import org.apache.spark.mllib.linalg.BLAS;
import org.apache.spark.mllib.linalg.Vector;
Expand Down Expand Up @@ -100,11 +99,12 @@ public static void main(String[] args) throws Exception {
/**
* Example of defining a type of {@link Classifier}.
*
* NOTE: This is private since it is an example. In practice, you may not want it to be private.
* Note: Some IDEs (e.g., IntelliJ) will complain that this will not compile due to
* {@link org.apache.spark.ml.param.Params#set} using incompatible return types.
* However, this should still compile and run successfully.
*/
class MyJavaLogisticRegression
extends Classifier<Vector, MyJavaLogisticRegression, MyJavaLogisticRegressionModel>
implements Params {
extends Classifier<Vector, MyJavaLogisticRegression, MyJavaLogisticRegressionModel> {

/**
* Param for max number of iterations
Expand Down Expand Up @@ -145,10 +145,12 @@ public MyJavaLogisticRegressionModel train(DataFrame dataset, ParamMap paramMap)
/**
* Example of defining a type of {@link ClassificationModel}.
*
* NOTE: This is private since it is an example. In practice, you may not want it to be private.
* Note: Some IDEs (e.g., IntelliJ) will complain that this will not compile due to
* {@link org.apache.spark.ml.param.Params#set} using incompatible return types.
* However, this should still compile and run successfully.
*/
class MyJavaLogisticRegressionModel
extends ClassificationModel<Vector, MyJavaLogisticRegressionModel> implements Params {
extends ClassificationModel<Vector, MyJavaLogisticRegressionModel> {

private MyJavaLogisticRegression parent_;
public MyJavaLogisticRegression parent() { return parent_; }
Expand Down
19 changes: 16 additions & 3 deletions mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.mutable.ListBuffer

import org.apache.spark.Logging
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi}
import org.apache.spark.ml.param.{Param, ParamMap}
import org.apache.spark.ml.param.{Params, Param, ParamMap}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -86,6 +86,14 @@ class Pipeline extends Estimator[PipelineModel] {
def setStages(value: Array[PipelineStage]): this.type = { set(stages, value); this }
def getStages: Array[PipelineStage] = getOrDefault(stages)

override def validate(paramMap: ParamMap): Unit = {
val map = extractParamMap(paramMap)
getStages.foreach {
case pStage: Params => pStage.validate(map)
case _ =>
}
}

/**
* Fits the pipeline to the input dataset with additional parameters. If a stage is an
* [[Estimator]], its [[Estimator#fit]] method will be called on the input dataset to fit a model.
Expand Down Expand Up @@ -140,7 +148,7 @@ class Pipeline extends Estimator[PipelineModel] {
override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = {
val map = extractParamMap(paramMap)
val theStages = map(stages)
require(theStages.toSet.size == theStages.size,
require(theStages.toSet.size == theStages.length,
"Cannot have duplicate components in a pipeline.")
theStages.foldLeft(schema)((cur, stage) => stage.transformSchema(cur, paramMap))
}
Expand All @@ -157,6 +165,11 @@ class PipelineModel private[ml] (
private[ml] val stages: Array[Transformer])
extends Model[PipelineModel] with Logging {

override def validate(paramMap: ParamMap): Unit = {
val map = fittingParamMap ++ extractParamMap(paramMap)
stages.foreach(_.validate(map))
}

/**
* Gets the model produced by the input estimator. Throws an NoSuchElementException is the input
* estimator does not exist in the pipeline.
Expand All @@ -168,7 +181,7 @@ class PipelineModel private[ml] (
}
if (matched.isEmpty) {
throw new NoSuchElementException(s"Cannot find stage $stage from the pipeline.")
} else if (matched.size > 1) {
} else if (matched.length > 1) {
throw new IllegalStateException(s"Cannot have duplicate estimators in the sample pipeline.")
} else {
matched.head.asInstanceOf[M]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,21 +103,16 @@ final class GBTClassifier
*/
val lossType: Param[String] = new Param[String](this, "lossType", "Loss function which GBT" +
" tries to minimize (case-insensitive). Supported options:" +
s" ${GBTClassifier.supportedLossTypes.mkString(", ")}")
s" ${GBTClassifier.supportedLossTypes.mkString(", ")}",
(value: String) => GBTClassifier.supportedLossTypes.contains(value.toLowerCase))

setDefault(lossType -> "logistic")

/** @group setParam */
def setLossType(value: String): this.type = {
val lossStr = value.toLowerCase
require(GBTClassifier.supportedLossTypes.contains(lossStr), "GBTClassifier was given bad loss" +
s" type: $value. Supported options: ${GBTClassifier.supportedLossTypes.mkString(", ")}")
set(lossType, lossStr)
this
}
def setLossType(value: String): this.type = set(lossType, value)

/** @group getParam */
def getLossType: String = getOrDefault(lossType)
def getLossType: String = getOrDefault(lossType).toLowerCase

/** (private[ml]) Convert new loss to old loss. */
override private[ml] def getOldLossType: OldLoss = {
Expand Down
12 changes: 7 additions & 5 deletions mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.ml.feature

import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.param.{IntParam, ParamMap}
import org.apache.spark.ml.param.{ParamValidators, IntParam, ParamMap}
import org.apache.spark.mllib.feature
import org.apache.spark.mllib.linalg.{VectorUDT, Vector}
import org.apache.spark.sql.types.DataType
Expand All @@ -32,19 +32,21 @@ import org.apache.spark.sql.types.DataType
class HashingTF extends UnaryTransformer[Iterable[_], Vector, HashingTF] {

/**
* number of features
* Number of features. Should be > 0.
* (default = 2^18^)
* @group param
*/
val numFeatures = new IntParam(this, "numFeatures", "number of features")
val numFeatures = new IntParam(this, "numFeatures", "number of features (> 0)",
ParamValidators.gt(0))

setDefault(numFeatures -> (1 << 18))

/** @group getParam */
def getNumFeatures: Int = getOrDefault(numFeatures)

/** @group setParam */
def setNumFeatures(value: Int): this.type = set(numFeatures, value)

setDefault(numFeatures -> (1 << 18))

override protected def createTransformFunc(paramMap: ParamMap): Iterable[_] => Vector = {
val hashingTF = new feature.HashingTF(paramMap(numFeatures))
hashingTF.transform
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.ml.feature

import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.param.{DoubleParam, ParamMap}
import org.apache.spark.ml.param.{ParamValidators, DoubleParam, ParamMap}
import org.apache.spark.mllib.feature
import org.apache.spark.mllib.linalg.{VectorUDT, Vector}
import org.apache.spark.sql.types.DataType
Expand All @@ -32,19 +32,20 @@ import org.apache.spark.sql.types.DataType
class Normalizer extends UnaryTransformer[Vector, Vector, Normalizer] {

/**
* Normalization in L^p^ space, p = 2 by default.
* Normalization in L^p^ space. Must be >= 1.
* (default: p = 2)
* @group param
*/
val p = new DoubleParam(this, "p", "the p norm value")
val p = new DoubleParam(this, "p", "the p norm value", ParamValidators.gtEq(1))

setDefault(p -> 2.0)

/** @group getParam */
def getP: Double = getOrDefault(p)

/** @group setParam */
def setP(value: Double): this.type = set(p, value)

setDefault(p -> 2.0)

override protected def createTransformFunc(paramMap: ParamMap): Vector => Vector = {
val normalizer = new feature.Normalizer(paramMap(p))
normalizer.transform
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.mutable

import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.param.{IntParam, ParamMap}
import org.apache.spark.ml.param.{ParamValidators, IntParam, ParamMap}
import org.apache.spark.mllib.linalg._
import org.apache.spark.sql.types.DataType

Expand All @@ -37,10 +37,13 @@ import org.apache.spark.sql.types.DataType
class PolynomialExpansion extends UnaryTransformer[Vector, Vector, PolynomialExpansion] {

/**
* The polynomial degree to expand, which should be larger than 1.
* The polynomial degree to expand, which should be >= 1. A value of 1 means no expansion.
* Default: 2
* @group param
*/
val degree = new IntParam(this, "degree", "the polynomial degree to expand")
val degree = new IntParam(this, "degree", "the polynomial degree to expand (>= 1)",
ParamValidators.gt(1))

setDefault(degree -> 2)

/** @group getParam */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,19 @@ import org.apache.spark.sql.types.{StructField, StructType}
* Params for [[StandardScaler]] and [[StandardScalerModel]].
*/
private[feature] trait StandardScalerParams extends Params with HasInputCol with HasOutputCol {

/**
* False by default. Centers the data with mean before scaling.
* Centers the data with mean before scaling.
* It will build a dense output, so this does not work on sparse input
* and will raise an exception.
* Default: false
* @group param
*/
val withMean: BooleanParam = new BooleanParam(this, "withMean", "Center data with mean")

/**
* True by default. Scales the data to unit standard deviation.
* Scales the data to unit standard deviation.
* Default: true
* @group param
*/
val withStd: BooleanParam = new BooleanParam(this, "withStd", "Scale to unit standard deviation")
Expand All @@ -56,7 +58,7 @@ private[feature] trait StandardScalerParams extends Params with HasInputCol with
class StandardScaler extends Estimator[StandardScalerModel] with StandardScalerParams {

setDefault(withMean -> false, withStd -> true)

/** @group setParam */
def setInputCol(value: String): this.type = set(inputCol, value)

Expand Down
20 changes: 11 additions & 9 deletions mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.ml.feature

import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.param.{ParamMap, IntParam, BooleanParam, Param}
import org.apache.spark.ml.param._
import org.apache.spark.sql.types.{DataType, StringType, ArrayType}

/**
Expand All @@ -43,20 +43,20 @@ class Tokenizer extends UnaryTransformer[String, Seq[String], Tokenizer] {
/**
* :: AlphaComponent ::
* A regex based tokenizer that extracts tokens either by repeatedly matching the regex(default)
* or using it to split the text (set matching to false). Optional parameters also allow to fold
* the text to lowercase prior to it being tokenized and to filer tokens using a minimal length.
* or using it to split the text (set matching to false). Optional parameters also allow filtering
* tokens using a minimal length.
* It returns an array of strings that can be empty.
* The default parameters are regex = "\\p{L}+|[^\\p{L}\\s]+", matching = true,
* lowercase = false, minTokenLength = 1
*/
@AlphaComponent
class RegexTokenizer extends UnaryTransformer[String, Seq[String], RegexTokenizer] {

/**
* param for minimum token length, default is one to avoid returning empty strings
* Minimum token length, >= 0.
* Default: 1, to avoid returning empty strings
* @group param
*/
val minTokenLength: IntParam = new IntParam(this, "minLength", "minimum token length")
val minTokenLength: IntParam = new IntParam(this, "minLength", "minimum token length (>= 0)",
ParamValidators.gtEq(0))

/** @group setParam */
def setMinTokenLength(value: Int): this.type = set(minTokenLength, value)
Expand All @@ -65,7 +65,8 @@ class RegexTokenizer extends UnaryTransformer[String, Seq[String], RegexTokenize
def getMinTokenLength: Int = getOrDefault(minTokenLength)

/**
* param sets regex as splitting on gaps (true) or matching tokens (false)
* Indicates whether regex splits on gaps (true) or matching tokens (false).
* Default: false
* @group param
*/
val gaps: BooleanParam = new BooleanParam(this, "gaps", "Set regex to match gaps or tokens")
Expand All @@ -77,7 +78,8 @@ class RegexTokenizer extends UnaryTransformer[String, Seq[String], RegexTokenize
def getGaps: Boolean = getOrDefault(gaps)

/**
* param sets regex pattern used by tokenizer
* Regex pattern used by tokenizer.
* Default: `"\\p{L}+|[^\\p{L}\\s]+"`
* @group param
*/
val pattern: Param[String] = new Param(this, "pattern", "regex pattern used for tokenizing")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.ml.util.SchemaUtils
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.attribute.{BinaryAttribute, NumericAttribute, NominalAttribute,
Attribute, AttributeGroup}
import org.apache.spark.ml.param.{IntParam, ParamMap, Params}
import org.apache.spark.ml.param.{ParamValidators, IntParam, ParamMap, Params}
import org.apache.spark.ml.param.shared._
import org.apache.spark.mllib.linalg.{SparseVector, DenseVector, Vector, VectorUDT}
import org.apache.spark.sql.{Row, DataFrame}
Expand All @@ -37,17 +37,19 @@ private[ml] trait VectorIndexerParams extends Params with HasInputCol with HasOu
/**
* Threshold for the number of values a categorical feature can take.
* If a feature is found to have > maxCategories values, then it is declared continuous.
* Must be >= 2.
*
* (default = 20)
*/
val maxCategories = new IntParam(this, "maxCategories",
"Threshold for the number of values a categorical feature can take." +
" If a feature is found to have > maxCategories values, then it is declared continuous.")
"Threshold for the number of values a categorical feature can take (>= 2)." +
" If a feature is found to have > maxCategories values, then it is declared continuous.",
ParamValidators.gtEq(2))

setDefault(maxCategories -> 20)

/** @group getParam */
def getMaxCategories: Int = getOrDefault(maxCategories)

setDefault(maxCategories -> 20)
}

/**
Expand Down Expand Up @@ -90,11 +92,7 @@ private[ml] trait VectorIndexerParams extends Params with HasInputCol with HasOu
class VectorIndexer extends Estimator[VectorIndexerModel] with VectorIndexerParams {

/** @group setParam */
def setMaxCategories(value: Int): this.type = {
require(value > 1,
s"DatasetIndexer given maxCategories = value, but requires maxCategories > 1.")
set(maxCategories, value)
}
def setMaxCategories(value: Int): this.type = set(maxCategories, value)

/** @group setParam */
def setInputCol(value: String): this.type = set(inputCol, value)
Expand Down
Loading

0 comments on commit 4a8f886

Please sign in to comment.