In [1]:
#Part 1, 2 and 3 of Assignment

In [2]:
spark

In [3]:
#Read file from csv in the Table
df = spark.read.csv("/FileStore/tables/CleanedLoanData.csv",
                      inferSchema="true", header="true")

In [4]:
#Check for datatype for anamalies
df.dtypes

In [5]:
#check the table 
display(df.select('*'))

In [6]:
#Q1 : What are the different type of grades a person can have?
df.select('grade').distinct().show()

In [7]:
#Q2 : How many count of grades?
display(df.select('grade').groupBy('grade').count().orderBy("count", ascending = False))

In [8]:
#Q3 : Find interest rate of each issue year
display(df.select('issue_year','int_rate').groupBy('issue_year').max("int_rate"). orderBy("issue_year", ascending = True))

In [9]:
#Q4 : What is the total current balance for each of the purpose of taking loans ?
display(df.select("purpose", 'tot_cur_bal').groupBy('purpose').sum("tot_cur_bal"). orderBy("purpose"))

In [10]:
#Q5 : What is the highest annual income of the customer who lent money for each termtotal current balance for each of the purpose of taking loans ?
display(df.select("term", 'annual_inc').groupBy('term').max("annual_inc"))

In [11]:
#Q6 what is the average annual income of people from each address code?
display(df.select("addr_state", 'annual_inc').groupBy('addr_state').avg("annual_inc"))

In [12]:
#Q7 what is the count of applicants with purpose?
display(df.select('purpose').groupBy('purpose').count().orderBy("count", ascending = True))

In [13]:
#Q8 What is the minimum loan amount for each each loan status?
display(df.select("loan_status", 'loan_amnt').groupBy('loan_status').min("loan_amnt"))

In [14]:
#Q9 what is the average risk score for Direct payment?
display(df.select("disbursement_method", 'Risk_Score').groupBy('disbursement_method').avg("Risk_Score"))

In [15]:
#Q10 When was the lowest interest rate for each year?
display(df.select("int_rate", "issue_year").groupBy('issue_year').min("int_rate"))

In [16]:
#Use features for the project drilled down by feature engineering
model_data = df.select('loan_amnt','term', 'emp_length', 'home_ownership', 'annual_inc','verification_status','delinq_2yrs','Risk_Score','inq_last_6mths','open_acc','revol_bal','revol_util','total_acc','mths_since_last_major_derog','funded_amnt_inv','installment','pub_rec','dti','addr_state', 'int_rate')
#Divide data in test and train with random split of 90-10 train to test
train, test = model_data.randomSplit([0.9, 0.1], seed=12345)

In [17]:
#View how train data looks
display(train)

In [18]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer
featuresCols = model_data.columns

# This concatenates all feature columns into a single feature vector in a new column "rawFeatures".
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")
# This identifies categorical features and indexes them.
vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features")


In [19]:
from pyspark.ml.regression import GBTRegressor
# Takes the "features" column and learns to predict "cnt"
gbt_reg = GBTRegressor(labelCol="int_rate")

In [20]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
# Define a grid of hyperparameters to test:
#  - maxDepth: max depth of each decision tree in the GBT ensemble
#  - maxIter: iterations, i.e., number of trees in each GBT ensemble
# In this example notebook, we keep these values small.  In practice, to get the highest accuracy, you would likely want to try deeper trees (10 or higher) and more trees in the ensemble (>100).
paramGrid = ParamGridBuilder()\
  .addGrid(gbt_reg.maxDepth, [2, 5])\
  .addGrid(gbt_reg.maxIter, [10, 100])\
  .build()
# We define an evaluation metric.  This tells CrossValidator how well we are doing by comparing the true labels with predictions.
evaluator = RegressionEvaluator(metricName="rmse", labelCol=gbt_reg.getLabelCol(), predictionCol=gbt_reg.getPredictionCol())
# Declare the CrossValidator, which runs model tuning for us.
cv = CrossValidator(estimator=gbt_reg, evaluator=evaluator, estimatorParamMaps=paramGrid)

In [21]:
#Put pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, cv])

In [22]:
#Train model
pipelineModel = pipeline.fit(train)

In [23]:
#Predict data on test
predictions = pipelineModel.transform(test)

In [24]:
#View the predictions
display(predictions.select("int_rate", "prediction", *featuresCols))

In [25]:
#Check accuracy
rmse = evaluator.evaluate(predictions)
print "RMSE on our test set: %g" % rmse

In [26]:
#View Output
display(predictions.select("loan_amnt", "prediction"))