## Final Project - Logistic Regression

### Prepare the Data

First, import the libraries you will need and prepare the training and test data:

In [3]:
# Import Spark SQL and Spark ML libraries
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator


### Read your csv file from its table at Databricks

In [5]:
# Load the source data
csv = sqlContext.sql("SELECT * FROM final_csv")

In [6]:
# Select features and label
data1 = csv.select("tract_to_msamd_income","population", "minority_population", "loan_amount_000s", "applicant_income_000s","purchaser_type_name","preapproval_name","owner_occupancy_name","loan_type_name","lien_status_name","co_applicant_sex_name","co_applicant_race_name_1","co_applicant_ethnicity_name","agency_name","agency_abbr",col("action_taken_name").alias("label"))

In [7]:
# Drop rows from the table even if one value is null
data2 = data1.dropna()

In [8]:
# Split the data
splits = data2.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")

### Define the Pipeline
Now define a pipeline that creates a feature vector and trains a classification model

In [10]:
# Define the pipeline
lr = []
pipeline = []
assembler = []
for i in range(2):
  assembler.insert(i, VectorAssembler(inputCols = ["tract_to_msamd_income","population", "minority_population", "loan_amount_000s", "applicant_income_000s","purchaser_type_name","preapproval_name","owner_occupancy_name","loan_type_name","lien_status_name","co_applicant_sex_name","co_applicant_race_name_1","co_applicant_ethnicity_name","agency_name","agency_abbr"], outputCol="features"))
  lr.insert(i, LogisticRegression(labelCol="label", featuresCol="features"))
  pipeline.insert(i, Pipeline(stages=[assembler[i], lr[i]]))

### Train Validation Split with Threshold parameters
Build the best model using TrainValidationSplit

The first combination of parameters with (regParam: [0.01, 0.5]), (threshold: [0.30, 0.35]), (maxIter: [1, 5])

In [13]:
# define list of models made from Train Validation Split and Cross Validation
model = []

In [14]:
# params refered to the reference above
paramGrid = (ParamGridBuilder() \
             .addGrid(lr[0].regParam, [0.01, 0.5, 2.0]) \
             .addGrid(lr[0].threshold, [0.30, 0.35]) \
             .addGrid(lr[0].maxIter, [1, 5]) \
             .build())

In [15]:
tvs = TrainValidationSplit(estimator=pipeline[0], evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid, trainRatio=0.8)
# the first best model
model.insert(0, tvs.fit(train))

### Train Validation Split with elastic-net parameters
The second combination of parameters with (regParam: [0.01, 0.5, 2.0]), (elasticNetParam: [0.0, 0.5, 1]), (maxIter: [1, 5])

In [17]:
# TODO: params refered to the reference above
paramGrid2 = (ParamGridBuilder()
.addGrid(lr[0].regParam, [0.01, 0.5, 2.0])
.addGrid(lr[0].elasticNetParam, [0.0, 0.5, 1])
.addGrid(lr[0].maxIter, [1, 5]).build())

In [18]:
tvs2 = TrainValidationSplit(estimator=pipeline[1], evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid2, trainRatio=0.8)

# the second best model
model.insert(1, tvs2.fit(train))

### Test the Model
Now you're ready to apply the model to the test data.

In [20]:
# list prediction
prediction = [] 
predicted = []
for i in range(2):
  prediction.insert(i, model[i].transform(test))
  predicted.insert(i, prediction[i].select("features", "prediction", "probability", "trueLabel"))
  predicted[i].show(30)

### Review the Area Under ROC
Another way to assess the performance of a classification model is to measure the area under a ROC curve for the model. the spark.ml library includes a **BinaryClassificationEvaluator** class that you can use to compute this.

In [22]:
evaluator = []
for i in range(2):
  evaluator.insert(i, BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="prediction", metricName="areaUnderROC"))
  auc = evaluator[i].evaluate(prediction[i])
  print "AUC ", i, " = ", auc

### Review the Recall And Precision

In [24]:
for i in range(2):
  tp = float(predicted[i].filter("prediction == 1.0 AND truelabel == 1").count())
  fp = float(predicted[i].filter("prediction == 1.0 AND truelabel == 0").count())
  tn = float(predicted[i].filter("prediction == 0.0 AND truelabel == 0").count())
  fn = float(predicted[i].filter("prediction == 0.0 AND truelabel == 1").count())
  metrics = spark.createDataFrame([
      ("Precision", tp / (tp + fp)),
      ("Recall", tp / (tp + fn))],["metric", "value"])
  metrics.show()