In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [2]:
creditDF=spark.sql("select * from temp.tcz_credit")
creditDF.show()

+-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+
|creditability|balance|duration|history|purpose|amount|savings|employment|instpercent|sexmarried|guarantors|residenceduration|assets| age|conccredit|apartment|credits|occupation|dependents|hasphone|foreign|
+-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+
|          1.0|    1.0|    18.0|    4.0|    2.0|1049.0|    1.0|       2.0|        4.0|       2.0|       1.0|              4.0|   2.0|21.0|       3.0|      1.0|    1.0|       3.0|       1.0|     1.0|    1.0|
|          1.0|    1.0|     9.0|    4.0|    0.0|2799.0|    1.0|       3.0|        2.0|       3.0|       1.0|              2.0|   1.0|36.0|       3.0|      1.0|    2.0|     

In [3]:
creditDF.cache()
creditDF.describe("balance").show()
creditDF.groupBy("creditability").avg("balance").show()
creditDF.describe("balance","amount","duration").show()

+-------+-----------------+
|summary|          balance|
+-------+-----------------+
|  count|             1000|
|   mean|            2.577|
| stddev|1.257637727110893|
|    min|              1.0|
|    max|              4.0|
+-------+-----------------+

+-------------+------------------+
|creditability|      avg(balance)|
+-------------+------------------+
|          0.0|1.9033333333333333|
|          1.0|2.8657142857142857|
+-------------+------------------+

+-------+-----------------+-----------------+------------------+
|summary|          balance|           amount|          duration|
+-------+-----------------+-----------------+------------------+
|  count|             1000|             1000|              1000|
|   mean|            2.577|         3271.248|            20.903|
| stddev|1.257637727110893|2822.751759895651|12.058814452756373|
|    min|              1.0|            250.0|               4.0|
|    max|              4.0|          18424.0|              72.0|
+-------+-------

In [4]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
    inputCols=["balance", "duration", "history", "purpose", "amount",
      "savings", "employment", "instpercent", "sexmarried", "guarantors",
      "residenceduration", "assets", "age", "conccredit", "apartment",
      "credits", "occupation", "dependents", "hasphone", "foreign"],
    outputCol="features")

In [5]:
df2 = assembler.transform(creditDF)
print("Assembled columns   to vector column 'features'")
df2.show(truncate=False)

Assembled columns   to vector column 'features'
+-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+---------------------------------------------------------------------------------------+
|creditability|balance|duration|history|purpose|amount|savings|employment|instpercent|sexmarried|guarantors|residenceduration|assets|age |conccredit|apartment|credits|occupation|dependents|hasphone|foreign|features                                                                               |
+-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+---------------------------------------------------------------------------------------+
|1.0          |1.0    |18.0    |4.0    |2.0    |1049.0|1.0    |2.0 

In [6]:
from pyspark.ml.feature import  StringIndexer

df3 = StringIndexer(inputCol="creditability", outputCol="label").fit(df2).transform(df2)


In [7]:
type(df3) 
df3.show(10)
df3.groupBy("creditability","label").count().show(truncate=False)

+-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+--------------------+-----+
|creditability|balance|duration|history|purpose|amount|savings|employment|instpercent|sexmarried|guarantors|residenceduration|assets| age|conccredit|apartment|credits|occupation|dependents|hasphone|foreign|            features|label|
+-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+--------------------+-----+
|          1.0|    1.0|    18.0|    4.0|    2.0|1049.0|    1.0|       2.0|        4.0|       2.0|       1.0|              4.0|   2.0|21.0|       3.0|      1.0|    1.0|       3.0|       1.0|     1.0|    1.0|[1.0,18.0,4.0,2.0...|  0.0|
|          1.0|    1.0|     9.0|    4.0|    0.0|2799.0|    1.0| 

In [8]:
splitSeed = 10
(trainingData, testData) = df3.randomSplit([0.7, 0.3],splitSeed)

In [9]:
trainingData.count()

701

In [10]:
testData.count()

299

In [12]:
# Train a RandomForest model.
from pyspark.ml.classification import  RandomForestClassifier
classifier = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=5,maxDepth=2,seed=10,impurity="gini",featureSubsetStrategy="auto")
model = classifier.fit(trainingData)
print(model.toDebugString)

