In [1]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('722-model').getOrCreate()

In [2]:
data1= spark.read.csv("./data_china.csv", header=True, inferSchema=True)
data1.printSchema()

root
 |-- adm0_id: integer (nullable = true)
 |-- adm0_name: string (nullable = true)
 |-- adm1_id: integer (nullable = true)
 |-- adm1_name: string (nullable = true)
 |-- mkt_id: integer (nullable = true)
 |-- mkt_name: string (nullable = true)
 |-- cm_id: integer (nullable = true)
 |-- cm_name: string (nullable = true)
 |-- cur_id: integer (nullable = true)
 |-- cur_name: string (nullable = true)
 |-- pt_id: integer (nullable = true)
 |-- pt_name: string (nullable = true)
 |-- um_id: integer (nullable = true)
 |-- um_name: string (nullable = true)
 |-- mp_month: integer (nullable = true)
 |-- mp_year: integer (nullable = true)
 |-- mp_price_RMB: double (nullable = true)



In [3]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
  inputCols=['mp_price_RMB'],
              outputCol="features")
output = assembler.transform(data1)
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="mkt_name", outputCol="PrivateIndex")
output_fixed = indexer.fit(output).transform(output)
final_data = output_fixed.select("features",'PrivateIndex')

In [4]:
from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier
from pyspark.ml import Pipeline
dtc = DecisionTreeClassifier(labelCol='PrivateIndex',featuresCol='features')
train_data,test_data = final_data.randomSplit([0.7,0.3])
dtc_model = dtc.fit(train_data)
dtc_predictions = dtc_model.transform(test_data)

In [5]:
# Let's start off with binary classification.
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Note that the label column isn't named label, it's named PrivateIndex in this case.
my_binary_eval = BinaryClassificationEvaluator(labelCol = 'PrivateIndex')

In [6]:
print("DTC")
print(my_binary_eval.evaluate(dtc_predictions))

DTC
0.3850041946308725


In [7]:
dtc_predictions.show()
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_evaluator = MulticlassClassificationEvaluator(labelCol="PrivateIndex", predictionCol="prediction", metricName="accuracy")
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
print('-'*40)
print('A single decision tree has an accuracy of: {0:2.2f}%'.format(dtc_acc*100))


+--------+------------+--------------------+--------------------+----------+
|features|PrivateIndex|       rawPrediction|         probability|prediction|
+--------+------------+--------------------+--------------------+----------+
| [1.315]|         5.0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       5.0|
|[1.3375]|         5.0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       5.0|
| [1.445]|         5.0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       5.0|
| [1.472]|         5.0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       5.0|
|[1.5025]|         5.0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       5.0|
|[1.5325]|         7.0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       5.0|
| [1.545]|         5.0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       5.0|
|[1.5567]|         4.0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       5.0|
|[1.5613]|         4.0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       5.0|
|[1.5625]|         5.0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       5.0|

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
final_data.show()
data1.select('mkt_name').distinct().show()
final_data.select('PrivateIndex').distinct().show()

+--------+------------+
|features|PrivateIndex|
+--------+------------+
|[2.6567]|         6.0|
|  [2.61]|         6.0|
| [2.616]|         6.0|
|  [2.57]|         6.0|
|  [2.54]|         6.0|
| [2.516]|         6.0|
|  [2.54]|         6.0|
|[2.5775]|         6.0|
|  [2.62]|         6.0|
|[2.6175]|         6.0|
| [2.594]|         6.0|
| [2.595]|         6.0|
|   [2.6]|         6.0|
|  [2.58]|         6.0|
| [2.582]|         6.0|
| [2.595]|         6.0|
| [2.552]|         6.0|
|  [2.47]|         6.0|
|  [2.45]|         6.0|
| [2.444]|         6.0|
+--------+------------+
only showing top 20 rows

+------------+
|    mkt_name|
+------------+
|       Hubei|
|       Wuhan|
|   Zhengzhou|
|Heilongjiang|
|       Jilin|
|       Linyi|
|    Jiujiang|
|    Yuncheng|
|    Hangzhou|
| Sijiazhuang|
|    Shandong|
+------------+

+------------+
|PrivateIndex|
+------------+
|         8.0|
|         0.0|
|         7.0|
|         1.0|
|         4.0|
|         3.0|
|         2.0|
|        10.0|
|      

In [9]:
print(dtc_model.toDebugString)

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_4bd7944fb6860fa3242e) of depth 5 with 47 nodes
  If (feature 0 <= 2.5867)
   If (feature 0 <= 2.255)
    If (feature 0 <= 1.7925)
     If (feature 0 <= 1.63)
      Predict: 5.0
     Else (feature 0 > 1.63)
      If (feature 0 <= 1.6933)
       Predict: 4.0
      Else (feature 0 > 1.6933)
       Predict: 4.0
    Else (feature 0 > 1.7925)
     If (feature 0 <= 1.922)
      Predict: 7.0
     Else (feature 0 > 1.922)
      Predict: 4.0
   Else (feature 0 > 2.255)
    If (feature 0 <= 2.4019999999999997)
     If (feature 0 <= 2.3125)
      Predict: 9.0
     Else (feature 0 > 2.3125)
      If (feature 0 <= 2.3725)
       Predict: 9.0
      Else (feature 0 > 2.3725)
       Predict: 9.0
    Else (feature 0 > 2.4019999999999997)
     If (feature 0 <= 2.492)
      If (feature 0 <= 2.455)
       Predict: 9.0
      Else (feature 0 > 2.455)
       Predict: 6.0
     Else (feature 0 > 2.492)
      If (feature 0 <= 2.552)
       Predict: 6.0
