In [1]:
from pyspark.ml.linalg import Vector,Vectors
from pyspark.sql import Row
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString,StringIndexer,VectorIndexer

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
def f(x):
    rel = {}
    rel['features'] = Vectors.dense(
        float(x[0]), float(x[1]), float(x[2]), float(x[3]))
    rel['label'] = str(x[4])
    return rel

In [5]:
data = spark.sparkContext.textFile("../../../dataset/iris.txt").map(
    lambda line: line.split(',')).map(lambda p: Row(**f(p))).toDF()

  return f(*args, **kwds)
  return f(*args, **kwds)


In [6]:
data.createOrReplaceTempView("iris")
df = spark.sql("select * from iris")
rel = df.rdd.map(lambda t : str(t[1])+":"+str(t[0])).collect()

In [7]:
for item in rel:
    print(item)

Iris-setosa:[5.1,3.5,1.4,0.2]
Iris-setosa:[4.9,3.0,1.4,0.2]
Iris-setosa:[4.7,3.2,1.3,0.2]
Iris-setosa:[4.6,3.1,1.5,0.2]
Iris-setosa:[5.0,3.6,1.4,0.2]
Iris-setosa:[5.4,3.9,1.7,0.4]
Iris-setosa:[4.6,3.4,1.4,0.3]
Iris-setosa:[5.0,3.4,1.5,0.2]
Iris-setosa:[4.4,2.9,1.4,0.2]
Iris-setosa:[4.9,3.1,1.5,0.1]
Iris-setosa:[5.4,3.7,1.5,0.2]
Iris-setosa:[4.8,3.4,1.6,0.2]
Iris-setosa:[4.8,3.0,1.4,0.1]
Iris-setosa:[4.3,3.0,1.1,0.1]
Iris-setosa:[5.8,4.0,1.2,0.2]
Iris-setosa:[5.7,4.4,1.5,0.4]
Iris-setosa:[5.4,3.9,1.3,0.4]
Iris-setosa:[5.1,3.5,1.4,0.3]
Iris-setosa:[5.7,3.8,1.7,0.3]
Iris-setosa:[5.1,3.8,1.5,0.3]
Iris-setosa:[5.4,3.4,1.7,0.2]
Iris-setosa:[5.1,3.7,1.5,0.4]
Iris-setosa:[4.6,3.6,1.0,0.2]
Iris-setosa:[5.1,3.3,1.7,0.5]
Iris-setosa:[4.8,3.4,1.9,0.2]
Iris-setosa:[5.0,3.0,1.6,0.2]
Iris-setosa:[5.0,3.4,1.6,0.4]
Iris-setosa:[5.2,3.5,1.5,0.2]
Iris-setosa:[5.2,3.4,1.4,0.2]
Iris-setosa:[4.7,3.2,1.6,0.2]
Iris-setosa:[4.8,3.1,1.6,0.2]
Iris-setosa:[5.4,3.4,1.5,0.4]
Iris-setosa:[5.2,4.1,1.5,0.1]
Iris-setos

In [8]:
# 进一步处理特征和标签，以及数据分组
# 分别获取标签列和特征列，进行索引，并进行了重命名。
labelIndexer = StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(df)

featureIndexer = VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(4).fit(df)
 
# 这里我们设置一个labelConverter，目的是把预测的类别重新转化成字符型的。
labelConverter = IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)
# 接下来，我们把数据集随机分成训练集和测试集，其中训练集占70%。
trainingData, testData = data.randomSplit([0.7, 0.3])

In [9]:
# 构建决策树分类模型
# 导入所需要的包
from pyspark.ml.classification import DecisionTreeClassificationModel,DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# 训练决策树模型,这里我们可以通过setter的方法来设置决策树的参数，也可以用ParamMap来设置（具体的可以查看spark mllib的官网）。具体的可以设置的参数可以通过explainParams()来获取。
dtClassifier = DecisionTreeClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures")
# 在pipeline中进行设置
pipelinedClassifier = Pipeline().setStages([labelIndexer, featureIndexer, dtClassifier, labelConverter])
# 训练决策树模型
modelClassifier = pipelinedClassifier.fit(trainingData)
# 进行预测
predictionsClassifier = modelClassifier.transform(testData)
# 查看部分预测的结果
predictionsClassifier.select("predictedLabel", "label", "features").show(20)

+---------------+---------------+-----------------+
| predictedLabel|          label|         features|
+---------------+---------------+-----------------+
|    Iris-setosa|    Iris-setosa|[4.3,3.0,1.1,0.1]|
|    Iris-setosa|    Iris-setosa|[4.4,3.2,1.3,0.2]|
|    Iris-setosa|    Iris-setosa|[5.0,3.2,1.2,0.2]|
|    Iris-setosa|    Iris-setosa|[5.2,4.1,1.5,0.1]|
|    Iris-setosa|    Iris-setosa|[5.3,3.7,1.5,0.2]|
|    Iris-setosa|    Iris-setosa|[5.4,3.4,1.5,0.4]|
|    Iris-setosa|    Iris-setosa|[5.4,3.4,1.7,0.2]|
|Iris-versicolor|Iris-versicolor|[5.8,2.7,4.1,1.0]|
|    Iris-setosa|    Iris-setosa|[5.8,4.0,1.2,0.2]|
|Iris-versicolor|Iris-versicolor|[5.9,3.0,4.2,1.5]|
| Iris-virginica|Iris-versicolor|[5.9,3.2,4.8,1.8]|
|Iris-versicolor|Iris-versicolor|[6.1,2.8,4.7,1.2]|
|Iris-versicolor|Iris-versicolor|[6.1,2.9,4.7,1.4]|
| Iris-virginica|Iris-versicolor|[6.3,2.5,4.9,1.5]|
|Iris-versicolor|Iris-versicolor|[6.4,3.2,4.5,1.5]|
|Iris-versicolor|Iris-versicolor|[6.6,2.9,4.6,1.3]|
| Iris-virgi

