## SPARK SQL QUERIES

This notebook contains Spark SQL queries.

The following SQl Functions were exectuted using PySpark Syntax

* Basic SQL Functions (Select, Group By, Where, Order by)
* Sampling using Table Sample
* Window Functions
* Datetime Manupulation
* Case Statements

### Import Libraries and Loading Dataset

In [1]:
#Before running anything on Spark import a Spark Session
from pyspark.sql import SparkSession

In [2]:
#Create a spark variable name for the session
spark = SparkSession.builder.appName('customer_complaint').getOrCreate()
spark

In [3]:
#read the dataset
complaint_df = spark.read.option('header', 'true').csv('C:\\Users\\User\\customer_complaints.csv', inferSchema = True)

#check schema (dtypes)
complaint_df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Product_Name: string (nullable = true)
 |-- Sub_Product: string (nullable = true)
 |-- Dispute_Amount: integer (nullable = true)
 |-- Issue: string (nullable = true)
 |-- Sub_Issue: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- State_Code: string (nullable = true)
 |-- Zip_Code: string (nullable = true)
 |-- Tags: string (nullable = true)
 |-- Consent_Provided: string (nullable = true)
 |-- Submitted_Via: string (nullable = true)
 |-- Response_Stage: string (nullable = true)
 |-- Timely_Response: string (nullable = true)
 |-- Consumer_Disputed: string (nullable = true)
 |-- Complaint_ID: integer (nullable = true)



In [4]:
#show first 10 rows of dataset
spark.read.option('header', 'true').csv('C:\\Users\\User\\customer_complaints.csv').show(10)

+---------+--------------------+--------------------+--------------+--------------------+----------------+--------------------+----------+--------+--------------+----------------+-------------+--------------------+---------------+-----------------+------------+
|     Date|        Product_Name|         Sub_Product|Dispute_Amount|               Issue|       Sub_Issue|             Company|State_Code|Zip_Code|          Tags|Consent_Provided|Submitted_Via|      Response_Stage|Timely_Response|Consumer_Disputed|Complaint_ID|
+---------+--------------------+--------------------+--------------+--------------------+----------------+--------------------+----------+--------+--------------+----------------+-------------+--------------------+---------------+-----------------+------------+
|7/29/2013|       Consumer Loan|        Vehicle loan|         17113|Managing the loan...|            null|Wells Fargo & Com...|        VA|   24540|          null|             N/A|        Phone|Closed with expla...|

### SQL Queries

Before writing SQL queries in Spark, a temporary table has to be created to reflect the tables to be queried. Also, every query start with spark.sql( ) command.

In [5]:
#creating temp table
complaint_df.createOrReplaceTempView('complaint_df')

#### Query 1: Selecting all from Table

In [6]:
spark.sql('select * from complaint_df').show()

+---------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+----------+--------+--------------+----------------+-------------+--------------------+---------------+-----------------+------------+
|     Date|        Product_Name|         Sub_Product|Dispute_Amount|               Issue|           Sub_Issue|             Company|State_Code|Zip_Code|          Tags|Consent_Provided|Submitted_Via|      Response_Stage|Timely_Response|Consumer_Disputed|Complaint_ID|
+---------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+----------+--------+--------------+----------------+-------------+--------------------+---------------+-----------------+------------+
|7/29/2013|       Consumer Loan|        Vehicle loan|         17113|Managing the loan...|                null|Wells Fargo & Com...|        VA|   24540|          null|             N/A|        Phone|Close

In [7]:
#counting number of rows in table
complaint_df.count()

65499

#### Query 2 - Selecting a Sample of Rows from the Table

The Table Sample statement is used to select sample of rows from a table. 
It supports the following sampling methods:

Tablesample(x ROWS): Sample the table down to the given number of rows

Tablesample(x PERCENT): Sample the table down to the given percentage

Tablesample(BUCKET x OUT OF y): Sample the table down to a x out of y fraction

In [8]:
spark.sql('select * from complaint_df Tablesample (0.1 Percent)').show()

