In [1]:
## Set python -Spark Environment
import os
import sys
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

In [2]:
## Create Spark Context Spark Session
from pyspark.sql import SparkSession
from pyspark import SparkContext
sc = SparkContext()
spark = SparkSession(sc)

In [3]:
sc

<pyspark.context.SparkContext at 0x25cda90>

In [4]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [5]:
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

In [6]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [7]:
## Importing Required Libraries
import numpy as np
import StringIO
import pandas as pd
import warnings

In [47]:
##Define the Schema
financialDistressSchema = StructType([
         StructField("Target", StringType(), True),
         StructField("Utilization", DoubleType(), True),
         StructField("age", IntegerType(), True),
         StructField("FD_ind1", IntegerType(), True),
         StructField("Debt_Ratio", DoubleType(), True),
         StructField("Monthly_Income", IntegerType(), True),
         StructField("FD_ind2", IntegerType(), True),
         StructField("FD_ind3", IntegerType(), True),        
         StructField("FD_ind4", IntegerType(), True),
         StructField("FD_ind5", IntegerType(), True),
         StructField("NumberOfDependents", IntegerType(), True)])

In [48]:
financialDistressSchema

StructType(List(StructField(Target,StringType,true),StructField(Utilization,DoubleType,true),StructField(age,IntegerType,true),StructField(FD_ind1,IntegerType,true),StructField(Debt_Ratio,DoubleType,true),StructField(Monthly_Income,IntegerType,true),StructField(FD_ind2,IntegerType,true),StructField(FD_ind3,IntegerType,true),StructField(FD_ind4,IntegerType,true),StructField(FD_ind5,IntegerType,true),StructField(NumberOfDependents,IntegerType,true)))

### 1. Read the data and create dataframe and verify data.

In [146]:
##Read the dataset into spark environment
financialDistressData = spark.read.format("csv")\
.option("header", "false")\
.option("inferschema",True)\
.option("nullvalue", "NA")\
.load("/user/datasets/B34PHD/Batch34_phdData.csv", schema = financialDistressSchema)


In [147]:
financialDistressData
financialDistressData.show(100)

+------+-----------+---+-------+-----------+--------------+-------+-------+-------+-------+------------------+
|Target|Utilization|age|FD_ind1| Debt_Ratio|Monthly_Income|FD_ind2|FD_ind3|FD_ind4|FD_ind5|NumberOfDependents|
+------+-----------+---+-------+-----------+--------------+-------+-------+-------+-------+------------------+
|     1|0.766126609| 45|      2|0.802982129|          9120|     13|      0|      6|      0|                 2|
|     0|0.957151019| 40|      0|0.121876201|          2600|      4|      0|      0|      0|                 1|
|     0| 0.65818014| 38|      1|0.085113375|          3042|      2|      1|      0|      0|                 0|
|     0|0.233809776| 30|      0|0.036049682|          3300|      5|      0|      0|      0|                 0|
|     0|  0.9072394| 49|      1|0.024925695|         63588|      7|      0|      1|      0|                 0|
|     0|0.213178682| 74|      0|0.375606969|          3500|      3|      0|      1|      0|                 1|
|

In [148]:
## Describes statistics
financialDistressData.describe().show()

+-------+-------------------+------------------+------------------+------------------+------------------+------------------+-----------------+-------------------+------------------+-------------------+------------------+
|summary|             Target|       Utilization|               age|           FD_ind1|        Debt_Ratio|    Monthly_Income|          FD_ind2|            FD_ind3|           FD_ind4|            FD_ind5|NumberOfDependents|
+-------+-------------------+------------------+------------------+------------------+------------------+------------------+-----------------+-------------------+------------------+-------------------+------------------+
|  count|             150000|            150000|            150000|            150000|            150000|            120269|           150000|             150000|            150000|             150000|            146076|
|   mean|            0.06684| 6.048438054666852|52.295206666666665|0.4210333333333333|353.00507576387264| 6670.22123

### 2. Display the count of rows and columns

In [149]:
financialDistressData.dtypes
#financialDistressData.count()
print("Total records count is {}".format(financialDistressData.count())) ##Total records
print("Total Coloumns count is {}".format(len(financialDistressData.columns)))
print("\n\n Coloumns {}".format((financialDistressData.columns)))

Total records count is 150000
Total Coloumns count is 11


 Coloumns ['Target', 'Utilization', 'age', 'FD_ind1', 'Debt_Ratio', 'Monthly_Income', 'FD_ind2', 'FD_ind3', 'FD_ind4', 'FD_ind5', 'NumberOfDependents']


## 3. Give the percentage distribution of Target attribute and verify if it is a im-balance class problem or not

In [150]:
zero = financialDistressData.where(financialDistressData.Target == "0")
one = financialDistressData.where(financialDistressData.Target == "1")
print(zero.count())
print(one.count())
#percentageImbalance = (DoubleType(one.count())) / (DoubleType(zero.count())))
#print("Percentage of distribution of target variable is {}" .format(percentageImbalance))

