## Intro to Spark SQL

In [6]:
#If you are starting from this notebook, please ensure you have uncommented and run:
#If you need help running the command, please visit notebook "1_CML_Session_Basics.ipynb"
#!pip3 install -r requirements.txt

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [8]:
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
import seaborn as sns

In [9]:
spark = SparkSession\
    .builder\
    .appName("IntroToSparkSQL")\
    .config("spark.hadoop.fs.s3a.s3guard.ddb.region", os.environ["REGION"])\
    .config("spark.yarn.access.hadoopFileSystems", os.environ["STORAGE"])\
    .getOrCreate()
#.config("spark.hadoop.fs.s3a.s3guard.ddb.region","us-east-2")\

KeyError: 'REGION'

In [9]:
df = spark.read.csv('data/LoanStats_2015_subset.csv')

AnalysisException: 'Path does not exist: file:/home/cdsw/data/LoanStats_2015_subset.csv;'

In [None]:
#Printing number of rows and columns:
print('Dataframe Shape')
print((df.count(), len(df.columns)))

#### Basic Inspection

In [None]:
#Count number of nulls for each column:
df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas()

In [None]:
#It seems like some columns have a lot of nulls while others have very few:
nulls = df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas()

In [None]:
null_cols = nulls.T[(nulls.T > 1000).any(axis=1)].index

In [None]:
df = df.drop(*null_cols)

In [None]:
#Printing number of rows:
print('Dataframe Shape')
print((df.count(), len(df.columns)))

In [None]:
df = df.filter(df.loan_status != '10500')

In [None]:
df.take(2)

## KPI Reporting

### What is the target variable and what does it define?

In [None]:
df.select("loan_status").distinct().show()

In [None]:
#Types of loan status
print(df.groupBy('loan_status').count().show())

In [None]:
df_plot = df.groupBy('loan_status').count().toPandas()
plt.figure(figsize=(8,3))
g = sns.barplot(x="loan_status", y="count", data=df_plot)
g.set_title('Loan Status Category Counts')
plt.show()

#### To predict defaults, we need to transform the target variable into a binary variable

In [None]:
from pyspark.sql.functions import when

In [None]:
df = df.withColumn("is_default", when((df["loan_status"] == "Charged Off")|(df["loan_status"] == "Default"), 1).otherwise(0))

In [None]:
#Checking that we have correctly replaced values
df.select("is_default").show()

In [None]:
df.select("is_default").dtypes

In [None]:
from pyspark.sql import functions as F

In [None]:
#Check the exact total of all loans labeled to default matches with the sum of the original two values used above (Charged Off and Default)
df.select(F.sum("is_default")).collect()[0][0]

### What is the monthly total loan volume in dollars and what is the monthly average loan size?

In [None]:
from pyspark.sql.functions import to_date

In [None]:
#The original issue date attribute
df.select("issue_d").show(4)

In [None]:
#We need to cast the issue date from string to month (all loan applications in the dataset occurred in 2015 so we don't need the year):
df.selectExpr("from_unixtime(unix_timestamp(issue_d,'MMM-yyyy'),'MM') as issue_month").show(4)

In [None]:
df = df.withColumn("issue_month",F.from_unixtime(F.unix_timestamp(F.col("issue_d"),'MMM-yyyy'),'MM'))

In [None]:
df.select("issue_month").distinct().show()

In [None]:
#how many loans defaulted for each month (all data is 2015):
df.groupby('issue_month').sum('is_default').na.drop().sort(F.asc('issue_month')).show()

In [None]:
defaults_date = df.groupby('issue_month').sum('is_default').na.drop().sort(F.asc('issue_month')).toPandas()

In [None]:
defaults_date

In [None]:
plt.figure(figsize=(8,3))
g = sns.barplot(x="issue_month", y="sum(is_default)", data=defaults_date)
g.set_title('Loan Defaults by Month')
g.set_ylabel('Total Loan Defaults')
g.set_xlabel('Month in 2015')
plt.show()

In [None]:
#Let's create more plots here. First we aggregate in different ways. Then we join, convert to Pandas df, and plot. 

In [None]:
from pyspark.sql.functions import sum as _sum

In [None]:
#by using like function
df.groupBy("issue_month","loan_status").\
count().\
filter(F.lower(F.col("loan_status")).like("late%")).\
groupby('issue_month').\
sum().\
sort(F.asc('issue_month')).\
show()

In [None]:
df_late = df.groupBy("issue_month","loan_status").\
count().\
filter(F.lower(F.col("loan_status")).like("late%")).\
groupby('issue_month').\
sum().\
sort(F.asc('issue_month'))

