# Lab 16 Assignment 3 - Group Assignment - Group O-1-7

When creating ML models, the concept of efficiency has three sides:
1. The time dedicated by the analyst to build the model
2. The computer time and resources needed by the final model
3. The accuracy of the final model

Efficiency is a combination of all

In this assignment, you are asked to be efficient. Spark is the best tool to build models over massive datasets

If you need to create Spark+Python Machine Learning models that "run fast" on the  cluster, you must avoid using Python code or working with RRD+python. Try to use  the already existing methods that do what you need (do not reinvent the wheel).

Therefore try to use the implemented object+methods inside the Spark SQL and ML modules. They are very fast, because it is compiled Java/Scala code. Try to use: DataFrames, Feature Transfomers, Estimators, Pipelines, GridSearch, CV, ...

For this assignment, you are asked to create a classification model that:
1. Uses the variables in the dataset (train.csv) to predict label "loan_status"
2. Write a python scripts that:
    - Reads the "train.csv" and "test.csv" files, transform and select variables as you wish.
    - Train/fit your model using the "train.csv".
    - Predict your model on the "test.csv" ( you should generate a file with your predictions).
    - I will use a different test dataset (with the true loan_status).

Your work will be evaluated under the following scoring schema
- (40%) ETL process
- (40%) Model train process
- (10%) Code Readability 
- (10%) AUC on the test set (at least 50%)

Enjoy it and best of luck!!


This Assignment is based on kaggle competition https://www.kaggle.com/c/loan-default-prediction from where a sub-dataset has been taken.

### File Description
**train.csv** - the training set (to use for building a model)

**test.csv** - the test set (to use for applying predictings)

**sample_submission.csv** - a template for the submission file

### Data Description (contained in LendingClub_DataDescription.csv)
**ID**: A unique LC assigned ID for the loan listing.

**loan_amnt**: The listed amount of the loan applied for by the borrower. If at some point in time, the credit department reduces the loan amount, then it will be reflected in this value.

**loan_status**: Current status of the loan (**Target**: 1 = Charged Off, 0 = Fully Paid).

**term**: The number of payments on the loan. Values are in months and can be either 36 or 60.

**int_rate**: Interest Rate on the loan.

**installment**: The monthly payment owed by the borrower if the loan originates.

**emp_length**: Employment length in years. Possible values are between 0 and 10 where 0 means less than one year and 10 means ten or more years.

**home_ownership**: The home ownership status provided by the borrower during registration. Our values are: OTHER/NONE, MORTGAGE, OWN, RENT.

**annual_inc**: The self-reported annual income provided by the borrower during registration.

**purpose**: A category provided by the borrower for the loan request.

**title**: The loan title provided by the borrower.

**STATE**: The state provided by the borrower in the loan application.

**delinq_2yrs**: The number of 30+ days past-due incidences of delinquency in the borrower's credit file for the past 2 years.

**revol_bal**: Total credit revolving balance.

**revol_util**: Revolving line utilization rate, or the amount of credit the borrower is using relative to all available revolving credit.

**total_pymnt**: Indicates total payment at the end of the loan.

In [1]:
import os
import sys

os.environ['SPARK_HOME'] = "/Users/stavrostsentemeidis/Desktop/Install_Spark/spark-2.3.2-bin-hadoop2.7/"

# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']

#Add the following paths to the system path. Please check your installation
#to make sure that these zip files actually exist. The names might change
#as versions change.
sys.path.insert(0,os.path.join(SPARK_HOME,"python"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","pyspark.zip"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","py4j-0.10.7-src.zip"))

#Initialize SparkSession and SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
from pyspark.sql.functions import col
from pyspark.sql.functions import isnan, when, count
from pyspark.sql.functions import *
from pyspark.ml.classification import RandomForestClassificationModel, RandomForestClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.param import Params
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import MinMaxScaler
import pandas as pd


#Create a Spark Session
MySparkSession = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("MiPrimer") \
    .config("spark.executor.memory", "6g") \
    .config("spark.cores.max","4") \
    .getOrCreate()


#Get the Spark Context from Spark Session    
MySparkContext = MySparkSession.sparkContext

### Reading & Displaying Files

In [2]:
loanDF = MySparkSession.read.format('csv') \
                .option("inferSchema", "true") \
                .option("delimiter", ";") \
                .option('header','true') \
                .load('../data/train.csv') 

testDF = MySparkSession.read.format('csv') \
                .option("inferSchema", "true") \
                .option("delimiter", ";") \
                .option('header','true') \
                .load('../data/test.csv')

In [3]:
loanDF.limit(10).toPandas()

Unnamed: 0,ID,loan_amnt,term,int_rate,installment,emp_length,home_ownership,annual_inc,purpose,title,STATE,delinq_2yrs,revol_bal,revol_util,total_pymnt,loan_status
0,2,2500,60 months,15.27%,59.83,< 1 year,RENT,30000.0,car,bike,GA,0,1687,0.094,1014.53,1
1,4,10000,36 months,13.49%,339.31,10+ years,RENT,49200.0,other,personel,CA,0,5598,0.21,12231.89,0
2,5,3000,60 months,12.69%,67.79,1 year,RENT,80000.0,other,Personal,OR,0,27783,0.539,4066.908161,0
3,6,5000,36 months,7.90%,156.46,3 years,RENT,36000.0,wedding,My wedding loan I promise to pay back,AZ,0,7963,0.283,5632.21,0
4,7,7000,60 months,15.96%,170.08,8 years,RENT,47004.0,debt_consolidation,Loan,NC,0,17726,0.856,10137.84001,0
5,8,3000,36 months,18.64%,109.43,9 years,RENT,48000.0,car,Car Downpayment,CA,0,8221,0.875,3939.135294,0
6,10,5375,60 months,12.69%,121.45,< 1 year,RENT,15000.0,other,Building my credit history.,TX,0,9279,0.365,1484.59,1
7,11,6500,60 months,14.65%,153.45,5 years,OWN,72000.0,debt_consolidation,High intrest Consolidation,AZ,0,4032,0.206,7678.017673,0
8,12,12000,36 months,12.69%,402.54,10+ years,OWN,75000.0,debt_consolidation,Consolidation,CA,0,23336,0.671,13947.98916,0
9,14,3000,36 months,9.91%,96.68,3 years,RENT,15000.0,credit_card,citicard fund,IL,0,7323,0.431,3480.269999,0


