In [1]:
'''
    @Author: King
    @Date: 2019.05.20
    @Purpose: Spark2.1.0入门：逻辑斯蒂回归分类器(Python版)
    @Introduction:  Spark2.1.0入门：逻辑斯蒂回归分类器(Python版)
    @Datasets: 
    @Link : http://dblab.xmu.edu.cn/blog/1773-2/
    @Reference : Spark2.1.0入门：逻辑斯蒂回归分类器(Python版)
'''

'\n    @Author: King\n    @Date: 2019.05.20\n    @Purpose: Spark2.1.0+入门：6.3 特征抽取、转化和选择\n    @Introduction:   Spark2.1.0+入门：6.3 特征抽取、转化和选择\n    @Datasets: \n    @Link : http://dblab.xmu.edu.cn/blog/1709-2/\n    @Reference : Spark2.1.0+入门：6.3 特征抽取、转化和选择\n'

![作者](../img/bigdata-roadmap.jpg)
【版权声明】博客内容由厦门大学数据库实验室拥有版权，未经允许，请勿转载！

## 一、逻辑斯蒂回归

### 1、方法简介

​ 逻辑斯蒂回归（logistic regression）是统计学习中的经典分类方法，属于对数线性模型。logistic回归的因变量可以是二分类的，也可以是多分类的。

#### 1)、logistic分布

![](../img/logistic1.png)

#### 2)、二项logistic回归模型
![](../img/logistic2.png)

#### 3)、参数估计
![](../img/logistic3.png)

### 2、示例代码