In [None]:
#by using like function
df_delinq = df.groupBy("issue_month").\
max("inq_last_6mths").\
na.drop().\
sort(F.asc('issue_month'))

In [None]:
#This time we need to cast the attribute we are working with to numeric before we can create a similar dataframe:
df = df.withColumn('loan_amnt', F.col('loan_amnt').cast('int'))

In [None]:
#by using like function
df_ann_inc = df.groupBy("issue_month").\
mean("loan_amnt").\
na.drop().\
sort(F.asc('issue_month'))

In [None]:
df_delinq.alias('a').join(df_ann_inc.alias('b'),F.col('b.issue_month') == F.col('a.issue_month')).\
join(df_late.alias('c'), F.col('b.issue_month') == F.col('c.issue_month')).\
select(F.col('a.issue_month'), F.col('a.max(inq_last_6mths)'), F.col('b.avg(loan_amnt)'), F.col('c.sum(count)').alias('default_count')).\
show()

In [None]:
df_stats_pd = df_delinq.alias('a').join(df_ann_inc.alias('b'),F.col('b.issue_month') == F.col('a.issue_month')).\
join(df_late.alias('c'), F.col('b.issue_month') == F.col('c.issue_month')).\
select(F.col('a.issue_month'), F.col('a.max(inq_last_6mths)'), F.col('b.avg(loan_amnt)'), F.col('c.sum(count)').alias('default_count')).\
toPandas()

In [None]:
df_stats_pd.head()

In [None]:
f, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(7, 5), sharex=True)

sns.barplot(x=df_stats_pd['issue_month'], y=df_stats_pd['max(inq_last_6mths)'], palette="rocket", ax=ax1)
ax1.axhline(0, color="k", clip_on=False)
ax1.set_ylabel("Max Months Since Deliq")
ax1.set_xlabel("")

sns.scatterplot(x=df_stats_pd['issue_month'], y=df_stats_pd['avg(loan_amnt)'], palette="vlag", ax=ax2)
ax2.axhline(0, color="k", clip_on=False)
ax2.set_ylabel("Average Loan Amount")

sns.barplot(x=df_stats_pd['issue_month'], y=df_stats_pd['default_count'], palette="deep", ax=ax3)
ax3.axhline(0, color="k", clip_on=False)
ax3.set_ylabel("Count of Defaults")
ax1.set_xlabel("Month")

sns.despine(bottom=True)
#plt.setp(f.axes)
plt.tight_layout(h_pad=2)

plt.show()

In [None]:
#do map based on zipcode?

In [None]:
#df = df.filter(df.loan_status != '10500')

### Spark SQL

In [None]:
##Registering the dataframe as a temporary table:
#df.registerTempTable("LC_Loans_2015")

In [51]:
spark.sql("show databases").show()

+------------------+
|      databaseName|
+------------------+
|        big12stats|
|           default|
|          finance2|
|           flights|
|           indexed|
|information_schema|
|          omop_cdm|
|  omop_cdm_parquet|
|   prescribing_dev|
|     prescribing_o|
|     prescribing_p|
|   prescribing_p_e|
|        retaildemo|
|               sys|
|              test|
|       ukcrime_dev|
+------------------+



In [52]:
spark.sql("show tables").show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|             concept|      false|
| default|     concept_synonym|      false|
| default|     device_exposure|      false|
| default|       drug_strength|      false|
| default|   druide_kafka_demo|      false|
| default|flight_not_partit...|      false|
| default|  flight_partitioned|      false|
| default|      lc_predictions|      false|
| default|lc_predictions_la...|      false|
| default|            lc_smote|      false|
| default|   lc_smote_complete|      false|
| default|         lc_smote_k2|      false|
| default|         lc_smote_k3|      false|
| default|         lc_smote_k4|      false|
| default|         lc_smote_k5|      false|
| default|            location|      false|
| default|            metadata|      false|
| default|              myview|      false|
| default|             myview2|      false|
| default|  observation_period| 

In [53]:
#Looks like revol_bal, tax_liens and tot_cur_bal should be numeric. Revol_util should also be numeric but we'll have to remove the % character

In [None]:
#df.write.format('parquet').mode("overwrite").saveAsTable('default.LC_table')

In [None]:
#Running SQL like queries on the dataframe 
group_by_grade = spark.sql("SELECT grade, MEAN(loan_amnt) FROM LC_table WHERE grade IS NOT NULL GROUP BY grade ORDER BY grade")

In [None]:
group_by_grade.show()

In [None]:
#Transforming to pandas
group_by_grade_pd = group_by_grade.toPandas()

In [None]:
#group_by_grade_pd.set_index('grade', inplace=True)

