In [1]:
#### --- The data which we have acquired is loans data from Lending Club ---- ####
####   The main objective of the project is to predict whether a loan will be Charged off or Fully paid #####
####   The initial data had close to a Million rows and 72 columns #### 
#### We also plan to understand factors and key points in client’s record which makes them a good client ####

### Importing pyspark.sql functions ### 
from pyspark.sql import functions as fn 

#Reading the file 
data = sqlContext.read.format('csv').options(header='true', inferSchema='true').load('/FileStore/tables/loan.csv')  

#selecting a column from the data set to check the data 
id1 = data.select('id')
data.select('id').show(5)

#Dropping the columns which are not required for the classification and storing it in new variable data_clean
data_clean = data.drop("member_id","issue_d","home_ownership","emp_title","url","desc","purpose","title","zip_code","addr_state","last_pymnt_d","sub_grade")

##Removing the loan whose status is Issued 
data_clean = data_clean.where("loan_status != 'Issued'")

## Removing all the loans for wich status is current 
data_clean = data_clean.where("loan_status != 'Current'")

## Removing the loans for  31-120 days late payment 
data_clean = data_clean.where("loan_status != 'Late (31-120 days)'")

## Removing the loans for  16-120 days late payment 
data_clean = data_clean.where("loan_status != 'Late (16-30 days)'") 

#Only Takking the loans which are of the status Charged Off or Fully Paid 
data_clean = data_clean.where("loan_status IN ('Charged Off','Fully Paid')") 

##Check the number of remaining rows 
data_clean.count()
### Dividing the data to avoid 
display(data_clean)

In [2]:
### Checking the columns which are "NULL" and based on the number of columns which have more than 20% NULL values, the columns will be removed in the codes below

from pyspark.sql.functions import isnan, when, count, col
ncn1=data_clean.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data_clean.columns]).show()

In [3]:
## Dropping the columns based on the Null Values it contains. 
## The impact of these columns on the final result that we are predicting is next to negligible thus removing these columns  

data_clean= data_clean.drop("last_fico_range_high","last_fico_range_low","earliest_cr_line","last_pymnt_amnt","mths_since_last_major_derog","mths_since_last_record","next_pymnt_d","policy_code","pymnt_plan","recoveries","revol_bal","revol_util","title","total_acc","total_pymnt_imv","total_rec_int","total_rec_prncp","open_rev_12m","open_rev_24m","open_il_12m","open_il_12m","mths_since_rcnt_il","il_util","inq_fi","total_cu_tl","inq_last_12m","tot_coll_amt","tot_cur_bal","member_id","issue_d","home_ownership","emp_title","url","desc","purpose","title","zip_code","addr_state","last_pymnt_d","sub_grade","total_rev_hi_lim","all_util","max_bal_bc","open_rv_24m","open_rv_12m","total_bal_il","open_il_24m","open_acc_6m","verification_status_joint","dti_joint","annual_inc_joint","mths_since_last_delinq","open_il_6m")
#display(data_clean)

#display(data_clean)
df123 = data_clean.where("loan_status IN ('Fully Paid')")
fd = data_clean.where("loan_status IN ('Charged Off')")
(df123take, dont) = df123.randomSplit([0.3, 0.7])
from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

data_clean = unionAll(df123take, fd)
#data_clean.count()
df123take.count()

In [4]:
## Checking the distinct application types for the data 

display(data_clean.select(data_clean["application_type"]).distinct())
data_clean.count()

In [5]:
## Since there were junk values in the applicaion type we are only taking the  types Joint and Individual columns 

data_clean = data_clean.where("application_type IN ('INDIVIDUAL','JOINT')")

### Counting the final number of rows 
data_clean.count()

In [6]:
#### This block of code deals with data cleaning process 
#### In the block of code the length of the employment for any employee is converted into Integer values from strings

#### The assumptions we have made in this sections are:    1. n/a is considered as 0  2. 10+ years  is considered as 11  3. <1 year is considered as 0.5 

from pyspark.sql.functions import *

#replacing the employee length value with zero 
data_clean= data_clean.withColumn('emp_length', regexp_replace('emp_length', 'n/a', '0'))
data_clean.where("emp_length IN ('0')").count()

#Replacing other values of employee length 

data_clean= data_clean.withColumn('emp_length', regexp_replace('emp_length', '9 years', '9'))
data_clean.where("emp_length IN ('9')").count()

data_clean= data_clean.withColumn('emp_length', regexp_replace('emp_length', '8 years', '8'))
data_clean.where("emp_length IN ('8')").count()

