#Evaluating Risk for Loan Approvals

## Business Value

Being able to accurately assess the risk of a loan application can save a lender the cost of holding too many risky assets. Rather than a credit score or credit history which tracks how reliable borrowers are, we will generate a score of how profitable a loan will be compared to other loans in the past. The combination of credit scores, credit history, and profitability score will help increase the bottom line for financial institution.

Having a interporable model that a loan officer can use before performing a full underwriting can provide immediate estimate and response for the borrower and a informative view for the lender.

<a href="https://ibb.co/cuQYr6"><img src="https://preview.ibb.co/jNxPym/Image.png" alt="Image" border="0"></a>

This notebook has been tested with *DBR 5.4 ML Beta, Python 3*

## The Data

The data used is public data from Lending Club. It includes all funded loans from 2012 to 2017. Each loan includes applicant information provided by the applicant as well as the current loan status (Current, Late, Fully Paid, etc.) and latest payment information. For a full view of the data please view the data dictionary available [here](https://resources.lendingclub.com/LCDataDictionary.xlsx).


![Loan_Data](https://preview.ibb.co/d3tQ4R/Screen_Shot_2018_02_02_at_11_21_51_PM.png)

https://www.kaggle.com/wendykan/lending-club-loan-data

### Databricks MLflow Integration
Uncomment the next cell to showcase Databricks MLflow Integration.  Note, this currently does not work in Databricks Community Edition.

In [4]:
# import mlflow
# print(mlflow.__version__)

# spark.conf.set("spark.databricks.mlflow.trackMLlib.enabled", "true")

In [5]:
# Configure location of loanstats_2012_2017.parquet
lspq_path = "/databricks-datasets/samples/lending_club/parquet/"

# Read loanstats_2012_2017.parquet
data = spark.read.parquet(lspq_path)

# Reduce the amount of data (to run on DBCE)
(loan_stats_ce, loan_stats_rest) = data.randomSplit([0.025, 0.975], seed=123)

# Select only the columns needed
loan_stats_ce = loan_stats_ce.select("loan_status", "int_rate", "revol_util", "issue_d", "earliest_cr_line", "emp_length", "verification_status", "total_pymnt", "loan_amnt", "grade", "annual_inc", "dti", "addr_state", "term", "home_ownership", "purpose", "application_type", "delinq_2yrs", "total_acc")

# Print out number of loans
print(str(loan_stats_ce.count()) + " loans opened by Lending Club...")

In [6]:
from pyspark.sql.functions import *

print("------------------------------------------------------------------------------------------------")
print("Create bad loan label, this will include charged off, defaulted, and late repayments on loans...")
loan_stats_ce = loan_stats_ce.filter(loan_stats_ce.loan_status.isin(["Default", "Charged Off", "Fully Paid"]))\
                       .withColumn("bad_loan", (~(loan_stats_ce.loan_status == "Fully Paid")).cast("string"))


print("------------------------------------------------------------------------------------------------")
print("Turning string interest rate and revoling util columns into numeric columns...")
loan_stats_ce = loan_stats_ce.withColumn('int_rate', regexp_replace('int_rate', '%', '').cast('float')) \
                       .withColumn('revol_util', regexp_replace('revol_util', '%', '').cast('float')) \
                       .withColumn('issue_year',  substring(loan_stats_ce.issue_d, 5, 4).cast('double') ) \
                       .withColumn('earliest_year', substring(loan_stats_ce.earliest_cr_line, 5, 4).cast('double'))
loan_stats_ce = loan_stats_ce.withColumn('credit_length_in_years', (loan_stats_ce.issue_year - loan_stats_ce.earliest_year))


print("------------------------------------------------------------------------------------------------")
print("Converting emp_length column into numeric...")
loan_stats_ce = loan_stats_ce.withColumn('emp_length', trim(regexp_replace(loan_stats_ce.emp_length, "([ ]*+[a-zA-Z].*)|(n/a)", "") ))
loan_stats_ce = loan_stats_ce.withColumn('emp_length', trim(regexp_replace(loan_stats_ce.emp_length, "< 1", "0") ))
loan_stats_ce = loan_stats_ce.withColumn('emp_length', trim(regexp_replace(loan_stats_ce.emp_length, "10\\+", "10") ).cast('float'))

## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Easily Convert Parquet to Delta Lake format
With Delta Lake, you can easily transform your Parquet data into Delta Lake format.

In [8]:
# Configure Path
DELTALAKE_GOLD_PATH = "/ml/loan_stats.delta"

# Remove table if it exists
dbutils.fs.rm(DELTALAKE_GOLD_PATH, recurse=True)

# Save table as Delta Lake
loan_stats_ce.write.format("delta").mode("overwrite").save(DELTALAKE_GOLD_PATH)

# Re-read as Delta Lake
loan_stats = spark.read.format("delta").load(DELTALAKE_GOLD_PATH)

# Review data
display(loan_stats)

loan_status,int_rate,revol_util,issue_d,earliest_cr_line,emp_length,verification_status,total_pymnt,loan_amnt,grade,annual_inc,dti,addr_state,term,home_ownership,purpose,application_type,delinq_2yrs,total_acc,bad_loan,issue_year,earliest_year,credit_length_in_years
Fully Paid,6.92,19.9,Mar-2015,Mar-2002,5.0,Verified,1043.1275299432,1000.0,A,127000.0,6.47,CA,36 months,MORTGAGE,credit_card,INDIVIDUAL,0.0,48.0,False,2015.0,2002.0,13.0
Fully Paid,8.18,13.0,Jun-2015,May-1988,10.0,Not Verified,1092.47,1000.0,B,45000.0,17.41,OH,36 months,RENT,vacation,INDIVIDUAL,0.0,50.0,False,2015.0,1988.0,27.0
Fully Paid,8.59,62.4,Aug-2016,Mar-2007,0.0,Not Verified,1001.68,1000.0,A,40000.0,3.42,TX,36 months,MORTGAGE,home_improvement,INDIVIDUAL,0.0,5.0,False,2016.0,2007.0,9.0
Fully Paid,8.9,49.6,Nov-2013,Jul-2000,10.0,Source Verified,1143.081070461,1000.0,A,165000.0,14.24,CT,36 months,MORTGAGE,major_purchase,INDIVIDUAL,0.0,40.0,False,2013.0,2000.0,13.0
Fully Paid,9.16,22.6,May-2016,Oct-1988,8.0,Not Verified,1065.0380387449,1000.0,B,50000.0,34.74,IL,36 months,MORTGAGE,credit_card,INDIVIDUAL,0.0,29.0,False,2016.0,1988.0,28.0
Fully Paid,10.99,63.2,Jul-2014,Apr-2003,1.0,Not Verified,1176.7371061369,1000.0,B,52800.0,22.5,CA,36 months,MORTGAGE,home_improvement,INDIVIDUAL,0.0,26.0,False,2014.0,2003.0,11.0
Fully Paid,10.99,43.6,Feb-2015,Aug-2002,10.0,Not Verified,1095.49,1000.0,B,50000.0,15.51,MI,36 months,MORTGAGE,debt_consolidation,INDIVIDUAL,0.0,20.0,False,2015.0,2002.0,13.0
Charged Off,11.47,93.1,May-2016,Apr-1996,0.0,Source Verified,229.52,1000.0,B,31200.0,34.81,IL,36 months,RENT,credit_card,INDIVIDUAL,0.0,34.0,True,2016.0,1996.0,20.0
Fully Paid,11.99,34.6,Mar-2014,Jul-1994,0.0,Verified,1182.38,1000.0,B,75000.0,14.46,CA,36 months,RENT,debt_consolidation,INDIVIDUAL,0.0,22.0,False,2014.0,1994.0,20.0
Fully Paid,11.99,77.7,Jan-2015,Jan-1999,0.0,Not Verified,1044.48,1000.0,B,105000.0,25.94,NH,36 months,MORTGAGE,other,INDIVIDUAL,0.0,37.0,False,2015.0,1999.0,16.0


In [9]:
display(loan_stats)

loan_status,int_rate,revol_util,issue_d,earliest_cr_line,emp_length,verification_status,total_pymnt,loan_amnt,grade,annual_inc,dti,addr_state,term,home_ownership,purpose,application_type,delinq_2yrs,total_acc,bad_loan,issue_year,earliest_year,credit_length_in_years
Fully Paid,6.92,19.9,Mar-2015,Mar-2002,5.0,Verified,1043.1275299432,1000.0,A,127000.0,6.47,CA,36 months,MORTGAGE,credit_card,INDIVIDUAL,0.0,48.0,False,2015.0,2002.0,13.0
Fully Paid,8.18,13.0,Jun-2015,May-1988,10.0,Not Verified,1092.47,1000.0,B,45000.0,17.41,OH,36 months,RENT,vacation,INDIVIDUAL,0.0,50.0,False,2015.0,1988.0,27.0
Fully Paid,8.59,62.4,Aug-2016,Mar-2007,0.0,Not Verified,1001.68,1000.0,A,40000.0,3.42,TX,36 months,MORTGAGE,home_improvement,INDIVIDUAL,0.0,5.0,False,2016.0,2007.0,9.0
Fully Paid,8.9,49.6,Nov-2013,Jul-2000,10.0,Source Verified,1143.081070461,1000.0,A,165000.0,14.24,CT,36 months,MORTGAGE,major_purchase,INDIVIDUAL,0.0,40.0,False,2013.0,2000.0,13.0
Fully Paid,9.16,22.6,May-2016,Oct-1988,8.0,Not Verified,1065.0380387449,1000.0,B,50000.0,34.74,IL,36 months,MORTGAGE,credit_card,INDIVIDUAL,0.0,29.0,False,2016.0,1988.0,28.0
Fully Paid,10.99,63.2,Jul-2014,Apr-2003,1.0,Not Verified,1176.7371061369,1000.0,B,52800.0,22.5,CA,36 months,MORTGAGE,home_improvement,INDIVIDUAL,0.0,26.0,False,2014.0,2003.0,11.0
Fully Paid,10.99,43.6,Feb-2015,Aug-2002,10.0,Not Verified,1095.49,1000.0,B,50000.0,15.51,MI,36 months,MORTGAGE,debt_consolidation,INDIVIDUAL,0.0,20.0,False,2015.0,2002.0,13.0
Charged Off,11.47,93.1,May-2016,Apr-1996,0.0,Source Verified,229.52,1000.0,B,31200.0,34.81,IL,36 months,RENT,credit_card,INDIVIDUAL,0.0,34.0,True,2016.0,1996.0,20.0
Fully Paid,11.99,34.6,Mar-2014,Jul-1994,0.0,Verified,1182.38,1000.0,B,75000.0,14.46,CA,36 months,RENT,debt_consolidation,INDIVIDUAL,0.0,22.0,False,2014.0,1994.0,20.0
Fully Paid,11.99,77.7,Jan-2015,Jan-1999,0.0,Not Verified,1044.48,1000.0,B,105000.0,25.94,NH,36 months,MORTGAGE,other,INDIVIDUAL,0.0,37.0,False,2015.0,1999.0,16.0


In [10]:
print("------------------------------------------------------------------------------------------------")
print("Map multiple levels into one factor level for verification_status...")
loan_stats = loan_stats.withColumn('verification_status', trim(regexp_replace(loan_stats.verification_status, 'Source Verified', 'Verified')))

print("------------------------------------------------------------------------------------------------")
print("Calculate the total amount of money earned or lost per loan...")
loan_stats = loan_stats.withColumn('net', round( loan_stats.total_pymnt - loan_stats.loan_amnt, 2))

##![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Schema Evolution
With the `mergeSchema` option, you can evolve your Delta Lake table schema

In [12]:
# Add the mergeSchema option
loan_stats.write.option("mergeSchema","true").format("delta").mode("overwrite").save(DELTALAKE_GOLD_PATH)

In [13]:
# Original Schema
loan_stats_ce.printSchema()

In [14]:
# New Schema
loan_stats.printSchema()

### ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Review Delta Lake Table History
All the transactions for this table are stored within this table including the initial set of insertions, update, delete, merge, and inserts with schema modification

In [16]:
spark.sql("DROP TABLE IF EXISTS loan_stats")
spark.sql("CREATE TABLE loan_stats USING DELTA LOCATION '" + DELTALAKE_GOLD_PATH + "'")

In [17]:
%sql
DESCRIBE HISTORY loan_stats

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend
1,2019-06-10T14:49:14.000+0000,100599,denny.lee@databricks.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(3148169),0610-133810-masts50,0.0,WriteSerializable,False
0,2019-06-10T14:48:59.000+0000,100599,denny.lee@databricks.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(3148169),0610-133810-masts50,,WriteSerializable,False


In [18]:
display(loan_stats)

loan_status,int_rate,revol_util,issue_d,earliest_cr_line,emp_length,verification_status,total_pymnt,loan_amnt,grade,annual_inc,dti,addr_state,term,home_ownership,purpose,application_type,delinq_2yrs,total_acc,bad_loan,issue_year,earliest_year,credit_length_in_years,net
Fully Paid,6.62,0.8,Apr-2014,Mar-1998,2.0,Verified,1095.3034651799,1000.0,A,150000.0,3.08,CA,36 months,RENT,major_purchase,INDIVIDUAL,0.0,18.0,False,2014.0,1998.0,16.0,95.3
Fully Paid,7.9,60.0,Sep-2012,May-1971,,Verified,1126.405535003999,1000.0,A,22000.0,8.02,MN,36 months,MORTGAGE,car,INDIVIDUAL,0.0,28.0,False,2012.0,1971.0,41.0,126.41
Fully Paid,8.9,0.0,Feb-2012,Nov-1997,10.0,Verified,1143.08107046561,1000.0,A,33982.0,1.87,OR,36 months,MORTGAGE,home_improvement,INDIVIDUAL,0.0,7.0,False,2012.0,1997.0,15.0,143.08
Fully Paid,13.11,86.5,Mar-2013,Sep-1987,10.0,Not Verified,1214.8667631475,1000.0,B,45000.0,15.39,CA,36 months,RENT,credit_card,INDIVIDUAL,0.0,21.0,False,2013.0,1987.0,26.0,214.87
Fully Paid,17.27,1.0,Sep-2012,Jun-2000,6.0,Not Verified,1282.1699999997,1000.0,C,28000.0,6.17,WV,36 months,MORTGAGE,other,INDIVIDUAL,1.0,29.0,False,2012.0,2000.0,12.0,282.17
Fully Paid,18.49,70.6,Jun-2013,Apr-2001,10.0,Not Verified,1310.337030729204,1000.0,D,43000.0,20.57,NC,36 months,MORTGAGE,vacation,INDIVIDUAL,0.0,15.0,False,2013.0,2001.0,12.0,310.34
Fully Paid,18.49,45.8,Jun-2013,Jul-2002,0.0,Verified,1306.560000001,1000.0,D,23750.0,8.24,KS,36 months,RENT,moving,INDIVIDUAL,0.0,12.0,False,2013.0,2002.0,11.0,306.56
Charged Off,19.47,69.1,Apr-2014,May-1989,10.0,Verified,368.25,1000.0,D,61115.0,23.29,TN,36 months,MORTGAGE,vacation,INDIVIDUAL,1.0,25.0,True,2014.0,1989.0,25.0,-631.75
Fully Paid,9.67,6.4,Apr-2014,Aug-2001,4.0,Not Verified,1271.6190749399,1100.0,B,40000.0,23.55,FL,36 months,MORTGAGE,other,INDIVIDUAL,0.0,42.0,False,2014.0,2001.0,13.0,171.62
Fully Paid,7.62,48.1,Jun-2012,Oct-1982,10.0,Verified,1346.1499998546,1200.0,A,150000.0,4.94,KY,36 months,MORTGAGE,moving,INDIVIDUAL,0.0,35.0,False,2012.0,1982.0,30.0,146.15


In [19]:
display(loan_stats.groupBy("addr_state").agg((count(col("annual_inc"))).alias("ratio")))

addr_state,ratio
AZ,395
SC,204
LA,190
MN,304
NJ,609
DC,37
OR,215
VA,489
RI,65
KY,163


In [20]:
display(loan_stats)
# display(loan_stats.groupBy("bad_loan", "grade").agg((sum(col("net"))).alias("sum_net")))

loan_status,int_rate,revol_util,issue_d,earliest_cr_line,emp_length,verification_status,total_pymnt,loan_amnt,grade,annual_inc,dti,addr_state,term,home_ownership,purpose,application_type,delinq_2yrs,total_acc,bad_loan,issue_year,earliest_year,credit_length_in_years,net
Fully Paid,6.62,0.8,Apr-2014,Mar-1998,2.0,Verified,1095.3034651799,1000.0,A,150000.0,3.08,CA,36 months,RENT,major_purchase,INDIVIDUAL,0.0,18.0,False,2014.0,1998.0,16.0,95.3
Fully Paid,7.9,60.0,Sep-2012,May-1971,,Verified,1126.405535003999,1000.0,A,22000.0,8.02,MN,36 months,MORTGAGE,car,INDIVIDUAL,0.0,28.0,False,2012.0,1971.0,41.0,126.41
Fully Paid,8.9,0.0,Feb-2012,Nov-1997,10.0,Verified,1143.08107046561,1000.0,A,33982.0,1.87,OR,36 months,MORTGAGE,home_improvement,INDIVIDUAL,0.0,7.0,False,2012.0,1997.0,15.0,143.08
Fully Paid,13.11,86.5,Mar-2013,Sep-1987,10.0,Not Verified,1214.8667631475,1000.0,B,45000.0,15.39,CA,36 months,RENT,credit_card,INDIVIDUAL,0.0,21.0,False,2013.0,1987.0,26.0,214.87
Fully Paid,17.27,1.0,Sep-2012,Jun-2000,6.0,Not Verified,1282.1699999997,1000.0,C,28000.0,6.17,WV,36 months,MORTGAGE,other,INDIVIDUAL,1.0,29.0,False,2012.0,2000.0,12.0,282.17
Fully Paid,18.49,70.6,Jun-2013,Apr-2001,10.0,Not Verified,1310.337030729204,1000.0,D,43000.0,20.57,NC,36 months,MORTGAGE,vacation,INDIVIDUAL,0.0,15.0,False,2013.0,2001.0,12.0,310.34
Fully Paid,18.49,45.8,Jun-2013,Jul-2002,0.0,Verified,1306.560000001,1000.0,D,23750.0,8.24,KS,36 months,RENT,moving,INDIVIDUAL,0.0,12.0,False,2013.0,2002.0,11.0,306.56
Charged Off,19.47,69.1,Apr-2014,May-1989,10.0,Verified,368.25,1000.0,D,61115.0,23.29,TN,36 months,MORTGAGE,vacation,INDIVIDUAL,1.0,25.0,True,2014.0,1989.0,25.0,-631.75
Fully Paid,9.67,6.4,Apr-2014,Aug-2001,4.0,Not Verified,1271.6190749399,1100.0,B,40000.0,23.55,FL,36 months,MORTGAGE,other,INDIVIDUAL,0.0,42.0,False,2014.0,2001.0,13.0,171.62
Fully Paid,7.62,48.1,Jun-2012,Oct-1982,10.0,Verified,1346.1499998546,1200.0,A,150000.0,4.94,KY,36 months,MORTGAGE,moving,INDIVIDUAL,0.0,35.0,False,2012.0,1982.0,30.0,146.15


In [21]:
display(loan_stats.select("net","verification_status","int_rate", "revol_util", "issue_year", "earliest_year", "bad_loan", "credit_length_in_years", "emp_length"))

net,verification_status,int_rate,revol_util,issue_year,earliest_year,bad_loan,credit_length_in_years,emp_length
95.3,Verified,6.62,0.8,2014.0,1998.0,False,16.0,2.0
126.41,Verified,7.9,60.0,2012.0,1971.0,False,41.0,
143.08,Verified,8.9,0.0,2012.0,1997.0,False,15.0,10.0
214.87,Not Verified,13.11,86.5,2013.0,1987.0,False,26.0,10.0
282.17,Not Verified,17.27,1.0,2012.0,2000.0,False,12.0,6.0
310.34,Not Verified,18.49,70.6,2013.0,2001.0,False,12.0,10.0
306.56,Verified,18.49,45.8,2013.0,2002.0,False,11.0,0.0
-631.75,Verified,19.47,69.1,2014.0,1989.0,True,25.0,10.0
171.62,Not Verified,9.67,6.4,2014.0,2001.0,False,13.0,4.0
146.15,Verified,7.62,48.1,2012.0,1982.0,False,30.0,10.0


In [22]:

print("------------------------------------------------------------------------------------------------")
print("Setting variables to predict bad loans")
myY = "bad_loan"
categoricals = ["term", "home_ownership", "purpose", "addr_state",
                "verification_status","application_type"]
numerics = ["loan_amnt","emp_length", "annual_inc","dti",
            "delinq_2yrs","revol_util","total_acc",
            "credit_length_in_years"]
myX = categoricals + numerics

loan_stats2 = loan_stats.select(myX + [myY, "int_rate", "net", "issue_year"])
train = loan_stats2.filter(loan_stats2.issue_year <= 2015).cache()
valid = loan_stats2.filter(loan_stats2.issue_year > 2015).cache()

# train.count()
# valid.count()

In [23]:
%sql
USE default;
DROP TABLE IF EXISTS loanstats_train;
DROP TABLE IF EXISTS loanstats_valid;

In [24]:
# Save training and validation tables for future use
train.write.saveAsTable("loanstats_train")
valid.write.saveAsTable("loanstats_valid")

### Logistic Regression Notes
* We will be using the Apache Spark pre-installed GLM and GBTClassifier models in this noteboook
* **GLM** is in reference to *generalized linear models*; the Apache Spark *logistic regression* model is a special case of a [generalized linear model](https://spark.apache.org/docs/2.2.0/ml-classification-regression.html#logistic-regression)
* We will also use BinaryClassificationEvaluator, CrossValidator, and ParamGridBuilder to tune our models.
* References to max F1 threshold (i.e. F_1 score or F-score or F-measure) is the measure of our logistic regression model's accuracy; more information can be found at [F1 score](https://en.wikipedia.org/wiki/F1_score).
* **GBTClassifier** is in reference to *gradient boosted tree classifier* which is a popular classification and regression method using ensembles of decision trees; more information can be found at [Gradiant Boosted Tree Classifier](https://spark.apache.org/docs/2.2.0/ml-classification-regression.html#gradient-boosted-tree-classifier)
* In a subsequent notebook, we will be using the XGBoost, an optimized distributed gradient boosting library.  
  * Underneath the covers, we will be using *XGBoost4J-Spark* - a project aiming to seamlessly integrate XGBoost and Apache Spark by fitting XGBoost to Apache Spark’s MLLIB framework.  More inforamtion can be found at [XGBoost4J-Spark Tutorial](https://xgboost.readthedocs.io/en/latest/jvm/xgboost4j_spark_tutorial.html).

In [26]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.feature import StandardScaler, Imputer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

## Current possible ways to handle categoricals in string indexer is 'error', 'keep', and 'skip'
indexers = map(lambda c: StringIndexer(inputCol=c, outputCol=c+"_idx", handleInvalid = 'keep'), categoricals)
ohes = map(lambda c: OneHotEncoder(inputCol=c + "_idx", outputCol=c+"_class"),categoricals)
imputers = Imputer(inputCols = numerics, outputCols = numerics)

# Establish features columns
featureCols = list(map(lambda c: c+"_class", categoricals)) + numerics

# Build the stage for the ML pipeline
# Build the stage for the ML pipeline
model_matrix_stages = list(indexers) + list(ohes) + [imputers] + \
                     [VectorAssembler(inputCols=featureCols, outputCol="features"), StringIndexer(inputCol="bad_loan", outputCol="label")]

# Apply StandardScaler to create scaledFeatures
scaler = StandardScaler(inputCol="features",
                        outputCol="scaledFeatures",
                        withStd=True,
                        withMean=True)

# Use logistic regression 
lr = LogisticRegression(maxIter=10, elasticNetParam=0.5, featuresCol = "scaledFeatures")

# Build our ML pipeline
pipeline = Pipeline(stages=model_matrix_stages+[scaler]+[lr])

# Build the parameter grid for model tuning
paramGrid = ParamGridBuilder() \
              .addGrid(lr.regParam, [0.1, 0.01]) \
              .build()

# Execute CrossValidator for model tuning
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=5)

# Train the tuned model and establish our best model
cvModel = crossval.fit(train)
glm_model = cvModel.bestModel

# Return ROC
lr_summary = glm_model.stages[len(glm_model.stages)-1].summary
display(lr_summary.roc)

FPR,TPR
0.0,0.0
0.0041903342955582,0.030899830220713
0.0092187354502281,0.0587436332767402
0.0153645590837135,0.0825127334465195
0.0220690939566067,0.1042444821731748
0.028494273209796,0.1269949066213921
0.035757519322097,0.1466893039049236
0.0426482912747928,0.1677419354838709
0.0502840115466989,0.1860780984719864
0.057267902039296,0.2067911714770798


In [27]:
fMeasure = lr_summary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
maxFMeasure = maxFMeasure['max(F-Measure)']
fMeasure = fMeasure.toPandas()
bestThreshold = float ( fMeasure[ fMeasure['F-Measure'] == maxFMeasure] ["threshold"])
lr.setThreshold(bestThreshold)

In [28]:
from pyspark.ml.classification import GBTClassifier

# Establish stages for our GBT model
indexers = map(lambda c: StringIndexer(inputCol=c, outputCol=c+"_idx", handleInvalid = 'keep'), categoricals)
imputers = Imputer(inputCols = numerics, outputCols = numerics)
featureCols = list(map(lambda c: c+"_idx", categoricals)) + numerics

# Define vector assemblers
model_matrix_stages = list(indexers) + [imputers] + \
                     [VectorAssembler(inputCols=featureCols, outputCol="features"), StringIndexer(inputCol="bad_loan", outputCol="label")]

# Define a GBT model.
gbt = GBTClassifier(featuresCol="features",
                    labelCol="label",
                    lossType = "logistic",
                    maxBins = 52,
                    maxIter=20,
                    maxDepth=5)

# Chain indexer and GBT in a Pipeline
pipeline = Pipeline(stages=model_matrix_stages+[gbt])

# Train model.  This also runs the indexer.
gbt_model = pipeline.fit(train)

In [29]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.linalg import Vectors

def extract(row):
  return (row.net,) + tuple(row.probability.toArray().tolist()) +  (row.label,) + (row.prediction,)

def score(model,data):
  pred = model.transform(data).select("net", "probability", "label", "prediction")
  pred = pred.rdd.map(extract).toDF(["net", "p0", "p1", "label", "prediction"])
  return pred 

def auc(pred):
  metric = BinaryClassificationMetrics(pred.select("p1", "label").rdd)
  return metric.areaUnderROC

glm_train = score(glm_model, train)
glm_valid = score(glm_model, valid)
gbt_train = score(gbt_model, train)
gbt_valid = score(gbt_model, valid)

glm_train.createOrReplaceTempView("glm_train")
glm_valid.createOrReplaceTempView("glm_valid")
gbt_train.createOrReplaceTempView("gbt_train")
gbt_valid.createOrReplaceTempView("gbt_valid")


print ("GLM Training AUC:" + str(auc(glm_train)))
print ("GLM Validation AUC :" + str(auc(glm_valid)))
print ("GBT Training AUC :" + str(auc(gbt_train)))
print ("GBT Validation AUC :" + str(auc(gbt_valid)))

In [30]:
%scala
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
// import org.apache.spark.sql.functions.typedLit
import org.apache.spark.sql.functions.{array, lit, map, struct}

def roc(pred:org.apache.spark.sql.DataFrame, model_id:String): org.apache.spark.sql.DataFrame = {
  var testScoreAndLabel = pred.select("p1", "label").map{ case Row(p:Double,l:Double) => (p,l)}
  val metrics = new BinaryClassificationMetrics(testScoreAndLabel.rdd, 100)
  val roc = metrics.roc().toDF().withColumn("model", lit(model_id))
  return roc
}

val glm_train = roc( spark.table("glm_train"), "glm_train")
val glm_valid = roc( spark.table("glm_valid"), "glm_valid")
val gbt_train = roc( spark.table("gbt_train"), "gbt_train")
val gbt_valid = roc( spark.table("gbt_valid"), "gbt_valid")

val roc_curves = glm_train.union(glm_valid).union(gbt_train).union(gbt_valid)

display(roc_curves)

_1,_2,model
0.0,0.0,glm_train
0.0041903342955582,0.030899830220713,glm_train
0.0092187354502281,0.0587436332767402,glm_train
0.0153645590837135,0.0825127334465195,glm_train
0.0220690939566067,0.1042444821731748,glm_train
0.028494273209796,0.1269949066213921,glm_train
0.035757519322097,0.1466893039049236,glm_train
0.0426482912747928,0.1677419354838709,glm_train
0.0502840115466989,0.1860780984719864,glm_train
0.057267902039296,0.2067911714770798,glm_train


In [31]:
gbt_valid_table = spark.table("gbt_valid")
gbt_valid_table.createOrReplaceTempView("gbt_valid_table")

In [32]:
%sql
select * from gbt_valid_table

net,p0,p1,label,prediction
1.68,0.8354936940137068,0.1645063059862932,0.0,0.0
65.04,0.7970671574736953,0.2029328425263047,0.0,0.0
-770.48,0.755552706628749,0.244447293371251,1.0,0.0
150.37,0.6896990805689914,0.3103009194310085,0.0,0.0
0.5,0.882076236972316,0.117923763027684,0.0,0.0
8.03,0.6575777374631043,0.3424222625368956,0.0,0.0
36.81,0.8327943679614492,0.1672056320385507,0.0,0.0
94.28,0.8541584878572737,0.1458415121427263,0.0,0.0
93.38,0.8957725651286328,0.1042274348713672,0.0,0.0
25.77,0.6760165709124683,0.3239834290875317,0.0,0.0


## Quantify the Business Value

A great way to quickly understand the business value of this model is to create a confusion matrix.  The definition of our matrix is as follows:

* Prediction=1, Label=1 (Blue) : Correctly found bad loans. sum_net = loss avoided.
* Prediction=1, Label=0 (Orange) : Incorrectly labeled bad loans. sum_net = profit forfeited.
* Prediction=0, Label=1 (Green) : Incorrectly labeled good loans. sum_net = loss still incurred.
* Prediction=0, Label=0 (Red) : Correctly found good loans. sum_net = profit retained.

The following code snippet calculates the following confusion matrix.

In [34]:
display(glm_valid.groupBy("label", "prediction").agg((sum(col("net"))).alias("sum_net")))

label,prediction,sum_net
1.0,1.0,-243849.94000000003
0.0,1.0,20739.870000000003
1.0,0.0,-6435345.999999997
0.0,0.0,2257551.6400000006


## Using the MLflow Runs Sidebar

Because of the code snippet added in cell 5, you can view your MLflow runs using the [MLflow Runs Sidebar](https://databricks.com/blog/2019/04/30/introducing-mlflow-run-sidebar-in-databricks-notebooks.html).  *Note, this feature is currently not available in Databricks Community Edition.*

![](https://pages.databricks.com/rs/094-YMS-629/images/db-mlflow-integration.gif)