# Spark SQL API Tutorial

Spark SQL is a module of the application Apache Spark, and Spark SQL facilitates working with structured data. It allows us to connect and query multiple data sources including Hive, Parquet, ORC, and more. This tutorial focuses on using Spark SQL with Python and pyspark, and showcases Spark SQL queries and functions of various complexities.

Two small datasets 'books.csv' and 'prices.csv' are used to demonstrate Spark SQL functionality and to provide a brief introduction to SQL functions, statements, and clauses.

In [1]:
%load_ext autoreload
%autoreload 2

# Getting Set Up

In [2]:
#Import libraries.
import logging
import os
import pyspark.sql
import pyspark.sql.functions as functions
import pyspark.sql.types as types
import shutil

# Configuration & Spark Session

In [3]:
#Create the Spark Session and disable Hive so that native Spark tables are created instead of tables managed by Hive.
spark = pyspark.sql.SparkSession.builder \
    .appName("API Example") \
    .config("spark.sql.legacy.createHiveTableByDefault", "false") \
    .config("spark.driver.extraJavaOptions", "--add-opens java.base/java.nio=ALL-UNNAMED") \
    .config("spark.executor.extraJavaOptions", "--add-opens java.base/java.nio=ALL-UNNAMED") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/15 19:53:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/15 19:53:10 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# Working with the DataFrame API of Spark SQL

Before we can perform any operations, we need to create a dataframe. A dataframe can be created from an existing RDD, Hive table, or Spark data sources. Here we manually create a small dataframe that contains information about a person's age and the number of books they have read.

In [41]:
#Create a dataframe in the Spark session.
df = spark.createDataFrame(
    [
        ("sue", 32, 12), #Each entry corresponds to one row in the dataframe.
        ("eli", 3, 1),
        ("bob", 75, 24),
        ("theo", 13, 5),
        ("mary", 25, 50)
    ],
    ["first_name", "age", "books_read"], #Column names are in order of the data added.
)
df.show()

+----------+---+----------+
|first_name|age|books_read|
+----------+---+----------+
|       sue| 32|        12|
|       eli|  3|         1|
|       bob| 75|        24|
|      theo| 13|         5|
|      mary| 25|        50|
+----------+---+----------+



Now we can add a column that indicates how much of a reading fanatic each person is based on the number of books they have read.

In [5]:
#Select and display the names of everyone in the dataframe.
df.select(df['first_name']).show()

+----------+
|first_name|
+----------+
|       sue|
|       eli|
|       bob|
|      theo|
|      mary|
+----------+



In [6]:
#Increment the number of books read for each person and display. This does not change the dataframe, just displays the new value.
df.select(df['first_name'], df['books_read'] + 1).show() 

+----------+----------------+
|first_name|(books_read + 1)|
+----------+----------------+
|       sue|              13|
|       eli|               2|
|       bob|              25|
|      theo|               6|
|      mary|              51|
+----------+----------------+



In [7]:
#Compute and display the average number of books read across the dataset.
df.select(functions.avg("books_read")).show()

+---------------+
|avg(books_read)|
+---------------+
|           18.4|
+---------------+



In [8]:
#Use dataframe operation to insert a new column which indicates how much of a reading fanatic each person is based on the number of books they have read.
df = df.withColumn(
    "reading_fanatic",
    functions.when(functions.col("books_read") <= 10, "beginner")
    .when(functions.col("books_read").between(11, 30), "intermediate")
    .otherwise("advanced"),
)
df.show()

+----------+---+----------+---------------+
|first_name|age|books_read|reading_fanatic|
+----------+---+----------+---------------+
|       sue| 32|        12|   intermediate|
|       eli|  3|         1|       beginner|
|       bob| 75|        24|   intermediate|
|      theo| 13|         5|       beginner|
|      mary| 25|        50|       advanced|
+----------+---+----------+---------------+



In [9]:
#Compute and display the number of books read in each reading_fanatic level.
df.groupBy("reading_fanatic").count().show()

+---------------+-----+
|reading_fanatic|count|
+---------------+-----+
|   intermediate|    2|
|       beginner|    2|
|       advanced|    1|
+---------------+-----+