data_clean= data_clean.withColumn('emp_length', regexp_replace('emp_length', '7 years', '7'))
data_clean.where("emp_length IN ('7')").count()

data_clean= data_clean.withColumn('emp_length', regexp_replace('emp_length', '6 years', '6'))
data_clean.where("emp_length IN ('6')").count()

data_clean= data_clean.withColumn('emp_length', regexp_replace('emp_length', '5 years', '5'))
data_clean.where("emp_length IN ('5')").count()

data_clean= data_clean.withColumn('emp_length', regexp_replace('emp_length', '4 years', '4'))
data_clean.where("emp_length IN ('4')").count()

data_clean= data_clean.withColumn('emp_length', regexp_replace('emp_length', '3 years', '3'))
data_clean.where("emp_length IN ('3')").count()

data_clean= data_clean.withColumn('emp_length', regexp_replace('emp_length', '2 years', '2'))
data_clean.where("emp_length IN ('2')").count()

data_clean= data_clean.withColumn('emp_length', regexp_replace('emp_length', '1 year', '1'))
data_clean.where("emp_length IN ('1')").count()

## As seen below the column was not replaced properly thus need to try different approach 
data_clean = data_clean.withColumn('emp_length', regexp_replace('emp_length', "< 1 year", "0.5"))
data_clean.where("emp_length IN ('0.5')").count()

data_clean = data_clean.withColumn('emp_length', regexp_replace('emp_length', "< 1", "0.5"))
data_clean.where("emp_length IN ('0.5')").count()

## As seen below the column was not replaced properly thus need to try different approach 
#from pyspark.sql import SQLContext
data_clean = data_clean.withColumn('emp_length', regexp_replace('emp_length', '10+ years', '11'))
data_clean.where("emp_length IN ('11')").count()

data_clean = data_clean.withColumn('emp_length', regexp_replace('emp_length', '10\\+ years', '11'))
data_clean.where("emp_length IN ('11')").count()

data_clean.count()
display(data_clean)

In [7]:
## Creating dummy variables for the categorical type of data 

## verification_status is of Multiple Types Verfied, Source Verified, Not Verified. Thus to consider these values we have taken the dummy variables since to understand the impact of these variables on the final result

## Similarly we are also converting Loan_Status as which is our prediction column, Application type, term, verification status. 

import pyspark.sql.functions as F
verification = data_clean.select("verification_status").distinct().rdd.flatMap(lambda x: x).collect()
status = data_clean.select("loan_status").distinct().rdd.flatMap(lambda x: x).collect()
application = data_clean.select("application_type").distinct().rdd.flatMap(lambda x: x).collect()
term = data_clean.select("term").distinct().rdd.flatMap(lambda x: x).collect()
veri_expr = [F.when(F.col("Verification_status") == vs, 1).otherwise(0).alias("V_Status_" + vs) for vs in verification]
Loan_expr = [F.when(F.col("loan_status") == ls, 1).otherwise(0).alias("Loan_Stat_" + ls) for ls in status]
app_expr = [F.when(F.col("application_type") == at, 1).otherwise(0).alias("App_Type_" + at) for at in application]
term_expr = [F.when(F.col("term") == t, 1).otherwise(0).alias("term_Type_" + t) for t in term]
dummy = data_clean.select("Verification_status","application_type","term", "id","loan_status",*veri_expr + app_expr  +term_expr +Loan_expr)
display(dummy)

In [8]:
##Joining the dummy variable data_clean dataframe with dummy data frame 
df = data_clean.join(dummy, (data_clean.id == dummy.id), "inner" )

display(df)

In [9]:
#term 0 is 36 and 1 is 60 months
#App_type 0 is INDIVIDUAL and 1 is JOINT
#Loan_stat_ 0 for fully paid and 1 for Charged off
#V_Status verified is 00, Source Verified is 10 and Not Verified is 01

## Dropping the columns which are redundant in the nature  

df = df.drop("id","term_Type_36 months","App_Type_INDIVIDUAL","Loan_Stat_Fully Paid","V_Status_Verified","Verification_status","loan_status","application_type","term","grade","initial_list_status","last_credit_pull_d")

df.count()

In [10]:
##Casting all the columns to float 
df.select(*(col(c).cast("float").alias(c) for c in df.columns))
display(df)

In [11]:
#Checking the NULL Values if there is any in the filtered dataset  

ncn2=df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [12]:
## Dropping the rows which had 'N/A' since the number of rows were only 56
dfnan = df.na.drop(subset=["collections_12_mths_ex_med"])

#from pyspark.sql.functions import isnan, when, count, col
ncn3=dfnan.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dfnan.columns]).show()

In [13]:
### Changing the dataframe to a dense vector dataframe in order to create the input data frames for the models 

