In [1]:
# 'local[*]' - instructing to use all the cores (*) on a local machine
# 'appName' - will be given a random name unless specified, like Docker container image name
from pyspark.sql import SparkSession
spark = SparkSession.builder. \
        master('local[*]').\
        appName('credit risk').\
        getOrCreate()

In [10]:
# Data source CSV file must reside in the same folder
# The 'german.data-numeric' file delimits columns with 2 spaces ('  ')
#   Without data scrubbing, "IllegalArgumentException: 'Delimiter cannot be more than one character:   '"
df = spark.read.csv('german.data',sep=' ',inferSchema=False,header=False)

In [11]:
print((df.count(),len(df.columns)))
df.printSchema()
df.show(5)

(1000, 21)
root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)
 |-- _c20: string (nullable = true)

+---+---+---+---+----+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+
|_c0|_c1|_c2|_c3| _c4|_c5|_c6|_c7|_c8| _c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|
+---+---+---+---+----+---+---+---+---+-

All the columns with numbers are "**ordinal**" i.e., the magnitude in each column/feature means "something" (e.g., age, duration, credit amount and installment rate). Thus, transformation and encoding for them is not needed and they will be converted to non-string.

In [34]:
df= df.withColumnRenamed('1', '_c0')
df= df.withColumnRenamed('2', '_c1')

In [12]:
from pyspark.sql.types import ShortType # a signed 16-bit integer vs. IntegerType (a signed 32-bit integer)

col_indices_of_ordinal_numbers = [2, 5, 8, 11, 13, 16, 18, 21]

for col_name in df.schema.names:
    # change the name of each column e.g., "_c0" -> "1"
    new_col_num = int(col_name.replace('_c', '')) + 1
    df = df.withColumnRenamed(col_name, str(new_col_num))
    
    if new_col_num in col_indices_of_ordinal_numbers:  
        df = df.withColumn(str(new_col_num), df[str(new_col_num)].cast(ShortType()))

In [5]:
df.show(5)
df.dtypes
#df = df.withColumnRenamed('21', 'target')

+---+---+---+---+----+---+---+---+---+----+---+----+---+----+----+---+----+---+----+----+---+
|  1|  2|  3|  4|   5|  6|  7|  8|  9|  10| 11|  12| 13|  14|  15| 16|  17| 18|  19|  20| 21|
+---+---+---+---+----+---+---+---+---+----+---+----+---+----+----+---+----+---+----+----+---+
|A11|  6|A34|A43|1169|A65|A75|  4|A93|A101|  4|A121| 67|A143|A152|  2|A173|  1|A192|A201|  1|
|A12| 48|A32|A43|5951|A61|A73|  2|A92|A101|  2|A121| 22|A143|A152|  1|A173|  1|A191|A201|  2|
|A14| 12|A34|A46|2096|A61|A74|  2|A93|A101|  3|A121| 49|A143|A152|  1|A172|  2|A191|A201|  1|
|A11| 42|A32|A42|7882|A61|A74|  2|A93|A103|  4|A122| 45|A143|A153|  1|A173|  2|A191|A201|  1|
|A11| 24|A33|A40|4870|A61|A73|  3|A93|A101|  4|A124| 53|A143|A153|  2|A173|  2|A191|A201|  2|
+---+---+---+---+----+---+---+---+---+----+---+----+---+----+----+---+----+---+----+----+---+
only showing top 5 rows



[('1', 'string'),
 ('2', 'smallint'),
 ('3', 'string'),
 ('4', 'string'),
 ('5', 'smallint'),
 ('6', 'string'),
 ('7', 'string'),
 ('8', 'smallint'),
 ('9', 'string'),
 ('10', 'string'),
 ('11', 'smallint'),
 ('12', 'string'),
 ('13', 'smallint'),
 ('14', 'string'),
 ('15', 'string'),
 ('16', 'smallint'),
 ('17', 'string'),
 ('18', 'smallint'),
 ('19', 'string'),
 ('20', 'string'),
 ('21', 'smallint')]

