In [1]:
from pyspark.sql import SparkSession

In [2]:
from pyspark.sql import Row, functions
from pyspark.ml.linalg import Vector, Vectors
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, HashingTF, Tokenizer
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel,BinaryLogisticRegressionSummary, \
LogisticRegression

In [3]:
def f(x):
    rel = {}
    rel['features'] = Vectors.dense(float(x[0]), float(x[1]), float(x[2]), float(x[3]))
    rel['label'] = str(x[4])
    return rel

spark = SparkSession.builder.master("local").appName("Logistic").getOrCreate()
data = spark.sparkContext.textFile("iris.txt").map(lambda line:line.split(',')).map(lambda p:Row(**f(p))).toDF()
data.show()

+-----------------+-----------+
|         features|      label|
+-----------------+-----------+
|[5.1,3.5,1.4,0.2]|Iris-setosa|
|[4.9,3.0,1.4,0.2]|Iris-setosa|
|[4.7,3.2,1.3,0.2]|Iris-setosa|
|[4.6,3.1,1.5,0.2]|Iris-setosa|
|[5.0,3.6,1.4,0.2]|Iris-setosa|
|[5.4,3.9,1.7,0.4]|Iris-setosa|
|[4.6,3.4,1.4,0.3]|Iris-setosa|
|[5.0,3.4,1.5,0.2]|Iris-setosa|
|[4.4,2.9,1.4,0.2]|Iris-setosa|
|[4.9,3.1,1.5,0.1]|Iris-setosa|
|[5.4,3.7,1.5,0.2]|Iris-setosa|
|[4.8,3.4,1.6,0.2]|Iris-setosa|
|[4.8,3.0,1.4,0.1]|Iris-setosa|
|[4.3,3.0,1.1,0.1]|Iris-setosa|
|[5.8,4.0,1.2,0.2]|Iris-setosa|
|[5.7,4.4,1.5,0.4]|Iris-setosa|
|[5.4,3.9,1.3,0.4]|Iris-setosa|
|[5.1,3.5,1.4,0.3]|Iris-setosa|
|[5.7,3.8,1.7,0.3]|Iris-setosa|
|[5.1,3.8,1.5,0.3]|Iris-setosa|
+-----------------+-----------+
only showing top 20 rows



In [15]:
# 注册成sql语句
data.createOrReplaceTempView("iris")
df = spark.sql("select * from iris where label != 'Iris-setosa'")
rel = df.rdd.map(lambda t: str(t[1]) + ":" + str(t[0])).collect()
for item in rel:
    print (item)
# 显示
#df.show().head()
#rel = df.map(lambda t:str(t[1]) + ":" + str(t[0])).collect()
#for item in rel:
#    print (item)

Iris-versicolor:[7.0,3.2,4.7,1.4]
Iris-versicolor:[6.4,3.2,4.5,1.5]
Iris-versicolor:[6.9,3.1,4.9,1.5]
Iris-versicolor:[5.5,2.3,4.0,1.3]
Iris-versicolor:[6.5,2.8,4.6,1.5]
Iris-versicolor:[5.7,2.8,4.5,1.3]
Iris-versicolor:[6.3,3.3,4.7,1.6]
Iris-versicolor:[4.9,2.4,3.3,1.0]
Iris-versicolor:[6.6,2.9,4.6,1.3]
Iris-versicolor:[5.2,2.7,3.9,1.4]
Iris-versicolor:[5.0,2.0,3.5,1.0]
Iris-versicolor:[5.9,3.0,4.2,1.5]
Iris-versicolor:[6.0,2.2,4.0,1.0]
Iris-versicolor:[6.1,2.9,4.7,1.4]
Iris-versicolor:[5.6,2.9,3.6,1.3]
Iris-versicolor:[6.7,3.1,4.4,1.4]
Iris-versicolor:[5.6,3.0,4.5,1.5]
Iris-versicolor:[5.8,2.7,4.1,1.0]
Iris-versicolor:[6.2,2.2,4.5,1.5]
Iris-versicolor:[5.6,2.5,3.9,1.1]
Iris-versicolor:[5.9,3.2,4.8,1.8]
Iris-versicolor:[6.1,2.8,4.0,1.3]
Iris-versicolor:[6.3,2.5,4.9,1.5]
Iris-versicolor:[6.1,2.8,4.7,1.2]
Iris-versicolor:[6.4,2.9,4.3,1.3]
Iris-versicolor:[6.6,3.0,4.4,1.4]
Iris-versicolor:[6.8,2.8,4.8,1.4]
Iris-versicolor:[6.7,3.0,5.0,1.7]
Iris-versicolor:[6.0,2.9,4.5,1.5]
Iris-versicolo

