Skip to content

Commit

Permalink
Inherit from StreamingLinearAlgorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
MechCoder committed Jun 25, 2015
1 parent 1b4ddd6 commit d47cc24
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 53 deletions.
2 changes: 1 addition & 1 deletion docs/mllib-linear-methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ model.setInitialWeights([0.0, 0.0, 0.0])

Now we register the streams for training and testing and start the job.

{% highlight scala %}
{% highlight python %}
model.trainOn(trainingData)
print(model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))))

Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/mllib/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,10 +630,10 @@ def predictOnValues(self, dstream):
@inherit_doc
class StreamingLogisticRegressionWithSGD(StreamingLinearAlgorithm):
"""
Run LogisticRegression with SGD on a stream of data.
Run LogisticRegression with SGD on a batch of data.
The weights obtained at the end of training a stream are used as initial
weights for the next stream.
weights for the next batch.
:param stepSize: Step size for each iteration of gradient descent.
:param numIterations: Number of iterations run for each batch of data.
Expand Down
56 changes: 9 additions & 47 deletions python/pyspark/mllib/regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from pyspark import RDD
from pyspark.streaming.dstream import DStream
from pyspark.mllib.classification import StreamingLinearAlgorithm
from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py, inherit_doc
from pyspark.mllib.linalg import SparseVector, Vectors, _convert_to_vector
from pyspark.mllib.util import Saveable, Loader
Expand Down Expand Up @@ -572,17 +573,16 @@ def train(cls, data, isotonic=True):


@inherit_doc
class StreamingLinearRegressionWithSGD(LinearRegressionModel):
class StreamingLinearRegressionWithSGD(StreamingLinearAlgorithm):
"""
Run LinearRegression with SGD on a stream of data.
Run LinearRegression with SGD on a batch of data.
The problem minimized is (1 / n_samples) * (y - weights'X)**2.
After training on a stream of data, the weights obtained at the end of
training are used as initial weights for the next stream of data.
After training on a batch of data, the weights obtained at the end of
training are used as initial weights for the next batch.
:param: stepSize Step size for each iteration of gradient
descent.
:param: numIterations Total number of iterations run.
:param: stepSize Step size for each iteration of gradient descent.
:param: numIterations Total number of iterations run.
:param: miniBatchFraction Fraction of data on which SGD is run for each
iteration.
"""
Expand All @@ -591,29 +591,8 @@ def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0):
self.numIterations = numIterations
self.miniBatchFraction = miniBatchFraction
self._model = None

def _validate_dstream(self, dstream):
if not isinstance(dstream, DStream):
raise TypeError(
"dstream should be a DStream object, got %s" % type(dstream))
if not self._model:
raise ValueError(
"Model must be intialized using setInitialWeights")

@property
def latestModel(self):
"""Returns a LinearRegressionModel fit on the latest stream of data.
The weights and intercepts can be got from the `weights` and
`intercept` attributes.
"""
return self._model

def __repr__(self):
if self._model is None:
return '(weights=None, intercept=None)'
else:
return str(self._model)
super(StreamingLinearRegressionWithSGD, self).__init__(
model=self._model)

def setInitialWeights(self, initialWeights):
"""
Expand All @@ -639,23 +618,6 @@ def update(rdd):

dstream.foreachRDD(update)

def predictOn(self, dstream):
"""
Make predictions on a dstream of Vectors.
:return: Transformed dstream object.
"""
self._validate_dstream(dstream)
return dstream.map(lambda x: self._model.predict(x))

def predictOnValues(self, dstream):
"""Make predictions on a keyed dstream where the values are Vectors.
:return: Transformed dstream object.
"""
self._validate_dstream(dstream)
return dstream.mapValues(lambda x: self._model.predict(x))


def _test():
import doctest
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/mllib/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1201,8 +1201,8 @@ def test_parameter_accuracy(self):
self.ssc.start()
self._ssc_wait(t, 10, 0.01)
self.assertArrayAlmostEqual(
slr.latestModel.weights.array, [10., 10.], 1)
self.assertAlmostEqual(slr.latestModel.intercept, 0.0, 1)
slr.latestModel().weights.array, [10., 10.], 1)
self.assertAlmostEqual(slr.latestModel().intercept, 0.0, 1)

def test_parameter_convergence(self):
"""Test that the model parameters improve with streaming data."""
Expand All @@ -1219,7 +1219,7 @@ def test_parameter_convergence(self):
model_weights = []
input_stream = self.ssc.queueStream(batches)
input_stream.foreachRDD(
lambda x: model_weights.append(slr.latestModel.weights[0]))
lambda x: model_weights.append(slr.latestModel().weights[0]))
t = time()
slr.trainOn(input_stream)
self.ssc.start()
Expand Down

0 comments on commit d47cc24

Please sign in to comment.