In [13]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder


COL_SUFFIX_IDX = "_INDEX"
COL_SUFFIX_VEC = "_VEC"
TARGET_COL_NAME = "target"

df = df.withColumnRenamed(str(len(df.columns)), TARGET_COL_NAME)

# Unforutnately need to loop through all the columns, as there is no 'dtype' attribute for column
def get_dtype(df, col_name):
    return [dtype for name, dtype in df.dtypes if name == col_name][0]

for col_name in df.schema.names:
    # Transform and encode all the non-string columns EXCEPT the last column of target/label 
    if (get_dtype(df, col_name) == 'string') and (col_name != TARGET_COL_NAME):
        # STAGE 1 of 2: TRANSFORMING

        # Class StringIndexer (an abstract of Estimator)
        stringIndexer = StringIndexer(inputCol=col_name, outputCol=col_name + COL_SUFFIX_IDX)

        # Fit a model to the input dataset, i.e., mapping categorical values to indices
        model = stringIndexer.fit(df)

        # transform converts the words column into feature vectors, adding a new column with those vectors to the DataFrame. 
        col_indexed_df = model.transform(df)

        # STAGE 2 of 2: ENCODING
        # Class OneHotEncoder
        oneHotEncoder = OneHotEncoder(inputCol=col_name + COL_SUFFIX_IDX, outputCol=col_name + COL_SUFFIX_VEC, dropLast=False)
 
        # NO need to fit()
        df = oneHotEncoder.transform(col_indexed_df) # Assgin back to 'df', using the col_indexed_df

In [14]:
df.printSchema # terser output from that of `df.printSchema()`

<bound method DataFrame.printSchema of DataFrame[1: string, 2: smallint, 3: string, 4: string, 5: smallint, 6: string, 7: string, 8: smallint, 9: string, 10: string, 11: smallint, 12: string, 13: smallint, 14: string, 15: string, 16: smallint, 17: string, 18: smallint, 19: string, 20: string, target: smallint, 1_INDEX: double, 1_VEC: vector, 3_INDEX: double, 3_VEC: vector, 4_INDEX: double, 4_VEC: vector, 6_INDEX: double, 6_VEC: vector, 7_INDEX: double, 7_VEC: vector, 9_INDEX: double, 9_VEC: vector, 10_INDEX: double, 10_VEC: vector, 12_INDEX: double, 12_VEC: vector, 14_INDEX: double, 14_VEC: vector, 15_INDEX: double, 15_VEC: vector, 17_INDEX: double, 17_VEC: vector, 19_INDEX: double, 19_VEC: vector, 20_INDEX: double, 20_VEC: vector]>

In [15]:
from pyspark.ml.feature import VectorAssembler

FEATURE_VECTOR_COLUMN_NAME = "features"
feature_columns = []

for col_name in df.schema.names:
    # Transform and encod all the non-string columns EXCEPT the last column of target/label
    if ((get_dtype(df, col_name) == 'smallint') or (get_dtype(df, col_name) == 'vector')) \
        and (col_name != TARGET_COL_NAME):

        feature_columns.append(col_name)

#feature_columns
vectorAssembler = VectorAssembler(inputCols=feature_columns, outputCol=FEATURE_VECTOR_COLUMN_NAME) # inputCols, NOT inputCol
df = vectorAssembler.transform(df)

# Spark mandates all the features are bunched up into a single vector for each row, in a single column plus target column
df_final = df.select([FEATURE_VECTOR_COLUMN_NAME, TARGET_COL_NAME])

In [16]:
df_final.show(5)