In [16]:
# 构建ML的pipeline
labelIndexer = StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(df)
featureIndexer = VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").fit(df)

In [17]:
# 随机分成7:3
trainingData, testData = df.randomSplit([0.7, 0.3])

In [18]:
lr = LogisticRegression().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
print ("LogisticRegression parameters:\n" + lr.explainParams())

LogisticRegression parameters:
aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0, current: 0.8)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features, current: indexedFeatures)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: indexedLabel)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on inte

In [19]:
labelConverter = IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)

In [20]:
lrPipeline = Pipeline().setStages([labelIndexer, featureIndexer, lr, labelConverter])
lrPipelineModel = lrPipeline.fit(trainingData)

In [21]:
lrPredictions = lrPipelineModel.transform(testData)

In [22]:
preRel = lrPredictions.select("predictedLabel", "label", "features", "probability").collect()
for item in preRel:
    print(str(item['label'])+','+str(item['features'])+'-->prob='+str(item['probability'])+',predictedLabel'+str(item['predictedLabel']))

Iris-versicolor,[5.0,2.0,3.5,1.0]-->prob=[0.5350723044156922,0.46492769558430785],predictedLabelIris-versicolor
Iris-versicolor,[5.2,2.7,3.9,1.4]-->prob=[0.464314176578193,0.535685823421807],predictedLabelIris-virginica
Iris-versicolor,[5.5,2.4,3.8,1.1]-->prob=[0.5218548036093305,0.4781451963906695],predictedLabelIris-versicolor
Iris-versicolor,[5.5,2.6,4.4,1.2]-->prob=[0.5036542015649411,0.4963457984350589],predictedLabelIris-versicolor
Iris-versicolor,[5.6,2.7,4.2,1.3]-->prob=[0.48643376851475223,0.5135662314852477],predictedLabelIris-virginica
Iris-virginica,[5.6,2.8,4.9,2.0]-->prob=[0.3625569354879262,0.6374430645120738],predictedLabelIris-virginica
Iris-versicolor,[5.6,3.0,4.5,1.5]-->prob=[0.45016748376609933,0.5498325162339006],predictedLabelIris-virginica
Iris-virginica,[5.8,2.7,5.1,1.9]-->prob=[0.38142589496304513,0.6185741050369549],predictedLabelIris-virginica
Iris-versicolor,[6.0,2.2,4.0,1.0]-->prob=[0.5449150196200674,0.45508498037993267],predictedLabelIris-versicolor
Iris-

####  4.模型评估

In [23]:
evaluator = MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction")
lrAccuracy = evaluator.evaluate(lrPredictions)
print("Test Error = " + str(1.0 - lrAccuracy))

Test Error = 0.4566542948038176


In [24]:
lrModel = lrPipelineModel.stages[2]
print ("Coefficients: " + str(lrModel.coefficients) + "Intercept: " + str(lrModel.intercept) + "numClasses: " + str(lrModel.numClasses) + "numFeatures: " + str(lrModel.numFeatures))

Coefficients: [-0.03962569755041504,0.0,0.0,0.07285788417034716]Intercept: 0.057608501677179784numClasses: 2numFeatures: 4


In [25]:
trainingSummary = lrModel.summary
objectiveHistory = trainingSummary.objectiveHistory
for item in objectiveHistory:
    print (item)

0.6914160776171184
0.6889940543074073
0.686210648150745
0.6761120900424713
0.6736214030819339
0.673397203036357
0.6722844426271781
0.6719542976141935
0.6716802162637169
0.6714720930987288
0.6702863815764213


In [26]:
print (trainingSummary.areaUnderROCUnderROC)

0.9796006944444444


In [29]:
fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.select(functions.max("F-Measure")).head()[0]
bestThreshold = fMeasure.where(fMeasure["F-Measure"] == maxFMeasure).select("threshold").head()[0]
lr.setThreshold(bestThreshold)

LogisticRegression_6e63f982ee86

In [31]:
mlr =  LogisticRegression().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8).setFamily("multinomial")
mlrPipeline = Pipeline().setStages([labelIndexer, featureIndexer, mlr, labelConverter])
mlrPipelineModel = mlrPipeline.fit(trainingData) 
mlrPredictions = mlrPipelineModel.transform(testData)
mlrPreRel = mlrPredictions.select("predictedLabel", "label", "features", "probability").collect()
for item in mlrPreRel:
    print('('+str(item['label'])+','+str(item['features'])+')-->prob='+str(item['probability'])+',predictLabel='+str(item['predictedLabel']))