# Querying with the SQL Function
We've run dataframe operations that are similar to simple SQL queries. Now let's use Spark Session's SQL function to run more complex queries on a larger version of our dataset from before. Additional [information](https://spark.apache.org/docs/latest/sql-data-sources-csv.html) on using CSV files for data.


## Load & Prep Dataset

### Dataframe

In [42]:
#Load and display books dataframe from csv file and display the schema.
books_df = spark.read.option("delimiter", ",").option("header", True).csv("books.csv")
books_df.show()

books_df.printSchema()

+----------+---+------------------+----------+
|first_name|age|             genre|books_read|
+----------+---+------------------+----------+
|       sue| 32|            sci-fi|         8|
|       sue| 32|         biography|         4|
|       eli|  3|           fantasy|         1|
|       bob| 75|           mystery|        12|
|       bob| 75|historical fiction|         4|
|       bob| 75|          thriller|         3|
|       bob| 75|         biography|         5|
|      theo| 13|            sci-fi|         2|
|      theo| 13|           fiction|         3|
|      mary| 25|historical fiction|         3|
|      mary| 25|           mystery|         7|
|      mary| 25|         dystopian|         8|
|      mary| 25|           romance|         8|
|      mary| 25|            satire|         4|
|      mary| 25|            sci-fi|        14|
|      mary| 25|            memoir|        16|
|      john| 50|         dystopian|         8|
|      john| 50|          thriller|        12|
|      john| 

### Create a Temp View
This view only persists as long as the Spark Session is active, once the session ends, the view is lost.

In [11]:
#Remove any existing view named 'books', if already created.
spark.catalog.dropTempView("books_view")
#Convert the dataframe to a view.
books_df.createOrReplaceTempView("books_view")

### Create a Table
This table is temporarily saved to Spark’s default warehouse directory. If the Spark Session ends, the data and table is lost unless the table is created in Hive Metastore. However, the directory on disk still exists, so the directory must be removed before re-creating the table. If Hive is enabled, Spark creates a permanent table.

In [43]:
#If already created in current or previous SparkSession, drop table and remove the directory where the table existed.
spark.sql("DROP TABLE IF EXISTS books_table")
shutil.rmtree("/data/spark-warehouse/books_table", ignore_errors=True)
spark.catalog.clearCache()
#Write the books_df to a table.
books_df.write.saveAsTable("books_table")

In [13]:
#The 'desc' statement returns metadata or the data-definition language (ddl) of the table.
spark.sql("desc books_table").show()

+----------+---------+-------+
|  col_name|data_type|comment|
+----------+---------+-------+
|first_name|   string|   null|
|       age|   string|   null|
|     genre|   string|   null|
|books_read|   string|   null|
+----------+---------+-------+



## Common Statements & Clauses
There are numerous SQL clauses that can be a part of a query, here we explore how to implement some of the most common ones with the 'sql' function.

In [14]:
#Display the first five names and number of books read from the table using the LIMIT clause at the end of the query.
spark.sql("""
    SELECT first_name, books_read
    FROM books_table
    LIMIT 5
""").show()

+----------+----------+
|first_name|books_read|
+----------+----------+
|       sue|         8|
|       sue|         4|
|       eli|         1|
|       bob|        12|
|       bob|         4|
+----------+----------+



In [15]:
#Display rows where genre read is historical fiction and number of books read is greater than or equal to 10.
spark.sql("""
    SELECT *
    FROM books_table
    WHERE genre = 'historical fiction' AND books_read >= 10
""").show()

+----------+---+------------------+----------+
|first_name|age|             genre|books_read|
+----------+---+------------------+----------+
|      jane| 61|historical fiction|        12|
|    martin| 29|historical fiction|        10|
+----------+---+------------------+----------+



In [16]:
#Display the top 5 rows from the table ordered by first_name and genre ascending.
spark.sql("""
    SELECT *
    FROM books_table
    ORDER BY first_name, genre
    LIMIT 5
""").show()