​ 我们以iris数据集（iris{http://dblab.xmu.edu.cn/blog/wp-content/uploads/2017/03/iris.txt}）
为例进行分析。iris以鸢尾花的特征作为数据来源，数据集包含150个数据集，分为3类，每类50个数据，每个数据包含4个属性，是在数据挖掘、数据分类中非常常用的测试集、训练集。为了便于理解，我们这里主要用后两个属性（花瓣的长度和宽度）来进行分类。目前 spark.ml 中支持二分类和多分类，我们将分别从“用二项逻辑斯蒂回归来解决二分类问题”、“用多项逻辑斯蒂回归来解决二分类问题”、“用多项逻辑斯蒂回归来解决多分类问题”三个方面进行分析。

#### 1)、用二项逻辑斯蒂回归解决 二分类 问题

首先我们先取其中的后两类数据，用二项逻辑斯蒂回归进行二分类分析。

##### step 1、导入需要的包

In [4]:
# 引入 pyspark 库
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Word Count").getOrCreate()

In [2]:
from pyspark.sql import Row,functions
from pyspark.ml.linalg import Vector,Vectors
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer,HashingTF, Tokenizer
from pyspark.ml.classification import LogisticRegression,LogisticRegressionModel,BinaryLogisticRegressionSummary, LogisticRegression

##### step 2、读取数据，简要分析

​ 我们定制一个函数，来返回一个指定的数据，然后读取文本文件，第一个map把每行的数据用“,”隔开，比如在我们的数据集中，每行被分成了5部分，前4部分是鸢尾花的4个特征，最后一部分是鸢尾花的分类；我们这里把特征存储在Vector中，创建一个Iris模式的RDD，然后转化成dataframe；最后调用show()方法来查看一下部分数据。


In [7]:
def f(x):
    rel = {}
    rel['features'] = Vectors.dense(float(x[0]),float(x[1]),float(x[2]),float(x[3]))
    rel['label'] = str(x[4])
    return rel
 
data = spark.sparkContext.textFile("../resources/iris.txt").map(lambda line: line.split(',')).map(lambda p: Row(**f(p))).toDF()

data.show()

+-----------------+-----------+
|         features|      label|
+-----------------+-----------+
|[5.1,3.5,1.4,0.2]|Iris-setosa|
|[4.9,3.0,1.4,0.2]|Iris-setosa|
|[4.7,3.2,1.3,0.2]|Iris-setosa|
|[4.6,3.1,1.5,0.2]|Iris-setosa|
|[5.0,3.6,1.4,0.2]|Iris-setosa|
|[5.4,3.9,1.7,0.4]|Iris-setosa|
|[4.6,3.4,1.4,0.3]|Iris-setosa|
|[5.0,3.4,1.5,0.2]|Iris-setosa|
|[4.4,2.9,1.4,0.2]|Iris-setosa|
|[4.9,3.1,1.5,0.1]|Iris-setosa|
|[5.4,3.7,1.5,0.2]|Iris-setosa|
|[4.8,3.4,1.6,0.2]|Iris-setosa|
|[4.8,3.0,1.4,0.1]|Iris-setosa|
|[4.3,3.0,1.1,0.1]|Iris-setosa|
|[5.8,4.0,1.2,0.2]|Iris-setosa|
|[5.7,4.4,1.5,0.4]|Iris-setosa|
|[5.4,3.9,1.3,0.4]|Iris-setosa|
|[5.1,3.5,1.4,0.3]|Iris-setosa|
|[5.7,3.8,1.7,0.3]|Iris-setosa|
|[5.1,3.8,1.5,0.3]|Iris-setosa|
+-----------------+-----------+
only showing top 20 rows



​ 因为我们现在处理的是2分类问题，所以我们不需要全部的3类数据，我们要从中选出两类的数据。这里首先把刚刚得到的数据注册成一个表iris，注册成这个表之后，我们就可以通过sql语句进行数据查询，比如我们这里选出了所有不属于“Iris-setosa”类别的数据；选出我们需要的数据后，我们可以把结果打印出来看一下，这时就已经没有“Iris-setosa”类别的数据。

In [9]:
data.createOrReplaceTempView("iris")
df = spark.sql("select * from iris where label != 'Iris-setosa'")
rel = df.rdd.map(lambda t : str(t[1])+":"+str(t[0])).collect()
for item in rel:
    print(item)

Iris-versicolor:[7.0,3.2,4.7,1.4]
Iris-versicolor:[6.4,3.2,4.5,1.5]
Iris-versicolor:[6.9,3.1,4.9,1.5]
Iris-versicolor:[5.5,2.3,4.0,1.3]
Iris-versicolor:[6.5,2.8,4.6,1.5]
Iris-versicolor:[5.7,2.8,4.5,1.3]
Iris-versicolor:[6.3,3.3,4.7,1.6]
Iris-versicolor:[4.9,2.4,3.3,1.0]
Iris-versicolor:[6.6,2.9,4.6,1.3]
Iris-versicolor:[5.2,2.7,3.9,1.4]
Iris-versicolor:[5.0,2.0,3.5,1.0]
Iris-versicolor:[5.9,3.0,4.2,1.5]
Iris-versicolor:[6.0,2.2,4.0,1.0]
Iris-versicolor:[6.1,2.9,4.7,1.4]
Iris-versicolor:[5.6,2.9,3.6,1.3]
Iris-versicolor:[6.7,3.1,4.4,1.4]
Iris-versicolor:[5.6,3.0,4.5,1.5]
Iris-versicolor:[5.8,2.7,4.1,1.0]
Iris-versicolor:[6.2,2.2,4.5,1.5]
Iris-versicolor:[5.6,2.5,3.9,1.1]
Iris-versicolor:[5.9,3.2,4.8,1.8]
Iris-versicolor:[6.1,2.8,4.0,1.3]
Iris-versicolor:[6.3,2.5,4.9,1.5]
Iris-versicolor:[6.1,2.8,4.7,1.2]
Iris-versicolor:[6.4,2.9,4.3,1.3]
Iris-versicolor:[6.6,3.0,4.4,1.4]
Iris-versicolor:[6.8,2.8,4.8,1.4]
Iris-versicolor:[6.7,3.0,5.0,1.7]
Iris-versicolor:[6.0,2.9,4.5,1.5]
Iris-versicolo

##### step 3、构建ML的pipeline

分别获取标签列和特征列，进行索引，并进行了重命名。

In [10]:
labelIndexer = StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(df)
featureIndexer = VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").fit(df)

接下来，我们把数据集随机分成训练集和测试集，其中训练集占70%。

In [11]:
trainingData, testData = df.randomSplit([0.7,0.3])

 然后，我们设置logistic的参数，这里我们统一用setter的方法来设置，也可以用ParamMap来设置（具体的可以查看spark mllib的官网）。这里我们设置了循环次数为10次，正则化项为0.3等，具体的可以设置的参数可以通过explainParams()来获取，还能看到我们已经设置的参数的结果。

In [12]:
lr = LogisticRegression().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
print("LogisticRegression parameters:\n" + lr.explainParams())

LogisticRegression parameters:
aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0, current: 0.8)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features, current: indexedFeatures)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: indexedLabel)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on inte