(Iris-versicolor,[5.0,2.0,3.5,1.0])-->prob=[0.5527875036384094,0.44721249636159055],predictLabel=Iris-versicolor
(Iris-versicolor,[5.2,2.7,3.9,1.4])-->prob=[0.467473958789406,0.532526041210594],predictLabel=Iris-virginica
(Iris-versicolor,[5.5,2.4,3.8,1.1])-->prob=[0.5371764674125327,0.46282353258746733],predictLabel=Iris-versicolor
(Iris-versicolor,[5.5,2.6,4.4,1.2])-->prob=[0.5152235273744041,0.48477647262559587],predictLabel=Iris-versicolor
(Iris-versicolor,[5.6,2.7,4.2,1.3])-->prob=[0.49446693501344907,0.5055330649865509],predictLabel=Iris-virginica
(Iris-virginica,[5.6,2.8,4.9,2.0])-->prob=[0.3455611996383086,0.6544388003616914],predictLabel=Iris-virginica
(Iris-versicolor,[5.6,3.0,4.5,1.5])-->prob=[0.4505945103519814,0.5494054896480186],predictLabel=Iris-virginica
(Iris-virginica,[5.8,2.7,5.1,1.9])-->prob=[0.3680720032644509,0.6319279967355491],predictLabel=Iris-virginica
(Iris-versicolor,[6.0,2.2,4.0,1.0])-->prob=[0.5651666001732658,0.4348333998267341],predictLabel=Iris-versicol

In [33]:
mlrAccuracy = evaluator.evaluate(mlrPredictions)
print ("Test Error = " + str(1.0 - mlrAccuracy))
 
mlrModel = mlrPipelineModel.stages[2]
 
print("Multinomial coefficients: " +str(mlrModel.coefficientMatrix)+"Multinomial intercepts: "+str(mlrModel.interceptVector)+"numClasses: "+str(mlrModel.numClasses)+
"numFeatures: "+str(mlrModel.numFeatures))

Test Error = 0.33147773279352233
Multinomial coefficients: DenseMatrix([[ 0.02510889,  0.        ,  0.        , -0.04403394],
             [-0.02510889,  0.        ,  0.        ,  0.04403394]])Multinomial intercepts: [-0.019574561219578925,0.019574561219578925]numClasses: 2numFeatures: 4


In [34]:
mlrPreRel = mlrPredictions.select("predictedLabel", "label", "features", "probability").collect()
for item in mlrPreRel:
    print('('+str(item['label'])+','+str(item['features'])+')-->prob='+str(item['probability'])+',predictLabel='+str(item['predictedLabel']))

(Iris-versicolor,[5.0,2.0,3.5,1.0])-->prob=[0.5527875036384094,0.44721249636159055],predictLabel=Iris-versicolor
(Iris-versicolor,[5.2,2.7,3.9,1.4])-->prob=[0.467473958789406,0.532526041210594],predictLabel=Iris-virginica
(Iris-versicolor,[5.5,2.4,3.8,1.1])-->prob=[0.5371764674125327,0.46282353258746733],predictLabel=Iris-versicolor
(Iris-versicolor,[5.5,2.6,4.4,1.2])-->prob=[0.5152235273744041,0.48477647262559587],predictLabel=Iris-versicolor
(Iris-versicolor,[5.6,2.7,4.2,1.3])-->prob=[0.49446693501344907,0.5055330649865509],predictLabel=Iris-virginica
(Iris-virginica,[5.6,2.8,4.9,2.0])-->prob=[0.3455611996383086,0.6544388003616914],predictLabel=Iris-virginica
(Iris-versicolor,[5.6,3.0,4.5,1.5])-->prob=[0.4505945103519814,0.5494054896480186],predictLabel=Iris-virginica
(Iris-virginica,[5.8,2.7,5.1,1.9])-->prob=[0.3680720032644509,0.6319279967355491],predictLabel=Iris-virginica
(Iris-versicolor,[6.0,2.2,4.0,1.0])-->prob=[0.5651666001732658,0.4348333998267341],predictLabel=Iris-versicol

In [36]:
mlrAccuracy = evaluator.evaluate(mlrPredictions)
print("Test Error = " + str(1.0 - mlrAccuracy))
#Test Error = 0.48730158730158735
mlrModel = mlrPipelineModel.stages[2]
print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix)+"Multinomial intercepts: "+str(mlrModel.interceptVector)+"numClasses: "+str(mlrModel.numClasses)+"numFeatures: "+str(mlrModel.numFeatures))

