Skip to content

Commit

Permalink
improve classification
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Nov 17, 2014
1 parent 40eb8b6 commit c1e5573
Showing 1 changed file with 103 additions and 30 deletions.
133 changes: 103 additions & 30 deletions python/pyspark/mllib/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import numpy
from numpy import array

from pyspark import RDD
from pyspark.mllib.common import callMLlibFunc
from pyspark.mllib.linalg import SparseVector, _convert_to_vector
from pyspark.mllib.regression import LabeledPoint, LinearModel, _regression_train_wrapper
Expand All @@ -29,47 +30,96 @@
'SVMWithSGD', 'NaiveBayesModel', 'NaiveBayes']


class LogisticRegressionModel(LinearModel):
class ClassificationModel(LinearModel):
"""
:: Experimental ::
Represents a classification model that predicts to which of a set of categories an example
belongs. The categories are represented by double values: 0.0, 1.0, 2.0, etc.
"""
def __init__(self, weights, intercept):
super(ClassificationModel, self).__init__(weights, intercept)
self._threshold = 0.5

def setThreshold(self, value):
"""
:: Experimental ::
Sets the threshold that separates positive predictions from negative predictions. An example
with prediction score greater than or equal to this threshold is identified as an positive,
and negative otherwise. The default value is 0.5.
"""
self._threshold = value

def clearThreshold(self):
"""
:: Experimental ::
Clears the threshold so that `predict` will output raw prediction scores.
"""
self._threshold = None

def predict(self, test):
"""
Predict values for a single data point or an RDD of points using the model trained.
"""
raise NotImplementedError


class LogisticRegressionModel(ClassificationModel):

"""A linear binary classification model derived from logistic regression.
>>> data = [
... LabeledPoint(0.0, [0.0]),
... LabeledPoint(1.0, [1.0]),
... LabeledPoint(1.0, [2.0]),
... LabeledPoint(1.0, [3.0])
... LabeledPoint(0.0, [0.0, 1.0]),
... LabeledPoint(1.0, [1.0, 0.0]),
... ]
>>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data))
>>> lrm.predict(array([1.0])) > 0
True
>>> lrm.predict(array([0.0])) <= 0
True
>>> lrm.predict([1.0, 0.0])
1
>>> lrm.predict([0.0, 1.0])
0
>>> lrm.predict(sc.parallelize([[1.0, 0.0], [0.0, 1.0]])).collect()
[1, 0]
>>> lrm.clearThreshold()
>>> lrm.predict([0.0, 1.0])
0.123...
>>> sparse_data = [
... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
... LabeledPoint(0.0, SparseVector(2, {0: 1.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
... ]
>>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data))
>>> lrm.predict(array([0.0, 1.0])) > 0
True
>>> lrm.predict(array([0.0, 0.0])) <= 0
True
>>> lrm.predict(SparseVector(2, {1: 1.0})) > 0
True
>>> lrm.predict(SparseVector(2, {1: 0.0})) <= 0
True
>>> lrm.predict(array([0.0, 1.0]))
1
>>> lrm.predict(array([1.0, 0.0]))
0
>>> lrm.predict(SparseVector(2, {1: 1.0}))
1
>>> lrm.predict(SparseVector(2, {0: 1.0}))
0
"""

def predict(self, x):
"""
Predict values for a single data point or an RDD of points using the model trained.
"""
if isinstance(x, RDD):
return x.map(lambda v: self.predict(v))

x = _convert_to_vector(x)
margin = self.weights.dot(x) + self._intercept
if margin > 0:
prob = 1 / (1 + exp(-margin))
prob = 1 / (1.0 + exp(-margin))
else:
exp_margin = exp(margin)
prob = exp_margin / (1 + exp_margin)
return 1 if prob > 0.5 else 0
if self._threshold is None:
return prob
else:
return 1 if prob >= self._threshold else 0


class LogisticRegressionWithSGD(object):
Expand Down Expand Up @@ -111,7 +161,7 @@ def train(rdd, i):
return _regression_train_wrapper(train, LogisticRegressionModel, data, initialWeights)


class SVMModel(LinearModel):
class SVMModel(ClassificationModel):

"""A support vector machine.
Expand All @@ -122,25 +172,43 @@ class SVMModel(LinearModel):
... LabeledPoint(1.0, [3.0])
... ]
>>> svm = SVMWithSGD.train(sc.parallelize(data))
>>> svm.predict(array([1.0])) > 0
True
>>> svm.predict([1.0])
1
>>> svm.predict(sc.parallelize([[1.0]])).collect()
[1]
>>> svm.clearThreshold()
>>> svm.predict(array([1.0]))
1.25...
>>> sparse_data = [
... LabeledPoint(0.0, SparseVector(2, {0: -1.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
... ]
>>> svm = SVMWithSGD.train(sc.parallelize(sparse_data))
>>> svm.predict(SparseVector(2, {1: 1.0})) > 0
True
>>> svm.predict(SparseVector(2, {0: -1.0})) <= 0
True
>>> svm.predict(SparseVector(2, {1: 1.0}))
1
>>> svm.predict(SparseVector(2, {0: -1.0}))
0
"""
def __init__(self, weights, intercept):
super(SVMModel, self).__init__(weights, intercept)
self._threshold = 0.0

def predict(self, x):
"""
Predict values for a single data point or an RDD of points using the model trained.
"""
if isinstance(x, RDD):
return x.map(lambda v: self.predict(v))

x = _convert_to_vector(x)
margin = self.weights.dot(x) + self.intercept
return 1 if margin >= 0 else 0
if self._threshold is None:
return margin
else:
return 1 if margin >= self._threshold else 0


class SVMWithSGD(object):
Expand Down Expand Up @@ -201,6 +269,8 @@ class NaiveBayesModel(object):
0.0
>>> model.predict(array([1.0, 0.0]))
1.0
>>> model.predict(sc.parallelize([[1.0, 0.0]])).collect()
[1.0]
>>> sparse_data = [
... LabeledPoint(0.0, SparseVector(2, {1: 0.0})),
... LabeledPoint(0.0, SparseVector(2, {1: 1.0})),
Expand All @@ -219,7 +289,9 @@ def __init__(self, labels, pi, theta):
self.theta = theta

def predict(self, x):
"""Return the most likely class for a data vector x"""
"""Return the most likely class for a data vector or an RDD of vectors"""
if isinstance(x, RDD):
return x.map(lambda v: self.predict(v))
x = _convert_to_vector(x)
return self.labels[numpy.argmax(self.pi + x.dot(self.theta.transpose()))]

Expand Down Expand Up @@ -250,7 +322,8 @@ def train(cls, data, lambda_=1.0):
def _test():
import doctest
from pyspark import SparkContext
globs = globals().copy()
import pyspark.mllib.classification
globs = pyspark.mllib.classification.__dict__.copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
Expand Down

0 comments on commit c1e5573

Please sign in to comment.