​ 这里我们设置一个labelConverter，目的是把预测的类别重新转化成字符型的。

In [22]:
labelConverter = IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)

print(labelConverter.getLabels())
print(labelConverter.getInputCol())
print(labelConverter.getOutputCol())

['Iris-versicolor', 'Iris-virginica']
prediction
predictedLabel


​ 构建pipeline，设置stage，然后调用fit()来训练模型。

In [15]:
lrPipeline =  Pipeline().setStages([labelIndexer, featureIndexer, lr, labelConverter])
lrPipelineModel = lrPipeline.fit(trainingData)

​ pipeline本质上是一个Estimator，当pipeline调用fit()的时候就产生了一个PipelineModel，本质上是一个Transformer。然后这个PipelineModel就可以调用transform()来进行预测，生成一个新的DataFrame，即利用训练得到的模型对测试集进行验证。

In [16]:
lrPredictions = lrPipelineModel.transform(testData)

​ 最后我们可以输出预测的结果，其中select选择要输出的列，collect获取所有行的数据，用foreach把每行打印出来。其中打印出来的值依次分别代表该行数据的真实分类和特征值、预测属于不同分类的概率、预测的分类。

In [23]:
preRel = lrPredictions.select("predictedLabel", "label", "features", "probability").collect()
for item in preRel:
    print(str(item['label'])+','+str(item['features'])+'-->prob='+str(item['probability'])+',predictedLabel'+str(item['predictedLabel']))

Iris-virginica,[4.9,2.5,4.5,1.7]-->prob=[0.5274710033427651,0.47252899665723486],predictedLabelIris-versicolor
Iris-versicolor,[5.0,2.0,3.5,1.0]-->prob=[0.6494390687143794,0.3505609312856206],predictedLabelIris-versicolor
Iris-versicolor,[5.2,2.7,3.9,1.4]-->prob=[0.5819241306479392,0.4180758693520607],predictedLabelIris-versicolor
Iris-versicolor,[5.7,2.8,4.5,1.3]-->prob=[0.6010403639407159,0.39895963605928414],predictedLabelIris-versicolor
Iris-versicolor,[5.7,2.9,4.2,1.3]-->prob=[0.6010403639407159,0.39895963605928414],predictedLabelIris-versicolor
Iris-versicolor,[5.8,2.7,3.9,1.2]-->prob=[0.6185415975365418,0.38145840246345813],predictedLabelIris-versicolor
Iris-virginica,[5.8,2.7,5.1,1.9]-->prob=[0.4945436926501766,0.5054563073498235],predictedLabelIris-virginica
Iris-virginica,[5.8,2.7,5.1,1.9]-->prob=[0.4945436926501766,0.5054563073498235],predictedLabelIris-virginica
Iris-versicolor,[5.9,3.0,4.2,1.5]-->prob=[0.5666617965788115,0.4333382034211884],predictedLabelIris-versicolor
Ir

##### step 4、模型评估

​ 创建一个MulticlassClassificationEvaluator实例，用setter方法把预测分类的列名和真实分类的列名进行设置；然后计算预测准确率和错误率。

In [24]:
evaluator = MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction")
lrAccuracy = evaluator.evaluate(lrPredictions)
print("Test Error = " + str(1.0 - lrAccuracy))

Test Error = 0.16189039718451492


​ 从上面可以看到预测的准确性达到65%，接下来我们可以通过model来获取我们训练得到的逻辑斯蒂模型。前面已经说过model是一个PipelineModel，因此我们可以通过调用它的stages来获取模型，具体如下：

