#Give me some credit predictive model
## Data set from:
- https://www.kaggle.com/c/GiveMeSomeCredit 

raul.arrabales - Apr. 17

In [2]:
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
# sqlContext = SQLContext(sc)
# sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())

In [3]:
dbutils.fs.ls('/FileStore/tables/')

Dataset:
cs-training.csv available as: dbfs:/FileStore/tables/ca964k591492704726010

In [5]:
# Load de CSV data into a RDD and count (150.000 records expected)
training_set_csv = sc.textFile("dbfs:/FileStore/tables/ca964k591492704726010")
training_set_csv.count()

In [6]:
# Explore a few lines
training_set_csv.take(5)

In [7]:
# Get header with field names
header = training_set_csv.first()
header

In [8]:
# Remove CSV header from the RDD
creditHeader = training_set_csv.filter(lambda l: "SeriousDlqin2yrs" in l)
creditHeader.count()

In [9]:
# After removing the header we should have 150.000 rows
training_set_csv = training_set_csv.subtract(creditHeader)
training_set_csv.count()

In [10]:
# Fields in the training set
fields = [StructField(field_name, StringType(), True) for field_name in header.split(',')]
fields

In [11]:
# Number of fields in training set
len(fields)

In [12]:
# Amend field data types
fields[0].name = "_id" #  = StructField( "_id", LongType(), True)
fields[0].dataType = LongType()

fields[1] =StructField(
  "label", DoubleType(), False,
  {"ml_attr": {
      "name": "label",
      "type": "nominal", 
      "vals": ["0.0", "1.0"]
    }}
)

fields[2].dataType = FloatType()
fields[3].dataType = IntegerType()
fields[4].dataType = IntegerType()
fields[5].dataType = FloatType()
fields[6].dataType = FloatType()
fields[7].dataType = IntegerType()
fields[8].dataType = IntegerType()
fields[9].dataType = IntegerType()
fields[10].dataType = IntegerType()
fields[11].dataType = IntegerType()
fields

In [13]:
# See how's the RDD format now, there're some NA values
training_set_csv.take(4)

In [14]:
rowsWithNA = training_set_csv.filter(lambda l: "NA" in l)
rowsWithNA.count()

In [15]:
training_set_csv = training_set_csv.subtract(rowsWithNA)
training_set_csv.count()

<pre>
Variable Name		                        Type
SeriousDlqin2yrs	 	                    Y/N
RevolvingUtilizationOfUnsecuredLines		percentage
age		                                    integer
NumberOfTime30-59DaysPastDueNotWorse	    integer
DebtRatio	                                percentage
MonthlyIncome		                        real
NumberOfOpenCreditLinesAndLoans	            integer
NumberOfTimes90DaysLate		                integer
NumberRealEstateLoansOrLines                integer
NumberOfTime60-89DaysPastDueNotWorse		integer
NumberOfDependents	                        integer
</pre>

In [17]:
temp = training_set_csv.map(lambda k: k.split(",")).map(lambda p: (long(p[0]), float(p[1]), float(p[2]), int(p[3]), int(p[4]) , float(p[5]), float(p[6]) , int(p[7]), int(p[8]), int(p[9]), int(p[10]), int(p[11])))

In [18]:
temp.take(4)

In [19]:
# Schema for the table
schema = StructType(fields)

In [20]:
# Create the dataframe
training_df = sqlContext.createDataFrame(temp,schema)
training_df.count()

In [21]:
training_df.take(4)

In [22]:
# Check dataframe types
training_df.dtypes

In [23]:
# Check dataframe schema
training_df.printSchema()

In [24]:
# Missing values?
training_df.filter(training_df.NumberRealEstateLoansOrLines == '').count()

In [25]:
# Checking number of delinquents
training_df.groupBy("SeriousDlqin2yrs").count().show()

In [26]:
# Register a temp table to be used with the SQL API
training_df.registerTempTable("traindata")

In [27]:
# Let's change some column names
training_df = training_df.withColumnRenamed('NumberOfTime60-89DaysPastDueNotWorse', 'Num6089').withColumnRenamed('NumberRealEstateLoansOrLines', 'NumLoans').withColumnRenamed('RevolvingUtilizationOfUnsecuredLines','Revolving').withColumnRenamed('NumberOfTime30-59DaysPastDueNotWorse','Num3059').withColumnRenamed('SeriousDlqin2yrs', 'label')

In [28]:
training_df.describe()

In [29]:
# Using a selection from the Spark Dataframe as a pandas dataframe
import pandas as pd
matureDelinquents = training_df.filter("label = 1 and age > 50").toPandas()

In [30]:
matureDelinquents.head()

In [31]:
# Using Spark ML (not MLLIB) for building a Random Forest model
# from pyspark.mllib.tree import RandomForest, RandomForestModel
# from pyspark.mllib.util import MLUtils
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [32]:
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(training_df)
labelIndexer.labels

In [33]:
plan_indexer = StringIndexer(inputCol = 'intl_plan', outputCol = 'intl_plan_indexed')

In [34]:
# Create the feature column
assembler = VectorAssembler(
  inputCols = ['Revolving', 'age', 'Num3059', 'DebtRatio', 'MonthlyIncome', 'NumberOfOpenCreditLinesAndLoans', 'NumberOfTimes90DaysLate', 'NumLoans', 'Num6089', 'NumberOfDependents'],
  outputCol = 'features')

In [35]:
# Training the model
classifier = RandomForestClassifier(labelCol = 'label', featuresCol = 'features')
pipeline = Pipeline(stages=[labelIndexer, assembler, classifier])
model = pipeline.fit(training_df)

In [36]:
# Evaluating the model
# Need to do that over a test set, not the same training set. 
from pyspark.ml.evaluation import BinaryClassificationEvaluator
 
predictions = model.transform(training_df)
evaluator = BinaryClassificationEvaluator()
auroc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})

In [37]:
predictions.take(3)

In [38]:
auroc