In [4]:
testDF.limit(10).toPandas()

Unnamed: 0,ID,loan_amnt,term,int_rate,installment,emp_length,home_ownership,annual_inc,purpose,title,STATE,delinq_2yrs,revol_bal,revol_util,total_pymnt,loan_status
0,1,5000,36 months,10.65%,162.87,10+ years,RENT,24000.0,credit_card,Computer,AZ,0,13648,0.84,5863.155187,1
1,3,2400,36 months,15.96%,84.33,10+ years,RENT,12252.0,small_business,real estate business,IL,0,2956,0.99,3005.666844,1
2,9,5600,60 months,21.28%,152.39,4 years,OWN,40000.0,small_business,Expand Business & Buy Debt Portfolio,CA,0,5210,0.33,647.5,1
3,13,9000,36 months,13.49%,305.38,< 1 year,RENT,30000.0,debt_consolidation,freedom,VA,0,10452,0.92,2277.32,1
4,15,10000,36 months,10.65%,325.74,3 years,RENT,100000.0,other,Other Loan,CA,0,11997,0.56,7471.99,1
5,18,3600,36 months,6.03%,109.57,10+ years,MORTGAGE,110000.0,major_purchase,Holiday,CT,0,22836,0.16,3785.271965,1
6,26,15000,36 months,9.91%,483.38,2 years,MORTGAGE,92000.0,credit_card,No more credit card debt!,IL,0,13707,0.94,15823.99905,1
7,27,15000,36 months,14.27%,514.64,9 years,RENT,60000.0,debt_consolidation,consolidation,NY,0,5872,0.58,0.0,1
8,29,4000,36 months,11.71%,132.31,10+ years,MORTGAGE,106000.0,debt_consolidation,Debt Consolidation,FL,1,6110,0.38,4486.293519,1
9,31,4375,36 months,7.51%,136.11,7 years,MORTGAGE,17108.0,debt_consolidation,Debt Consolidation,NY,0,11210,0.87,4899.96,1


### EDA | Null Values | Cross Table Distribution | Covariances

#### Summary of Columns