In [25]:
lrModel = lrPipelineModel.stages[2]
print("Coefficients: " + str(lrModel.coefficients)+"Intercept: "+str(lrModel.intercept)+"numClasses: "+str(lrModel.numClasses)+"numFeatures: "+str(lrModel.numFeatures))

Coefficients: [-0.01390973547275124,0.0,0.0,0.07216981898787578]Intercept: -0.5470258093340817numClasses: 2numFeatures: 4


##### step 5、模型评估

​ spark的ml库还提供了一个对模型的摘要总结（summary），不过目前只支持二项逻辑斯蒂回归，而且要显示转化成BinaryLogisticRegressionSummary 。在下面的代码中，首先获得二项逻辑斯模型的摘要；然后获得10次循环中损失函数的变化，并将结果打印出来，可以看到损失函数随着循环是逐渐变小的，损失函数越小，模型就越好；接下来，我们把摘要强制转化为BinaryLogisticRegressionSummary ，来获取用来评估模型性能的矩阵；通过获取ROC，我们可以判断模型的好坏，areaUnderROC达到了 0.969551282051282，说明我们的分类器还是不错的；最后，我们通过最大化fMeasure来选取最合适的阈值，其中fMeasure是一个综合了召回率和准确率的指标，通过最大化fMeasure，我们可以选取到用来分类的最合适的阈值。

In [27]:
trainingSummary = lrModel.summary
objectiveHistory = trainingSummary.objectiveHistory
for item in objectiveHistory:
    print(item)
    
print(trainingSummary.areaUnderROC)

fMeasure = trainingSummary.fMeasureByThreshold

maxFMeasure = fMeasure.select(functions.max("F-Measure")).head()[0]

bestThreshold = fMeasure.where(fMeasure["F-Measure"]== maxFMeasure).select("threshold").head()[0]

lr.setThreshold(bestThreshold)

0.6916855479178757
0.6892045259636208
0.686634185737353
0.67860800539271
0.673735712819139
0.6693014645935897
0.6691009840364661
0.6689601483226956
0.6675239026310226
0.6678712776382452
0.6616961853860587
0.9787545787545787


LogisticRegression_3dfca8e44cbd

##### step 6、用多项逻辑斯蒂回归解决 二分类 问题

​ 对于二分类问题，我们还可以用多项逻辑斯蒂回归进行多分类分析。多项逻辑斯蒂回归与二项逻辑斯蒂回归类似，只是在模型设置上把family参数设置成multinomial，这里我们仅列出结果：

In [29]:
mlr =  LogisticRegression().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8).setFamily("multinomial")
 
mlrPipeline = Pipeline().setStages([labelIndexer, featureIndexer, mlr, labelConverter])
 
mlrPipelineModel = mlrPipeline.fit(trainingData)
 
mlrPreRel = lrPredictions.select("predictedLabel", "label", "features", "probability").collect()
for item in mlrPreRel:
    print('('+str(item['label'])+','+str(item['features'])+')-->prob='+str(item['probability'])+',predictLabel='+str(item['predictedLabel']))

(Iris-virginica,[4.9,2.5,4.5,1.7])-->prob=[0.5274710033427651,0.47252899665723486],predictLabel=Iris-versicolor
(Iris-versicolor,[5.0,2.0,3.5,1.0])-->prob=[0.6494390687143794,0.3505609312856206],predictLabel=Iris-versicolor
(Iris-versicolor,[5.2,2.7,3.9,1.4])-->prob=[0.5819241306479392,0.4180758693520607],predictLabel=Iris-versicolor
(Iris-versicolor,[5.7,2.8,4.5,1.3])-->prob=[0.6010403639407159,0.39895963605928414],predictLabel=Iris-versicolor
(Iris-versicolor,[5.7,2.9,4.2,1.3])-->prob=[0.6010403639407159,0.39895963605928414],predictLabel=Iris-versicolor
(Iris-versicolor,[5.8,2.7,3.9,1.2])-->prob=[0.6185415975365418,0.38145840246345813],predictLabel=Iris-versicolor
(Iris-virginica,[5.8,2.7,5.1,1.9])-->prob=[0.4945436926501766,0.5054563073498235],predictLabel=Iris-virginica
(Iris-virginica,[5.8,2.7,5.1,1.9])-->prob=[0.4945436926501766,0.5054563073498235],predictLabel=Iris-virginica
(Iris-versicolor,[5.9,3.0,4.2,1.5])-->prob=[0.5666617965788115,0.4333382034211884],predictLabel=Iris-vers

