以iris数据集（iris）为例进行分析。iris以鸢尾花的特征作为数据来源，数据集包含150个数据集，分为3类，每类50个数据，每个数据包含4个属性，是在数据挖掘、数据分类中非常常用的测试集、训练集。为了便于理解，我们这里主要用后两个属性（花瓣的长度和宽度）来进行分类。目前 spark.ml 中支持二分类和多分类，我们将分别从“用二项逻辑斯蒂回归来解决二分类问题”、“用多项逻辑斯蒂回归来解决二分类问题”、“用多项逻辑斯蒂回归来解决多分类问题”三个方面进行分析
## 二项逻辑斯蒂回归解决 二分类 问题

In [5]:
!hadoop fs -cat /user/yanbin/data/iris.txt|head -n5

5.1,3.5,1.4,0.2,Iris-setosa
4.9,3.0,1.4,0.2,Iris-setosa
4.7,3.2,1.3,0.2,Iris-setosa
4.6,3.1,1.5,0.2,Iris-setosa
5.0,3.6,1.4,0.2,Iris-setosa


In [215]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark import SparkConf,SparkContext
from pyspark.ml import Pipeline,PipelineModel
from pyspark.ml.linalg import Vector,Vectors
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression,LogisticRegressionModel
from pyspark.ml.classification import BinaryLogisticRegressionSummary
from pyspark.ml.feature import StringIndexer,IndexToString,HashingTF,Tokenizer,VectorIndexer
from pyspark.sql import Row
conf = SparkConf().setAppName('logist Regession')\
                  .setMaster('local')
spark = SparkSession.builder\
        .config(conf=conf)\
        .enableHiveSupport()\
        .getOrCreate()
sc = spark.sparkContext

path = 'data/iris.txt'

rawdata = sc.textFile(path).map(lambda line:line.split(","))\
          .map(lambda x:Row(Vectors.dense(float(x[0]),float(x[1]),float(x[2]),float(x[3])),x[4]))
rawDf = spark.createDataFrame(rawdata,["features","label"])
df=rawDf.filter("label!='Iris-setosa'")

labelInder = StringIndexer(inputCol='label',outputCol='indexedLabel').fit(df)
featureIndex = VectorIndexer(inputCol='features',outputCol='indexedFeatures').fit(df)
(traningDf,testDf) = df.randomSplit([0.7,0.3])
lr = LogisticRegression(featuresCol='indexedFeatures',labelCol='indexedLabel',\
                        maxIter=10,regParam=0.3,elasticNetParam=0.8)
labelConverter = IndexToString(inputCol='prediction',outputCol='predictedLabel',labels=labelInder.labels)
lrPipeline = Pipeline().setStages([labelInder,featureIndex,lr,labelConverter])
lrPipelineModel = lrPipeline.fit(traningDf)
lrPredictions = lrPipelineModel.transform(testDf)

In [219]:
%matplotlib inline
result = lrPredictions.select("predictedLabel", "label", "features", "probability").rdd\
.map(lambda x:("(%s, %s --> prob=%s, predicted Label=%s") %(x.predictedLabel,x.label,x.features,x.probability))\
.take(5)
for i in result:
    print i
evaluator = MulticlassClassificationEvaluator(labelCol='indexedLabel',predictionCol='prediction')
lrAccuracy = evaluator.evaluate(lrPredictions)
print ('Test Error = ',1-lrAccuracy)
lrModel = lrPipelineModel.stages[2]
print "Coefficients: " , lrModel.coefficientMatrix.toArray(),"\n"\
        "Intercept: ",lrModel.interceptVector,"\n"\
        "numClasses: ",lrModel.numClasses,"\n"\
        "numFeatures: ",lrModel.numFeatures
trainingSummary = lrModel.summary
objectiveHistory = trainingSummary.objectiveHistory
print objectiveHistory
# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.

