# Interactive Querying with Spark SQL

## Assignment 8

In this assignment, you will create a small SQL-based data warehouse using [Spark SQL](http://spark.apache.org/docs/latest/sql-programming-guide.html). You will then run basic SQL queries on the dataset. 

For this assignment, we will use a dataset derived from the [official Stardew Valley wiki](https://stardewvalleywiki.com/Stardew_Valley_Wiki) whose content is available under the [Creative Commons Attribution-NonCommercial-ShareAlike]( https://creativecommons.org/licenses/by-nc-sa/3.0/) license. 

As a first step, we load the CSV files into Pandas dataframes.  In a later stage, you will convert these to run as Spark dataframes. 

In [2]:
import pandas as pd
import numpy as np

base_github_repo_url = 'https://raw.githubusercontent.com/bellevue-university/dsc400/main'
family_csv_url = base_github_repo_url + '/data/stardew/family.csv'
friends_csv_url = base_github_repo_url + '/data/stardew/friends.csv'
gifts_csv_url = base_github_repo_url + '/data/stardew/gifts.csv'
villagers_csv_url = base_github_repo_url + '/data/stardew/villagers.csv'

pd_df_family = pd.read_csv(family_csv_url, index_col='id')
pd_df_gifts = pd.read_csv(gifts_csv_url, index_col='id')
pd_df_friends = pd.read_csv(friends_csv_url, index_col='id')
pd_df_villagers = pd.read_csv(villagers_csv_url, index_col='id')

pd_df_villagers['birthday'] = pd_df_villagers['birthday'].replace(np.nan, 'Unknown')
pd_df_villagers['address'] = pd_df_villagers['address'].replace(np.nan, 'Unknown')

The following code removes the `spark-warehouse` directory if it exists. This is done to prevent any issues associated with a previously created `spark-warehouse` folder. Run this code if you experience issues creating temporary tables in Spark. 

In [3]:
from pathlib import Path
import os 
import shutil

current_working_dir = Path(os.getcwd())
spark_warehouse_dir = current_working_dir.joinpath('spark-warehouse')
if spark_warehouse_dir.exists():
    shutil.rmtree(spark_warehouse_dir)

### Assignment 8.1

In the first part of the assignment, you will create Spark dataframes from the existing Pandas dataframes. Once you create the Spark dataframes, print the schema using `printSchema` and show the dataframe using `show`. 

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("DSC 400 Assignment 8") \
    .getOrCreate()

spark_context = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/12 14:45:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/02/12 14:45:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


The following is fully implemented code that converts the `pd_df_family` dataframe into a Spark dataframe, prints the schema, and shows the dataframe. 

In [5]:
df_family = spark.createDataFrame(pd_df_family)
df_family.printSchema()
df_family.show()

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


root
 |-- villager_id: string (nullable = true)
 |-- family_member_id: string (nullable = true)
 |-- relationship: string (nullable = true)



                                                                                

+-----------+----------------+------------+
|villager_id|family_member_id|relationship|
+-----------+----------------+------------+
|    Abigail|          Pierre|      Father|
|    Abigail|        Caroline|      Mother|
|       Alex|          Evelyn| Grandmother|
|       Alex|          George| Grandfather|
|   Caroline|          Pierre|     Husband|
|   Caroline|         Abigail|    Daughter|
|  Demetrius|           Robin|        Wife|
|  Demetrius|            Maru|    Daughter|
|  Demetrius|       Sebastian|    Step-son|
|      Emily|           Haley|      Sister|
|     Evelyn|          George|     Husband|
|     Evelyn|            Alex|    Grandson|
|     George|          Evelyn|        Wife|
|     George|            Alex|    Grandson|
|      Haley|           Emily|      Sister|
|        Jas|          Marnie|        Aunt|
|        Jas|           Shane|   Godfather|
|       Jodi|            Kent|     Husband|
|       Jodi|             Sam|         Son|
|       Jodi|         Vincent|  

Repeat the process shown above for the remaining dataframes. 

In [9]:
# TODO: Create a PySpark dataframe `df_gifts` from `pd_df_gifts`

# TODO: Print the schema and show the dataframe
df_gifts = spark.createDataFrame(pd_df_gifts)
df_gifts.printSchema()
df_gifts.show()

root
 |-- gift_id: string (nullable = true)
 |-- villager_id: string (nullable = true)

+------------------+-----------+
|           gift_id|villager_id|
+------------------+-----------+
|          Amethyst|    Abigail|
|    Banana Pudding|    Abigail|
|Blackberry Cobbler|    Abigail|
|    Chocolate Cake|    Abigail|
|        Pufferfish|    Abigail|
|           Pumpkin|    Abigail|
|         Spicy Eel|    Abigail|
|Complete Breakfast|       Alex|
|     Salmon Dinner|       Alex|
|         Fish Taco|   Caroline|
|         Green Tea|   Caroline|
|    Summer Spangle|   Caroline|
|    Tropical Curry|   Caroline|
|          Amethyst|      Clint|
|        Aquamarine|      Clint|
|     Artichoke Dip|      Clint|
|           Emerald|      Clint|
|Fiddlehead Risotto|      Clint|
|          Gold Bar|      Clint|
|       Iridium Bar|      Clint|
+------------------+-----------+
only showing top 20 rows



In [13]:
# TODO: Create a PySpark dataframe `df_friends` from `pd_df_friends`
# TODO: Print the schema and show the dataframe
df_friends = spark.createDataFrame(pd_df_friends)
df_friends.printSchema()
df_friends.show()

root
 |-- villager_id: string (nullable = true)
 |-- friend_id: string (nullable = true)

+-----------+---------+
|villager_id|friend_id|
+-----------+---------+
|    Abigail|      Sam|
|    Abigail|Sebastian|
|       Alex|    Haley|
|   Caroline|     Jodi|
|      Clint|    Emily|
|    Elliott|     Leah|
|    Elliott|    Willy|
|      Emily|    Sandy|
|        Gil|   Marlon|
|        Gus|      Pam|
|      Haley|     Alex|
|     Harvey|     Maru|
|        Jas|      Leo|
|        Jas|  Vincent|
|       Jodi| Caroline|
|       Leah|  Elliott|
|        Leo|      Jas|
|        Leo|    Linus|
|        Leo|  Vincent|
|      Lewis|   Marnie|
+-----------+---------+
only showing top 20 rows



We need to create an explicit schema for the villager dataframe as Spark has difficulty infering this schema from the Pandas dataframe. 

In [14]:
from pyspark.sql.types import StructType, StringType, BooleanType, StructField

villager_schema = StructType([
    StructField("name", StringType(), True),
    StructField("birthday", StringType(), True),
    StructField("address", StringType(), True),
    StructField("is_marriable", BooleanType(), True),
    StructField("img_url", StringType(), True),
])

df_villagers = spark.createDataFrame(pd_df_villagers, villager_schema)

# TODO: Print the schema and show the dataframe

### Assignment 8.2

Now that we have loaded the dataframes, we will use the dataframes to create a temporary SQL-based data warehouse. In a production environment, we could persist these tables for later use. 

Register each of the dataframes as a Spark [Global Tempory View](http://spark.apache.org/docs/latest/sql-getting-started.html#global-temporary-view) using the view names, *family*, *friends*, *gifts*, and *villagers* for the dataframes *df_family*, *df_friends*, *df_gifts*, and *df_villagers* respectively.  

In [18]:
# TODO: Create a temporary `family` view using the `df_family` dataframe
df_family.registerTempTable("family")

In [21]:
# TODO: Create a temporary `friends` view using the `df_friends` dataframe
df_friends.registerTempTable("friends")

In [22]:
# TODO: Create a temporary `gifts` view using the `df_gifts` dataframe
df_gifts.registerTempTable("gifts")

In [25]:
# TODO: Create a temporary `villagers` view using the `df_villagers` dataframe
df_villagers.registerTempTable("villagers")

Verify that the views exist by using the following SQL queries. 

In [19]:
# This should output the first 20 rows of the family table
spark.sql("SELECT * FROM family").show()

+-----------+----------------+------------+
|villager_id|family_member_id|relationship|
+-----------+----------------+------------+
|    Abigail|          Pierre|      Father|
|    Abigail|        Caroline|      Mother|
|       Alex|          Evelyn| Grandmother|
|       Alex|          George| Grandfather|
|   Caroline|          Pierre|     Husband|
|   Caroline|         Abigail|    Daughter|
|  Demetrius|           Robin|        Wife|
|  Demetrius|            Maru|    Daughter|
|  Demetrius|       Sebastian|    Step-son|
|      Emily|           Haley|      Sister|
|     Evelyn|          George|     Husband|
|     Evelyn|            Alex|    Grandson|
|     George|          Evelyn|        Wife|
|     George|            Alex|    Grandson|
|      Haley|           Emily|      Sister|
|        Jas|          Marnie|        Aunt|
|        Jas|           Shane|   Godfather|
|       Jodi|            Kent|     Husband|
|       Jodi|             Sam|         Son|
|       Jodi|         Vincent|  

In [26]:
# This should output the first 20 rows of the friends table
spark.sql("SELECT * FROM friends").show()

+-----------+---------+
|villager_id|friend_id|
+-----------+---------+
|    Abigail|      Sam|
|    Abigail|Sebastian|
|       Alex|    Haley|
|   Caroline|     Jodi|
|      Clint|    Emily|
|    Elliott|     Leah|
|    Elliott|    Willy|
|      Emily|    Sandy|
|        Gil|   Marlon|
|        Gus|      Pam|
|      Haley|     Alex|
|     Harvey|     Maru|
|        Jas|      Leo|
|        Jas|  Vincent|
|       Jodi| Caroline|
|       Leah|  Elliott|
|        Leo|      Jas|
|        Leo|    Linus|
|        Leo|  Vincent|
|      Lewis|   Marnie|
+-----------+---------+
only showing top 20 rows



In [27]:
# This should output the first 20 rows of the gifts table
spark.sql("SELECT * FROM gifts").show()

+------------------+-----------+
|           gift_id|villager_id|
+------------------+-----------+
|          Amethyst|    Abigail|
|    Banana Pudding|    Abigail|
|Blackberry Cobbler|    Abigail|
|    Chocolate Cake|    Abigail|
|        Pufferfish|    Abigail|
|           Pumpkin|    Abigail|
|         Spicy Eel|    Abigail|
|Complete Breakfast|       Alex|
|     Salmon Dinner|       Alex|
|         Fish Taco|   Caroline|
|         Green Tea|   Caroline|
|    Summer Spangle|   Caroline|
|    Tropical Curry|   Caroline|
|          Amethyst|      Clint|
|        Aquamarine|      Clint|
|     Artichoke Dip|      Clint|
|           Emerald|      Clint|
|Fiddlehead Risotto|      Clint|
|          Gold Bar|      Clint|
|       Iridium Bar|      Clint|
+------------------+-----------+
only showing top 20 rows



In [28]:
# # This should output the first 20 rows of the villagers table
spark.sql("SELECT * FROM villagers").show()

+---------+---------+--------------------+------------+--------------------+
|     name| birthday|             address|is_marriable|             img_url|
+---------+---------+--------------------+------------+--------------------+
|  Abigail|  Fall 13|Pierre's General ...|        true|https://stardewva...|
|     Alex|Summer 13|        1 River Road|        true|https://stardewva...|
|   Birdie|  Unknown|  Hut on Island West|       false|https://stardewva...|
|  Bouncer|  Unknown|               Oasis|       false|https://stardewva...|
| Caroline| Winter 7|Pierre's General ...|       false|https://stardewva...|
|    Clint|Winter 26|          Blacksmith|       false|https://stardewva...|
|Demetrius|Summer 19|    24 Mountain Road|       false|https://stardewva...|
|    Dwarf|Summer 22|        Eastern Cave|       false|https://stardewva...|
|  Elliott|   Fall 5|     Elliott's Cabin|        true|https://stardewva...|
|    Emily|Spring 27|       2 Willow Lane|        true|https://stardewva...|

### Assignment 8.3

In the final part of the assignment, you will run some basic SQL queries. [Spark's SQL reference guide](http://spark.apache.org/docs/latest/sql-ref.html) and [Spark's SQL getting started guide](http://spark.apache.org/docs/latest/sql-getting-started.html) will help complete these queries. 

#### Assignment 8.3.a

Using a `SELECT` statement and `WHERE` clause, run a query that returns all Sebastian's friends. Sebastian's `villager_id` is Sebastian. Select only the `friend_id` column in the results to be returned. 

In [37]:
# TODO: Run SQL query to return a list of Sebastian's friends. 
result = spark.sql("SELECT friend_id FROM friends WHERE villager_id = 'Sebastian'")
result.show()

+---------+
|friend_id|
+---------+
|  Abigail|
|      Sam|
+---------+



#### Assignment 8.3.b

Group the `friends` table by `villager_id` and perform a count of number of friends for each villager. 

In [44]:
# TODO: Group the `friends` table by `villager_id` and perform a count of number of friends for each villager. 
#result = spark.sql("SELECT villager_id FROM friends")
result = spark.sql("SELECT friend_id, COUNT(*) AS cnt FROM friends GROUP BY friend_id")
result.show()

[Stage 22:>                                                         (0 + 8) / 8]

+---------+---+
|friend_id|cnt|
+---------+---+
|     Jodi|  1|
|      Sam|  3|
|Sebastian|  2|
|    Haley|  1|
|    Sandy|  1|
|    Willy|  1|
|    Emily|  2|
|     Leah|  1|
|     Alex|  1|
|     Maru|  2|
|      Pam|  1|
|   Marlon|  1|
|    Linus|  2|
|  Elliott|  2|
|   Marnie|  1|
|      Leo|  3|
|      Jas|  2|
|  Vincent|  2|
| Caroline|  1|
|   Wizard|  1|
+---------+---+
only showing top 20 rows



                                                                                