In [31]:
mlrAccuracy = evaluator.evaluate(lrPredictions)
 
print("Test Error = " + str(1.0 - mlrAccuracy))

mlrModel = mlrPipelineModel.stages[2]
 
print("Multinomial coefficients: " +str(mlrModel.coefficientMatrix)+"Multinomial intercepts: "+str(mlrModel.interceptVector)+"numClasses: "+str(mlrModel.numClasses)+"numFeatures: "+str(mlrModel.numFeatures))

Test Error = 0.16189039718451492
Multinomial coefficients: DenseMatrix([[ 0.0296415 ,  0.        ,  0.        , -0.04763024],
             [-0.0296415 ,  0.        ,  0.        ,  0.04763024]])Multinomial intercepts: [-0.008250471976582976,0.008250471976582976]numClasses: 2numFeatures: 4


##### step 7、用多项逻辑斯蒂回归解决 多分类 问题

​ 对于多分类问题，我们需要用多项逻辑斯蒂回归进行多分类分析。这里我们用全部的iris数据集，即有三个类别，过程与上述基本一致，这里我们同样仅列出结果：

In [32]:
mlrPreRel = lrPredictions.select("predictedLabel", "label", "features", "probability").collect()
for item in mlrPreRel:
    print('('+str(item['label'])+','+str(item['features'])+')-->prob='+str(item['probability'])+',predictLabel='+str(item['predictedLabel']))

(Iris-virginica,[4.9,2.5,4.5,1.7])-->prob=[0.5274710033427651,0.47252899665723486],predictLabel=Iris-versicolor
(Iris-versicolor,[5.0,2.0,3.5,1.0])-->prob=[0.6494390687143794,0.3505609312856206],predictLabel=Iris-versicolor
(Iris-versicolor,[5.2,2.7,3.9,1.4])-->prob=[0.5819241306479392,0.4180758693520607],predictLabel=Iris-versicolor
(Iris-versicolor,[5.7,2.8,4.5,1.3])-->prob=[0.6010403639407159,0.39895963605928414],predictLabel=Iris-versicolor
(Iris-versicolor,[5.7,2.9,4.2,1.3])-->prob=[0.6010403639407159,0.39895963605928414],predictLabel=Iris-versicolor
(Iris-versicolor,[5.8,2.7,3.9,1.2])-->prob=[0.6185415975365418,0.38145840246345813],predictLabel=Iris-versicolor
(Iris-virginica,[5.8,2.7,5.1,1.9])-->prob=[0.4945436926501766,0.5054563073498235],predictLabel=Iris-virginica
(Iris-virginica,[5.8,2.7,5.1,1.9])-->prob=[0.4945436926501766,0.5054563073498235],predictLabel=Iris-virginica
(Iris-versicolor,[5.9,3.0,4.2,1.5])-->prob=[0.5666617965788115,0.4333382034211884],predictLabel=Iris-vers

In [38]:
mlrAccuracy = evaluator.evaluate(lrPredictions)
 
print("Test Error = " + str(1.0 - mlrAccuracy))

 
mlrModel = lrPipelineModel.stages[2]
 
print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix)+"Multinomial intercepts: "+str(mlrModel.interceptVector)+"numClasses: "+str(mlrModel.numClasses)+"numFeatures: "+str(mlrModel.numFeatures))

Test Error = 0.16189039718451492
Multinomial coefficients: DenseMatrix([[-0.01390974,  0.        ,  0.        ,  0.07216982]])
Multinomial intercepts: [-0.5470258093340817]numClasses: 2numFeatures: 4