Test Error = 0.33147773279352233
Multinomial coefficients: DenseMatrix([[ 0.02510889,  0.        ,  0.        , -0.04403394],
             [-0.02510889,  0.        ,  0.        ,  0.04403394]])Multinomial intercepts: [-0.019574561219578925,0.019574561219578925]numClasses: 2numFeatures: 4


### 决策树

In [37]:
from pyspark.ml.linalg import Vector,Vectors
from pyspark.sql import Row
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString,StringIndexer,VectorIndexer

In [38]:
def f(x):
    rel = {}
    rel['features'] = Vectors.dense(float(x[0]),float(x[1]),float(x[2]),float(x[3]))
    rel['label'] = str(x[4])
    return rel

data = spark.sparkContext.textFile("iris.txt").map(lambda line:line.split(',')).map(lambda p: Row(**f(p))).toDF()

In [39]:
data.createOrReplaceTempView("iris")
df = spark.sql("select * from iris")
 
rel = df.rdd.map(lambda t : str(t[1])+":"+str(t[0])).collect()
for item in rel:
    print(item)

Iris-setosa:[5.1,3.5,1.4,0.2]
Iris-setosa:[4.9,3.0,1.4,0.2]
Iris-setosa:[4.7,3.2,1.3,0.2]
Iris-setosa:[4.6,3.1,1.5,0.2]
Iris-setosa:[5.0,3.6,1.4,0.2]
Iris-setosa:[5.4,3.9,1.7,0.4]
Iris-setosa:[4.6,3.4,1.4,0.3]
Iris-setosa:[5.0,3.4,1.5,0.2]
Iris-setosa:[4.4,2.9,1.4,0.2]
Iris-setosa:[4.9,3.1,1.5,0.1]
Iris-setosa:[5.4,3.7,1.5,0.2]
Iris-setosa:[4.8,3.4,1.6,0.2]
Iris-setosa:[4.8,3.0,1.4,0.1]
Iris-setosa:[4.3,3.0,1.1,0.1]
Iris-setosa:[5.8,4.0,1.2,0.2]
Iris-setosa:[5.7,4.4,1.5,0.4]
Iris-setosa:[5.4,3.9,1.3,0.4]
Iris-setosa:[5.1,3.5,1.4,0.3]
Iris-setosa:[5.7,3.8,1.7,0.3]
Iris-setosa:[5.1,3.8,1.5,0.3]
Iris-setosa:[5.4,3.4,1.7,0.2]
Iris-setosa:[5.1,3.7,1.5,0.4]
Iris-setosa:[4.6,3.6,1.0,0.2]
Iris-setosa:[5.1,3.3,1.7,0.5]
Iris-setosa:[4.8,3.4,1.9,0.2]
Iris-setosa:[5.0,3.0,1.6,0.2]
Iris-setosa:[5.0,3.4,1.6,0.4]
Iris-setosa:[5.2,3.5,1.5,0.2]
Iris-setosa:[5.2,3.4,1.4,0.2]
Iris-setosa:[4.7,3.2,1.6,0.2]
Iris-setosa:[4.8,3.1,1.6,0.2]
Iris-setosa:[5.4,3.4,1.5,0.4]
Iris-setosa:[5.2,4.1,1.5,0.1]
Iris-setos

In [41]:
#分别获取标签列和特征列，进行索引，并进行了重命名。
labelIndexer = StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(df)
featureIndexer = VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(4).fit(df)
#这里我们设置一个labelConverter，目的是把预测的类别重新转化成字符型的。
labelConverter = IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)
#接下来，我们把数据集随机分成训练集和测试集，其中训练集占70%。
trainingData, testData = data.randomSplit([0.7, 0.3])

In [42]:
from pyspark.ml.classification import DecisionTreeClassificationModel,DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
dtClassifier = DecisionTreeClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures")
pipelinedClassifier = Pipeline().setStages([labelIndexer, featureIndexer, dtClassifier, labelConverter])
# 决策树模型
modelClassifier = pipelinedClassifier.fit(trainingData)
# 进行预测
predictionsClassifier = modelClassifier.transform(testData)
# 查看部分预测的结果
predictionsClassifier.select("predictedLabel", "label", "features").show(20)