def transData(df):
    return df.rdd.map(lambda r: [r[-1], Vectors.dense(r[:-1])]).\
           toDF(['label','features1'])

from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

data= transData(dfnan)
#data.show()

display(data)

In [14]:
### Changing the dataframe to a dense vector dataframe in order to create the input data frames for running the PCA  

def transData(df):
    return df.rdd.map(lambda r: [r[-1], Vectors.dense(r[:])]).\
           toDF(['label','features1'])

from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

data1= transData(dfnan)
#data.show()

display(data1)

In [15]:
## We are scaling the data for running coorelation matrix
## Using Mix Max Scalar we have scaled the data before running the models 

from pyspark.ml.feature import MinMaxScaler
scaler = MinMaxScaler(inputCol="features1", outputCol="features")
# Compute summary statistics and generate MinMaxScalerModel


scalerModel1 = scaler.fit(data1)

# rescale each feature to range [min, max].

scaledData1 = scalerModel1.transform(data1)

print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))

scaledData1.select("features1", "features").show()

display(scaledData1)

In [16]:
### Correlation matrix 

rdd = df2.rdd
features = rdd.map(lambda row: row[1:])

from pyspark.mllib.stat import Statistics

corr_mat=Statistics.corr(features, method="spearman")

In [17]:
## Displaying the matrix 
type(corr_mat)
corr_mat

In [18]:
## Displaying the matrix for correlation matrix 
import pandas as pd
pd.set_option('display.max_columns', 26)

col_names = ["loan_amnt","funded_amnt"
,"funded_amnt_inv"
,"int_rate"
,"installment"
,"emp_length"
,"annual_inc"
,"dti"
,"delinq_2yrs"
,"inq_last_6mths"
,"open_acc"
,"pub_rec"
,"out_prncp"
,"out_prncp_inv"
,"total_pymnt"
,"total_pymnt_inv"
,"total_rec_late_fee"
,"collection_recovery_fee"
,"collections_12_mths_ex_med"
,"acc_now_delinq"
,"V_Status_Source Verified"
,"V_Status_Not Verified"
,"App_Type_JOINT"
,"term_Type_ 36 months"
,"term_Type_ 60 months"
]

corr_df = pd.DataFrame(
                    corr_mat, 
                    index=col_names, 
                    columns=col_names)

corr_df

In [19]:
#################################################################################################################################################
#df1 = df1.selectExpr("scaledFeaures", "features")
#df1
## We are taking into consideration 3 models which are based on the result of coorelation matrix

##Model 1 data 
## First Model Consists of Interest Rate, Employee length, Verification Status

dfnan_m1 = dfnan.select("int_rate", "emp_length", "V_Status_Not Verified" , "Loan_Stat_Charged Off")

##Model 2 Data
##Seccond Consists of Interest Rate, Employee length, Verification Status, Total Rec Late Fee, Annual Inc, Debt to Income Ratio, Verfication Status, Term
dfnan_m2 = dfnan.select("int_rate", "total_rec_late_fee", "loan_amnt", "annual_inc", "dti", "V_Status_Not Verified", "term_Type_ 36 months", "Loan_Stat_Charged Off")

##Model3 Data consists of Total Payment, Inq Last 6 Months, Verification Status, Open Account 
dfnan_m3 = dfnan.select( "total_pymnt", "inq_last_6mths" ,"V_Status_Source Verified","open_acc", "Loan_Stat_Charged Off")

def transData(df):
    return df.rdd.map(lambda r: [r[-1], Vectors.dense(r[:-1])]).\
           toDF(['label','features1'])

from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

data_m1= transData(dfnan_m1)
data_m2 = transData(dfnan_m2)
data_m3 = transData(dfnan_m3)
#data.show()


from pyspark.ml.feature import MinMaxScaler
scaler = MinMaxScaler(inputCol="features1", outputCol="features")

# Compute summary statistics and generate MinMaxScalerModel
scalerModel_m1 = scaler.fit(data_m1)
scalerModel_m2 = scaler.fit(data_m2)
scalerModel_m3 = scaler.fit(data_m3)

# rescale each feature to range [min, max].
scaledData_m1 = scalerModel_m1.transform(data_m1)
scaledData_m2 = scalerModel_m2.transform(data_m2)
scaledData_m3 = scalerModel_m3.transform(data_m3)


print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scaledData_m1.select("features1", "features").show()
display(scaledData_m1)

#display(data)

In [20]:
### Taking only the scaled features column forward and dropping the non required column 
m1 = scaledData_m1.drop("features1")
m2 = scaledData_m2.drop("features1")
m3 = scaledData_m3.drop("features1")
df2 = scaledData1.drop("features1")
display(df2)