+----------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+----------+--------+--------------+----------------+-------------+--------------------+---------------+-----------------+------------+
|      Date|        Product_Name|         Sub_Product|Dispute_Amount|               Issue|           Sub_Issue|             Company|State_Code|Zip_Code|          Tags|Consent_Provided|Submitted_Via|      Response_Stage|Timely_Response|Consumer_Disputed|Complaint_ID|
+----------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+----------+--------+--------------+----------------+-------------+--------------------+---------------+-----------------+------------+
| 7/30/2013|         Credit card|                null|          2586|Advertising and m...|                null|First National Ba...|        VT|    5069|          null|             N/A|          Web|C

In [9]:
#saving Query 2 into a df
sample_df = spark.sql('select * from complaint_df Tablesample (0.1 Percent)')

#counting number of rows in table
sample_df.count()

54

##### Obersvation

Full complaint_df has 65499 rows while sample_df (0.1%) of complaint_df has 67 rows

#### Query 3 - Identying count of each complaint type using Group By Clause

In [10]:
spark.sql('select Product_Name, count(*) as N_Complaint from complaint_df group by Product_Name order by N_Complaint desc').show()

+--------------------+-----------+
|        Product_Name|N_Complaint|
+--------------------+-----------+
|            Mortgage|      18734|
|     Debt collection|      16106|
|    Credit reporting|      12092|
|Bank account or s...|       6540|
|         Credit card|       6472|
|       Consumer Loan|       2343|
|        Student loan|       1827|
|         Payday loan|        633|
|     Money transfers|        507|
|        Prepaid card|        190|
|Other financial s...|         55|
+--------------------+-----------+



In [11]:
#saving Query 2 into a df
n_complaints = spark.sql('select Product_Name, count(*) as N_Complaint from complaint_df group \
by Product_Name order by N_Complaint desc')

n_complaints.show()

+--------------------+-----------+
|        Product_Name|N_Complaint|
+--------------------+-----------+
|            Mortgage|      18734|
|     Debt collection|      16106|
|    Credit reporting|      12092|
|Bank account or s...|       6540|
|         Credit card|       6472|
|       Consumer Loan|       2343|
|        Student loan|       1827|
|         Payday loan|        633|
|     Money transfers|        507|
|        Prepaid card|        190|
|Other financial s...|         55|
+--------------------+-----------+



#### Query 4- Dropping all null values in a Column ('State_Code')

In [12]:
#dropping all null values from the table
complaint_df2 = complaint_df.na.drop(how = 'any', subset = ['State_Code'])

#no of rows in new df
complaint_df2.count()

65021

In [13]:
#creating temp table
complaint_df2.createOrReplaceTempView('complaint_df2')

**Note** - SparkSQL queries can be saved into a DataFrame after the required results have been generated

#### Query 5 - Filtering for only Equifax transaction using WHERE clause

In [14]:
spark.sql('Select Date, Complaint_ID, Product_Name, Issue, Dispute_Amount, Company, State_Code From complaint_df2 \
Where Company = "Equifax"').show(5)

+---------+------------+----------------+--------------------+--------------+-------+----------+
|     Date|Complaint_ID|    Product_Name|               Issue|Dispute_Amount|Company|State_Code|
+---------+------------+----------------+--------------------+--------------+-------+----------+
|7/29/2013|      469201|Credit reporting|Incorrect informa...|         16667|Equifax|        CA|
|7/29/2013|      479282|Credit reporting|Improper use of m...|         11260|Equifax|        MO|
|7/30/2013|      469792|Credit reporting|Incorrect informa...|         13566|Equifax|        NJ|
|7/31/2013|      471000|Credit reporting|Incorrect informa...|         14461|Equifax|        MI|
|7/31/2013|      480943|Credit reporting|Incorrect informa...|         14847|Equifax|        MO|
+---------+------------+----------------+--------------------+--------------+-------+----------+
only showing top 5 rows



In [15]:
equifax = spark.sql('Select Date, Complaint_ID, Product_Name, Issue, Dispute_Amount,Company, State_Code From complaint_df2 \
Where Company = "Equifax"')

#### Query 6 - SparkSQL Window Functions

##### 6.1 - Row_Number in PySpark