RandomForestClassificationModel (uid=RandomForestClassifier_4aa18178fb1a322bf15a) with 5 trees
  Tree 0 (weight 1.0):
    If (feature 2 <= 1.0)
     If (feature 12 <= 49.0)
      Predict: 1.0
     Else (feature 12 > 49.0)
      Predict: 1.0
    Else (feature 2 > 1.0)
     If (feature 0 <= 2.0)
      Predict: 0.0
     Else (feature 0 > 2.0)
      Predict: 0.0
  Tree 1 (weight 1.0):
    If (feature 6 <= 3.0)
     If (feature 11 <= 1.0)
      Predict: 0.0
     Else (feature 11 > 1.0)
      Predict: 0.0
    Else (feature 6 > 3.0)
     If (feature 0 <= 3.0)
      Predict: 0.0
     Else (feature 0 > 3.0)
      Predict: 0.0
  Tree 2 (weight 1.0):
    If (feature 0 <= 2.0)
     If (feature 5 <= 1.0)
      Predict: 1.0
     Else (feature 5 > 1.0)
      Predict: 0.0
    Else (feature 0 > 2.0)
     If (feature 12 <= 22.0)
      Predict: 0.0
     Else (feature 12 > 22.0)
      Predict: 0.0
  Tree 3 (weight 1.0):
    If (feature 11 <= 1.0)
     If (feature 10 <= 1.0)
      Predict: 0.0
     Else (f

In [13]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator =  BinaryClassificationEvaluator().setLabelCol("label")
predictions = model.transform(testData)

In [16]:
predictions.show(10,truncate=False)
predictions.groupBy("creditability","label","prediction").count().show(10,truncate=False)

+-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+--------------------------------------------------------------------------------------+-----+---------------------------------------+----------------------------------------+----------+
|creditability|balance|duration|history|purpose|amount|savings|employment|instpercent|sexmarried|guarantors|residenceduration|assets|age |conccredit|apartment|credits|occupation|dependents|hasphone|foreign|features                                                                              |label|rawPrediction                          |probability                             |prediction|
+-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+----------

In [17]:
accuracy = evaluator.evaluate(predictions)
print("accuracy before pipeline fitting %g"%(accuracy))

accuracy before pipeline fitting 0.710701


In [18]:
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.sql import Row
Predict_pair = Row('prediction', 'label') 

rm = RegressionMetrics(
      predictions.select("prediction", "label").rdd.map(lambda r: Predict_pair(*r))
    )

In [19]:
print("explainedVariance %g"%(rm.explainedVariance))
print("meanAbsoluteError %g"%(rm.meanAbsoluteError))
print("meanSquaredError %g"%(rm.meanSquaredError))
print("rootMeanSquaredError %g"%(rm.rootMeanSquaredError))
print("r2 %g"%(rm.r2))


explainedVariance 0.098075
meanAbsoluteError 0.280936
meanSquaredError 0.280936
rootMeanSquaredError 0.530034
r2 -0.34382


In [20]:
from pyspark.ml.tuning import ParamGridBuilder
paramGrid = ParamGridBuilder() \
        .addGrid(classifier.maxBins,[25,31]) \
        .addGrid(classifier.maxDepth,[5,10]) \
        .addGrid(classifier.numTrees, [20, 60]) \
        .addGrid(classifier.impurity, ["entropy", "gini"]) \
        .build()

In [21]:
from pyspark.ml import Pipeline

pipeline=Pipeline().setStages(classifier)

In [22]:
from pyspark.ml.tuning import CrossValidator
#from pyspark.ml.classification import LogisticRegression
#lr = LogisticRegression()
cv = CrossValidator(estimator=classifier, estimatorParamMaps=paramGrid, evaluator=evaluator,numFolds=10)  


In [24]:
pipelineFittedModel = cv.fit(trainingData)

In [25]:
print(pipelineFittedModel.bestModel.toDebugString)

RandomForestClassificationModel (uid=RandomForestClassifier_4aa18178fb1a322bf15a) with 60 trees
  Tree 0 (weight 1.0):
    If (feature 6 <= 2.0)
     If (feature 1 <= 21.0)
      If (feature 0 <= 2.0)
       If (feature 12 <= 56.0)
        If (feature 3 <= 0.0)
         If (feature 10 <= 2.0)
          If (feature 8 <= 2.0)
           Predict: 1.0
          Else (feature 8 > 2.0)
           If (feature 2 <= 2.0)
            Predict: 1.0
           Else (feature 2 > 2.0)
            Predict: 0.0
         Else (feature 10 > 2.0)
          If (feature 4 <= 1262.0)
           If (feature 4 <= 662.0)
            Predict: 0.0
           Else (feature 4 > 662.0)
            Predict: 1.0
          Else (feature 4 > 1262.0)
           Predict: 0.0
        Else (feature 3 > 0.0)
         If (feature 18 <= 1.0)
          If (feature 15 <= 1.0)
           If (feature 2 <= 1.0)
            Predict: 1.0
           Else (feature 2 > 1.0)
            If (feature 4 <= 1382.0)
             If (feature 1

In [26]:
predictions2 = pipelineFittedModel.transform(testData)


In [27]:
predictions2.show(10,truncate=False)
predictions2.groupBy("creditability","label","prediction").count().show(truncate=False)

+-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+--------------------------------------------------------------------------------------+-----+---------------------------------------+----------------------------------------+----------+
|creditability|balance|duration|history|purpose|amount|savings|employment|instpercent|sexmarried|guarantors|residenceduration|assets|age |conccredit|apartment|credits|occupation|dependents|hasphone|foreign|features                                                                              |label|rawPrediction                          |probability                             |prediction|
+-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+----------

In [28]:
accuracy2 = evaluator.evaluate(predictions2)
print("accuracy before pipeline fitting %g"%(accuracy2))

accuracy before pipeline fitting 0.730819


In [29]:
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.sql import Row
Predict_pair = Row('prediction', 'label') 

rm2 = RegressionMetrics(
      predictions2.select("prediction", "label").rdd.map(lambda r: Predict_pair(*r))
    )

In [30]:
print("explainedVariance %g"%(rm2.explainedVariance))
print("meanAbsoluteError %g"%(rm2.meanAbsoluteError))
print("meanSquaredError %g"%(rm2.meanSquaredError))
print("rootMeanSquaredError %g"%(rm2.rootMeanSquaredError))
print("r2 %g"%(rm2.r2))


explainedVariance 0.163041
meanAbsoluteError 0.267559
meanSquaredError 0.267559
rootMeanSquaredError 0.517261
r2 -0.279829