In [21]:
#Applying Principal Component Analysis for Dimensionality Reduction
from pyspark.ml.feature import PCA

pca = PCA(k=10, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df1)

result = model.transform(df1).select("pcaFeatures")
result.show(truncate=False)

In [22]:
#Randomly splitting the data into Training, Validation and Testing set for Model1

# Also counting the number of number of rows in each of the dataset 

(training_m1, valid_m1, test_m1) = m1.randomSplit([0.7, 0.2, 0.1])
training_m1.groupBy('label').agg(fn.count('*')).show()
test_m1.groupBy('label').agg(fn.count('*')).show()
m1.groupBy('label').agg(fn.count('*')).show()



In [23]:
#Randomly splitting the data into Training, Validation and Testing set for Model2

# Also counting the number of number of rows in each of the dataset 

(training_m2, valid_m2, test_m2) = m2.randomSplit([0.7, 0.2, 0.1])
training_m2.groupBy('label').agg(fn.count('*')).show()
test_m2.groupBy('label').agg(fn.count('*')).show()
m2.groupBy('label').agg(fn.count('*')).show()

In [24]:
#Randomly splitting the data into Training, Validation and Testing set for Model3

# Also counting the number of number of rows in each of the dataset 

(training_m3, valid_m3, test_m3) = m3.randomSplit([0.7, 0.2, 0.1])
training_m3.groupBy('label').agg(fn.count('*')).show()
test_m3.groupBy('label').agg(fn.count('*')).show()
m3.groupBy('label').agg(fn.count('*')).show()

In [25]:
## Logistic regression 

## We are training the dataset for Model1 
## The results of the training are displayed below with the coefficients and the Intercept 

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(regParam=0.3)
lrModel = lr.fit(training_m1)
print("Coefficients: \n" + str(lrModel.coefficientMatrix))
print("Intercept: " + str(lrModel.interceptVector))

In [26]:
#Validating the data for for model1. As we can see below, the accuracy for the model is 63.91 

lr = LogisticRegression().\
    setLabelCol('label').\
    setFeaturesCol('features').\
    setRegParam(0.0).\
    setMaxIter(100).\
    setElasticNetParam(0.)
LRmodelm1 = lr.fit(training_m1)
LRmodelm1.transform(valid_m1).\
    select(fn.expr('float(prediction = label)').alias('correct')).\
    select(fn.avg('correct')).show()

In [27]:
## Logistic regression 

## We are training the dataset for Model2
## The results of the training are displayed below with the coefficients and the Intercept 

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(regParam=0.3)
lrModel = lr.fit(training_m2)
print("Coefficients: \n" + str(lrModel.coefficientMatrix))
print("Intercept: " + str(lrModel.interceptVector))

In [28]:
#Validating the data for for model1. As we can see below, the accuracy for the model is 66.58
lr = LogisticRegression().\
    setLabelCol('label').\
    setFeaturesCol('features').\
    setRegParam(0.0).\
    setMaxIter(100).\
    setElasticNetParam(0.)
LRmodelm1 = lr.fit(training_m2)
LRmodelm1.transform(valid_m2).\
    select(fn.expr('float(prediction = label)').alias('correct')).\
    select(fn.avg('correct')).show()

In [29]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(regParam=0.3)
lrModel = lr.fit(training_m3)
print("Coefficients: \n" + str(lrModel.coefficientMatrix))
print("Intercept: " + str(lrModel.interceptVector))

In [30]:
#Validating the data for for model1. As we can see below, the accuracy for the model is 74.24
lr = LogisticRegression().\
    setLabelCol('label').\
    setFeaturesCol('features').\
    setRegParam(0.0).\
    setMaxIter(100).\
    setElasticNetParam(0.)
LRmodelm1 = lr.fit(training_m3)
LRmodelm1.transform(valid_m3).\
    select(fn.expr('float(prediction = label)').alias('correct')).\
    select(fn.avg('correct')).show()

In [31]:
## We are implementing decision Tree our Model1

from pyspark import SparkContext, SQLContext
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(training_m1)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(training_m1)

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(training_m1)

# Make predictions.
predictions = model.transform(valid_m1)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))
print(accuracy)
treeModel = model.stages[2]
# summary only
print(treeModel)

In [32]:
#### We are implementing decision Tree our Model2

from pyspark import SparkContext, SQLContext
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(training_m2)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(training_m2)

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(training_m2)

# Make predictions.
predictions = model.transform(valid_m2)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))
print(accuracy)
treeModel = model.stages[2]
# summary only
print(treeModel)