In [5]:
loanDF.printSchema()
testDF.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- loan_amnt: integer (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: string (nullable = true)
 |-- installment: double (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- purpose: string (nullable = true)
 |-- title: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- delinq_2yrs: integer (nullable = true)
 |-- revol_bal: integer (nullable = true)
 |-- revol_util: string (nullable = true)
 |-- total_pymnt: double (nullable = true)
 |-- loan_status: integer (nullable = true)

root
 |-- ID: integer (nullable = true)
 |-- loan_amnt: integer (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: string (nullable = true)
 |-- installment: double (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- purp

#### Renaming | Describing | Changing Data Type

In [6]:
loanDF = loanDF.withColumn('int_rate', regexp_replace('int_rate', '%', ''))
loanDF = loanDF.withColumn('title', regexp_replace('title', '.', ''))
loanDF = loanDF.withColumn("int_rate", loanDF["int_rate"].cast("decimal(10,0)"))
loanDF = loanDF.withColumn("revol_util", loanDF["revol_util"].cast("decimal(10,0)"))

testDF = testDF.withColumn('int_rate', regexp_replace('int_rate', '%', ''))
testDF = testDF.withColumn('title', regexp_replace('title', '.', ''))
testDF = testDF.withColumn("int_rate", testDF["int_rate"].cast("decimal(10,0)"))
testDF = testDF.withColumn("revol_util", testDF["revol_util"].cast("decimal(10,0)"))

In [7]:
loanDF = loanDF.withColumnRenamed("ID","id")
loanDF = loanDF.withColumnRenamed("loan_amnt","loan_amount")
loanDF = loanDF.withColumnRenamed("term","term")
loanDF = loanDF.withColumnRenamed("home_ownership","home_ownership")
loanDF = loanDF.withColumnRenamed("int_rate","interest_rate")
loanDF = loanDF.withColumnRenamed("installment","monthly_payment")
loanDF = loanDF.withColumnRenamed("emp_length","employment_time")
loanDF = loanDF.withColumnRenamed("delinq_2yrs","deliquency_past_2years")
loanDF = loanDF.withColumnRenamed("revol_bal","revolving_balance")
loanDF = loanDF.withColumnRenamed("revol_util","revolving_utilization_rate")
loanDF = loanDF.withColumnRenamed("total_pymnt","total_payment")
loanDF = loanDF.withColumnRenamed("purpose","loan_purpose")
loanDF = loanDF.withColumnRenamed("annual_inc","annual_income")
loanDF = loanDF.withColumnRenamed("STATE","state")
loanDF = loanDF.withColumnRenamed("installment","installment")
loanDF = loanDF.withColumnRenamed("loan_status","loan_status")


testDF = testDF.withColumnRenamed("ID","id")
testDF = testDF.withColumnRenamed("loan_amnt","loan_amount")
testDF = testDF.withColumnRenamed("term","term")
testDF = testDF.withColumnRenamed("home_ownership","home_ownership")
testDF = testDF.withColumnRenamed("int_rate","interest_rate")
testDF = testDF.withColumnRenamed("installment","monthly_payment")
testDF = testDF.withColumnRenamed("emp_length","employment_time")
testDF = testDF.withColumnRenamed("delinq_2yrs","deliquency_past_2years")
testDF = testDF.withColumnRenamed("revol_bal","revolving_balance")
testDF = testDF.withColumnRenamed("revol_util","revolving_utilization_rate")
testDF = testDF.withColumnRenamed("total_pymnt","total_payment")
testDF = testDF.withColumnRenamed("purpose","loan_purpose")
testDF = testDF.withColumnRenamed("annual_inc","annual_income")
testDF = testDF.withColumnRenamed("STATE","state")
testDF = testDF.withColumnRenamed("installment","installment")
testDF = testDF.withColumnRenamed("loan_status","loan_status")

In [8]:
numeric_df = ['loan_amount','interest_rate', 'monthly_payment','annual_income', 'deliquency_past_2years',
              'total_payment','revolving_balance','revolving_utilization_rate']

categorical_Df = ['term','employment_time', 'home_ownership', 'loan_purpose', 'title','state']

loanDF.describe('loan_amount','term','interest_rate','title','employment_time','home_ownership').show()
loanDF.describe('annual_income','loan_purpose','monthly_payment','state','deliquency_past_2years').show()
loanDF.describe('revolving_balance','revolving_utilization_rate','total_payment','loan_status').show()

+-------+------------------+----------+------------------+-----+---------------+--------------+
|summary|       loan_amount|      term|     interest_rate|title|employment_time|home_ownership|
+-------+------------------+----------+------------------+-----+---------------+--------------+
|  count|             29755|     29755|             29755|28517|          29755|         29755|
|   mean|11218.509494202655|      null|           12.0301| null|           null|          null|
| stddev| 7431.662873498601|      null|3.7181574007403158| null|           null|          null|
|    min|               500| 36 months|                 5|     |         1 year|      MORTGAGE|
|    max|             35000| 60 months|                25|    |            n/a|          RENT|
+-------+------------------+----------+------------------+-----+---------------+--------------+

+-------+----------------+------------+------------------+-----+----------------------+
|summary|   annual_income|loan_purpose|   month

In [9]:
display(testDF.describe())

DataFrame[summary: string, id: string, loan_amount: string, term: string, interest_rate: string, monthly_payment: string, employment_time: string, home_ownership: string, annual_income: string, loan_purpose: string, title: string, state: string, deliquency_past_2years: string, revolving_balance: string, revolving_utilization_rate: string, total_payment: string, loan_status: string]

In [10]:
print(loanDF.count())
print(testDF.count())

29755
10024


#### Checking Null Values


In [11]:
remove_loanDF = loanDF.na.drop() 
remove_testDF = testDF.na.drop()  
print(remove_loanDF.count())
print(remove_testDF.count())

27773
9116


#### Cross Table Distribution


In [12]:
# cross tables distribution
loanDF.stat.crosstab('term','employment_time').show()
loanDF.stat.crosstab('term','home_ownership').show()
loanDF.stat.crosstab('term','loan_purpose').show()
loanDF.stat.crosstab('term','state').show()
loanDF.stat.crosstab('employment_time','home_ownership').show()
loanDF.stat.crosstab('employment_time','loan_purpose').show()
loanDF.stat.crosstab('employment_time','state').show()
loanDF.stat.crosstab('home_ownership','loan_purpose').show()
loanDF.stat.crosstab('home_ownership','state').show()
loanDF.stat.crosstab('loan_purpose','state').show()


+--------------------+------+---------+-------+-------+-------+-------+-------+-------+-------+-------+--------+---+
|term_employment_time|1 year|10+ years|2 years|3 years|4 years|5 years|6 years|7 years|8 years|9 years|< 1 year|n/a|
+--------------------+------+---------+-------+-------+-------+-------+-------+-------+-------+-------+--------+---+
|           60 months|   497|     2284|    746|    748|    664|    724|    495|    389|    333|    268|     694|207|
|           36 months|  1943|     4321|   2524|   2314|   1927|   1773|   1175|    943|    777|    672|    2752|585|
+--------------------+------+---------+-------+-------+-------+-------+-------+-------+-------+-------+--------+---+

+-------------------+--------+----+-----+----+-----+
|term_home_ownership|MORTGAGE|NONE|OTHER| OWN| RENT|
+-------------------+--------+----+-----+----+-----+
|          60 months|    4315|   0|    0| 569| 3165|
|          36 months|    8973|   3|   69|1703|10958|
+-------------------+--------+--

+---------------------------+---+-----------+------------------+-----------+----------------+-----+--------------+-------+------+----+-----+----------------+--------------+--------+-------+
|home_ownership_loan_purpose|car|credit_card|debt_consolidation|educational|home_improvement|house|major_purchase|medical|moving|null|other|renewable_energy|small_business|vacation|wedding|
+---------------------------+---+-----------+------------------+-----------+----------------+-----+--------------+-------+------+----+-----+----------------+--------------+--------+-------+
|                      OTHER|  1|          9|                29|          2|               3|    1|             6|      2|     0|   3|    8|               0|             5|       0|      0|
|                        OWN|115|        214|               960|         17|             240|   22|           162|     47|    17|  78|  265|               2|            74|      20|     39|
|                   MORTGAGE|558|       1632|     

#### Covariance


In [13]:
# covariance
for i in numeric_df:
    for j in numeric_df:
        print('Covariance of ' + i + ' and '+ j )
        print(loanDF.stat.cov(i, j))
        print("")
    


Covariance of loan_amount and loan_amount
55229613.06533747

Covariance of loan_amount and interest_rate
8549.319317863159

Covariance of loan_amount and monthly_payment
1436013.2198559118

Covariance of loan_amount and annual_income
128790952.5486099

Covariance of loan_amount and deliquency_past_2years
-111.91643190399427

Covariance of loan_amount and total_payment
56278750.18753406

Covariance of loan_amount and revolving_balance
35052983.98744884

Covariance of loan_amount and revolving_utilization_rate
205.5056356256766

Covariance of interest_rate and loan_amount
8549.31931786316

Covariance of interest_rate and interest_rate
13.824694456679978

Covariance of interest_rate and monthly_payment
220.26613109833838

Covariance of interest_rate and annual_income
12508.30669074606

Covariance of interest_rate and deliquency_past_2years
0.27668654772190204

Covariance of interest_rate and total_payment
9885.688703886828

Covariance of interest_rate and revolving_balance
5542.1693831681

### ETL summary |  Spark code for Imputing 

Below we present the steps we decided to follow during our EDA in order to prepare our dataset for our machine learning implementation. It is worth mentioning that some of the steps had to be included in the previous part of the assignment in order to have a complete overview of the *cross table distributions* and the *covariances*.

1. **interest_rate** : remove special % character and change datatype from *string* to *decimal*.
2. **revol_util** : change datatype from *string* to *decimal*.
3. **Rename** columns to be better represented.
4. **Trim** the variable title as there are multiple unnecessary dots.
5. **Checkin for Na**: 
    1. Number of rows with NA for *loanDF*:    29755
    2. Number of rows without NA for *loanDF*: 27773
    3. Number of rows with NA for *testDF*:    10024
    4. Number of rows without NA for *testDF*: 9116
6. **Filling Na values**:
    1. **title** :                      unknown
    2. **loan_purpose** :               unknown
    3. **state** :                      unknown
    4. **deliquency_past_2years** :     -1
    5. **revolving_balance** :          13350.529071398512 (avg)
    6. **revolving_utilization_rate** : 0.5054 (avg)
    7. **total_payment** :              12143.791490200982 (avg)
7. **Drop** variables *title* and *state* cause they have too many unique values that cannot be grouped in order to apply one hot coding.


####  Imputing Null Values

In [14]:
loanDF = loanDF.na.fill({'title': 'uknown', 'loan_purpose': 'unknown', 'state': 'unknown',
                         'deliquency_past_2years':-1,'revolving_balance':13350.529071398512,
                         'revolving_utilization_rate':0.5054 , 'total_payment': 12143.791490200982})

testDF = testDF.na.fill({'title': 'unknown', 'loan_purpose': 'uknown', 'state': 'unknown',
                         'deliquency_past_2years':-1,'revolving_balance':13350.529071398512,
                         'revolving_utilization_rate':0.5054 , 'total_payment': 12143.791490200982})
print(loanDF.count())
print(testDF.count())

29755
10024


####  Scaling

In [15]:
# assembler_scale = VectorAssembler().setInputCols(numeric_df).setOutputCol('scaled_features')
# transformed = assembler_scale.transform(loanDF)
# scaler_loan_amount = MinMaxScaler(inputCol="loan_amount", outputCol="scaled_loan_amount")
# scalerModel_loan_amount = scaler_loan_amount.fit(transformed.select('scaled_features'))
# # scalerModel_loan_amount = scaler_loan_amount.fit(testDF)

# scaledData= scalerModel_loan_amount.transform(transformed)
# # testDF = scalerModel_loan_amount.transform(testDF)

# # #############################################################################################################
# # scaler_interest_rate = MinMaxScaler(inputCol="interest_rate", outputCol="scaled_interest_rate")
# # scalerModel_interest_rate = scaler_interest_rate.fit(loanDF)
# # scalerModel_interest_rate = scaler_interest_rate.fit(testDF)

# # scaledData_interest_rate = scalerModel_interest_rate.transform(loanDF)
# # scaledData_interest_rate = scalerModel_interest_rate.transform(testDF)

# # #############################################################################################################
# # scaler_annual_income = MinMaxScaler(inputCol="annual_income", outputCol="scaled_annual_income")
# # scalerModel_annual_income = scaler_annual_income.fit(loanDF)
# # scalerModel_annual_income = scaler_annual_income.fit(testDF)

# # scaledData_annual_income = scalerModel_annual_income.transform(loanDF)
# # scaledData_annual_income = scalerModel_annual_income.transform(testDF)

# # #############################################################################################################
# # scaler_revolving_balance = MinMaxScaler(inputCol="revolving_balance", outputCol="scaled_revolving_balance")
# # scalerModel_revolving_balance = scaler_revolving_balance.fit(loanDF)
# # scalerModel_revolving_balance = scaler_revolving_balance.fit(testDF)

# # scaledData_revolving_balance = scalerModel_revolving_balance.transform(loanDF)
# # scaledData_revolving_balance = scalerModel_revolving_balance.transform(testDF)

# # #############################################################################################################
# # scaler_total_payment = MinMaxScaler(inputCol="total_payment", outputCol="scaled_total_payment")
# # scalerModel_total_payment = scaler_total_payment.fit(loanDF)
# # scalerModel_total_payment = scaler_total_payment.fit(testDF)

# # scaledData_total_payment = scalerModel_total_payment.transform(loanDF)
# # scaledData_total_payment = scalerModel_total_payment.transform(testDF)

# # #############################################################################################################

# # print(loanDF.count())
# # print(testDF.count())

# loanDF.describe('loan_amount').show()

####  Double checking final clean dataset

Checking all our columns to verify that we have:
   * Distinct user id so we do not need to group by.
   * Clean data format.
   * Clean cell values, (for example no Na and nan, which mean the same thing) 

In [16]:
loanDF.describe('loan_amount','term','interest_rate','title','employment_time','home_ownership').show()
loanDF.describe('annual_income','loan_purpose','monthly_payment','state','deliquency_past_2years').show()
loanDF.describe('revolving_balance','revolving_utilization_rate','total_payment','loan_status').show()

+-------+------------------+----------+------------------+-----+---------------+--------------+
|summary|       loan_amount|      term|     interest_rate|title|employment_time|home_ownership|
+-------+------------------+----------+------------------+-----+---------------+--------------+
|  count|             29755|     29755|             29755|29755|          29755|         29755|
|   mean|11218.509494202655|      null|           12.0301| null|           null|          null|
| stddev| 7431.662873498601|      null|3.7181574007403158| null|           null|          null|
|    min|               500| 36 months|                 5|     |         1 year|      MORTGAGE|
|    max|             35000| 60 months|                25|    |            n/a|          RENT|
+-------+------------------+----------+------------------+-----+---------------+--------------+

+-------+----------------+------------+------------------+-------+----------------------+
|summary|   annual_income|loan_purpose|   mon

In [17]:
# we have unique id = number of rows of dataset, so we do not need to group by.
for i in loanDF.columns:
    print(loanDF.select(i).distinct().count())
    print(loanDF.select(i).distinct().show())


29755
+----+
|  id|
+----+
| 496|
| 833|
|1088|
|1238|
|1342|
|1580|
|1645|
|1829|
|1959|
|2122|
|2366|
|2866|
|3749|
|3918|
|4101|
|4519|
|4818|
|4900|
|4935|
|5156|
+----+
only showing top 20 rows

None
824
+-----------+
|loan_amount|
+-----------+
|       5300|
|      18800|
|       3175|
|       4900|
|       9900|
|      21700|
|      11500|
|      16500|
|      19200|
|      14075|
|       3475|
|       3000|
|       7850|
|      11800|
|       6825|
|      11025|
|       1650|
|      26375|
|      15575|
|       8650|
+-----------+
only showing top 20 rows

None
2
+----------+
|      term|
+----------+
| 36 months|
| 60 months|
+----------+

None
21
+-------------+
|interest_rate|
+-------------+
|           19|
|           22|
|            7|
|           25|
|            6|
|            9|
|           17|
|            5|
|           10|
|           12|
|            8|
|           11|
|           13|
|           18|
|           14|
|           21|
|           15|
|           23|

At this stage we check the final structure of the table and we also decide to drop the **title** and **state** feature as they have so many different values, which makes it difficult to use in our model.

In [18]:
loanDF = loanDF.drop("title", 'state','id')
testDF = testDF.drop("title", 'state','id')
loanDF.printSchema()
testDF.printSchema()

root
 |-- loan_amount: integer (nullable = true)
 |-- term: string (nullable = true)
 |-- interest_rate: decimal(10,0) (nullable = true)
 |-- monthly_payment: double (nullable = true)
 |-- employment_time: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: double (nullable = true)
 |-- loan_purpose: string (nullable = false)
 |-- deliquency_past_2years: integer (nullable = false)
 |-- revolving_balance: integer (nullable = true)
 |-- revolving_utilization_rate: decimal(10,0) (nullable = true)
 |-- total_payment: double (nullable = false)
 |-- loan_status: integer (nullable = true)

root
 |-- loan_amount: integer (nullable = true)
 |-- term: string (nullable = true)
 |-- interest_rate: decimal(10,0) (nullable = true)
 |-- monthly_payment: double (nullable = true)
 |-- employment_time: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: double (nullable = true)
 |-- loan_purpose: string (nullable = false)
 |--

### Create Pipeline | Split train/test

In [19]:
loanDF.printSchema()
testDF.printSchema()

root
 |-- loan_amount: integer (nullable = true)
 |-- term: string (nullable = true)
 |-- interest_rate: decimal(10,0) (nullable = true)
 |-- monthly_payment: double (nullable = true)
 |-- employment_time: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: double (nullable = true)
 |-- loan_purpose: string (nullable = false)
 |-- deliquency_past_2years: integer (nullable = false)
 |-- revolving_balance: integer (nullable = true)
 |-- revolving_utilization_rate: decimal(10,0) (nullable = true)
 |-- total_payment: double (nullable = false)
 |-- loan_status: integer (nullable = true)

root
 |-- loan_amount: integer (nullable = true)
 |-- term: string (nullable = true)
 |-- interest_rate: decimal(10,0) (nullable = true)
 |-- monthly_payment: double (nullable = true)
 |-- employment_time: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: double (nullable = true)
 |-- loan_purpose: string (nullable = false)
 |--

At this stage we make the following adjustments:

1. We keep the **total_amount** column and we discard the **monthly payment**, because of high correlation between the 2. The reason for keeipng the total amount is based more on a business perspective.
2. We keep the **interest_rate** column and we discard the **revolving_utilization_rate**, as they are totally correlated.

In [20]:
categoricalColumns = ['term', 'employment_time', 'home_ownership', 'loan_purpose']
stages = []

for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
    
label_stringIdx = StringIndexer(inputCol = 'loan_status', outputCol = 'label')

stages += [label_stringIdx]

numericCols = ['loan_amount', 'interest_rate', 'annual_income', 'deliquency_past_2years', 
               'revolving_balance','total_payment']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

stages += [assembler]



Now it is time for use to define the pipeline we are going to use for all the models that are about to be tested including:
1. **Logistic Regression**
2. **Random Forest**
3. **Gradient Tree Boosting**


In [21]:
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(loanDF)
loanDF = pipelineModel.transform(loanDF)
selectedCols = ['features' , 'label']
loanDF = loanDF.select(selectedCols)
loanDF.printSchema()

root
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)



In [22]:
pd.DataFrame(loanDF.take(5), columns=loanDF.columns).transpose()

Unnamed: 0,0,1,2,3,4
features,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
label,1,0,0,0,0


In [23]:
(trainingData, validationData) = loanDF.randomSplit([0.7, 0.3], seed=100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(validationData.count()))

Training Dataset Count: 20882
Test Dataset Count: 8873


### Logistic Regresion Model

#### Write a function "metrics" which has a LogisticRegressionModel.summary as input attribute and produces an output of: 
1. Area under ROC
2. False Positive Rate By Label
3. True Positive Rate By Label
4. Precision By Label
5. Recall By Label
6. fMeasure By Label
7. Accuracy
8. False Positive Rate
9. True Positive Rate
10. fMeasure
11. Precision
12. Recall

In [24]:
def metrics(trainingSummary):  
    my_formatted_list_FP = [ '%.2f' % elem for elem in trainingSummary.truePositiveRateByLabel]
    my_formatted_list_TP = [ '%.2f' % elem for elem in trainingSummary.precisionByLabel]
    my_formatted_list_P = [ '%.2f' % elem for elem in trainingSummary.falsePositiveRateByLabel]
    my_formatted_list_R = [ '%.2f' % elem for elem in trainingSummary.recallByLabel]
    
    print("AUC: " + str("%.2f" % trainingSummary.areaUnderROC))
    print('')
    print("False Positive Rate by Label: " + str(my_formatted_list_FP))
    print('')
    print("True Positive Rate by Label: " + str(my_formatted_list_P))
    print('')
    print("Precision by Label: " + str(my_formatted_list_TP))
    print('')
    print("Recall by Label: " + str(my_formatted_list_R))
    print('')
    print("fMeasure by Label: " + str(trainingSummary.fMeasureByLabel))
    print('')
    print("Accuracy: " + str("%.2f" %trainingSummary.accuracy))
    print('')
    print("False Positive Rate: " + str("%.2f" %trainingSummary.weightedFalsePositiveRate))
    print('')
    print("True Positive Rate: " + str("%.2f" %trainingSummary.weightedTruePositiveRate)) 
    print('')
    print("fMeasure: " + str(trainingSummary.weightedFMeasure))
    print('')
    print("Precision: " + str("%.2f" %trainingSummary.weightedPrecision))
    print('')
    print("Recall: " + str("%.2f" %trainingSummary.weightedRecall))

#### Apply a Logistic Regresion Base Model and show the metrics by the function above

In [25]:
evaluator = BinaryClassificationEvaluator() \
                .setLabelCol("label") \
                .setRawPredictionCol("rawPrediction")

print("We are using metric: " + evaluator.getMetricName())

We are using metric: areaUnderROC


In [26]:
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(trainingData)
trainingSummary = lrModel.summary
metrics(trainingSummary)

AUC: 0.90

False Positive Rate by Label: ['0.99', '0.52']

True Positive Rate by Label: ['0.48', '0.01']

Precision by Label: ['0.92', '0.88']

Recall by Label: ['0.99', '0.52']

fMeasure by Label: <bound method LogisticRegressionSummary.fMeasureByLabel of <pyspark.ml.classification.BinaryLogisticRegressionTrainingSummary object at 0x11c6aec18>>

Accuracy: 0.92

False Positive Rate: 0.42

True Positive Rate: 0.92

fMeasure: <bound method LogisticRegressionSummary.weightedFMeasure of <pyspark.ml.classification.BinaryLogisticRegressionTrainingSummary object at 0x11c6aec18>>

Precision: 0.92

Recall: 0.92


In [27]:
predictions_lr = lrModel.transform(validationData)
predictions_lr.select('rawPrediction', 'prediction', 'probability').show(10)

+--------------------+----------+--------------------+
|       rawPrediction|prediction|         probability|
+--------------------+----------+--------------------+
|[1.67531490572297...|       0.0|[0.84228315027341...|
|[0.89174508484855...|       0.0|[0.70925016549909...|
|[3.12213984377515...|       0.0|[0.95779680987968...|
|[1.99896352972117...|       0.0|[0.88068821228925...|
|[1.86675170595399...|       0.0|[0.86608197517861...|
|[2.03352338506302...|       0.0|[0.88427213176162...|
|[2.02128702119117...|       0.0|[0.88301402429537...|
|[1.67041077700678...|       0.0|[0.84163058070899...|
|[2.80277319756258...|       0.0|[0.94282549862876...|
|[1.76208839570405...|       0.0|[0.85347102412012...|
+--------------------+----------+--------------------+
only showing top 10 rows



In [28]:
print('Test Area Under ROC','%.4f' %  evaluator.evaluate(predictions_lr))

Test Area Under ROC 0.8993


#### We are going to try to improve our model:
1. Using a `weight column` in our Logistic Regression Model (Take into account we are working with a unbalanced dataset)
2. Define a `ParamGridBuilder` with `regParam`, `elasticNetParam` and `maxIter` at least
3. Define an `BinaryClassificationEvaluator`
4. Using Cross Validation with a 5-fold `CrossValidator`

Questions to answer:
1. Have we improved the ROC-AUC?
2. Which are the average ROC-AUC measurements in the different cross validation runs?
3. Which are the parameters of the best model in the 5 k-fold runs?
4. Which are the metrics of the best model (training) in the 5 k-fold runs? (Use the function above)
5. Which is the ROC-AUC on validation dataset?


In [29]:
evaluator = BinaryClassificationEvaluator() \
                .setLabelCol("label") \
                .setRawPredictionCol("rawPrediction")

print("We are using metric: " + evaluator.getMetricName())

We are using metric: areaUnderROC


By setting the **paramGrid_lr** variable we define the set of different parameters we would like to test in order to tune our algorithm.

In [30]:
paramGrid_lr = ParamGridBuilder() \
                .addGrid(lr.regParam, [0.1, 0.5, 2.0]) \
                .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
                .addGrid(lr.maxIter, [1, 5, 10])\
                .build()

print("Param Grid: " + str(paramGrid_lr))


cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid_lr, evaluator=evaluator, numFolds=5)
cv_Model = cv.fit(trainingData)
predictions_cv = cv_Model.transform(validationData)

Param Grid: [{Param(parent='LogisticRegression_409084c8f7e4fc0950c0', name='regParam', doc='regularization parameter (>= 0).'): 0.1, Param(parent='LogisticRegression_409084c8f7e4fc0950c0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0, Param(parent='LogisticRegression_409084c8f7e4fc0950c0', name='maxIter', doc='max number of iterations (>= 0).'): 1}, {Param(parent='LogisticRegression_409084c8f7e4fc0950c0', name='regParam', doc='regularization parameter (>= 0).'): 0.1, Param(parent='LogisticRegression_409084c8f7e4fc0950c0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0, Param(parent='LogisticRegression_409084c8f7e4fc0950c0', name='maxIter', doc='max number of iterations (>= 0).'): 5}, {Param(parent='LogisticRegression_409084c8f7e4fc0950c0', name='re

At this point we apply a **Cross Validation** strategy.

In [31]:
# Before we had 0.6822 and now
print("AUC: " + str('%.2f' % evaluator.evaluate(predictions_cv)))

# Means of model accuracy
my_formatted_list_0 = [ '%.4f' % elem for elem in cv_Model.avgMetrics]
print("Means of metrics: " + str(my_formatted_list_0))

AUC: 0.84
Means of metrics: ['0.7870', '0.8385', '0.8440', '0.7960', '0.8012', '0.8013', '0.5000', '0.5000', '0.5000', '0.7870', '0.8071', '0.8055', '0.5000', '0.5000', '0.5000', '0.5000', '0.5000', '0.5000', '0.7870', '0.7939', '0.7923', '0.5000', '0.5000', '0.5000', '0.5000', '0.5000', '0.5000']


### 4. Random Forest Model
1. Define a `ParamGridBuilder` with `maxDepth`, `numTrees` and `maxIter` at least
2. Define an `BinaryClassificationEvaluator` (You can use the above one)
3. Using Cross Validation with a 5-fold `CrossValidator`

Questions to answer:

1. Have we improved the ROC-AUC?
2. Which are the average ROC-AUC measurements in the different cross validation runs?
3. Which are the parameters of the best model in the 5 k-fold runs?
4. Which is the importance of the features?
5. Print full description of model.
6. Which is the ROC-AUC on validation dataset?

In [32]:
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')

rfModel = rf.fit(trainingData)

predictions = rfModel.transform(validationData)

predictions.select('rawPrediction', 'prediction', 'probability').show(10)

+--------------------+----------+--------------------+
|       rawPrediction|prediction|         probability|
+--------------------+----------+--------------------+
|[17.5036492929811...|       0.0|[0.87518246464905...|
|[15.9001453105630...|       0.0|[0.79500726552815...|
|[18.5729831633053...|       0.0|[0.92864915816526...|
|[17.5648713232494...|       0.0|[0.87824356616247...|
|[17.8065920678000...|       0.0|[0.89032960339000...|
|[17.6247963053591...|       0.0|[0.88123981526795...|
|[17.7153718041939...|       0.0|[0.88576859020969...|
|[17.7893924023610...|       0.0|[0.88946962011805...|
|[18.0690399272640...|       0.0|[0.90345199636320...|
|[17.7307207284763...|       0.0|[0.88653603642381...|
+--------------------+----------+--------------------+
only showing top 10 rows



In [33]:
print('Test Area Under ROC','%.4f' %  evaluator.evaluate(predictions))

Test Area Under ROC 0.8418


By setting the **paramGrid_rf** variable we define the set of different parameters we would like to test in order to tune our algorithm.

In [34]:
paramGrid_rf = ParamGridBuilder() \
                .addGrid(rf.maxDepth, [10,20]) \
                .addGrid(rf.numTrees, [10,20]) \
                .build()

print("Param Grid: " + str(paramGrid_rf))


cv_rf = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid_rf, evaluator=evaluator, numFolds=5)
cv_Model_rf = cv_rf.fit(trainingData)
predictions_cv_rf = cv_Model_rf.transform(validationData)

Param Grid: [{Param(parent='RandomForestClassifier_4f3c859fd37a5bb0fee0', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 10, Param(parent='RandomForestClassifier_4f3c859fd37a5bb0fee0', name='numTrees', doc='Number of trees to train (>= 1).'): 10}, {Param(parent='RandomForestClassifier_4f3c859fd37a5bb0fee0', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 10, Param(parent='RandomForestClassifier_4f3c859fd37a5bb0fee0', name='numTrees', doc='Number of trees to train (>= 1).'): 20}, {Param(parent='RandomForestClassifier_4f3c859fd37a5bb0fee0', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 20, Param(parent='RandomForestClassifier_4f3c859fd37a5bb0fee0', name='numTrees', doc='Number of trees to train (>= 1).'): 10}, {Param(pa

At this point we apply a **Cross Validation** strategy.

In [35]:
# Before we had 0.6822 and now
print("AUC: " + str('%.2f' % evaluator.evaluate(predictions_cv_rf)))

# Means of model accuracy
my_formatted_list_1 = [ '%.4f' % elem for elem in cv_Model_rf.avgMetrics]
print("Means of metrics: " + str(my_formatted_list_1))

AUC: 0.92
Means of metrics: ['0.8819', '0.8940', '0.8969', '0.9139']


In [36]:
#Best LR Model
best_rf = cv_Model_rf.bestModel
best_rf

RandomForestClassificationModel (uid=RandomForestClassifier_4f3c859fd37a5bb0fee0) with 20 trees

### 5. Gradient Boosting Model
1. Defining a `ParamGridBuilder` with `maxDepth`, `numTrees` and `maxIter` at least (You can use the above one)
2. Define an `BinaryClassificationEvaluator` (You can use the above one)
3. Using Cross Validation with a 5-fold `CrossValidator`

Questions to answer:

1. Have we improved the ROC-AUC?
2. Which are the average ROC-AUC measurements in the different cross validation runs?
3. Which are the parameters of the best model in the 5 k-fold runs?
4. Which is the importance of the features?
5. Print full description of model.
6. Which is the ROC-AUC on validation dataset?

In [37]:
gbt = GBTClassifier(maxIter=10)

gbtModel = gbt.fit(trainingData)

predictions_gbt = gbtModel.transform(validationData)

predictions_gbt.select('rawPrediction', 'prediction', 'probability').show(10)

+--------------------+----------+--------------------+
|       rawPrediction|prediction|         probability|
+--------------------+----------+--------------------+
|[1.21978536359241...|       0.0|[0.91979542530482...|
|[0.68210981968341...|       0.0|[0.79644464384408...|
|[1.21377218285156...|       0.0|[0.91890372787360...|
|[1.18802767301666...|       0.0|[0.91498308530803...|
|[1.09383018413349...|       0.0|[0.89913592208174...|
|[1.08212367091458...|       0.0|[0.89699265172697...|
|[1.08212367091458...|       0.0|[0.89699265172697...|
|[0.96125805639737...|       0.0|[0.87241875031202...|
|[1.22489228877388...|       0.0|[0.92054569596139...|
|[0.92417951653237...|       0.0|[0.86393431953011...|
+--------------------+----------+--------------------+
only showing top 10 rows



In [38]:
print('Test Area Under ROC','%.4f' %  evaluator.evaluate(predictions_gbt))

Test Area Under ROC 0.9339


By setting the **paramGrid_gbt** variable we define the set of different parameters we would like to test in order to tune our algorithm.

In [39]:
paramGrid_gbt = ParamGridBuilder() \
                    .addGrid(GBTClassifier.maxDepth, [2,5])\
                    .addGrid(GBTClassifier.maxIter, [10,20])\
                    .build()

print("Param Grid: " + str(paramGrid_gbt))


cv_gbt = CrossValidator(estimator = gbt, estimatorParamMaps = paramGrid_gbt, evaluator = evaluator, numFolds=5)
cv_Model_gbt = cv_gbt.fit(trainingData)
predictions_cv_gbt = cv_Model_gbt.transform(validationData)

Param Grid: [{Param(parent='undefined', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 2, Param(parent='undefined', name='maxIter', doc='max number of iterations (>= 0).'): 10}, {Param(parent='undefined', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 2, Param(parent='undefined', name='maxIter', doc='max number of iterations (>= 0).'): 20}, {Param(parent='undefined', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 5, Param(parent='undefined', name='maxIter', doc='max number of iterations (>= 0).'): 10}, {Param(parent='undefined', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 5, Param(parent='undefined', name='maxIter', doc='max 

At this point we apply a **Cross Validation** strategy.

In [40]:
# Before we had 0.6822 and now
print("AUC: " + str('%.2f' % evaluator.evaluate(predictions_cv_gbt)))

# Means of model accuracy
my_formatted_list_2 = [ '%.4f' % elem for elem in cv_Model_gbt.avgMetrics]
print("Means of metrics: " + str(my_formatted_list_2))

AUC: 0.93
Means of metrics: ['0.9249', '0.9249', '0.9249', '0.9249']


In [41]:
# Best GBT Model
best_gbt_model = cv_Model_gbt.bestModel
best_gbt_model

GBTClassificationModel (uid=GBTClassifier_48c5ae11d972d687d0e9) with 10 trees

### Apply your best model to send the predictions on test

In [42]:
pipeline = Pipeline(stages = stages)
pipelineModel_final = pipeline.fit(testDF)
testDF = pipelineModel_final.transform(testDF)
selectedCols_final = testDF.columns
testDF = testDF.select(selectedCols_final)
testDF.printSchema()

root
 |-- loan_amount: integer (nullable = true)
 |-- term: string (nullable = true)
 |-- interest_rate: decimal(10,0) (nullable = true)
 |-- monthly_payment: double (nullable = true)
 |-- employment_time: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: double (nullable = true)
 |-- loan_purpose: string (nullable = false)
 |-- deliquency_past_2years: integer (nullable = false)
 |-- revolving_balance: integer (nullable = true)
 |-- revolving_utilization_rate: decimal(10,0) (nullable = true)
 |-- total_payment: double (nullable = false)
 |-- loan_status: integer (nullable = true)
 |-- termIndex: double (nullable = false)
 |-- termclassVec: vector (nullable = true)
 |-- employment_timeIndex: double (nullable = false)
 |-- employment_timeclassVec: vector (nullable = true)
 |-- home_ownershipIndex: double (nullable = false)
 |-- home_ownershipclassVec: vector (nullable = true)
 |-- loan_purposeIndex: double (nullable = false)
 |-- loan_purposeclass

In [43]:
gbt = GBTClassifier(maxIter=10)

gbtModel_final = gbt.fit(loanDF)

predictions_final = gbtModel_final.transform(testDF)


In [44]:
predictions_final.toPandas()

Py4JJavaError: An error occurred while calling o39894.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6824.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6824.0 (TID 13568, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (vector) => vector)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IndexOutOfBoundsException: 35 not in [0,34)
	at breeze.linalg.SparseVector$mcD$sp.apply$mcD$sp(SparseVector.scala:74)
	at breeze.linalg.SparseVector$mcD$sp.apply(SparseVector.scala:73)
	at breeze.linalg.SparseVector$mcD$sp.apply(SparseVector.scala:49)
	at breeze.linalg.TensorLike$class.apply$mcID$sp(Tensor.scala:107)
	at breeze.linalg.SparseVector.apply$mcID$sp(SparseVector.scala:49)
	at org.apache.spark.ml.linalg.Vector$class.apply(Vectors.scala:102)
	at org.apache.spark.ml.linalg.SparseVector.apply(Vectors.scala:561)
	at org.apache.spark.ml.tree.ContinuousSplit.shouldGoLeft(Split.scala:161)
	at org.apache.spark.ml.tree.InternalNode.predictImpl(Node.scala:171)
	at org.apache.spark.ml.classification.GBTClassificationModel$$anonfun$5.apply(GBTClassifier.scala:325)
	at org.apache.spark.ml.classification.GBTClassificationModel$$anonfun$5.apply(GBTClassifier.scala:325)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
	at org.apache.spark.ml.classification.GBTClassificationModel.margin(GBTClassifier.scala:325)
	at org.apache.spark.ml.classification.GBTClassificationModel.predictRaw(GBTClassifier.scala:280)
	at org.apache.spark.ml.classification.GBTClassificationModel.predictRaw(GBTClassifier.scala:212)
	at org.apache.spark.ml.classification.ProbabilisticClassificationModel$$anonfun$1.apply(ProbabilisticClassifier.scala:117)
	at org.apache.spark.ml.classification.ProbabilisticClassificationModel$$anonfun$1.apply(ProbabilisticClassifier.scala:116)
	... 16 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:297)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3200)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3197)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3197)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (vector) => vector)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.lang.IndexOutOfBoundsException: 35 not in [0,34)
	at breeze.linalg.SparseVector$mcD$sp.apply$mcD$sp(SparseVector.scala:74)
	at breeze.linalg.SparseVector$mcD$sp.apply(SparseVector.scala:73)
	at breeze.linalg.SparseVector$mcD$sp.apply(SparseVector.scala:49)
	at breeze.linalg.TensorLike$class.apply$mcID$sp(Tensor.scala:107)
	at breeze.linalg.SparseVector.apply$mcID$sp(SparseVector.scala:49)
	at org.apache.spark.ml.linalg.Vector$class.apply(Vectors.scala:102)
	at org.apache.spark.ml.linalg.SparseVector.apply(Vectors.scala:561)
	at org.apache.spark.ml.tree.ContinuousSplit.shouldGoLeft(Split.scala:161)
	at org.apache.spark.ml.tree.InternalNode.predictImpl(Node.scala:171)
	at org.apache.spark.ml.classification.GBTClassificationModel$$anonfun$5.apply(GBTClassifier.scala:325)
	at org.apache.spark.ml.classification.GBTClassificationModel$$anonfun$5.apply(GBTClassifier.scala:325)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
	at org.apache.spark.ml.classification.GBTClassificationModel.margin(GBTClassifier.scala:325)
	at org.apache.spark.ml.classification.GBTClassificationModel.predictRaw(GBTClassifier.scala:280)
	at org.apache.spark.ml.classification.GBTClassificationModel.predictRaw(GBTClassifier.scala:212)
	at org.apache.spark.ml.classification.ProbabilisticClassificationModel$$anonfun$1.apply(ProbabilisticClassifier.scala:117)
	at org.apache.spark.ml.classification.ProbabilisticClassificationModel$$anonfun$1.apply(ProbabilisticClassifier.scala:116)
	... 16 more


In [None]:
MySparkContext.stop()