#Display the top 5 rows from the table ordered by first_name and genre descending.
spark.sql("""
    SELECT *
    FROM books_table
    ORDER BY first_name DESC, genre DESC
    LIMIT 5
""").show()

+----------+---+------------------+----------+
|first_name|age|             genre|books_read|
+----------+---+------------------+----------+
|      alex| 19|            sci-fi|        14|
|      ally| 42|         biography|        14|
|      ally| 42|            memoir|         9|
|       bob| 75|         biography|         5|
|       bob| 75|historical fiction|         4|
+----------+---+------------------+----------+

+----------+---+---------+----------+
|first_name|age|    genre|books_read|
+----------+---+---------+----------+
|      theo| 13|   sci-fi|         2|
|      theo| 13|  fiction|         3|
|       sue| 32|   sci-fi|         8|
|       sue| 32|biography|         4|
|      mary| 25|   sci-fi|        14|
+----------+---+---------+----------+



## Aggregate Functions
Many times when analyzing structured data, we want to perform aggregations on columns using certain conditions. We can achieve the same using Spark SQL with aggregate functions, GROUP BY clause, and HAVING clause. Note any non-aggregate columns in the SELECT statement must appear in GROUP BY.

In [17]:
#Find the total number of books each person read.
spark.sql("""
    SELECT first_name, age, cast(sum(books_read) as int) as total_books_read
    FROM books_table
    GROUP BY first_name, age
""").show()

+----------+---+----------------+
|first_name|age|total_books_read|
+----------+---+----------------+
|       sue| 32|              12|
|      alex| 19|              14|
|      josh| 22|              11|
|       eve|  8|               7|
|      john| 50|              23|
|      theo| 13|               5|
|      mary| 25|              60|
|      jane| 61|              32|
|    martin| 29|              14|
|      ally| 42|              23|
|       bob| 75|              24|
|       eli|  3|               1|
+----------+---+----------------+



In [18]:
#Count how many people have read at least one book of a given genre
spark.sql("""
    SELECT genre, cast(count(first_name) as int) as total_readers
    FROM books_table
    GROUP BY genre
""").show()

+------------------+-------------+
|             genre|total_readers|
+------------------+-------------+
|         biography|            3|
|           fantasy|            2|
|           mystery|            3|
|           fiction|            1|
|         dystopian|            2|
|        true crime|            1|
|            memoir|            2|
|            satire|            2|
|historical fiction|            4|
|           romance|            2|
|          thriller|            3|
|            sci-fi|            5|
+------------------+-------------+



In [19]:
#Identify minimum and maximum number of books read for each genre.
spark.sql("""
    SELECT genre,
        MIN(books_read) as min_books,
        MAX(books_read) as max_books
    FROM books_table
    GROUP by genre
""").show()

+------------------+---------+---------+
|             genre|min_books|max_books|
+------------------+---------+---------+
|         biography|       14|        5|
|         dystopian|        8|        8|
|           fantasy|        1|        7|
|           fiction|        3|        3|
|historical fiction|       10|        4|
|            memoir|       16|        9|
|           mystery|        1|        7|
|           romance|       20|        8|
|            satire|        3|        4|
|            sci-fi|       14|        8|
|          thriller|       12|        8|
|        true crime|        3|        3|
+------------------+---------+---------+



In [20]:
#Display the average age of readers for each genre where genre is not fiction and the avg_age is between 10 and 40.
spark.sql("""
    SELECT genre, cast(avg(age) as int) as avg_age
    FROM books_table
    WHERE genre != 'fiction'
    GROUP BY genre
    HAVING avg(age) BETWEEN 10 and 40
""").show()

+---------+-------+
|    genre|avg_age|
+---------+-------+
|dystopian|     37|
|   memoir|     33|
|   satire|     23|
|   sci-fi|     23|
+---------+-------+



## Case When Statement
The CASE expression returns a specific value when a condition is met - consider it similar to an if-then-else statement. It can be used to create identifier columns, condition on aggreations, and more. Let's see one implementation of this below.

