In [1]:
# The dataset is from Kaggle lending club loan dataset with below link:
# https://www.kaggle.com/wendykan/lending-club- loan-data

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
import pandas as pd


In [2]:
from pyspark.sql import SQLContext
sqlc = SQLContext(sc)

In [3]:
df = (sqlc.read.format('com.databricks.spark.csv')
      .options(header='true', inferschema='true')
      .load('/FileStore/tables/d20jc3dn1501724096519/loan.csv'))

In [4]:
df.printSchema()

In [5]:
df.count()

In [6]:
df.groupBy("loan_status").count().show()

In [7]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
def modify_values(r):
    if r == "Default" or r =="Charged Off" or r =="Late (31-120 days)" or r =="Late (16-30 days)" or r =="Does not meet the credit policy. Status:Charged Off" :
        return int(1)
    else:
        return int(0)


ol_val = udf(modify_values, StringType())
new_df = df.withColumn("default_Binary",ol_val(df.loan_status))
  

In [8]:
new_df.groupby("default_Binary").count().show()

In [9]:
df_1 = new_df.select( "Default_Binary",
                     "loan_amnt",
                     "int_rate",
                     "grade",
                     "emp_length",
                     "purpose"
                    )

In [10]:
pd.DataFrame(df_1.take(5), columns=df_1.columns).transpose()

In [11]:
df_1.describe().show()

In [12]:
#fill numeria variavle na with mean
numVars = ['loan_amnt','int_rate']
def countNull(df_1,var):
    return df_1.where(df_1[var].isNull()).count()
missing = {var: countNull(df_1,var) for var in numVars}
load_amnt_mean = df_1.groupBy().mean('loan_amnt').first()[0]
int_rate_mean = df_1.groupBy().mean('int_rate').first()[0]
df_1 = df_1.na.fill({'loan_amnt':load_amnt_mean,'int_rate':int_rate_mean})

In [13]:
df_1=df_1.dropna()

In [14]:
catVars = ['grade','emp_length','purpose']
## make use of pipeline to index all categorical variables
def indexer(df_1,col):
    si = StringIndexer(inputCol = col, outputCol = col+'_indexed').fit(df_1)
    return si
indexers = [indexer(df_1,col) for col in catVars]
from pyspark.ml import Pipeline
pipeline_1 = Pipeline(stages = indexers)
df_2 = pipeline_1.fit(df_1).transform(df_1)

In [15]:
df_2.printSchema()

In [16]:
df_2.select("grade","grade_indexed").distinct().show()

In [17]:
#covert features to Vectors
catVarsIndexed = [i+'_indexed' for i in catVars]
featuresCol = numVars+catVarsIndexed
labelCol = ['Default_Binary']
from pyspark.sql import Row
row = Row('label','features') 
df_2 = df_2[labelCol+featuresCol]

In [18]:
df_2.show(5)

In [19]:
df_2.groupby('Default_Binary').count().toPandas()

In [20]:
# map features to DenseVector
from pyspark.ml.linalg import DenseVector
lf = df_2.rdd.map(lambda r: (row(r[0],DenseVector(r[1:])))).toDF()
lf = StringIndexer(inputCol = 'label',outputCol='index').fit(lf).transform(lf)
lf.show(5)

In [21]:
(trainingData, testData) = lf.randomSplit([0.7, 0.3], seed = 100)
print trainingData.count()
print testData.count()

In [22]:
# logistic regression
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter = 10, regParam = 0.05, labelCol='index')
lrmodel=lr.fit(trainingData)

In [23]:
# Evaluate model based on auc ROC(default for binary classification)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
 
def testModel(model, testData = testData):
    pred = model.transform(testData)
    evaluator = BinaryClassificationEvaluator(labelCol = 'index')
   
    return evaluator.evaluate(pred),pred.select('index', 'prediction', 'probability').toPandas().sample(n=5)
print 'AUC ROC of Logistic Regression model is: '+str(testModel(lrmodel))

In [24]:
#decision tree random forest
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier
 
dt = DecisionTreeClassifier(maxDepth = 4,  impurity='gini', labelCol ='index')
dtmodel=dt.fit(trainingData)
rf = RandomForestClassifier(numTrees = 100, labelCol = 'index')
rfmodel=rf.fit(trainingData)
 
models = {'LogisticRegression':lrmodel,
          'DecistionTree':dtmodel,
          'RandomForest':rfmodel}
 
modelPerf = {k:testModel(v) for k,v in models.iteritems()}
 
print modelPerf

In [25]:
#Stratified Sampling
stratified_data = lf.sampleBy("index", fractions={0: 0.1, 1.0: 1.0}).cache()

stratified_data.groupby("index").count().toPandas()

In [26]:
(trainingData2, testData2) = stratified_data.randomSplit([0.7, 0.3], seed = 100)

In [27]:
lrModel2 = lr.fit(trainingData2)
predictions = lrModel2.transform(testData2)
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(labelCol = 'index')
evaluator.evaluate(predictions)

In [28]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(labelCol = 'index')
evaluator.evaluate(predictions)

In [29]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData2)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing
predictions = cvModel.transform(testData2)
evaluator.evaluate(predictions)

In [30]:
dtModel2 = dt.fit(trainingData2)
predictions = dtModel2.transform(testData2)
print(dtModel2.toDebugString)

In [31]:
# Evaluate model
evaluator = BinaryClassificationEvaluator(labelCol = 'index')
evaluator.evaluate(predictions)

In [32]:
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1,2,6,10])
             .addGrid(dt.maxBins, [20,40,80])
             .build())
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData2)

# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData2)
# Evaluate best model
evaluator.evaluate(predictions)

In [33]:
# getting random forest feature importances
rfModel2 = rf.fit(trainingData2)
predictions = rfModel2.transform(testData2)
print (rfModel2.featureImportances)


In [34]:
# Evaluate model
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol = 'index')
evaluator.evaluate(predictions)

In [35]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

cvModel = cv.fit(trainingData2)
predictions = cvModel.transform(testData2)
evaluator.evaluate(predictions)

In [36]:
bestModel = cvModel.bestModel
finalPredictions = bestModel.transform(stratified_data)
evaluator.evaluate(finalPredictions)