+--------------------+------+
|            features|target|
+--------------------+------+
|(61,[0,1,2,3,4,5,...|     1|
|(61,[0,1,2,3,4,5,...|     2|
|(61,[0,1,2,3,4,5,...|     1|
|(61,[0,1,2,3,4,5,...|     1|
|(61,[0,1,2,3,4,5,...|     2|
+--------------------+------+
only showing top 5 rows



In [17]:
df_final.collect()[0]['features']

SparseVector(61, {0: 6.0, 1: 1169.0, 2: 4.0, 3: 4.0, 4: 67.0, 5: 2.0, 6: 1.0, 8: 1.0, 12: 1.0, 16: 1.0, 27: 1.0, 32: 1.0, 36: 1.0, 40: 1.0, 44: 1.0, 47: 1.0, 50: 1.0, 53: 1.0, 58: 1.0, 59: 1.0})

In [18]:
t = df_final.collect()[0]['features']
print(t[7], t.indices, t.indices.shape[0])

#19

0.0 [ 0  1  2  3  4  5  6  8 12 16 27 32 36 40 44 47 50 53 58 59] 20


In [19]:
t.indices[18]

58

In [20]:
t[int(t.indices[18])]

1.0

In [21]:
for col_name in feature_columns:
    print(col_name)
feature_columns.count
#20 columns

2
5
8
11
13
16
18
1_VEC
3_VEC
4_VEC
6_VEC
7_VEC
9_VEC
10_VEC
12_VEC
14_VEC
15_VEC
17_VEC
19_VEC
20_VEC


<function list.count(value, /)>

1='A11', 2=6, 3='A34', 4='A43', 5=1169, 6='A65', 7='A75', 8=4, 9='A93', 10='A101', 11=4, 12='A121', 13=67, 14='A143', 15='A152', 16=2, 17='A173', 18=1, 19='A192', 20='A201', target=1, 1_INDEX=1.0, 1_VEC=SparseVector(3, {1: 1.0}), 3_INDEX=1.0, 3_VEC=SparseVector(4, {1: 1.0}), 4_INDEX=0.0, 4_VEC=SparseVector(9, {0: 1.0}), 6_INDEX=1.0, 6_VEC=SparseVector(4, {1: 1.0}), 7_INDEX=1.0, 7_VEC=SparseVector(4, {1: 1.0}), 9_INDEX=0.0, 9_VEC=SparseVector(3, {0: 1.0}), 10_INDEX=0.0, 10_VEC=SparseVector(2, {0: 1.0}), 12_INDEX=1.0, 12_VEC=SparseVector(3, {1: 1.0}), 14_INDEX=0.0, 14_VEC=SparseVector(2, {0: 1.0}), 15_INDEX=0.0, 15_VEC=SparseVector(2, {0: 1.0}), 17_INDEX=0.0, 17_VEC=SparseVector(3, {0: 1.0}), 19_INDEX=1.0, 19_VEC=SparseVector(1, {}), 20_INDEX=0.0, 20_VEC=SparseVector(1, {0: 1.0})

In [27]:
for col_name in feature_columns:
    print(df.collect()[0][col_name])
#20

6
1169
4
4
67
2
1
(4,[1],[1.0])
(5,[1],[1.0])
(10,[0],[1.0])
(5,[1],[1.0])
(5,[1],[1.0])
(4,[0],[1.0])
(3,[0],[1.0])
(4,[1],[1.0])
(3,[0],[1.0])
(3,[0],[1.0])
(4,[0],[1.0])
(2,[1],[1.0])
(2,[0],[1.0])


In [24]:
features_list = []

for idx in range(t.indices.shape[0]):
    # without int(), TypeError: Indices must be of type integer, got type <class 'numpy.int32'>
    key = int(t.indices[idx])
    
    features_list.append(int(t[key]))


    
# features_values = tuple(features_list)
print(tuple(features_list))


# WHy 19????

(6, 1169, 4, 4, 67, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1)


In [26]:
t2 = []
t2.append(tuple(features_list))
t2

[(6, 1169, 4, 4, 67, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1)]

In [None]:




>>> l = [('Alice', 1)]
>>> spark.createDataFrame(l).collect()
[Row(_1=u'Alice', _2=1)]
>>> spark.createDataFrame(l, ['name', 'age']).collect()
[Row(name=u'Alice', age=1)]

>>> d = [{'name': 'Alice', 'age': 1}]
>>> spark.createDataFrame(d).collect()
[Row(age=1, name=u'Alice')]



In [12]:
df.collect()[0]

Row(1='A11', 2=6, 3='A34', 4='A43', 5=1169, 6='A65', 7='A75', 8=4, 9='A93', 10='A101', 11=4, 12='A121', 13=67, 14='A143', 15='A152', 16=2, 17='A173', 18=1, 19='A192', 20='A201', target=1, 1_INDEX=1.0, 1_VEC=SparseVector(3, {1: 1.0}), 3_INDEX=1.0, 3_VEC=SparseVector(4, {1: 1.0}), 4_INDEX=0.0, 4_VEC=SparseVector(9, {0: 1.0}), 6_INDEX=1.0, 6_VEC=SparseVector(4, {1: 1.0}), 7_INDEX=1.0, 7_VEC=SparseVector(4, {1: 1.0}), 9_INDEX=0.0, 9_VEC=SparseVector(3, {0: 1.0}), 10_INDEX=0.0, 10_VEC=SparseVector(2, {0: 1.0}), 12_INDEX=1.0, 12_VEC=SparseVector(3, {1: 1.0}), 14_INDEX=0.0, 14_VEC=SparseVector(2, {0: 1.0}), 15_INDEX=0.0, 15_VEC=SparseVector(2, {0: 1.0}), 17_INDEX=0.0, 17_VEC=SparseVector(3, {0: 1.0}), 19_INDEX=1.0, 19_VEC=SparseVector(1, {}), 20_INDEX=0.0, 20_VEC=SparseVector(1, {0: 1.0}), features=SparseVector(48, {0: 6.0, 1: 1169.0, 2: 4.0, 3: 4.0, 4: 67.0, 5: 2.0, 6: 1.0, 8: 1.0, 11: 1.0, 14: 1.0, 24: 1.0, 28: 1.0, 31: 1.0, 34: 1.0, 37: 1.0, 39: 1.0, 41: 1.0, 43: 1.0, 47: 1.0}))

In [None]:
import pandas as pd





**SparseVector**, that is! 

The preivous `for` loop iterates all the columns and puts column data only in 'smallint' and 'vector' datatypes into the 'feature_columns' list. If you closely look at the first row [0] from left to right above, the code picks up 'smallint' first i.e., the column numbers in the col_indices_of_ordinal_numbers list, then the added vector-type columns with suffix of "_VEC".

2=6, 5=1169, 8=4, 11=4, 13=67, 16=2, 18=1, target=1,
{0: 6.0, 1: 1169.0, 2: 4.0, 3: 4.0, 4: 67.0, 5: 2.0, 6: 1.0, 8: 1.0,                       13: 1.0, 

GET RID OF TARGET COLUMN target = 1



This means there are 968 (967 + 1) unique values all across the features, of which this particular sample, the first row [0], 

In [None]:
X_train, Y_test = df_final.randomSplit(0.80, 0.20)

In [3]:
#df["_c0"].isNull

from pyspark.sql.functions import trim


names = df.schema.names

for name in names:
    trim(df[name])
    
    print(name + ': ' + str(df.where(df[name].isNull()).count())) # + ' & ' + str(df.where(df[name] == 'NoneType').count())


#duh = df.filter(df["_c0"] == ' ').collect()
#duh = df.filter(df["_c0"].isNull()).collect()


_c0: 0
_c1: 0
_c2: 0
_c3: 0
_c4: 0
_c5: 0
_c6: 0
_c7: 0
_c8: 0
_c9: 0
_c10: 0
_c11: 0
_c12: 0
_c13: 0
_c14: 0
_c15: 0
_c16: 0
_c17: 0
_c18: 0
_c19: 0
_c20: 0


In [23]:
from pyspark.sql.functions import translate
from pyspark.sql.functions import when
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import column
from pyspark.sql.functions import substring

for name in names:
    suffix = 'A' + str(int(name.replace('_c', '')) + 1)
    
#    print(suffix + ": " + name)
    # translate(name, 'A' + str(int(name.replace('_c', '')) + 1), '')

    #df.name = df.select(regexp_replace(column(name), '^' + suffix, '')) #.alias(name)
    #df.name = df.withColumn(name, regexp_replace(name, '^' + suffix, ''))
    
    
    #targetDf = df.withColumn(name, \
    #          when(df["session"] == 0, 999).otherwise(df["timestamp1"]))
    
# use VectorAssembler for One hot encoding or indexer

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string, _c10: string, _c11: string, _c12: string, _c13: string, _c14: string, _c15: string, _c16: string, _c17: string, _c18: string, _c19: string, _c20: string]

In [8]:
import pandas as pd
# import numpy as np
#import seaborn as sns
import matplotlib.pyplot as plt

df_pair_plot = df.select("*").toPandas()

#g = pd.plotting.scatter_matrix(df_pair_plot, figsize=(10,10), marker = 'o', hist_kwds = {'bins': 10}, s = 60, alpha = 0.8)
#g = pd.plotting.scatter_matrix(df_pair_plot, figsize=(15, 15), marker='o', hist_kwds={'bins': 20}, alpha=.8)
g = pd.plotting.scatter_matrix(df_pair_plot, 0.8)

plt.show()

ValueError: num must be 1 <= num <= 0, not 1

<Figure size 432x288 with 0 Axes>

In [8]:
df.groupBy('affairs').count().show()

+-------+-----+
|affairs|count|
+-------+-----+
|      1| 2053|
|      0| 4313|
+-------+-----+



In [9]:
df.groupBy('rate_marriage').count().show()

+-------------+-----+
|rate_marriage|count|
+-------------+-----+
|            1|   99|
|            3|  993|
|            5| 2684|
|            4| 2242|
|            2|  348|
+-------------+-----+



In [10]:
df.groupBy('rate_marriage','affairs').count().orderBy('rate_marriage','affairs','count',ascending=True).show()

+-------------+-------+-----+
|rate_marriage|affairs|count|
+-------------+-------+-----+
|            1|      0|   25|
|            1|      1|   74|
|            2|      0|  127|
|            2|      1|  221|
|            3|      0|  446|
|            3|      1|  547|
|            4|      0| 1518|
|            4|      1|  724|
|            5|      0| 2197|
|            5|      1|  487|
+-------------+-------+-----+



In [11]:
df.groupBy('religious','affairs').count().orderBy('religious','affairs','count',ascending=True).show()

+---------+-------+-----+
|religious|affairs|count|
+---------+-------+-----+
|        1|      0|  613|
|        1|      1|  408|
|        2|      0| 1448|
|        2|      1|  819|
|        3|      0| 1715|
|        3|      1|  707|
|        4|      0|  537|
|        4|      1|  119|
+---------+-------+-----+



In [12]:
df.groupBy('children','affairs').count().orderBy('children','affairs','count',ascending=True).show()

+--------+-------+-----+
|children|affairs|count|
+--------+-------+-----+
|     0.0|      0| 1912|
|     0.0|      1|  502|
|     1.0|      0|  747|
|     1.0|      1|  412|
|     2.0|      0|  873|
|     2.0|      1|  608|
|     3.0|      0|  460|
|     3.0|      1|  321|
|     4.0|      0|  197|
|     4.0|      1|  131|
|     5.5|      0|  124|
|     5.5|      1|   79|
+--------+-------+-----+



In [11]:
df.groupBy('affairs').mean().show()

+-------+------------------+------------------+------------------+------------------+------------------+------------+
|affairs|avg(rate_marriage)|          avg(age)|  avg(yrs_married)|     avg(children)|    avg(religious)|avg(affairs)|
+-------+------------------+------------------+------------------+------------------+------------------+------------+
|      1|3.6473453482708234|30.537018996590355|11.152459814905017|1.7289332683877252| 2.261568436434486|         1.0|
|      0| 4.329700904242986| 28.39067934152562| 7.989334569904939|1.2388128912589844|2.5045212149316023|         0.0|
+-------+------------------+------------------+------------------+------------------+------------------+------------+



In [13]:
from pyspark.ml.feature import VectorAssembler

In [14]:
df_assembler = VectorAssembler(inputCols=['rate_marriage', 'age', 'yrs_married', 'children', 'religious'], outputCol="features")
df = df_assembler.transform(df)

In [15]:
df.printSchema()

root
 |-- rate_marriage: integer (nullable = true)
 |-- age: double (nullable = true)
 |-- yrs_married: double (nullable = true)
 |-- children: double (nullable = true)
 |-- religious: integer (nullable = true)
 |-- affairs: integer (nullable = true)
 |-- features: vector (nullable = true)



In [16]:
df.select(['features','affairs']).show(10,False)

+-----------------------+-------+
|features               |affairs|
+-----------------------+-------+
|[5.0,32.0,6.0,1.0,3.0] |0      |
|[4.0,22.0,2.5,0.0,2.0] |0      |
|[3.0,32.0,9.0,3.0,3.0] |1      |
|[3.0,27.0,13.0,3.0,1.0]|1      |
|[4.0,22.0,2.5,0.0,1.0] |1      |
|[4.0,37.0,16.5,4.0,3.0]|1      |
|[5.0,27.0,9.0,1.0,1.0] |1      |
|[4.0,27.0,9.0,0.0,2.0] |1      |
|[5.0,37.0,23.0,5.5,2.0]|1      |
|[5.0,37.0,23.0,5.5,2.0]|1      |
+-----------------------+-------+
only showing top 10 rows



In [16]:
#select data for building model
model_df=df.select(['features','affairs'])

In [17]:
train_df,test_df=model_df.randomSplit([0.75,0.25])

In [18]:
train_df.count()

4800

In [19]:
train_df.groupBy('affairs').count().show()

+-------+-----+
|affairs|count|
+-------+-----+
|      1| 1574|
|      0| 3226|
+-------+-----+



In [20]:
test_df.groupBy('affairs').count().show()

+-------+-----+
|affairs|count|
+-------+-----+
|      1|  479|
|      0| 1087|
+-------+-----+



In [21]:
from pyspark.ml.classification import RandomForestClassifier

In [22]:
rf_classifier=RandomForestClassifier(labelCol='affairs',numTrees=50).fit(train_df)

In [23]:
rf_predictions=rf_classifier.transform(test_df)

In [24]:
rf_predictions.show()

+--------------------+-------+--------------------+--------------------+----------+
|            features|affairs|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[1.0,22.0,2.5,1.0...|      1|[18.7524598757082...|[0.37504919751416...|       1.0|
|[1.0,27.0,2.5,0.0...|      1|[21.0021478623554...|[0.42004295724710...|       1.0|
|[1.0,27.0,6.0,0.0...|      0|[20.1166611727778...|[0.40233322345555...|       1.0|
|[1.0,27.0,6.0,2.0...|      1|[19.0905206218080...|[0.38181041243616...|       1.0|
|[1.0,27.0,6.0,3.0...|      0|[16.3348579592130...|[0.32669715918426...|       1.0|
|[1.0,27.0,9.0,4.0...|      0|[13.3128973003485...|[0.26625794600697...|       1.0|
|[1.0,32.0,13.0,0....|      1|[16.7812008990910...|[0.33562401798182...|       1.0|
|[1.0,32.0,13.0,2....|      1|[12.6966189294366...|[0.25393237858873...|       1.0|
|[1.0,32.0,13.0,2....|      1|[12.6766379097881...|[0.25353275819576...|    

In [25]:
rf_predictions.groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0| 1261|
|       1.0|  305|
+----------+-----+



In [26]:
rf_predictions.select(['probability','affairs','prediction']).show(10,False)

+----------------------------------------+-------+----------+
|probability                             |affairs|prediction|
+----------------------------------------+-------+----------+
|[0.37504919751416455,0.6249508024858356]|1      |1.0       |
|[0.42004295724710805,0.579957042752892] |1      |1.0       |
|[0.40233322345555694,0.597666776544443] |0      |1.0       |
|[0.3818104124361619,0.6181895875638381] |1      |1.0       |
|[0.32669715918426007,0.6733028408157399]|0      |1.0       |
|[0.26625794600697006,0.7337420539930299]|0      |1.0       |
|[0.3356240179818214,0.6643759820181787] |1      |1.0       |
|[0.25393237858873335,0.7460676214112667]|1      |1.0       |
|[0.2535327581957624,0.7464672418042376] |1      |1.0       |
|[0.3319327677415531,0.6680672322584469] |1      |1.0       |
+----------------------------------------+-------+----------+
only showing top 10 rows



In [28]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [29]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [30]:
rf_accuracy=MulticlassClassificationEvaluator(labelCol='affairs',metricName='accuracy').evaluate(rf_predictions)

In [31]:
print('The accuracy of RF on test data is {0:.0%}'.format(rf_accuracy))

The accuracy of RF on test data is 73%


In [32]:
print(rf_accuracy)

0.7279693486590039


In [33]:
rf_precision=MulticlassClassificationEvaluator(labelCol='affairs',metricName='weightedPrecision').evaluate(rf_predictions)

In [34]:
print('The precision rate on test data is {0:.0%}'.format(rf_precision))

The precision rate on test data is 71%


In [35]:
rf_precision

0.7085017563673452

In [37]:
rf_auc=BinaryClassificationEvaluator(labelCol='affairs').evaluate(rf_predictions)

In [38]:
print(rf_auc)

0.7421702296835062


In [39]:
# Feature importance

In [40]:
rf_classifier.featureImportances

SparseVector(5, {0: 0.5536, 1: 0.0364, 2: 0.2358, 3: 0.0803, 4: 0.0939})

In [41]:
df.schema["features"].metadata["ml_attr"]["attrs"]

{'numeric': [{'idx': 0, 'name': 'rate_marriage'},
  {'idx': 1, 'name': 'age'},
  {'idx': 2, 'name': 'yrs_married'},
  {'idx': 3, 'name': 'children'},
  {'idx': 4, 'name': 'religious'}]}

In [49]:
# Save the model 

In [50]:
pwd

'/home/jovyan/work'

In [52]:
rf_classifier.save("/home/jovyan/work/RF_model")

In [54]:
from pyspark.ml.classification import RandomForestClassificationModel

In [55]:
rf=RandomForestClassificationModel.load("/home/jovyan/work/RF_model")

In [58]:
model_preditions=rf.transform(test_df)

In [60]:
model_preditions.show()

+--------------------+-------+--------------------+--------------------+----------+
|            features|affairs|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[1.0,22.0,2.5,1.0...|      1|[188.676639360932...|[0.37735327872186...|       1.0|
|[1.0,27.0,2.5,0.0...|      1|[195.425833792250...|[0.39085166758450...|       1.0|
|[1.0,27.0,6.0,0.0...|      0|[193.138478579040...|[0.38627695715808...|       1.0|
|[1.0,27.0,6.0,2.0...|      1|[185.424877645536...|[0.37084975529107...|       1.0|
|[1.0,27.0,6.0,3.0...|      0|[164.685852316351...|[0.32937170463270...|       1.0|
|[1.0,27.0,9.0,4.0...|      0|[142.006095001922...|[0.28401219000384...|       1.0|
|[1.0,32.0,13.0,0....|      1|[176.885399312490...|[0.35377079862498...|       1.0|
|[1.0,32.0,13.0,2....|      1|[128.585405941664...|[0.25717081188332...|       1.0|
|[1.0,32.0,13.0,2....|      1|[126.963464206019...|[0.25392692841203...|    