In [21]:
#Identify the 'reading_fanatic' level for a given person and genre.
#'BETWEEN ... AND ...' is inclusive on both ends.
spark.sql("""
    SELECT *,
        CASE WHEN books_read < 10 THEN 'beginner'
             WHEN books_read BETWEEN 10 AND 30 THEN 'intermediate'
             WHEN books_read > 30 THEN 'advanced'
             ELSE 'N/A'
             END AS reading_fanatic
    FROM books_table
""").show()

+----------+---+------------------+----------+---------------+
|first_name|age|             genre|books_read|reading_fanatic|
+----------+---+------------------+----------+---------------+
|       sue| 32|            sci-fi|         8|       beginner|
|       sue| 32|         biography|         4|       beginner|
|       eli|  3|           fantasy|         1|       beginner|
|       bob| 75|           mystery|        12|   intermediate|
|       bob| 75|historical fiction|         4|       beginner|
|       bob| 75|          thriller|         3|       beginner|
|       bob| 75|         biography|         5|       beginner|
|      theo| 13|            sci-fi|         2|       beginner|
|      theo| 13|           fiction|         3|       beginner|
|      mary| 25|historical fiction|         3|       beginner|
|      mary| 25|           mystery|         7|       beginner|
|      mary| 25|         dystopian|         8|       beginner|
|      mary| 25|           romance|         8|       be

