Skip to content

Commit

Permalink
[SPARK-8265] Add LinearDataGenerator to pyspark.mllib.utils
Browse files Browse the repository at this point in the history
  • Loading branch information
MechCoder committed Jun 19, 2015
1 parent 54976e5 commit 0f1053c
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import org.apache.spark.mllib.tree.loss.Losses
import org.apache.spark.mllib.tree.model.{DecisionTreeModel, GradientBoostedTreesModel, RandomForestModel}
import org.apache.spark.mllib.tree.{DecisionTree, GradientBoostedTrees, RandomForest}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.util.LinearDataGenerator
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -979,6 +980,27 @@ private[python] class PythonMLLibAPI extends Serializable {
List[AnyRef](model.clusterCenters, Vectors.dense(model.clusterWeights)).asJava
}

/**
* Wrapper around the generateLinearInput method of LinearDataGenerator.
*/
def generateLinearInputWrapper(
intercept: Double,
weights: JArrayList[Double], xMean: JArrayList[Double],
xVariance: JArrayList[Double], nPoints: Int, seed: Int, eps: Double): Array[LabeledPoint] = {
return LinearDataGenerator.generateLinearInput(
intercept, weights.asScala.toArray, xMean.asScala.toArray,
xVariance.asScala.toArray, nPoints, seed, eps).toArray
}

/**
* Wrapper around the generateLinearRDD method of LinearDataGenerator.
*/
def generateLinearRDDWrapper(
sc: JavaSparkContext, nexamples: Int, nfeatures: Int,
eps: Double, nparts: Int, intercept: Double): JavaRDD[LabeledPoint] = {
return LinearDataGenerator.generateLinearRDD(
sc, nexamples, nfeatures, eps, nparts, intercept)
}
}

/**
Expand Down
20 changes: 18 additions & 2 deletions python/pyspark/mllib/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@
from pyspark.mllib.stat import Statistics
from pyspark.mllib.feature import Word2Vec
from pyspark.mllib.feature import IDF
from pyspark.mllib.feature import StandardScaler
from pyspark.mllib.feature import ElementwiseProduct
from pyspark.mllib.feature import StandardScaler, ElementwiseProduct
from pyspark.mllib.util import LinearDataGenerator
from pyspark.serializers import PickleSerializer
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
Expand Down Expand Up @@ -1011,6 +1011,22 @@ def collect(rdd):
self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]])


class LinearDataGeneratorTests(MLlibTestCase):
def test_dim(self):
points = LinearDataGenerator.generateLinearInput(
0.0, [0.0, 0.0, 0.0], [0.0, 0.0, 0.0],
[0.33, 0.33, 0.33], 4, 0, 0.1)
self.assertEqual(len(points), 4)
for point in points:
self.assertEqual(len(point.features), 3)

rdd = LinearDataGenerator.generateLinearRDD(
sc, 6, 2, 0.1, 2, 0.0).collect()
self.assertEqual(len(rdd), 6)
for point in rdd:
self.assertEqual(len(point.features), 2)


if __name__ == "__main__":
if not _have_scipy:
print("NOTE: Skipping SciPy tests as it does not seem to be installed")
Expand Down
38 changes: 38 additions & 0 deletions python/pyspark/mllib/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,44 @@ def load(cls, sc, path):
return cls(java_model)


class LinearDataGenerator(object):
"""Utils for generating linear data"""

@staticmethod
def generateLinearInput(intercept, weights, xMean, xVariance,
nPoints, seed, eps):
"""
:param: intercept bias factor, the term c in X'w + c
:param: weights feature vector the term w in X'w + c
:param: xMean Point around which the data X is centered.
:param: xVariance Variance of the given data
:param: nPoints Number of points to be generated
:param: seed Random Seed
:param: eps Used to scale the noise. If eps is set high,
the amount of gaussian noise added is more.
Returns a list of LabeledPoints of length nPoints
"""
seed, nPoints = int(seed), int(nPoints)
intercept, eps = float(intercept), float(eps)
weights = [float(weight) for weight in weights]
xMean = [float(mean) for mean in xMean]
xVariance = [float(var) for var in xVariance]
return list(callMLlibFunc(
"generateLinearInputWrapper", intercept, weights, xMean, xVariance,
nPoints, seed, eps))

@staticmethod
def generateLinearRDD(sc, nexamples, nfeatures, eps,
nParts=2, intercept=0.0):
"""
Generate a RDD of LabeledPoints.
"""
nexamples, nfeatures, nParts = int(nexamples), int(nfeatures), int(nParts)
intercept, eps = float(intercept), float(eps)
return callMLlibFunc("generateLinearRDDWrapper", sc, nexamples, nfeatures,
eps, nParts, intercept)


def _test():
import doctest
from pyspark.context import SparkContext
Expand Down

0 comments on commit 0f1053c

Please sign in to comment.