In [None]:
group_by_subgrade = spark.sql("SELECT sub_grade, MEAN(loan_amnt), MEAN(annual_inc), SUM(is_default) FROM LC_table GROUP BY sub_grade ORDER BY sub_grade")

In [None]:
#cache what you are going to use across queries (and early and often up to available memory)
group_by_subgrade.cache()

In [None]:
%time group_by_grade.show()

In [None]:
%time group_by_subgrade.show()

In [None]:
#caching should reduce loading time for smaller dataframe -- check 
group_by_subgrade.cache()

In [None]:
%time group_by_subgrade.show()

In [None]:
group_by_subgrade_pd = group_by_subgrade.toPandas()

In [None]:
#group_by_subgrade_pd = group_by_subgrade_pd.rename(columns={'avg(CAST(funded_amnt AS DOUBLE))':'avg(funded_amnt)'})

In [None]:
group_by_grade_pd.plot(kind='bar', figsize=(4,2))
plt.title('Avg Loan Amount by Grade')
plt.gca().legend_.remove()
plt.show()
#adjust styling here

In [None]:
f, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(7, 5), sharex=True)

sns.barplot(x=group_by_subgrade_pd['sub_grade'], y=group_by_subgrade_pd['avg(annual_inc)'], palette="hls", ax=ax1)
ax1.axhline(0, color="k", clip_on=False)
ax1.set_ylabel("Mean Annual Income")
ax1.set_xlabel("")

sns.barplot(x=group_by_subgrade_pd['sub_grade'], y=group_by_subgrade_pd['avg(loan_amnt)'], palette="vlag", ax=ax2)
ax2.axhline(0, color="k", clip_on=False)
ax2.set_ylabel("Mean Requested Amnt")
ax1.set_xlabel("")

sns.barplot(x=group_by_subgrade_pd['sub_grade'], y=group_by_subgrade_pd['sum(is_default)'], palette="rocket", ax=ax3)
ax3.axhline(0, color="k", clip_on=False)
ax3.set_ylabel("Total Number of Defaults")

#sns.despine(bottom=True)
#plt.setp(f.axes)
#plt.tight_layout(h_pad=2)

plt.show()

In [None]:
#removing from cache
group_by_grade.unpersist()
group_by_subgrade.unpersist()

Data Quality Checks

In [None]:
#Check dataframe columnns
#df_new.columns

Checking that correct data types were inferred

In [None]:
df.dtypes 

In [None]:
#The following attributes are strings but they potentially should be numeric. Let's take a look at a sample.
df.select('revol_bal', 'revol_util', 'tax_liens', 'tot_cur_bal', 'int_rate', 'emp_length').show()

In [None]:
#from pyspark.sql.functions import substring
#df_ml = df_ml.withColumn("manufacturer", substring(col("manufacturer"), 0, 5))

In [None]:
#telco_data\
#  .write.format("parquet")\
#  .mode("overwrite")\
#  .saveAsTable(
#    'default.telco_churn'
#)

In [None]:
#Removing the % character from revol_util
df = df.withColumn("revol_util", F.expr("substring(revol_util, 1, length(revol_util)-1)"))

In [None]:
#Removing the % character from revol_util
df = df.withColumn("int_rate", F.expr("substring(int_rate, 1, length(revol_util)-1)"))

In [None]:
#Notice that we are not casting emp_length to numeric as the time periods it represents are different - it will have to be one hot encoded
integer = ["revol_bal", "tax_liens", "tot_cur_bal", "funded_amnt"]

In [None]:
for c in integer:
    df = df.withColumn(c, df[c].cast("int"))

In [None]:
#Updating revol_util to double:
df = df.withColumn('revol_util', F.col('revol_util').cast('double'))

In [None]:
#Updating int_rate to double:
df = df.withColumn('int_rate', F.col('int_rate').cast('double'))

In [None]:
for c in integer:
    df = df.withColumn(c, F.col(c).cast('int'))

In [None]:
##Registering the dataframe as a temporary table:
#Temporary views in Spark SQL are session-scoped and will disappear if the session that creates it terminates. 
#If you want to have a temporary view that is shared among all sessions and keep alive until the Spark application terminates, 
#you can create a global temporary view

df.createOrReplaceTempView("LC_Glob_Temp_View")

In [None]:
spark.sql("show tables").show()

In [None]:
spark.catalog.dropTempView("LC_Glob_Temp_View")

In [None]:
spark.sql("show tables").show()

In [None]:
#spark.catalog.dropGlobalTempView("LC_Loans_2015_GlobalTempView")

In [None]:
spark.sql("SELECT * FROM default.LC_table")

Congratulations! You have learned a lot about the Spark SQL API!