In [33]:
## We are implementing decision Tree our Model3

from pyspark import SparkContext, SQLContext
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(training_m3)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(training_m3)

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(training_m3)

# Make predictions.
predictions = model.transform(valid_m3)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))
print(accuracy)
treeModel = model.stages[2]
# summary only
print(treeModel)

In [34]:
## We are implementing Random Forest for Model1

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(training_m1)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(training_m1)

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(training_m1)

# Make predictions.
predictions = model.transform(valid_m1)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print(accuracy)

rfModel = model.stages[2]
print(rfModel)  # summary only

In [35]:
## We are implementing Random Forest for Model2

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(training_m2)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(training_m2)

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(training_m2)

# Make predictions.
predictions = model.transform(valid_m2)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print(accuracy)

rfModel = model.stages[2]
print(rfModel)  # summary only

In [36]:
## We are implementing Random Forest for Model3

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(training_m3)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(training_m3)

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(training_m3)

# Make predictions.
predictions = model.transform(valid_m3)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print(accuracy)

rfModel = model.stages[2]
print(rfModel)  # summary only

In [37]:
## We are implementing Gradiant Boosting Trees for Model1

from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(training_m1)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(training_m1)

# Train a GBT model.
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

# Train model.  This also runs the indexers.
model = pipeline.fit(training_m1)

# Make predictions.
predictions = model.transform(valid_m1)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

gbtModel = model.stages[2]
print(gbtModel)  # summary only
print(accuracy)

In [38]:
## We are implementing Gradiant Boosting Trees for Model2

from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(training_m2)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(training_m2)

# Train a GBT model.
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

# Train model.  This also runs the indexers.
model = pipeline.fit(training_m2)

# Make predictions.
predictions = model.transform(valid_m2)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

gbtModel = model.stages[2]
print(gbtModel)  # summary only
print(accuracy)

In [39]:
## We are implementing Gradiant Boosting Trees for Model3

from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(training_m3)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(training_m3)

# Train a GBT model.
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

# Train model.  This also runs the indexers.
model = pipeline.fit(training_m3)

# Make predictions.
predictions = model.transform(valid_m3)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

gbtModel = model.stages[2]
print(gbtModel)  # summary only
print(accuracy)

In [40]:
## We are implementing Naive Bayes for Model1

from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial", featuresCol="features")

# train the model
model = nb.fit(training_m1)

# select example rows to display.
predictions = model.transform(valid_m1)
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

In [41]:
## We are implementing Naive Bayes for Model2

from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial", featuresCol="features")

# train the model
model = nb.fit(training_m2)

# select example rows to display.
predictions = model.transform(valid_m2)
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

In [42]:
## We are implementing Naive Bayes for Model3

from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial", featuresCol="features")

# train the model
model = nb.fit(training_m3)

# select example rows to display.
predictions = model.transform(valid_m3)
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

In [43]:
## Implementing Neural Networks for Model 1##

from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

layers = [3, 2, 2, 2]

# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234, featuresCol="features")

# train the model
model = trainer.fit(training_m1)

# compute accuracy on the test set
result = model.transform(valid_m1)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

In [44]:
## Implementing Neural Networks for Model 2##

from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

layers = [7, 4, 3, 2]

# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234, featuresCol="features")

# train the model
model = trainer.fit(training_m2)

# compute accuracy on the test set
result = model.transform(valid_m2)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

In [45]:
## Implementing Neural Networks for Model 3##

from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

layers = [4, 3, 2, 2]

# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234, featuresCol="features")

# train the model
model = trainer.fit(training_m3)

# compute accuracy on the test set
result = model.transform(valid_m3)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

In [46]:
## Implementing Neural Network for Model1
## Implementing Logistic Regression in hidden Layers as a classifier algorithm

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.linalg import Vectors
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator, numFolds=10)
cvModel = cv.fit(training_m1)
cvModel.avgMetrics[0]

evaluator.evaluate(cvModel.transform(valid_m1))


In [47]:

## Implementing Neural Network for Model2
## Implementing Logistic Regression in hidden Layers as a classifier algorithm

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.linalg import Vectors
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator, numFolds=10)
cvModel = cv.fit(training_m2)
cvModel.avgMetrics[0]

evaluator.evaluate(cvModel.transform(valid_m2))


In [48]:
## Implementing Neural Network for Model3
## Implementing Logistic Regression in hidden Layers as a classifier algorithm

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.linalg import Vectors
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator, numFolds=10)
cvModel = cv.fit(training_m3)
cvModel.avgMetrics[0]

evaluator.evaluate(cvModel.transform(valid_m3))