139974
10026


## 4. After you create the dataframe in the first step , Target attribute will be in the first column of the dataframe , make it as the last column of the dataframe

In [151]:
print(financialDistressData.columns)
#listCols
financialDistressData = financialDistressData.select(['Utilization', 'age','FD_ind1','Debt_Ratio', 'Monthly_Income', 'FD_ind2', 'FD_ind3', 'FD_ind4', 'FD_ind5', 'NumberOfDependents', 'Target'])
#print(financialDistressData.columns)
print(financialDistressData.columns)


['Target', 'Utilization', 'age', 'FD_ind1', 'Debt_Ratio', 'Monthly_Income', 'FD_ind2', 'FD_ind3', 'FD_ind4', 'FD_ind5', 'NumberOfDependents']
['Utilization', 'age', 'FD_ind1', 'Debt_Ratio', 'Monthly_Income', 'FD_ind2', 'FD_ind3', 'FD_ind4', 'FD_ind5', 'NumberOfDependents', 'Target']


## 5.  Find out which feature has how many numbers of missing values.

In [152]:
from pyspark.sql.functions import isnan, when, count, col, sum, avg

financialDistressData.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in financialDistressData.columns]).show()


+-----------+---+-------+----------+--------------+-------+-------+-------+-------+------------------+------+
|Utilization|age|FD_ind1|Debt_Ratio|Monthly_Income|FD_ind2|FD_ind3|FD_ind4|FD_ind5|NumberOfDependents|Target|
+-----------+---+-------+----------+--------------+-------+-------+-------+-------+------------------+------+
|          0|  0|      0|         0|         29731|      0|      0|      0|      0|              3924|     0|
+-----------+---+-------+----------+--------------+-------+-------+-------+-------+------------------+------+



## 6. Fill the missing values for features as given below (Do not delete the rows with Null/Missing values):
 ## Monthly_Income, NumberOfDependents has missing values

In [153]:
def fill_with_mean(df, exclude=set()): 
    stats = df.agg(*(
        avg(c).alias(c) for c in df.columns if c in exclude
    ))
    return df.na.fill(stats.first().asDict())

financialDistressData = fill_with_mean(financialDistressData, ["Monthly_Income"])



In [154]:

financialDistressData =  financialDistressData.na.fill(0)

### 7. Once you fill out the missing values proceed for the steps to apply any Spark machie learning technique of your choice and try to see the accuracy of your models.


## Build model using Logistic Regression


In [155]:
numFeatures = ['Utilization', 'age', 'FD_ind1', 'Debt_Ratio', 'Monthly_Income', 'FD_ind2', 'FD_ind3', 'FD_ind4', 'FD_ind5', 'NumberOfDependents']
allFeat = numFeatures

In [156]:
assembler = VectorAssembler(inputCols=allFeat, outputCol='features')

#financialDistressData = assembler.transform(financialDistressData)
#financialDistressData = financialDistressData.withColumn( "label", financialDistressData.Target)
#financialDistressData.show(10, truncate=False)

In [145]:
#financialDistressData.select('features', 'label').show(5, truncate=False)

+--------------------------------------------------------------+-----+
|features                                                      |label|
+--------------------------------------------------------------+-----+
|[0.766126609,45.0,2.0,0.802982129,9120.0,13.0,0.0,6.0,0.0,2.0]|1    |
|[0.957151019,40.0,0.0,0.121876201,2600.0,4.0,0.0,0.0,0.0,1.0] |0    |
|[0.65818014,38.0,1.0,0.085113375,3042.0,2.0,1.0,0.0,0.0,0.0]  |0    |
|(10,[0,1,3,4,5],[0.233809776,30.0,0.036049682,3300.0,5.0])    |0    |
|[0.9072394,49.0,1.0,0.024925695,63588.0,7.0,0.0,1.0,0.0,0.0]  |0    |
+--------------------------------------------------------------+-----+
only showing top 5 rows



In [159]:
train,test=financialDistressData.randomSplit([0.7, 0.3])


In [161]:
labelIndexer = StringIndexer(inputCol="Target", outputCol="indexedLabel").fit(train)
labelIndexer

StringIndexer_449dbc8023c3e32a0152

In [162]:
from pyspark.ml.feature import IndexToString
labelConverter = IndexToString(inputCol="prediction",outputCol="predictedLabel",labels=labelIndexer.labels)
labelConverter


IndexToString_467f8ba0761ffc38abe4

In [163]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

lr = LogisticRegression(maxIter=30,labelCol="indexedLabel", featuresCol="features")

In [166]:
stages=[labelIndexer,assembler]
stages.append(lr)
stages.append(labelConverter)

pipeline = Pipeline(stages=stages)