+---------------+---------------+-----------------+
| predictedLabel|          label|         features|
+---------------+---------------+-----------------+
|    Iris-setosa|    Iris-setosa|[4.3,3.0,1.1,0.1]|
|    Iris-setosa|    Iris-setosa|[4.4,3.0,1.3,0.2]|
|    Iris-setosa|    Iris-setosa|[4.5,2.3,1.3,0.3]|
|Iris-versicolor| Iris-virginica|[4.9,2.5,4.5,1.7]|
|Iris-versicolor|Iris-versicolor|[5.0,2.0,3.5,1.0]|
|    Iris-setosa|    Iris-setosa|[5.0,3.2,1.2,0.2]|
|    Iris-setosa|    Iris-setosa|[5.0,3.5,1.3,0.3]|
|    Iris-setosa|    Iris-setosa|[5.0,3.5,1.6,0.6]|
|    Iris-setosa|    Iris-setosa|[5.1,3.3,1.7,0.5]|
|    Iris-setosa|    Iris-setosa|[5.1,3.5,1.4,0.3]|
|    Iris-setosa|    Iris-setosa|[5.1,3.8,1.5,0.3]|
|    Iris-setosa|    Iris-setosa|[5.1,3.8,1.6,0.2]|
|Iris-versicolor|Iris-versicolor|[5.2,2.7,3.9,1.4]|
|    Iris-setosa|    Iris-setosa|[5.2,3.5,1.5,0.2]|
|Iris-versicolor|Iris-versicolor|[5.5,2.6,4.4,1.2]|
|    Iris-setosa|    Iris-setosa|[5.5,3.5,1.3,0.2]|
|Iris-versic

In [43]:
# 评估决策树的类型
evaluatorClassifier = MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction").setMetricName("accuracy")
accuracy = evaluatorClassifier.evaluate(predictionsClassifier)
print("Test Error = " + str(1.0 - accuracy))
treeModelClassifier = modelClassifier.stages[2]
print("Learned classification tree model:\n" + str(treeModelClassifier.toDebugString))

Test Error = 0.09615384615384615
Learned classification tree model:
DecisionTreeClassificationModel (uid=DecisionTreeClassifier_ea24d08dd2da) of depth 4 with 9 nodes
  If (feature 2 <= 2.45)
   Predict: 0.0
  Else (feature 2 > 2.45)
   If (feature 3 <= 1.75)
    Predict: 1.0
   Else (feature 3 > 1.75)
    If (feature 2 <= 4.85)
     If (feature 0 <= 5.95)
      Predict: 1.0
     Else (feature 0 > 5.95)
      Predict: 2.0
    Else (feature 2 > 4.85)
     Predict: 2.0



In [45]:
# 构建决策树分类模型
from pyspark.ml.regression import DecisionTreeRegressionModel,DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
#训练决策树模型
dtRegressor = DecisionTreeRegressor().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures")
#在pipeline中进行设置
pipelineRegressor = Pipeline().setStages([labelIndexer, featureIndexer, dtRegressor, labelConverter])
#训练决策树模型
modelRegressor = pipelineRegressor.fit(trainingData)
#进行预测
predictionsRegressor = modelRegressor.transform(testData)
#查看部分预测结果
predictionsRegressor.select("predictedLabel", "label", "features").show(20)

+---------------+---------------+-----------------+
| predictedLabel|          label|         features|
+---------------+---------------+-----------------+
|    Iris-setosa|    Iris-setosa|[4.3,3.0,1.1,0.1]|
|    Iris-setosa|    Iris-setosa|[4.4,3.0,1.3,0.2]|
|    Iris-setosa|    Iris-setosa|[4.5,2.3,1.3,0.3]|
|Iris-versicolor| Iris-virginica|[4.9,2.5,4.5,1.7]|
|Iris-versicolor|Iris-versicolor|[5.0,2.0,3.5,1.0]|
|    Iris-setosa|    Iris-setosa|[5.0,3.2,1.2,0.2]|
|    Iris-setosa|    Iris-setosa|[5.0,3.5,1.3,0.3]|
|    Iris-setosa|    Iris-setosa|[5.0,3.5,1.6,0.6]|
|    Iris-setosa|    Iris-setosa|[5.1,3.3,1.7,0.5]|
|    Iris-setosa|    Iris-setosa|[5.1,3.5,1.4,0.3]|
|    Iris-setosa|    Iris-setosa|[5.1,3.8,1.5,0.3]|
|    Iris-setosa|    Iris-setosa|[5.1,3.8,1.6,0.2]|
|Iris-versicolor|Iris-versicolor|[5.2,2.7,3.9,1.4]|
|    Iris-setosa|    Iris-setosa|[5.2,3.5,1.5,0.2]|
|Iris-versicolor|Iris-versicolor|[5.5,2.6,4.4,1.2]|
|    Iris-setosa|    Iris-setosa|[5.5,3.5,1.3,0.2]|
|Iris-versic