While working with Windows function in PySpark, necessary libraries need to be imported. Also, the dataframe is usually partitioned before callign the required window function.

In [16]:
#importing library
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

#partition of dataframe
windowPartition = Window.partitionBy("State_Code").orderBy("Dispute_Amount")

In [17]:
equifax.withColumn("row_number",row_number().over(windowPartition)).show()

+----------+------------+----------------+--------------------+--------------+-------+----------+----------+
|      Date|Complaint_ID|    Product_Name|               Issue|Dispute_Amount|Company|State_Code|row_number|
+----------+------------+----------------+--------------------+--------------+-------+----------+----------+
| 8/25/2014|      999116|Credit reporting|Incorrect informa...|          1772|Equifax|        AE|         1|
| 4/22/2014|      819293|Credit reporting|Credit monitoring...|         11325|Equifax|        AE|         2|
|  2/8/2014|      704784|Credit reporting|Credit reporting ...|         13676|Equifax|        AE|         3|
| 5/30/2014|      874047|Credit reporting|Credit reporting ...|          2722|Equifax|        AK|         1|
| 5/11/2014|      847015|Credit reporting|Unable to get cre...|          6469|Equifax|        AK|         2|
| 5/23/2014|      865850|Credit reporting|Incorrect informa...|          6532|Equifax|        AK|         3|
|  8/1/2013|      4

##### 6.2 - Cummulative Distribution

In [18]:
# importing cume_dist() from pyspark.sql.functions
from pyspark.sql.functions import cume_dist
 
# applying window function with
# the help of DataFrame.withColumn
equifax.withColumn("cume_dist",cume_dist().over(windowPartition)).show()

+----------+------------+----------------+--------------------+--------------+-------+----------+--------------------+
|      Date|Complaint_ID|    Product_Name|               Issue|Dispute_Amount|Company|State_Code|           cume_dist|
+----------+------------+----------------+--------------------+--------------+-------+----------+--------------------+
| 8/25/2014|      999116|Credit reporting|Incorrect informa...|          1772|Equifax|        AE|  0.3333333333333333|
| 4/22/2014|      819293|Credit reporting|Credit monitoring...|         11325|Equifax|        AE|  0.6666666666666666|
|  2/8/2014|      704784|Credit reporting|Credit reporting ...|         13676|Equifax|        AE|                 1.0|
| 5/30/2014|      874047|Credit reporting|Credit reporting ...|          2722|Equifax|        AK| 0.16666666666666666|
| 5/11/2014|      847015|Credit reporting|Unable to get cre...|          6469|Equifax|        AK|  0.3333333333333333|
| 5/23/2014|      865850|Credit reporting|Incorr

#### Query 7 - SparkSQL Datetime Functions

The default format of the Timestamp in PySpark is **"MM-dd-yyyy HH:mm: ss.SSS,"** and also Time Parsers has to be set to negative or else null values are returned. Necessary libraries also have to be imported.

In [19]:
#converting Date column from String to TimeStamp Data Type
from pyspark.sql.functions import *

#setting up timestamp parserset 
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

DataFrame[key: string, value: string]

In [20]:
#converting Date column from String to TimeStamp Data Type
from pyspark.sql.functions import to_date, date_format, to_timestamp

complaint_df3 = complaint_df2.withColumn('New_Date', to_date('Date', 'MM/dd/yyyy'))

In [21]:
#schema shows New Date column has been converted
complaint_df3.show(5)
complaint_df3.printSchema()

+---------+--------------------+--------------------+--------------+--------------------+---------+--------------------+----------+--------+--------------+----------------+-------------+--------------------+---------------+-----------------+------------+----------+
|     Date|        Product_Name|         Sub_Product|Dispute_Amount|               Issue|Sub_Issue|             Company|State_Code|Zip_Code|          Tags|Consent_Provided|Submitted_Via|      Response_Stage|Timely_Response|Consumer_Disputed|Complaint_ID|  New_Date|
+---------+--------------------+--------------------+--------------+--------------------+---------+--------------------+----------+--------+--------------+----------------+-------------+--------------------+---------------+-----------------+------------+----------+
|7/29/2013|       Consumer Loan|        Vehicle loan|         17113|Managing the loan...|     null|Wells Fargo & Com...|        VA|   24540|          null|             N/A|        Phone|Closed with expl

