# Apache Spark for complex queries

## Data Management Homework 7

In this assignment we will use 
[Apache Spark](https://spark.apache.org/): 
a popular framework for optimal distributed processing on large amount of data. 
The objective of is to use Apache Spark to translate and execute some queries of the TPCx-BB bigdata benchmark.  
[TPCx-BB](https://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) 
or simply "Big Bench" is a common benchmark suite to evaluate the system performance on big data analytics and machine learning algorithms. 
We will focus on big data analytical queries, which are expressed in SQL. 

Spark is a framework available in multiple languages: Scala, Java, Python, R. In this exercise, we will use Python.

## Setup

### Jupyter Lab

If you are not familiar with the Jupyter Lab environment, check out these resources from the official website: 
[example notebook](https://jupyter.org/try), 
[docs](https://jupyterlab.readthedocs.io/en/stable/getting_started/overview.html). 

Quick reference:
- This is a cell. A cell can contain either Markdown text (such as this one) or code. Everything in jupyter notebook is a cell.
    - Click on the plus on the top bar to add a new cell
    - You can double-click on a text cell to edit iy using Markdown
    - You can run a cell by either using the button "play" at the top bar or by using the "shift + enter" key combination
    - Running a code cell executes it
    - Running a text cell formats the text
- Once you run a cell it stays in memory! So code will be run based on which order you execute cells, even if you execute a cell that is below another one before
- General rule #1: try to arrange cell step-by-stop from top to bottom. If anything breaks, try to execute every cell from the top
- General rule #2: if you are stuck or a cell is blocked during execution re-run the kernel from the top bar menu
 
### Contents
You can navigate through this exercise contents with the file explorer on the left.  
The contents are "extracted" from the 
[TPCx-BB](https://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) 
benchmark source folder. 
Please refer to the link if you want to have a broader overview and/or additional information TPCx-BB. 
Since this exercise differs from the actual benchmark, only a subset of its content are reported here:
- `queries/` contains 30 SQL/Spark queries, some of which are to be ported to Spark in this exercise. Every query `qxx/` folder (`xx` = number) contains
    - `engineLocalSettings.conf`: TPC related, disregard
    - `engineLocalSettings.sql`: TPC related, disregard
    - `explain_qxx.sql`: *query content* in "explanatory" format
    - `qxx.sql`: *query content* in TPC exec format
    - `run.sh`: TPC related, disregard
    - `results/qxx-result`: contains the expect result in plain-text. You should compare this with your query output (example provided later)
- `spark_table_schemas`: contains schema information for every table in the dataset. Not relevant for the implementation
- `TPCx-BB-dataset`: contains all the tables in separate folder. Refer to it for table names

**Do not modify** `spark_table_schemas` or `TPCx-BB-dataset` contents as it may compromise your solution.

### Guidelines
You must use the Spark SQL module to solve this exercise. Refer to the official documentation:
> Spark SQL: https://spark.apache.org/docs/latest/sql-programming-guide.html

We will work with *DataFrames*: a Spark data type used to represent collections of data, including database Tables. 
You are strongly recommended to refer to the DataFrame API reference within the Spark SQL module during the exercise implementation. 
There you will find methods, functions and further datatypes which are equivalent to SQL operations. 
> DataFrame Reference: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html
The Spark DataFrame API resembles the one of Pandas library.
 
Reference: 
 [PySpark API documentation](https://spark.apache.org/docs/latest/api/python/reference/index.html)

#### Reading TCPx-BB queries
The SQL queries files (`explain_qxx.sql` and `qxx.sql`) are taken directly from the TPCx-BB benchmark suite 
and therefore might contain "extra" SQL statements and comments, 
which are functional to the TCPx-BB original benchmark (e.g. `hive` instructions, `EXPLAIN`, etc.). 
Your goal is to extract and translate the SQL query only, disregarding irrelevant statements/instructions for the purpose of this exercise.  
Additionally, queries might contain *template* variables, in the form `${qxx_variable_name}`. 
You can find all relative templates in the `query/queryParameters.sql` file.

## Environment preparation
**Make sure to read through and run the following code cells before starting the exercise** 

### Install PySpark
Run the cell below to install Spark for Python. If it does not work, close and restart Jupyter or execute the command below on a separate terminal.

In [1]:
!pip install pyspark



### Import PySpark

Whenever working with Spark, you need to either start a Spark Session.
The Spark Session Builder will handle under-the-hood the architecture of the framework discussed in class 
and give us an entry point to programming with Spark .
 

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import os

# when run locally, spark has one (master) node
spark = SparkSession.builder \
        .master("local") \
        .appName("Homework 07") \
        .getOrCreate()

spark

24/04/17 00:05:32 WARN Utils: Your hostname, gief-pc resolves to a loopback address: 127.0.1.1; using 192.168.1.25 instead (on interface wlp0s20f3)
24/04/17 00:05:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/17 00:05:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Define Helper Function

In [34]:
def get_table(name):
    """
    Load a table from the TPCx-BB dataset.
    The dataframe is read from the Parquet format, 
    which is a columnar storage format ideal for efficient storage and retrieval of data
    :param name: name of the table 
    :return: a Spark DataFrame of the table
    """
    df = spark.read.parquet(f"TPCx-BB-dataset/{name}.ptxt")
    f = open(f"spark_table_schemas/{name}.schema","r")
    lines = f.readlines()
    for line in lines:
        l = line.split()
        if len(l) > 2:
            df.schema[l[0]].nullable = False

    return df

### Explore the dataset
You can use `get_table` to load current dataset tables. A table in Spark is stored as a *DataFrame* - see reference in the exercise intro.

In [37]:
# load the current table 
customer = get_table("customer")
customer.show() # show the first 20 rows

+-------------+----------------+------------------+------------------+-----------------+----------------------+---------------------+------------+------------+-----------+---------------------+-----------+-------------+------------+--------------------+-------------+--------------------+------------------+
|c_customer_sk|   c_customer_id|c_current_cdemo_sk|c_current_hdemo_sk|c_current_addr_sk|c_first_shipto_date_sk|c_first_sales_date_sk|c_salutation|c_first_name|c_last_name|c_preferred_cust_flag|c_birth_day|c_birth_month|c_birth_year|     c_birth_country|      c_login|     c_email_address|c_last_review_date|
+-------------+----------------+------------------+------------------+-----------------+----------------------+---------------------+------------+------------+-----------+---------------------+-----------+-------------+------------+--------------------+-------------+--------------------+------------------+
|            0|AAAAAAAAAAAAAAAA|           1824793|              3203|      

In [52]:
# Creating a DataFrame from scratch
df = spark.createDataFrame(data=[
    (1, "Alice", 34),
    (2, "Bob", 45),
    (3, "Charlie", 56)
], schema=["id", "name", "age"])

# some useful methods to explore the table in dataframe
print(f'Type: {type(df) = }',    # PySpark Dataframe is not equal to Pandas Dataframe !
      f'First top n=2 rows: {df.head(2) = }',
      f'Column title names: {df.columns = }',
      f'Column title names with its types: {df.dtypes = }',
      f'Selecting a column: {df.select("name") = }', # it's still a DataFrame
      f'Avoid pandas notation (less features): {df["name"] = }', # Column object with lesser features than PySpark DataFrame
      f'Selecting more columns: {df.select(["name", "age"]) = }', # it's still a DataFrame
      sep='\n\n')

# print the values of the selected columns
df.select(["name", "age"]).show()

# get statistics retrieved from a dataframe, computationally expensive, similar to Pandas
df.describe().show()

Type: type(df) = <class 'pyspark.sql.dataframe.DataFrame'>

First top n=2 rows: df.head(2) = [Row(id=1, name='Alice', age=34), Row(id=2, name='Bob', age=45)]

Column title names: df.columns = ['id', 'name', 'age']

Column title names with its types: df.dtypes = [('id', 'bigint'), ('name', 'string'), ('age', 'bigint')]

Selecting a column: df.select("name") = DataFrame[name: string]

Avoid pandas notation (less features): df["name"] = Column<'name'>

Selecting more columns: df.select(["name", "age"]) = DataFrame[name: string, age: bigint]
+-------+---+
|   name|age|
+-------+---+
|  Alice| 34|
|    Bob| 45|
|Charlie| 56|
+-------+---+

+-------+---+-------+----+
|summary| id|   name| age|
+-------+---+-------+----+
|  count|  3|      3|   3|
|   mean|2.0|   NULL|45.0|
| stddev|1.0|   NULL|11.0|
|    min|  1|  Alice|  34|
|    max|  3|Charlie|  56|
+-------+---+-------+----+



In [25]:
df = spark.read.option('header', 'true').parquet('TPCx-BB-dataset/customer.ptxt')
df

DataFrame[c_customer_sk: bigint, c_customer_id: string, c_current_cdemo_sk: bigint, c_current_hdemo_sk: bigint, c_current_addr_sk: bigint, c_first_shipto_date_sk: bigint, c_first_sales_date_sk: bigint, c_salutation: string, c_first_name: string, c_last_name: string, c_preferred_cust_flag: string, c_birth_day: bigint, c_birth_month: bigint, c_birth_year: bigint, c_birth_country: string, c_login: string, c_email_address: string, c_last_review_date: string]

## Column operations on DataFrame

The columns operations generate a new DataFrame that need to be stored in a variable to be saved.
It does not change the original DataFrame, but creates a new modified one from it.

In [53]:
# add a column
df_added_column = df.withColumn(colName="age in 2 years", col=col("age") + 2)
df_added_column.show()

+---+-------+---+--------------+
| id|   name|age|age in 2 years|
+---+-------+---+--------------+
|  1|  Alice| 34|            36|
|  2|    Bob| 45|            47|
|  3|Charlie| 56|            58|
+---+-------+---+--------------+



In [54]:
# remove a column
df_removed_column = df_added_column.drop("id")
df_removed_column.show()

+-------+---+--------------+
|   name|age|age in 2 years|
+-------+---+--------------+
|  Alice| 34|            36|
|    Bob| 45|            47|
|Charlie| 56|            58|
+-------+---+--------------+



In [57]:
# rename a column
df_renamed_column = df_removed_column.withColumnRenamed(existing="name", new="name_column_renamed")
df_renamed_column.show()

+-------------------+---+--------------+
|name_column_renamed|age|age in 2 years|
+-------------------+---+--------------+
|              Alice| 34|            36|
|                Bob| 45|            47|
|            Charlie| 56|            58|
+-------------------+---+--------------+



In [20]:
# show the 1st row of the customer table
customer.show(1) 

+-------------+----------------+------------------+------------------+-----------------+----------------------+---------------------+------------+------------+-----------+---------------------+-----------+-------------+------------+--------------------+------------+--------------------+------------------+
|c_customer_sk|   c_customer_id|c_current_cdemo_sk|c_current_hdemo_sk|c_current_addr_sk|c_first_shipto_date_sk|c_first_sales_date_sk|c_salutation|c_first_name|c_last_name|c_preferred_cust_flag|c_birth_day|c_birth_month|c_birth_year|     c_birth_country|     c_login|     c_email_address|c_last_review_date|
+-------------+----------------+------------------+------------------+-----------------+----------------------+---------------------+------------+------------+-----------+---------------------+-----------+-------------+------------+--------------------+------------+--------------------+------------------+
|            0|AAAAAAAAAAAAAAAA|           1824793|              3203|         

In [55]:
# display the table schema, which is Spark is a set of [column, type, nullable]
customer.schema 

StructType([StructField('c_customer_sk', LongType(), False), StructField('c_customer_id', StringType(), False), StructField('c_current_cdemo_sk', LongType(), True), StructField('c_current_hdemo_sk', LongType(), True), StructField('c_current_addr_sk', LongType(), True), StructField('c_first_shipto_date_sk', LongType(), True), StructField('c_first_sales_date_sk', LongType(), True), StructField('c_salutation', StringType(), True), StructField('c_first_name', StringType(), True), StructField('c_last_name', StringType(), True), StructField('c_preferred_cust_flag', StringType(), True), StructField('c_birth_day', LongType(), True), StructField('c_birth_month', LongType(), True), StructField('c_birth_year', LongType(), True), StructField('c_birth_country', StringType(), True), StructField('c_login', StringType(), True), StructField('c_email_address', StringType(), True), StructField('c_last_review_date', StringType(), True)])

In [56]:
# or for a nice pretty print tree view of it
customer.printSchema()

root
 |-- c_customer_sk: long (nullable = true)
 |-- c_customer_id: string (nullable = true)
 |-- c_current_cdemo_sk: long (nullable = true)
 |-- c_current_hdemo_sk: long (nullable = true)
 |-- c_current_addr_sk: long (nullable = true)
 |-- c_first_shipto_date_sk: long (nullable = true)
 |-- c_first_sales_date_sk: long (nullable = true)
 |-- c_salutation: string (nullable = true)
 |-- c_first_name: string (nullable = true)
 |-- c_last_name: string (nullable = true)
 |-- c_preferred_cust_flag: string (nullable = true)
 |-- c_birth_day: long (nullable = true)
 |-- c_birth_month: long (nullable = true)
 |-- c_birth_year: long (nullable = true)
 |-- c_birth_country: string (nullable = true)
 |-- c_login: string (nullable = true)
 |-- c_email_address: string (nullable = true)
 |-- c_last_review_date: string (nullable = true)



## Sample query translation
Refer to `queries/q00/explain_q00.sql`. The code below is a valid translation of that query using SparkSQL. You can use any methods in the Spark SQL DataFrame class to implement your solution.

### Query 0
Find the amount of items sold by their category.  
Only in certain categories sold in specific stores are considered,


In [6]:
## query0

## look in TCPx-BB-dataset to check all the available tables.
s = get_table("store_sales")
i = get_table("item")

query0_solution = s.join(i, s.ss_item_sk == i.i_item_sk) \
                .filter(i.i_category_id < 3) \
                .filter(s.ss_store_sk.isin([10, 20, 33, 40, 50])) \
                .groupBy("i_category") \
                .count()

query0_solution.show()      

+--------------+-----+
|    i_category|count|
+--------------+-----+
|Home & Kitchen| 1975|
|         Music|25060|
+--------------+-----+


The cell below is a shortcut to display the results file of q00 without navigating to the file.  
The `!` symbol followed by a bash command (`cat` in this case) can be used as in-cell access to the terminal 

In [9]:
## check the result
!cat queries/q00/results/q00-result

Home & Kitchen, 1975
Music, 25060

## [YOUR SOLUTION BELOW]
Write the query description in a Markdown cell, followed by a code cell with the query implementation.  
Query descriptions can be found in the TCPx-BB specification, page 93: https://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp

You should implement all the queries assigned in the homework sheet, plus the optional ones if you wish.

### Query X
description

In [112]:
# implementation
print('hello')

hello


In [10]:
# check the result