model = pipeline.fit(train)
predictions = model.transform(train)


+------------+--------------+----------+
|indexedLabel|predictedLabel|prediction|
+------------+--------------+----------+
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0 

In [187]:
predictions.select("indexedLabel","predictedLabel", "prediction").show(100,False)


+------------+--------------+----------+
|indexedLabel|predictedLabel|prediction|
+------------+--------------+----------+
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0             |0.0       |
|0.0         |0 

In [188]:
predictions.prediction

Column<prediction>

In [171]:

true_positive = predictions[(predictions.indexedLabel == 1) & (predictions.prediction == 1.0)].count()
true_negative = predictions[(predictions.indexedLabel == 0) & (predictions.prediction == 0.0)].count()
false_positive = predictions[(predictions.indexedLabel == 0) & (predictions.prediction == 1.0)].count()
false_negative = predictions[(predictions.indexedLabel == 1) & (predictions.prediction == 0.0)].count()
print "True Positives:", true_positive
print "True Negatives:", true_negative
print "False Positives:", false_positive
print "False Negatives:", false_negative
print "Total Train", predictions.count()



True Positives: 281
True Negatives: 97976
False Positives: 215
False Negatives: 6755
Total Train 105227


In [172]:

def recallValue(tp, fn):
    rec = float(tp)/float(tp+fn)
    return rec

In [173]:
recall = recallValue(true_positive,false_negative)
print("training recall value is {}".format(recall))

training recall value is 0.0399374644684


In [183]:
pred_test=model.transform(test)
true_positive_test = pred_test[(pred_test.indexedLabel == 1) & (pred_test.prediction == 1.0)].count()
true_negative_test = pred_test[(pred_test.indexedLabel == 0) & (pred_test.prediction == 0.0)].count()
false_positive_test = pred_test[(pred_test.indexedLabel == 0) & (pred_test.prediction == 1.0)].count()
false_negative_test = pred_test[(pred_test.indexedLabel == 1) & (pred_test.prediction == 0.0)].count()
print "True Positives _test:", true_positive_test
print "True Negatives _test:", true_negative_test
print "False Positives _test:", false_positive_test
print "False Negatives_test:", false_negative_test
print "Total _test", pred_test.count()

True Positives _test: 123
True Negatives _test: 41690
False Positives _test: 93
False Negatives_test: 2867
Total _test 44773


In [186]:
recall_test = recallValue(true_positive_test, false_negative_test)
print("test recall value is {}".format(recall_test))

test recall value is 0.0411371237458


### Random Forest

In [176]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features",numTrees=100,maxDepth=12,maxBins=8)


In [178]:
stages1=[labelIndexer,assembler]
stages1.append(rf)
stages1.append(labelConverter)

pipeline_rf = Pipeline(stages=stages1)

model_rf = pipeline.fit(train)
predictions_rf = model.transform(train)

In [180]:
true_positive_rf = predictions_rf[(predictions_rf.indexedLabel == 1) & (predictions_rf.prediction == 1.0)].count()
true_negative_rf = predictions_rf[(predictions_rf.indexedLabel == 0) & (predictions_rf.prediction == 0.0)].count()
false_positive_rf = predictions_rf[(predictions_rf.indexedLabel == 0) & (predictions_rf.prediction == 1.0)].count()
false_negative_rf = predictions_rf[(predictions_rf.indexedLabel == 1) & (predictions_rf.prediction == 0.0)].count()
print "True Positives:", true_positive_rf
print "True Negatives:", true_negative_rf
print "False Positives:", false_positive_rf
print "False Negatives:", false_negative_rf
print "Total RF", predictions_rf.count()

True Positives: 281
True Negatives: 97976
False Positives: 215
False Negatives: 6755
Total RF 105227


In [181]:

pred_rf_test=model.transform(test)
true_positive_rft = pred_rf_test[(pred_rf_test.indexedLabel == 1) & (pred_rf_test.prediction == 1.0)].count()
true_negative_rft = pred_rf_test[(pred_rf_test.indexedLabel == 0) & (pred_rf_test.prediction == 0.0)].count()
false_positive_rft = pred_rf_test[(pred_rf_test.indexedLabel == 0) & (pred_rf_test.prediction == 1.0)].count()
false_negative_rft = pred_rf_test[(pred_rf_test.indexedLabel == 1) & (pred_rf_test.prediction == 0.0)].count()
print "True Positives:", true_positive_rft
print "True Negatives:", true_negative_rft
print "False Positives:", false_positive_rft
print "False Negatives:", false_negative_rft
print "Total", pred_rf_test.count()



True Positives: 123
True Negatives: 41690
False Positives: 93
False Negatives: 2867
Total 44773


In [182]:
print("Recall Test RF is {}".format(recallValue(true_positive_rft, false_negative_rft)))




Recall Test RF is 0.0411371237458



Both the models have approximately same Recall value