In [22]:
#creating temp table
complaint_df3.createOrReplaceTempView('complaint_df3')

##### 7.1 - Extracting Datetime Elements

In [23]:
#sampling complaint_df3 table
sample_df2 = spark.sql('select * from complaint_df3 Tablesample (0.2 Percent)')

In [24]:
sample_df2.select(col("New_Date"), dayofweek(col("New_Date")).alias("dayofweek"),
                     dayofmonth(col("New_Date")).alias("dayofmonth"), dayofyear(col("New_Date")).alias("dayofyear") ).show()

+----------+---------+----------+---------+
|  New_Date|dayofweek|dayofmonth|dayofyear|
+----------+---------+----------+---------+
|2013-08-01|        5|         1|      213|
|2013-07-23|        3|        23|      204|
|2013-08-13|        3|        13|      225|
|2013-07-25|        5|        25|      206|
|2013-08-03|        7|         3|      215|
|2013-09-09|        2|         9|      252|
|2013-09-03|        3|         3|      246|
|2013-09-13|        6|        13|      256|
|2013-09-05|        5|         5|      248|
|2013-10-17|        5|        17|      290|
|2013-10-17|        5|        17|      290|
|2013-10-17|        5|        17|      290|
|2013-10-17|        5|        17|      290|
|2013-10-18|        6|        18|      291|
|2013-10-31|        5|        31|      304|
|2013-11-22|        6|        22|      326|
|2013-11-22|        6|        22|      326|
|2013-10-27|        1|        27|      300|
|2013-11-24|        1|        24|      328|
|2013-10-23|        4|        23

##### 7.2 - Number of Complaints by Month

In [25]:
spark.sql('select month(New_Date), count(*) as N_Complaint from complaint_df3 group by month(New_Date)').show()

+---------------+-----------+
|month(New_Date)|N_Complaint|
+---------------+-----------+
|             12|       5164|
|              1|       6272|
|              6|       3081|
|              3|       6929|
|              9|       5382|
|              4|       6706|
|              8|       5641|
|              7|       4103|
|             10|       5486|
|             11|       5117|
|              2|       6494|
|              5|       4646|
+---------------+-----------+



##### 7.3- Yearly Dispute Sum

In [26]:
spark.sql('select year(New_Date) as Year, sum(Dispute_Amount) as Dispute_Sum from complaint_df3 group by \
           year(New_Date) order by year(New_Date)').show()

+----+-----------+
|Year|Dispute_Sum|
+----+-----------+
|2013|  133981908|
|2014|  397938198|
|2015|  156077900|
+----+-----------+



#### Query 8 - Case When Statements

In [27]:
category_df = equifax.withColumn("Amount_Category", expr("CASE WHEN Dispute_Amount <= 5000 THEN 'Low' " + 
               "WHEN Dispute_Amount between 5001 and 15000 THEN 'Mid' WHEN Dispute_Amount >= 15000 THEN 'High'" +
               "ELSE 'Unknown' END")).show()

+---------+------------+----------------+--------------------+--------------+-------+----------+---------------+
|     Date|Complaint_ID|    Product_Name|               Issue|Dispute_Amount|Company|State_Code|Amount_Category|
+---------+------------+----------------+--------------------+--------------+-------+----------+---------------+
|7/29/2013|      469201|Credit reporting|Incorrect informa...|         16667|Equifax|        CA|           High|
|7/29/2013|      479282|Credit reporting|Improper use of m...|         11260|Equifax|        MO|            Mid|
|7/30/2013|      469792|Credit reporting|Incorrect informa...|         13566|Equifax|        NJ|            Mid|
|7/31/2013|      471000|Credit reporting|Incorrect informa...|         14461|Equifax|        MI|            Mid|
|7/31/2013|      480943|Credit reporting|Incorrect informa...|         14847|Equifax|        MO|            Mid|
|7/31/2013|      471166|Credit reporting|Incorrect informa...|         13409|Equifax|        CO|