## Window Functions
While GROUP BY and HAVING are imperative for aggregations, there are times where window functions can be more useful. Window functions implement aggregations by partitioning (grouping) over columns that may be a subset of what is in the GROUP BY.
- The syntax follows as AGG(column) OVER (PARTITION BY column1, column2,...) ORDER BY (column3, column4,...)
- Only AGG(column) OVER (PARTITION BY (column1) is required at the minimum.
- This can be used within a case expression.

In [22]:
#Identify the 'reading_fanatic' level for a given person and the total number of books they have read.
spark.sql("""
    SELECT DISTINCT first_name,
        CASE WHEN SUM(books_read) OVER (PARTITION BY first_name) < 10 THEN 'beginner'
             WHEN SUM(books_read) OVER (PARTITION BY first_name) BETWEEN 10 AND 30 THEN 'intermediate'
             WHEN SUM(books_read) OVER (PARTITION BY first_name) > 30 THEN 'advanced'
             ELSE 'N/A'
             END AS reading_fanatic
    FROM books_table
""").show()

+----------+---------------+
|first_name|reading_fanatic|
+----------+---------------+
|      alex|   intermediate|
|      ally|   intermediate|
|       bob|   intermediate|
|       eli|       beginner|
|       eve|       beginner|
|      jane|       advanced|
|      john|   intermediate|
|      josh|   intermediate|
|    martin|   intermediate|
|      mary|       advanced|
|       sue|   intermediate|
|      theo|       beginner|
+----------+---------------+



## User Defined Function
We used the CASE expression to assign a reading_fanatic level, we can also do something similar using a UDF. Instead of assigning a reading_fanatic level, we'll create an age group.

In [23]:
#Create function age_group and determine if a reader is a Youth or an Adult.
@functions.udf(returnType=types.StringType())
def age_group(age):
    return "Adult" if int(age) > 30 else "Youth"

spark.udf.register("age_group", age_group)

spark.sql("""
    SELECT first_name, age_group(age) AS age_group 
    FROM books_table
""").show()

+----------+---------+
|first_name|age_group|
+----------+---------+
|       sue|    Adult|
|       sue|    Adult|
|       eli|    Youth|
|       bob|    Adult|
|       bob|    Adult|
|       bob|    Adult|
|       bob|    Adult|
|      theo|    Youth|
|      theo|    Youth|
|      mary|    Youth|
|      mary|    Youth|
|      mary|    Youth|
|      mary|    Youth|
|      mary|    Youth|
|      mary|    Youth|
|      mary|    Youth|
|      john|    Adult|
|      john|    Adult|
|      john|    Adult|
|      jane|    Adult|
+----------+---------+
only showing top 20 rows



## Insert Statement
In this native Spark table, we can insert new data using the INSERT statement.

In [24]:
#Insert data for a new reader.
spark.sql("INSERT INTO books_table VALUES ('alice', 30, 'thriller', 10)")

DataFrame[]

In [25]:
#Display the row that corresponds to the new reader Alice.
spark.sql("""
    SELECT * 
    FROM books_table
    WHERE first_name = 'alice' 
""").show()

+----------+---+--------+----------+
|first_name|age|   genre|books_read|
+----------+---+--------+----------+
|     alice| 30|thriller|        10|
+----------+---+--------+----------+



# Joining Tables
Oftentimes one table alone may not have all the information we need. In our example, let's consider another table which provides for how much our readers purchased their books.

## Load Prices Dataset

In [26]:
#Load and display prices dataframe from the csv file.
prices_df = spark.read.option("delimiter", ",").option("header", True).csv("prices.csv")
prices_df.show(5)

#Display the schema.
prices_df.printSchema()

+----------+---------+--------------+-------------------+
|first_name|    genre|price_per_book|num_books_purchased|
+----------+---------+--------------+-------------------+
|       sue|   sci-fi|           3.3|                  2|
|       sue|   sci-fi|          4.75|                  6|
|       sue|biography|          8.84|                  4|
|       eli|  fantasy|          null|                  1|
|       bob|  mystery|          5.73|                  6|
+----------+---------+--------------+-------------------+
only showing top 5 rows

root
 |-- first_name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- price_per_book: string (nullable = true)
 |-- num_books_purchased: string (nullable = true)



In [27]:
#If already created in current or previous SparkSession, drop table and remove the directory where the table existed.
spark.sql("DROP TABLE IF EXISTS prices_table")
shutil.rmtree("/data/spark-warehouse/prices_table", ignore_errors=True)
spark.catalog.clearCache()
#Write the prices_df to a table.
prices_df.write.saveAsTable("prices_table")

In [28]:
#Describe the schema of the table.
spark.sql("desc prices_table").show()

+-------------------+---------+-------+
|           col_name|data_type|comment|
+-------------------+---------+-------+
|         first_name|   string|   null|
|              genre|   string|   null|
|     price_per_book|   string|   null|
|num_books_purchased|   string|   null|
+-------------------+---------+-------+



## Join books table with prices table
We want to join the information regarding a person's name and age in the books table to the cost information in the prices table. For all our readers in the books table, we want to find the corresponding prices information. To achieve this we can perform a JOIN ON columns we expect to match between the two tables.

Different types of Joins include:
- LEFT
- RIGHT
- INNER
- FULL
- OUTER

In [29]:
#Perform a left join between books_table and prices_table on first_name and genre.
spark.sql("""
    SELECT *
    FROM books_table b
    LEFT JOIN prices_table p
    ON b.first_name = p.first_name and b.genre = p.genre
    ORDER BY b.first_name
""").show()

+----------+---+------------------+----------+----------+------------------+--------------+-------------------+
|first_name|age|             genre|books_read|first_name|             genre|price_per_book|num_books_purchased|
+----------+---+------------------+----------+----------+------------------+--------------+-------------------+
|      alex| 19|            sci-fi|        14|      alex|            sci-fi|            12|                  2|
|      alex| 19|            sci-fi|        14|      alex|            sci-fi|          4.32|                  2|
|      alex| 19|            sci-fi|        14|      alex|            sci-fi|          3.32|                 10|
|     alice| 30|          thriller|        10|      null|              null|          null|               null|
|      ally| 42|         biography|        14|      ally|         biography|          9.34|                 14|
|      ally| 42|            memoir|         9|      ally|            memoir|          4.59|             

In [30]:
#Identify which rows have missing information in the prices table.
spark.sql("""
    SELECT *
    FROM books_table b
    LEFT JOIN prices_table p
    ON b.first_name = p.first_name and b.genre = p.genre
    WHERE p.first_name is null and p.genre is null and p.price_per_book is null and p.num_books_purchased is null
    ORDER BY b.first_name
""").show()

+----------+---+--------+----------+----------+-----+--------------+-------------------+
|first_name|age|   genre|books_read|first_name|genre|price_per_book|num_books_purchased|
+----------+---+--------+----------+----------+-----+--------------+-------------------+
|     alice| 30|thriller|        10|      null| null|          null|               null|
+----------+---+--------+----------+----------+-----+--------------+-------------------+



## Row Number Window Function
There can be times where we want each row to have a unique identifier based on a certain grouping - in this case, we can use the ROW_NUMBER() window function.

In [31]:
#Readers like Bob and Mary have purchased multiple books of a genre at different prices. We can assign a row number based on person, genre, and price.
#Display rows for memoir and sci-fi genres for Mary and the mystery genre for Bob.
#Assign row number such that the greatest price and number of books purchased is the top row of each group.
spark.sql("""
SELECT *, ROW_NUMBER() OVER (PARTITION BY first_name, genre ORDER BY price_per_book DESC, num_books_purchased DESC) AS row_num
FROM(
    SELECT b.first_name, b.age, p.genre, p.price_per_book as price_per_book, p.num_books_purchased
    FROM books_table b
    LEFT JOIN prices_table p
    ON b.first_name = p.first_name AND b.genre = p.genre
)
WHERE (first_name = 'bob' and genre = 'mystery') OR (first_name = 'mary' and genre IN ('memoir', 'sci-fi'))
""").show()

+----------+---+-------+--------------+-------------------+-------+
|first_name|age|  genre|price_per_book|num_books_purchased|row_num|
+----------+---+-------+--------------+-------------------+-------+
|       bob| 75|mystery|          6.24|                  6|      1|
|       bob| 75|mystery|          5.73|                  6|      2|
|      mary| 25| memoir|          9.49|                  9|      1|
|      mary| 25| memoir|          11.1|                  4|      2|
|      mary| 25| memoir|          null|                  3|      3|
|      mary| 25| sci-fi|          6.43|                  5|      1|
|      mary| 25| sci-fi|          3.45|                  9|      2|
+----------+---+-------+--------------+-------------------+-------+



# More Complex Queries - Aggregations on Joins
Now, we can use the various clauses and functions of Spark SQL to determine how much readers have purchased their books for.

## Finding the Total Price Paid per Genre & per Reader

In [32]:
#Display total price per genre per reader where the total cost per genre is > 50.
spark.sql("""
    SELECT *
    FROM (
        SELECT DISTINCT first_name, age, genre, 
               SUM(CAST(total_cost_per_pricing AS DECIMAL(10,2))) OVER (PARTITION BY first_name, age, genre) AS total_cost_per_genre,
               SUM(CAST(total_cost_per_pricing AS DECIMAL(10,2))) OVER (PARTITION BY first_name, age) AS total_cost_per_person
        FROM(
            SELECT b.first_name, b.age, p.genre, coalesce(p.price_per_book,0) as price_per_book, num_books_purchased,
                   COALESCE(p.price_per_book,0) * num_books_purchased as total_cost_per_pricing
            FROM books_table b
            LEFT JOIN prices_table p
            ON b.first_name = p.first_name AND b.genre = p.genre
        ) 
    )
    WHERE total_cost_per_genre > 50
    ORDER BY first_name, genre
""").show()

+----------+---+------------------+--------------------+---------------------+
|first_name|age|             genre|total_cost_per_genre|total_cost_per_person|
+----------+---+------------------+--------------------+---------------------+
|      alex| 19|            sci-fi|               65.84|                65.84|
|      ally| 42|         biography|              130.76|               172.07|
|       bob| 75|           mystery|               71.82|               143.12|
|      jane| 61|historical fiction|               77.40|               211.80|
|      jane| 61|           romance|              134.40|               211.80|
|      josh| 22|          thriller|               65.68|                86.65|
|      mary| 25|         dystopian|               71.44|               413.81|
|      mary| 25|            memoir|              129.81|               413.81|
|      mary| 25|           romance|               59.68|               413.81|
|      mary| 25|            sci-fi|               63

## Calculate Cost for a Reader using Highest Cost per Genre
We can use the ROW_NUMBER() window function and nested subqueries to identify which row has the highest cost per genre per reader.

In [33]:
#Assign row numbers based on name, age, and genre with highest price at the top irrespective of number of books purchased.
#Display the rows where there are differing prices within the group of name, age, and genre.
spark.sql("""
    SELECT first_name, age, genre, price_per_book, num_books_purchased, row_num
    FROM (
        SELECT first_name, age, genre, price_per_book, num_books_purchased,
              COUNT(*) OVER (PARTITION BY first_name, genre) as num_pricing_per_genre,
              ROW_NUMBER() OVER (PARTITION BY first_name, genre ORDER BY price_per_book DESC) AS row_num
        FROM(
            SELECT b.first_name, b.age, p.genre, coalesce(p.price_per_book,0) as price_per_book, num_books_purchased
            FROM books_table b
            LEFT JOIN prices_table p
            ON b.first_name = p.first_name AND b.genre = p.genre
        ) 
    )
    WHERE num_pricing_per_genre > 1 
""").show()

+----------+---+------------------+--------------+-------------------+-------+
|first_name|age|             genre|price_per_book|num_books_purchased|row_num|
+----------+---+------------------+--------------+-------------------+-------+
|      alex| 19|            sci-fi|          4.32|                  2|      1|
|      alex| 19|            sci-fi|          3.32|                 10|      2|
|      alex| 19|            sci-fi|            12|                  2|      3|
|       bob| 75|           mystery|          6.24|                  6|      1|
|       bob| 75|           mystery|          5.73|                  6|      2|
|      jane| 61|           romance|          8.21|                 10|      1|
|      jane| 61|           romance|          5.23|                 10|      2|
|    martin| 29|historical fiction|          2.39|                  3|      1|
|    martin| 29|historical fiction|             0|                  7|      2|
|      mary| 25|            memoir|          9.49|  

Perhaps a more efficient and readable to achieve this is using a WITH clause and the FIRST_VALUE() fucntion.. A WITH clause achieves the same as nested subqueries, and can be more helpful when we want to perform joins within subqueries. 

In [34]:
#Use the highest price per genre per person and calculate the new total_cost_per_genre for each person.
#Display total price per genre and per person where the total cost per genre is > 50.
spark.sql("""
WITH combined_tables as (
    SELECT b.first_name, b.age, p.genre, coalesce(p.price_per_book,0) as price_per_book, coalesce(p.num_books_purchased,0) as num_books_purchased
    FROM books_table b
    LEFT JOIN prices_table p
    ON b.first_name = p.first_name AND b.genre = p.genre
), aggregations as (
    SELECT *,
        first_name, age, genre, price_per_book, num_books_purchased,
        SUM(num_books_purchased) OVER (PARTITION BY first_name, genre) as num_books_per_genre,
        FIRST_VALUE(price_per_book) OVER (PARTITION BY first_name, genre ORDER BY price_per_book DESC) AS new_price_per_book   
    FROM combined_tables
), new_pricing as (
    SELECT DISTINCT first_name, age, genre, CAST(num_books_per_genre * new_price_per_book AS DECIMAL(4,2)) as total_cost_per_genre
    FROM aggregations
)
    SELECT * 
    FROM new_pricing 
    WHERE total_cost_per_genre > 50
    ORDER BY first_name, genre
""").show()

+----------+---+------------------+--------------------+
|first_name|age|             genre|total_cost_per_genre|
+----------+---+------------------+--------------------+
|      alex| 19|            sci-fi|               60.48|
|       bob| 75|           mystery|               74.88|
|      jane| 61|historical fiction|               77.40|
|      josh| 22|          thriller|               65.68|
|      mary| 25|         dystopian|               71.44|
|      mary| 25|           romance|               59.68|
|      mary| 25|            sci-fi|               90.02|
+----------+---+------------------+--------------------+



# Create & Partition Table using SparkSQL

We may want to save our results from a query or sequence of queries into a table that may be a new source in the future. Let's create a new table from our results above. We'll create the same table twice:

1. Using the CREATE TABLE statement in the sql function without any partitions.
2. Saving the query results to a DataFrame and using a DataFrame operation to write the table, and adding a partition on first_name.

In [35]:
#If already created in current or previous SparkSession, drop table(s) and remove the directory(s) where the table(s) existed.
spark.sql("DROP TABLE IF EXISTS cost")
spark.sql("DROP TABLE IF EXISTS cost_partitioned")
shutil.rmtree("/data/spark-warehouse/cost", ignore_errors=True)
shutil.rmtree("/data/spark-warehouse/cost_partitioned", ignore_errors=True)
spark.catalog.clearCache()

In [36]:
#Create table 'cost' which shows the accurate total cost for each reader by genre. 
spark.sql("""
CREATE TABLE cost AS(
    SELECT DISTINCT first_name, age, genre, 
           SUM(CAST(total_cost_per_pricing AS DECIMAL(10,2))) OVER (PARTITION BY first_name, age, genre) AS total_cost_per_genre,
           SUM(CAST(total_cost_per_pricing AS DECIMAL(10,2))) OVER (PARTITION BY first_name, age) AS total_cost_per_person
    FROM(
        SELECT b.first_name, b.age, p.genre, coalesce(p.price_per_book,0) as price_per_book, num_books_purchased,
               coalesce(p.price_per_book,0) * num_books_purchased as total_cost_per_pricing
        FROM books_table b
        LEFT JOIN prices_table p
        ON b.first_name = p.first_name AND b.genre = p.genre
    ) 
)
""")

DataFrame[]

In [37]:
#Create table 'cost_partitioned' which shows the accurate total cost for each reader by genre. Overwrite the table to add a partition column.
df = spark.sql("""
    SELECT DISTINCT first_name, age, genre, 
           SUM(CAST(total_cost_per_pricing AS DECIMAL(10,2))) OVER (PARTITION BY first_name, age, genre) AS total_cost_per_genre,
           SUM(CAST(total_cost_per_pricing AS DECIMAL(10,2))) OVER (PARTITION BY first_name, age) AS total_cost_per_person
    FROM(
        SELECT b.first_name, b.age, p.genre, coalesce(p.price_per_book,0) as price_per_book, num_books_purchased,
               coalesce(p.price_per_book,0) * num_books_purchased as total_cost_per_pricing
        FROM books_table b
        LEFT JOIN prices_table p
        ON b.first_name = p.first_name AND b.genre = p.genre
    ) 
""")
df.write.mode("overwrite").partitionBy("first_name").saveAsTable("cost_partitioned")

In [38]:
#Display all tables and views.
spark.sql("SHOW TABLES").show()

+---------+----------------+-----------+
|namespace|       tableName|isTemporary|
+---------+----------------+-----------+
|  default|     books_table|      false|
|  default|            cost|      false|
|  default|cost_partitioned|      false|
|  default|    prices_table|      false|
|         |      books_view|       true|
+---------+----------------+-----------+



In [39]:
#Display the schema for the new table, note that first_name is shown as the partition column.
spark.sql("desc cost_partitioned").show()

+--------------------+-------------+-------+
|            col_name|    data_type|comment|
+--------------------+-------------+-------+
|                 age|       string|   null|
|               genre|       string|   null|
|total_cost_per_genre|decimal(20,2)|   null|
|total_cost_per_pe...|decimal(20,2)|   null|
|          first_name|       string|   null|
|# Partition Infor...|             |       |
|          # col_name|    data_type|comment|
|          first_name|       string|   null|
+--------------------+-------------+-------+



In [40]:
#Query from the new table.
spark.sql("SELECT * FROM cost_partitioned WHERE first_name = 'bob'").show()

+---+------------------+--------------------+---------------------+----------+
|age|             genre|total_cost_per_genre|total_cost_per_person|first_name|
+---+------------------+--------------------+---------------------+----------+
| 75|         biography|               29.50|               143.12|       bob|
| 75|historical fiction|                8.80|               143.12|       bob|
| 75|           mystery|               71.82|               143.12|       bob|
| 75|          thriller|               33.00|               143.12|       bob|
+---+------------------+--------------------+---------------------+----------+