In [12]:
# 评估决策树分类模型
evaluatorClassifier = MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction").setMetricName("accuracy")
 
accuracy = evaluatorClassifier.evaluate(predictionsClassifier)
 
print("Test Error = " + str(1.0 - accuracy))
 
treeModelClassifier = modelClassifier.stages[2]
 
print("Learned classification tree model:\n" + str(treeModelClassifier.toDebugString))

Test Error = 0.07894736842105265
Learned classification tree model:
DecisionTreeClassificationModel (uid=DecisionTreeClassifier_d8a31a27586f) of depth 5 with 11 nodes
  If (feature 2 <= 2.45)
   Predict: 2.0
  Else (feature 2 > 2.45)
   If (feature 3 <= 1.65)
    If (feature 2 <= 4.85)
     Predict: 0.0
    Else (feature 2 > 4.85)
     If (feature 1 <= 3.05)
      If (feature 0 <= 6.05)
       Predict: 0.0
      Else (feature 0 > 6.05)
       Predict: 1.0
     Else (feature 1 > 3.05)
      Predict: 0.0
   Else (feature 3 > 1.65)
    Predict: 1.0



#### 构建决策树回归模型

In [13]:
# 导入所需要的包
from pyspark.ml.regression import DecisionTreeRegressionModel,DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
# 训练决策树模型
dtRegressor = DecisionTreeRegressor().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures")
# 在pipeline中进行设置
pipelineRegressor = Pipeline().setStages([labelIndexer, featureIndexer, dtRegressor, labelConverter])
# 训练决策树模型
modelRegressor = pipelineRegressor.fit(trainingData)
# 进行预测
predictionsRegressor = modelRegressor.transform(testData)
# 查看部分预测结果
predictionsRegressor.select("predictedLabel", "label", "features").show(20)

+---------------+---------------+-----------------+
| predictedLabel|          label|         features|
+---------------+---------------+-----------------+
|    Iris-setosa|    Iris-setosa|[4.3,3.0,1.1,0.1]|
|    Iris-setosa|    Iris-setosa|[4.4,3.2,1.3,0.2]|
|    Iris-setosa|    Iris-setosa|[5.0,3.2,1.2,0.2]|
|    Iris-setosa|    Iris-setosa|[5.2,4.1,1.5,0.1]|
|    Iris-setosa|    Iris-setosa|[5.3,3.7,1.5,0.2]|
|    Iris-setosa|    Iris-setosa|[5.4,3.4,1.5,0.4]|
|    Iris-setosa|    Iris-setosa|[5.4,3.4,1.7,0.2]|
|Iris-versicolor|Iris-versicolor|[5.8,2.7,4.1,1.0]|
|    Iris-setosa|    Iris-setosa|[5.8,4.0,1.2,0.2]|
|Iris-versicolor|Iris-versicolor|[5.9,3.0,4.2,1.5]|
| Iris-virginica|Iris-versicolor|[5.9,3.2,4.8,1.8]|
|Iris-versicolor|Iris-versicolor|[6.1,2.8,4.7,1.2]|
|Iris-versicolor|Iris-versicolor|[6.1,2.9,4.7,1.4]|
| Iris-virginica|Iris-versicolor|[6.3,2.5,4.9,1.5]|
|Iris-versicolor|Iris-versicolor|[6.4,3.2,4.5,1.5]|
|Iris-versicolor|Iris-versicolor|[6.6,2.9,4.6,1.3]|
| Iris-virgi

In [14]:
# 评估决策树回归模型
evaluatorRegressor = RegressionEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction").setMetricName("rmse")
 
rmse = evaluatorRegressor.evaluate(predictionsRegressor)
 
print("Root Mean Squared Error (RMSE) on test data = " +str(rmse))
 
treeModelRegressor = modelRegressor.stages[2]
 
print("Learned regression tree model:\n" + str(treeModelRegressor.toDebugString))

Root Mean Squared Error (RMSE) on test data = 0.28097574347450816
Learned regression tree model:
DecisionTreeRegressionModel (uid=DecisionTreeRegressor_f714cd151b86) of depth 5 with 11 nodes
  If (feature 2 <= 2.45)
   Predict: 2.0
  Else (feature 2 > 2.45)
   If (feature 3 <= 1.65)
    If (feature 2 <= 4.85)
     Predict: 0.0
    Else (feature 2 > 4.85)
     If (feature 1 <= 3.05)
      If (feature 0 <= 6.05)
       Predict: 0.5
      Else (feature 0 > 6.05)
       Predict: 1.0
     Else (feature 1 > 3.05)
      Predict: 0.0
   Else (feature 3 > 1.65)
    Predict: 1.0