import matplotlib.pyplot as plt
pD=trainingSummary.roc.toPandas()
# plt.plot(pD.ix[:,0],pD.ix[:,1])
pD_FPR=pD.set_index('FPR')
pD_FPR.plot()
plt.legend(loc='best')
FDf=trainingSummary.fMeasureByThreshold.sort('threshold')
maxFMeasure =FDf.select(max('F-Measure').alias('maxFMeasure')).head().maxFMeasure
print 'maxFMeasure: ',maxFMeasure
FDf=FDf.withColumnRenamed('F-Measure','F1')
# FDf.select(FDf.F1.cast(FloatType())).show()
# FDf.createTempView('F1Table2')
bestThreshold = FDf.where((-0.00001<(FDf.F1-maxFMeasure)) & ((FDf.F1-maxFMeasure)<0.0001)).head()['threshold']
print 'bestThreshold: ',bestThreshold
lr.setThreshold(bestThreshold)
# pDf=trainingSummary.precisionByThreshold
# rDf=trainingSummary.recallByThreshold
# rDf=rDf.withColumnRenamed('threshold','threshold1')
# f=rDf.join(pDf,pDf.threshold==rDf.threshold1,'left')
# f.select(f.threshold,(2/((1.0/f.recall)+(1.0/f.precision))).alias('F1')).sort('threshold').show()
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))





 (Iris-virginica, Iris-versicolor --> prob=[5.4,3.0,4.5,1.5], predicted Label=[0.472370451576,0.527629548424]
(Iris-versicolor, Iris-versicolor --> prob=[5.5,2.4,3.7,1.0], predicted Label=[0.56719423018,0.43280576982]
(Iris-versicolor, Iris-versicolor --> prob=[5.5,2.6,4.4,1.2], predicted Label=[0.529932895954,0.470067104046]
(Iris-virginica, Iris-versicolor --> prob=[5.6,3.0,4.5,1.5], predicted Label=[0.474713416283,0.525286583717]
(Iris-virginica, Iris-virginica --> prob=[5.7,2.5,5.0,2.0], predicted Label=[0.383935252367,0.616064747633]
('Test Error = ', 0.2054298642533936)
Coefficients:  [[-0.04699096  0.          0.          0.07526891]] 
Intercept:  [-0.0119624567944] 
numClasses:  2 
numFeatures:  4
[0.6927389617440812, 0.6899568172038798, 0.6884621802365235, 0.6871411658539227, 0.683999190620436, 0.6742600596548816, 0.6729880325825707, 0.6729720010493697, 0.6719400739860806, 0.6717587146812963, 0.6712425059750556]


AttributeError: 'NoneType' object has no attribute 'setCallSite'

## 用多项逻辑斯蒂回归解决 二分类 问题

In [220]:
mlr = LogisticRegression(featuresCol='indexedFeatures',labelCol='indexedLabel',\
                         maxIter=10,regParam=0.3,elasticNetParam=0.8,family='multinomial')
mlrPipeline = Pipeline().setStages([labelInder,featureIndex,mlr,labelConverter])
mlrPipelineModel = mlrPipeline.fit(traningDf)
mlrPredictions = mlrPipelineModel.transform(testDf)
result2 =mlrPredictions.select("predictedLabel", "label", "features", "probability").rdd\
.map(lambda x:("(%s, %s --> prob=%s, predicted Label=%s") %(x.predictedLabel,x.label,x.features,x.probability)).take(5)
for i in result2:
    print i
mEvaluator = MulticlassClassificationEvaluator(labelCol='indexedLabel',predictionCol='prediction')
mlrAccuracy = evaluator.evaluate(mlrPredictions)
mlrModel = mlrPipelineModel.stages[2]
print "Coefficients: " , mlrModel.coefficientMatrix.toArray(),"\n"\
        "Intercept: ",mlrModel.interceptVector,"\n"\
        "numClasses: ",mlrModel.numClasses,"\n"\
        "numFeatures: ",mlrModel.numFeatures
mlrAccuracy = mEvaluator.evaluate(mlrPredictions)
print ('Test Error = ',1-mlrAccuracy)


(Iris-virginica, Iris-versicolor --> prob=[5.4,3.0,4.5,1.5], predicted Label=[0.468153771927,0.531846228073]
(Iris-versicolor, Iris-versicolor --> prob=[5.5,2.4,3.7,1.0], predicted Label=[0.578495995776,0.421504004224]
(Iris-versicolor, Iris-versicolor --> prob=[5.5,2.6,4.4,1.2], predicted Label=[0.535266692919,0.464733307081]
(Iris-virginica, Iris-versicolor --> prob=[5.6,3.0,4.5,1.5], predicted Label=[0.471096653482,0.528903346518]
(Iris-virginica, Iris-virginica --> prob=[5.7,2.5,5.0,2.0], predicted Label=[0.36630830832,0.63369169168]
Coefficients:  [[ 0.02953791  0.          0.         -0.04382524]
 [-0.02953791  0.          0.          0.04382524]] 
Intercept:  [-0.00415734261051,0.00415734261051] 
numClasses:  2 
numFeatures:  4
('Test Error = ', 0.2054298642533936)


## 用多项逻辑斯蒂回归解决 多分类 问题

In [221]:
tmTrainingDf,tmTestDf = rawDf.randomSplit([0.7,0.3])
labelInder = StringIndexer(inputCol='label',outputCol='indexedLabel').fit(rawDf)
featureIndex = VectorIndexer(inputCol='features',outputCol='indexedFeatures').fit(rawDf)
labelConverter = IndexToString(inputCol='prediction',outputCol='predictedLabel',labels=labelInder.labels)
threeMlrPipeline = Pipeline().setStages([labelInder,featureIndex,mlr,labelConverter])
threeMlrPipelineModel = threeMlrPipeline.fit(tmTrainingDf)
threeMlrPredictions = threeMLrPipelineModel.transform(tmTestDf)
result3 = threeMlrPredictions.select("predictedLabel", "label", "features", "probability").rdd\
.map(lambda x:("(%s, %s --> prob=%s, predicted Label=%s") %(x.predictedLabel,x.label,x.features,x.probability)).take(5)
for i in result3:
    print i
tmEvaluator = MulticlassClassificationEvaluator(labelCol='indexedLabel',predictionCol='prediction')
tmlrAccuracy = tmEvaluator.evaluate(threeMlrPredictions)
mlrModel = threeMlrPipelineModel.stages[2]
print "Coefficients: " , mlrModel.coefficientMatrix.toArray(),"\n"\
        "Intercept: ",mlrModel.interceptVector,"\n"\
        "numClasses: ",mlrModel.numClasses,"\n"\
        "numFeatures: ",mlrModel.numFeatures
tmlrAccuracy = tmEvaluator.evaluate(threeMlrPredictions)
print ('Test Error = ',1-tmlrAccuracy)


(Iris-setosa, Iris-setosa --> prob=[4.3,3.0,1.1,0.1], predicted Label=[0.475810984079,0.253077478581,0.27111153734]
(Iris-setosa, Iris-setosa --> prob=[4.5,2.3,1.3,0.3], predicted Label=[0.39021221799,0.28655254839,0.32323523362]
(Iris-setosa, Iris-setosa --> prob=[4.6,3.6,1.0,0.2], predicted Label=[0.519162280174,0.232177765704,0.248659954123]
(Iris-setosa, Iris-setosa --> prob=[4.7,3.2,1.3,0.2], predicted Label=[0.47405000579,0.252519744855,0.273430249355]
(Iris-setosa, Iris-setosa --> prob=[4.9,3.1,1.5,0.1], predicted Label=[0.468421196729,0.257009439772,0.274569363499]
Coefficients:  [[ 0.          0.34253428 -0.19136101 -0.39962172]
 [ 0.          0.          0.          0.        ]
 [ 0.          0.          0.          0.13842965]] 
Intercept:  [0.140052600264,0.0116661148,-0.151718715064] 
numClasses:  3 
numFeatures:  4
('Test Error = ', 0.5128205128205128)
