diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 2897865af6912..bf1c6a6fc77cc 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -51,6 +51,7 @@ import org.apache.spark.mllib.tree.loss.Losses import org.apache.spark.mllib.tree.model.{DecisionTreeModel, GradientBoostedTreesModel, RandomForestModel} import org.apache.spark.mllib.tree.{DecisionTree, GradientBoostedTrees, RandomForest} import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.mllib.util.LinearDataGenerator import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame import org.apache.spark.storage.StorageLevel @@ -979,6 +980,27 @@ private[python] class PythonMLLibAPI extends Serializable { List[AnyRef](model.clusterCenters, Vectors.dense(model.clusterWeights)).asJava } + /** + * Wrapper around the generateLinearInput method of LinearDataGenerator. + */ + def generateLinearInputWrapper( + intercept: Double, + weights: JArrayList[Double], xMean: JArrayList[Double], + xVariance: JArrayList[Double], nPoints: Int, seed: Int, eps: Double): Array[LabeledPoint] = { + return LinearDataGenerator.generateLinearInput( + intercept, weights.asScala.toArray, xMean.asScala.toArray, + xVariance.asScala.toArray, nPoints, seed, eps).toArray + } + + /** + * Wrapper around the generateLinearRDD method of LinearDataGenerator. + */ + def generateLinearRDDWrapper( + sc: JavaSparkContext, nexamples: Int, nfeatures: Int, + eps: Double, nparts: Int, intercept: Double): JavaRDD[LabeledPoint] = { + return LinearDataGenerator.generateLinearRDD( + sc, nexamples, nfeatures, eps, nparts, intercept) + } } /** diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 744dc112d9209..0ed436d808c56 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -48,8 +48,8 @@ from pyspark.mllib.stat import Statistics from pyspark.mllib.feature import Word2Vec from pyspark.mllib.feature import IDF -from pyspark.mllib.feature import StandardScaler -from pyspark.mllib.feature import ElementwiseProduct +from pyspark.mllib.feature import StandardScaler, ElementwiseProduct +from pyspark.mllib.util import LinearDataGenerator from pyspark.serializers import PickleSerializer from pyspark.streaming import StreamingContext from pyspark.sql import SQLContext @@ -1011,6 +1011,22 @@ def collect(rdd): self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]]) +class LinearDataGeneratorTests(MLlibTestCase): + def test_dim(self): + points = LinearDataGenerator.generateLinearInput( + 0.0, [0.0, 0.0, 0.0], [0.0, 0.0, 0.0], + [0.33, 0.33, 0.33], 4, 0, 0.1) + self.assertEqual(len(points), 4) + for point in points: + self.assertEqual(len(point.features), 3) + + rdd = LinearDataGenerator.generateLinearRDD( + sc, 6, 2, 0.1, 2, 0.0).collect() + self.assertEqual(len(rdd), 6) + for point in rdd: + self.assertEqual(len(point.features), 2) + + if __name__ == "__main__": if not _have_scipy: print("NOTE: Skipping SciPy tests as it does not seem to be installed") diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py index 16a90db146ef0..0dfad2e078b84 100644 --- a/python/pyspark/mllib/util.py +++ b/python/pyspark/mllib/util.py @@ -257,6 +257,44 @@ def load(cls, sc, path): return cls(java_model) +class LinearDataGenerator(object): + """Utils for generating linear data""" + + @staticmethod + def generateLinearInput(intercept, weights, xMean, xVariance, + nPoints, seed, eps): + """ + :param: intercept bias factor, the term c in X'w + c + :param: weights feature vector the term w in X'w + c + :param: xMean Point around which the data X is centered. + :param: xVariance Variance of the given data + :param: nPoints Number of points to be generated + :param: seed Random Seed + :param: eps Used to scale the noise. If eps is set high, + the amount of gaussian noise added is more. + Returns a list of LabeledPoints of length nPoints + """ + seed, nPoints = int(seed), int(nPoints) + intercept, eps = float(intercept), float(eps) + weights = [float(weight) for weight in weights] + xMean = [float(mean) for mean in xMean] + xVariance = [float(var) for var in xVariance] + return list(callMLlibFunc( + "generateLinearInputWrapper", intercept, weights, xMean, xVariance, + nPoints, seed, eps)) + + @staticmethod + def generateLinearRDD(sc, nexamples, nfeatures, eps, + nParts=2, intercept=0.0): + """ + Generate a RDD of LabeledPoints. + """ + nexamples, nfeatures, nParts = int(nexamples), int(nfeatures), int(nParts) + intercept, eps = float(intercept), float(eps) + return callMLlibFunc("generateLinearRDDWrapper", sc, nexamples, nfeatures, + eps, nParts, intercept) + + def _test(): import doctest from pyspark.context import SparkContext