In [1]:
import pyspark

ImportError: No module named 'pyspark'

In [5]:
import findspark

In [6]:
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')

In [7]:
import pyspark

In [8]:
from pyspark.sql import SparkSession

## Creating a Spark Session

In [9]:
spark = SparkSession.builder.appName('ccd').getOrCreate()

## Loading the CSV File

In [10]:
df = spark.read.csv('credit-card-default-1000.csv',inferSchema=True,header=True)

In [11]:
df.select('CUSTID').show()

+------+
|CUSTID|
+------+
|   530|
|    38|
|    43|
|    47|
|    70|
|    79|
|    99|
|   104|
|   135|
|   170|
|   173|
|   179|
|   198|
|   200|
|   292|
|   398|
|   519|
|   653|
|   664|
|   703|
+------+
only showing top 20 rows



## Data Cleaning and Augmentation

### 1. Removing the junk characters from some of the rows using FILTER operation

In [12]:
df = df.filter(df['CUSTID'] != 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')

In [13]:
df.select(df['SEX']).show()

+---+
|SEX|
+---+
|  2|
|  2|
|  1|
|  2|
|  1|
|  2|
|  F|
|  2|
|  2|
|  2|
|  2|
|  2|
|  2|
|  2|
|  2|
|  2|
|  2|
|  2|
|  1|
|  2|
+---+
only showing top 20 rows



In [14]:
from pyspark.sql.functions import when

### 2. Here some of values in the sex column are M or 1 for Male & F or 2 for Female so coverting all the values to numeric values of 1 or 2 using the WHEN operation. Also creating another column SEX_FINAL using WITHCOLUMN operation.

In [15]:
df = df.withColumn("SEX_NEW",when(df["SEX"] == 'F', 2).otherwise(df['SEX']))

In [16]:
df = df.withColumn("SEX_FINAL",when(df["SEX_NEW"] == 'M', 1).otherwise(df['SEX_NEW']))

In [17]:
df.columns

['CUSTID',
 'LIMIT_BAL',
 'SEX',
 'EDUCATION',
 'MARRIAGE',
 'AGE',
 'PAY_1',
 'PAY_2',
 'PAY_3',
 'PAY_4',
 'PAY_5',
 'PAY_6',
 'BILL_AMT1',
 'BILL_AMT2',
 'BILL_AMT3',
 'BILL_AMT4',
 'BILL_AMT5',
 'BILL_AMT6',
 'PAY_AMT1',
 'PAY_AMT2',
 'PAY_AMT3',
 'PAY_AMT4',
 'PAY_AMT5',
 'PAY_AMT6',
 'DEFAULTED',
 'SEX_NEW',
 'SEX_FINAL']

In [18]:
from pyspark.sql.functions import format_number

In [19]:
df = df.select('CUSTID',
 'LIMIT_BAL',
 'SEX',
 'EDUCATION',
 'MARRIAGE',
 'AGE',
 'PAY_1',
 'PAY_2',
 'PAY_3',
 'PAY_4',
 'PAY_5',
 'PAY_6',
 'BILL_AMT1',
 'BILL_AMT2',
 'BILL_AMT3',
 'BILL_AMT4',
 'BILL_AMT5',
 'BILL_AMT6',
 'PAY_AMT1',
 'PAY_AMT2',
 'PAY_AMT3',
 'PAY_AMT4',
 'PAY_AMT5',
 'PAY_AMT6',
 'DEFAULTED',
 'SEX_NEW',
 'SEX_FINAL',format_number(df['AGE'],2).alias('AGE_NEW'))

In [20]:
df.select(df['AGE_NEW']).show()

+-------+
|AGE_NEW|
+-------+
|  21.00|
|  22.00|
|  22.00|
|  22.00|
|  22.00|
|  22.00|
|  22.00|
|  22.00|
|  22.00|
|  22.00|
|  22.00|
|  22.00|
|  22.00|
|  22.00|
|  22.00|
|  22.00|
|  22.00|
|  22.00|
|  22.00|
|  22.00|
+-------+
only showing top 20 rows



### 3. Creating a new age variable AGE_FINAL where the age is rounded off to the 10's

In [21]:
df = df.withColumn("AGE_NEW1", df['AGE_NEW']/10.0)

In [22]:
import pyspark.sql.functions as func 

### Here FUNC operation is used to round the value in the AGE_NEW1 column

In [23]:
df = df.withColumn("AGE_NEW2", func.round(df['AGE_NEW1']))

In [24]:
df = df.withColumn("AGE_FINAL",df['AGE_NEW2']*10)

### 4. Creating new columns for averages such as AVG_BILL, AVG_PAY & PER_PAY using the WITHCOLUMN operation

In [25]:
df = df.withColumn("AVG_BILL",(df["BILL_AMT1"]+df["BILL_AMT2"]+df["BILL_AMT3"]+df["BILL_AMT4"]+df["BILL_AMT5"]+df["BILL_AMT6"])/6)

In [26]:
df = df.withColumn("AVG_PAY",(df["PAY_AMT1"]+df["PAY_AMT2"]+df["PAY_AMT3"]+df["PAY_AMT4"]+df["PAY_AMT5"]+df["PAY_AMT6"])/6)

In [27]:
df = df.withColumn("PER_PAY",df["AVG_BILL"]/df["AVG_PAY"]*100)

### Converting negative values to positive values using using the FUNC operation

In [28]:
df = df.withColumn("PAY_NEW1", func.abs(df['PAY_1']))

In [29]:
df = df.withColumn("PAY_NEW2", func.abs(df['PAY_2']))

In [30]:
df = df.withColumn("PAY_NEW3", func.abs(df['PAY_3']))

In [31]:
df = df.withColumn("PAY_NEW4", func.abs(df['PAY_4']))

In [32]:
df = df.withColumn("PAY_NEW5", func.abs(df['PAY_5']))

In [33]:
df = df.withColumn("PAY_NEW6", func.abs(df['PAY_6']))

In [34]:
df = df.withColumn("AVG_DUR",(df["PAY_NEW1"]+df["PAY_NEW2"]+df["PAY_NEW3"]+df["PAY_NEW4"]+df["PAY_NEW5"]+df["PAY_NEW6"])/6)

In [35]:
df.count()

1000

In [36]:
df.columns

['CUSTID',
 'LIMIT_BAL',
 'SEX',
 'EDUCATION',
 'MARRIAGE',
 'AGE',
 'PAY_1',
 'PAY_2',
 'PAY_3',
 'PAY_4',
 'PAY_5',
 'PAY_6',
 'BILL_AMT1',
 'BILL_AMT2',
 'BILL_AMT3',
 'BILL_AMT4',
 'BILL_AMT5',
 'BILL_AMT6',
 'PAY_AMT1',
 'PAY_AMT2',
 'PAY_AMT3',
 'PAY_AMT4',
 'PAY_AMT5',
 'PAY_AMT6',
 'DEFAULTED',
 'SEX_NEW',
 'SEX_FINAL',
 'AGE_NEW',
 'AGE_NEW1',
 'AGE_NEW2',
 'AGE_FINAL',
 'AVG_BILL',
 'AVG_PAY',
 'PER_PAY',
 'PAY_NEW1',
 'PAY_NEW2',
 'PAY_NEW3',
 'PAY_NEW4',
 'PAY_NEW5',
 'PAY_NEW6',
 'AVG_DUR']

In [37]:
df.show()

+------+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+---------+-------+---------+-------+--------+--------+---------+------------------+------------------+------------------+--------+--------+--------+--------+--------+--------+------------------+
|CUSTID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_1|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULTED|SEX_NEW|SEX_FINAL|AGE_NEW|AGE_NEW1|AGE_NEW2|AGE_FINAL|          AVG_BILL|           AVG_PAY|           PER_PAY|PAY_NEW1|PAY_NEW2|PAY_NEW3|PAY_NEW4|PAY_NEW5|PAY_NEW6|           AVG_DUR|
+------+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+---------+-------+-------

In [38]:
from pyspark.ml.feature import VectorAssembler

In [39]:
df.columns

['CUSTID',
 'LIMIT_BAL',
 'SEX',
 'EDUCATION',
 'MARRIAGE',
 'AGE',
 'PAY_1',
 'PAY_2',
 'PAY_3',
 'PAY_4',
 'PAY_5',
 'PAY_6',
 'BILL_AMT1',
 'BILL_AMT2',
 'BILL_AMT3',
 'BILL_AMT4',
 'BILL_AMT5',
 'BILL_AMT6',
 'PAY_AMT1',
 'PAY_AMT2',
 'PAY_AMT3',
 'PAY_AMT4',
 'PAY_AMT5',
 'PAY_AMT6',
 'DEFAULTED',
 'SEX_NEW',
 'SEX_FINAL',
 'AGE_NEW',
 'AGE_NEW1',
 'AGE_NEW2',
 'AGE_FINAL',
 'AVG_BILL',
 'AVG_PAY',
 'PER_PAY',
 'PAY_NEW1',
 'PAY_NEW2',
 'PAY_NEW3',
 'PAY_NEW4',
 'PAY_NEW5',
 'PAY_NEW6',
 'AVG_DUR']

In [40]:
df.printSchema()

root
 |-- CUSTID: string (nullable = true)
 |-- LIMIT_BAL: integer (nullable = true)
 |-- SEX: string (nullable = true)
 |-- EDUCATION: integer (nullable = true)
 |-- MARRIAGE: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- PAY_1: integer (nullable = true)
 |-- PAY_2: integer (nullable = true)
 |-- PAY_3: integer (nullable = true)
 |-- PAY_4: integer (nullable = true)
 |-- PAY_5: integer (nullable = true)
 |-- PAY_6: integer (nullable = true)
 |-- BILL_AMT1: integer (nullable = true)
 |-- BILL_AMT2: integer (nullable = true)
 |-- BILL_AMT3: integer (nullable = true)
 |-- BILL_AMT4: integer (nullable = true)
 |-- BILL_AMT5: integer (nullable = true)
 |-- BILL_AMT6: integer (nullable = true)
 |-- PAY_AMT1: integer (nullable = true)
 |-- PAY_AMT2: integer (nullable = true)
 |-- PAY_AMT3: integer (nullable = true)
 |-- PAY_AMT4: integer (nullable = true)
 |-- PAY_AMT5: integer (nullable = true)
 |-- PAY_AMT6: integer (nullable = true)
 |-- DEFAULTED: integer (nullable =

### From the above Schema we can see that SEX_FINAL is a string so coverting it to an integer using the CAST(INTERGERTYPE) operation.

In [41]:
from pyspark.sql.types import IntegerType
df = df.withColumn("SEX_LAST",df["SEX_FINAL"].cast(IntegerType()))

In [42]:
df.printSchema()

root
 |-- CUSTID: string (nullable = true)
 |-- LIMIT_BAL: integer (nullable = true)
 |-- SEX: string (nullable = true)
 |-- EDUCATION: integer (nullable = true)
 |-- MARRIAGE: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- PAY_1: integer (nullable = true)
 |-- PAY_2: integer (nullable = true)
 |-- PAY_3: integer (nullable = true)
 |-- PAY_4: integer (nullable = true)
 |-- PAY_5: integer (nullable = true)
 |-- PAY_6: integer (nullable = true)
 |-- BILL_AMT1: integer (nullable = true)
 |-- BILL_AMT2: integer (nullable = true)
 |-- BILL_AMT3: integer (nullable = true)
 |-- BILL_AMT4: integer (nullable = true)
 |-- BILL_AMT5: integer (nullable = true)
 |-- BILL_AMT6: integer (nullable = true)
 |-- PAY_AMT1: integer (nullable = true)
 |-- PAY_AMT2: integer (nullable = true)
 |-- PAY_AMT3: integer (nullable = true)
 |-- PAY_AMT4: integer (nullable = true)
 |-- PAY_AMT5: integer (nullable = true)
 |-- PAY_AMT6: integer (nullable = true)
 |-- DEFAULTED: integer (nullable =

In [43]:
df = df.withColumn("AGE",df["AGE_FINAL"].cast(IntegerType()))

In [44]:
df = df.withColumn("BILL",df["AVG_BILL"].cast(IntegerType()))

In [45]:
df = df.withColumn("PAY",df["AVG_PAY"].cast(IntegerType()))

In [46]:
df = df.withColumn("PERCENTAGE_PAY",df["PER_PAY"].cast(IntegerType()))

In [47]:
df = df.withColumn("DURATION",df["AVG_DUR"].cast(IntegerType()))

### Creating a new data frame using the important features to be used for machine learning

In [48]:
new_df = df.select(['LIMIT_BAL',
 'EDUCATION',
 'MARRIAGE',
 'SEX_LAST',
 'AGE',
 'BILL',
 'PAY',
 'PERCENTAGE_PAY','DEFAULTED'])

In [49]:
new_df.count()

1000

### Some of the values are null so filling those values by 0

In [50]:
new = new_df.fillna(0, subset=['LIMIT_BAL',
 'EDUCATION',
 'MARRIAGE',
 'SEX_LAST',
 'AGE',
 'BILL',
 'PAY',
 'PERCENTAGE_PAY','DEFAULTED'])

In [51]:
new.show()

+---------+---------+--------+--------+---+----+-----+--------------+---------+
|LIMIT_BAL|EDUCATION|MARRIAGE|SEX_LAST|AGE|BILL|  PAY|PERCENTAGE_PAY|DEFAULTED|
+---------+---------+--------+--------+---+----+-----+--------------+---------+
|    20000|        2|       2|       2| 20|   0|27000|             0|        0|
|    60000|        2|       2|       2| 20|   0|  262|             0|        0|
|    10000|        2|       2|       1| 20|   0|  250|             0|        0|
|    20000|        1|       2|       2| 20| 431|21969|             1|        0|
|    20000|        4|       2|       1| 20|3349|28651|            11|        0|
|    30000|        2|       2|       2| 20|1025| 7358|            13|        0|
|    50000|        3|       1|       2| 20| 117|  829|            14|        0|
|    50000|        3|       2|       2| 20| 473| 3328|            14|        0|
|    30000|        2|       2|       2| 20|  61|  359|            17|        0|
|    50000|        2|       2|       2| 

### Using VECTORASSEMBLER converting the column values in a list called features of each row

In [52]:
assembler = VectorAssembler(inputCols=['LIMIT_BAL',
 'EDUCATION',
 'MARRIAGE',
 'SEX_LAST',
 'AGE',
 'BILL',
 'PAY',
 'PERCENTAGE_PAY'],outputCol='features')

### TRANSFORM operation is used to carry out the action metioned above as Apache Spark supports lazy evaluation

In [53]:
final_data = assembler.transform(new)

In [54]:
final_data.select('features','DEFAULTED').show()

+--------------------+---------+
|            features|DEFAULTED|
+--------------------+---------+
|[20000.0,2.0,2.0,...|        0|
|[60000.0,2.0,2.0,...|        0|
|[10000.0,2.0,2.0,...|        0|
|[20000.0,1.0,2.0,...|        0|
|[20000.0,4.0,2.0,...|        0|
|[30000.0,2.0,2.0,...|        0|
|[50000.0,3.0,1.0,...|        0|
|[50000.0,3.0,2.0,...|        0|
|[30000.0,2.0,2.0,...|        0|
|[50000.0,2.0,2.0,...|        0|
|[50000.0,2.0,2.0,...|        0|
|[20000.0,2.0,2.0,...|        0|
|[20000.0,1.0,2.0,...|        0|
|[30000.0,3.0,2.0,...|        0|
|[50000.0,2.0,2.0,...|        0|
|[50000.0,2.0,2.0,...|        0|
|[10000.0,2.0,2.0,...|        0|
|[20000.0,1.0,2.0,...|        0|
|[50000.0,1.0,2.0,...|        0|
|[10000.0,3.0,2.0,...|        0|
+--------------------+---------+
only showing top 20 rows



### Creating a new DataFrame data containing only features and the final label

In [55]:
data = final_data.select('features','DEFAULTED')

In [56]:
data.count()

1000

### Splitting the data into training set and testing set

In [57]:
train_data,test_data = data.randomSplit([0.7,0.3])

In [58]:
train_data.describe().show()

+-------+------------------+
|summary|         DEFAULTED|
+-------+------------------+
|  count|               709|
|   mean|0.4118476727785614|
| stddev|0.4925152777300845|
|    min|                 0|
|    max|                 1|
+-------+------------------+



In [59]:
test_data.describe().show()

+-------+-------------------+
|summary|          DEFAULTED|
+-------+-------------------+
|  count|                291|
|   mean|0.38144329896907214|
| stddev| 0.4865777529901479|
|    min|                  0|
|    max|                  1|
+-------+-------------------+



In [60]:
from pyspark.ml.classification import DecisionTreeClassifier,RandomForestClassifier

### Creating the Decision Tree Classifier and Random Forest Classifier model

In [61]:
dtc = DecisionTreeClassifier(labelCol='DEFAULTED',featuresCol='features')

In [62]:
rfc = RandomForestClassifier(numTrees=100, labelCol='DEFAULTED',featuresCol='features')

### Fitting the models on the train data

In [63]:
dtc_model = dtc.fit(train_data)

In [64]:
rfc_model = rfc.fit(train_data)

### Carrying out the prediction on the test data

In [65]:
dtc_preds = dtc_model.transform(test_data)

In [66]:
rfc_preds = rfc_model.transform(test_data)

In [67]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

### Finding the ROC value using BINARYCLASSIFICATIONEVALUATOR

In [68]:
my_eval = BinaryClassificationEvaluator(labelCol='DEFAULTED')

In [69]:
print('DTC:')
print(my_eval.evaluate(dtc_preds))

DTC:
0.6531531531531531


In [70]:
print('RFC:')
print(my_eval.evaluate(rfc_preds))

RFC:
0.7796296296296293


In [71]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

### Finding the ACCURACY using MULTICLASSCLASSIFICATIONEVALUATOR

In [72]:
multi_eval = MulticlassClassificationEvaluator(metricName='accuracy',labelCol='DEFAULTED')

In [73]:
print('DTC:')
print(multi_eval.evaluate(dtc_preds))

DTC:
0.7044673539518901


In [74]:
print('RFC:')
print(multi_eval.evaluate(rfc_preds))

RFC:
0.7285223367697594
