# Apache Spark for complex queries

## Data Management Homework 7

In this assignment we will use 
[Apache Spark](https://spark.apache.org/): 
a popular framework for optimal distributed processing on large amount of data. 
The objective of is to use Apache Spark to translate and execute some queries of the TPCx-BB bigdata benchmark.  
[TPCx-BB](https://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) 
or simply "Big Bench" is a common benchmark suite to evaluate the system performance on big data analytics and machine learning algorithms. 
We will focus on big data analytical queries, which are expressed in SQL. 

Spark is a framework available in multiple languages: Scala, Java, Python, R. In this exercise, we will use Python.

## Setup

### Jupyter Lab

If you are not familiar with the Jupyter Lab environment, check out these resources from the official website: 
[example notebook](https://jupyter.org/try), 
[docs](https://jupyterlab.readthedocs.io/en/stable/getting_started/overview.html). 

Quick reference:
- This is a cell. A cell can contain either Markdown text (such as this one) or code. Everything in jupyter notebook is a cell.
    - Click on the plus on the top bar to add a new cell
    - You can double-click on a text cell to edit iy using Markdown
    - You can run a cell by either using the button "play" at the top bar or by using the "shift + enter" key combination
    - Running a code cell executes it
    - Running a text cell formats the text
- Once you run a cell it stays in memory! So code will be run based on which order you execute cells, even if you execute a cell that is below another one before
- General rule #1: try to arrange cell step-by-stop from top to bottom. If anything breaks, try to execute every cell from the top
- General rule #2: if you are stuck or a cell is blocked during execution re-run the kernel from the top bar menu
 
### Contents
You can navigate through this exercise contents with the file explorer on the left.  
The contents are "extracted" from the 
[TPCx-BB](https://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) 
benchmark source folder. 
Please refer to the link if you want to have a broader overview and/or additional information TPCx-BB. 
Since this exercise differs from the actual benchmark, only a subset of its content are reported here:
- `queries/` contains 30 SQL/Spark queries, some of which are to be ported to Spark in this exercise. Every query `qxx/` folder (`xx` = number) contains
    - `engineLocalSettings.conf`: TPC related, disregard
    - `engineLocalSettings.sql`: TPC related, disregard
    - `explain_qxx.sql`: *query content* in "explanatory" format
    - `qxx.sql`: *query content* in TPC exec format
    - `run.sh`: TPC related, disregard
    - `results/qxx-result`: contains the expect result in plain-text. You should compare this with your query output (example provided later)
- `spark_table_schemas`: contains schema information for every table in the dataset. Not relevant for the implementation
- `TPCx-BB-dataset`: contains all the tables in separate folder. Refer to it for table names

**Do not modify** `spark_table_schemas` or `TPCx-BB-dataset` contents as it may compromise your solution.

### Guidelines
You must use the Spark SQL module to solve this exercise. Refer to the official documentation:
> Spark SQL: https://spark.apache.org/docs/latest/sql-programming-guide.html

We will work with *DataFrames*: a Spark data type used to represent collections of data, including database Tables. 
You are strongly recommended to refer to the DataFrame API reference within the Spark SQL module during the exercise implementation. 
There you will find methods, functions and further datatypes which are equivalent to SQL operations. 
> DataFrame Reference: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html
The Spark DataFrame API resembles the one of Pandas library.
 
Reference: 
 [PySpark API documentation](https://spark.apache.org/docs/latest/api/python/reference/index.html)

#### Reading TCPx-BB queries
The SQL queries files (`explain_qxx.sql` and `qxx.sql`) are taken directly from the TPCx-BB benchmark suite 
and therefore might contain "extra" SQL statements and comments, 
which are functional to the TCPx-BB original benchmark (e.g. `hive` instructions, `EXPLAIN`, etc.). 
Your goal is to extract and translate the SQL query only, disregarding irrelevant statements/instructions for the purpose of this exercise.  
Additionally, queries might contain *template* variables, in the form `${qxx_variable_name}`. 
You can find all relative templates in the `query/queryParameters.sql` file.

## Environment preparation
**Make sure to read through and run the following code cells before starting the exercise** 

### Install PySpark

Setup of python environment done through `pyproject.toml` specifications and 
[_poetry_](https://python-poetry.org/docs/)
on project root directory.
Follow `README.md` instructions to install the dependencies in a freshly created virtual environment.

### Import PySpark

Whenever working with Spark, you need to either start a Spark Session or join one.
The Spark Session Builder will handle under-the-hood the architecture of the framework discussed in class 
and give us an entry point to programming with Spark.
It will create an application UI panel at `localhost:4040` by default.
Go check it to see info regarding driver, executors and jobs for the current configuration.
 

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# when run locally, spark has one (master) node with its own jvm and no cluster manager is created
spark = SparkSession.builder.master("local").appName("Homework 07").getOrCreate()

spark

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/28 22:22:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Spark API

The building block of the Spark API is its Resilient Distributed Dataset (RDD) API.
In the RDD API, there are two types of operations: transformations, which define a new dataset based on previous ones,
and actions, which kick off a job to execute on a cluster.
On top of Spark's RDD API, high level APIs are provided e.g. Dataframe API and Machine Learning API.
We will focus on the former. 


# PySpark Dataframes

Dataframes are a data structure for data manipulation.
A [PySpark Dataframe](https://spark.apache.org/docs/latest/api/python/reference/index.html) 
is represented as a 2-dimensional labeled data structure with columns of potentially different types.
Similar to what a spreadsheet or SQL table looks like.
Most functionality and API of 
[Pandas](https://pandas.pydata.org/docs/)
data analysis library is proposed in PySpark with support for distributed collections of data.

To start, let's see how to create a dataframe, visualize the data in it and retrieve its basic properties.

In [2]:
# Creating a DataFrame from scratch
df = spark.createDataFrame(
    data=[(1, "Alice", 33), (2, "Bob", 45), (3, "Charlie", 50)],
    schema=["id", "name", "age"],
)

# visualize dataframe representation
df.show()  # by default shows first 20 rows

[Stage 0:>                                                          (0 + 1) / 1]

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 33|
|  2|    Bob| 45|
|  3|Charlie| 50|
+---+-------+---+



                                                                                

In [3]:
print(
    f"Type: {type(df)}",  # PySpark Dataframe is not equal to Pandas Dataframe !
    f"First top n=2 rows: {df.head(2)}",  # rows returned in a list
    f"Column title names: {df.columns}",
    f"Column title names with its types: {df.dtypes}",
    f'Selecting a column: {df.select("name")}',  # it's still a DataFrame
    f'Avoid pandas notation (less features): {df["name"]}',  # Column object with lesser features than PySpark DataFrame
    f'Selecting more than one columns: {df.select(["name", "age"])}',  # it's still a DataFrame
    sep="\n\n",
)

Type: <class 'pyspark.sql.classic.dataframe.DataFrame'>

First top n=2 rows: [Row(id=1, name='Alice', age=33), Row(id=2, name='Bob', age=45)]

Column title names: ['id', 'name', 'age']

Column title names with its types: [('id', 'bigint'), ('name', 'string'), ('age', 'bigint')]

Selecting a column: DataFrame[name: string]

Avoid pandas notation (less features): Column<'name'>

Selecting more than one columns: DataFrame[name: string, age: bigint]


In [4]:
# print the values of the selected columns
df.select(["name", "age"]).show()

+-------+---+
|   name|age|
+-------+---+
|  Alice| 33|
|    Bob| 45|
|Charlie| 50|
+-------+---+



In [5]:
# get statistics retrieved from a dataframe, computationally expensive, similar to Pandas
df.describe().show()

+-------+---+-------+------------------+
|summary| id|   name|               age|
+-------+---+-------+------------------+
|  count|  3|      3|                 3|
|   mean|2.0|   NULL|42.666666666666664|
| stddev|1.0|   NULL| 8.736894948054106|
|    min|  1|  Alice|                33|
|    max|  3|Charlie|                50|
+-------+---+-------+------------------+



                                                                                

## Column operations on DataFrame

Column operations generate a new DataFrame that need to be stored in a variable to be saved.
It does not change the original DataFrame, but creates a new modified one from it.

The following operations on column are presented:
- creation
- renaming
- deletion

The method `col('...')` is the one responsible 
to return a column given its name.


In [6]:
# add a column where the 'age' is increase by +2
df_col_added = df.withColumn(colName="age in 2 years", col=col("age") + 2)
df_col_added.show()

+---+-------+---+--------------+
| id|   name|age|age in 2 years|
+---+-------+---+--------------+
|  1|  Alice| 33|            35|
|  2|    Bob| 45|            47|
|  3|Charlie| 50|            52|
+---+-------+---+--------------+



In [7]:
# remove the 'id' column
df_col_removed = df_col_added.drop("id")
df_col_removed.show()

+-------+---+--------------+
|   name|age|age in 2 years|
+-------+---+--------------+
|  Alice| 33|            35|
|    Bob| 45|            47|
|Charlie| 50|            52|
+-------+---+--------------+



In [8]:
# rename the 'name' column
df_col_renamed = df_col_removed.withColumnRenamed(
    existing="name", new="name_column_renamed"
)

df_col_renamed.show()

+-------------------+---+--------------+
|name_column_renamed|age|age in 2 years|
+-------------------+---+--------------+
|              Alice| 33|            35|
|                Bob| 45|            47|
|            Charlie| 50|            52|
+-------------------+---+--------------+



## Row operations on DataFrame

The result from this operations need to be saved in a new variable since it does not change the original dataframe. 
Same logic as column operations.

The following operations on rows are presented:
- creation
- update
- deletion

The examples provided are scoped within the context of handling null missing values in dataframe records.

In [9]:
# update a row setting a null value
df_null = df.withColumn(
    colName="age", col=when(col("age") >= 50, None).otherwise(col("age"))
)

df_null = df_null.withColumn(
    colName="id", col=when(col("Name") == "Bob", None).otherwise(col("id"))
)

df_null.show()

+----+-------+----+
|  id|   name| age|
+----+-------+----+
|   1|  Alice|  33|
|NULL|    Bob|  45|
|   3|Charlie|NULL|
+----+-------+----+



Missing values are generally referred as `NA`, which is a sentinel value.
Further explanation in 
[pandas](https://pandas.pydata.org/docs/user_guide/missing_data.html#missing-data),
[pyspark](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.na.html?highlight=na#pyspark.sql.DataFrame.na)
specific docs.

In [10]:
# remove rows if containing any null value in field attributes
df_without_null_rows = df_null.dropna()
# same as df_null.na.drop(), df_null.dropna(how='any'), df_null.na.drop(how='any')
df_without_null_rows.show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 33|
+---+-----+---+



In [11]:
# remove rows only if null missing values is present in the specified column
df_without_id_null = df_null.dropna(subset=["id"])
df_without_id_null.show()

+---+-------+----+
| id|   name| age|
+---+-------+----+
|  1|  Alice|  33|
|  3|Charlie|NULL|
+---+-------+----+



In [12]:
# replace null missing values with the one provided, must correspond to column type
df_filled_nulls = df_without_id_null.fillna(-1)
df_filled_nulls.show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 33|
|  3|Charlie| -1|
+---+-------+---+



## Filter operations

Fundamental to implement conditions on data.
We pick again our old non-modified dataframe.
The 3 fundamental boolean operators for filters are:
- & and
- | or
- ~ not

Remember to put brackets for more than one condition inside filter
e.g. `.filter((...) & (...) | (...))`

In [13]:
df_people_forty = df.filter(condition=(40 <= df.age) & (df.age < 50))
df_people_forty.show()

+---+----+---+
| id|name|age|
+---+----+---+
|  2| Bob| 45|
+---+----+---+



In [14]:
# remember the select operation presented at the start to chose only relevant columns
df_people_forty_anonymized = df_people_forty.select(["id", "age"])
df_people_forty_anonymized.show()

+---+---+
| id|age|
+---+---+
|  2| 45|
+---+---+



## Join

Join columns of another Dataframe.

In [15]:
# new faculties dataframe
df_faculties = spark.createDataFrame(
    data=[(1, "INF"), (2, "ECO")],
    schema=["id", "faculty"],
)

df_faculties.show()

+---+-------+
| id|faculty|
+---+-------+
|  1|    INF|
|  2|    ECO|
+---+-------+



In [16]:
# add a column to the original df to perform join
df_faculty_assigned = df.withColumn(
    colName="faculty", col=when(df.name == "Bob", 2).otherwise("1")
)

df_faculty_assigned.show()

+---+-------+---+-------+
| id|   name|age|faculty|
+---+-------+---+-------+
|  1|  Alice| 33|      1|
|  2|    Bob| 45|      2|
|  3|Charlie| 50|      1|
+---+-------+---+-------+



In [17]:
# JOIN operation
df_joined = df_faculty_assigned.join(
    other=df_faculties, on=df_faculty_assigned.faculty == df_faculties.id
)

df_joined.show()

+---+-------+---+-------+---+-------+
| id|   name|age|faculty| id|faculty|
+---+-------+---+-------+---+-------+
|  1|  Alice| 33|      1|  1|    INF|
|  3|Charlie| 50|      1|  1|    INF|
|  2|    Bob| 45|      2|  2|    ECO|
+---+-------+---+-------+---+-------+



## Aggregates

GroupBy functionality is implemented as the method `.groupBy()` in spark dataframe API.
It then allows you to use aggregate functionality `.agg()` on the resulting object 
that can perform any aggregate operation you have already seen in SQL.
It involves a combination of splitting the object, applying a function on the data 
and recombine the result.
Example of aggregate operations available: `.count()`, `.sum()`, `.mean()`, `.max()`, `.min()`, ...
Do not use the alternative with lowercase letters `.groupby()` as it is for pandas compatibility.

In [18]:
# get the average age of the faculty
df_faculty_avg_age = df_faculty_assigned.groupBy(df_faculty_assigned.faculty).agg(
    avg(df_faculty_assigned.age)
)

df_faculty_avg_age.show()

+-------+--------+
|faculty|avg(age)|
+-------+--------+
|      1|    41.5|
|      2|    45.0|
+-------+--------+



### Alias

Correspondent of 'AS' sql keyword. 
Allow data structure to be referenced using an alternative name.
Can be applied to dataframes, columns.
If in the same expression you are renaming a column 
and want to use it in another function, 
reference to such column with `col('aliasedName')`.

In [19]:
df_alias = df.alias("df_alias")
df_alias.show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 33|
|  2|    Bob| 45|
|  3|Charlie| 50|
+---+-------+---+



In [20]:
# from previous example of aggregates
df_faculty_avg_age = df_faculty_assigned.groupBy(df_faculty_assigned.faculty).agg(
    avg(df_faculty_assigned.age).alias("Average Age")
)

df_faculty_avg_age.show()

+-------+-----------+
|faculty|Average Age|
+-------+-----------+
|      1|       41.5|
|      2|       45.0|
+-------+-----------+



## OrderBy

Returns a new sorted dataframe by the specified column(s).


In [21]:
df_faculty_avg_age.orderBy(desc(col="Average Age")).show()

+-------+-----------+
|faculty|Average Age|
+-------+-----------+
|      2|       45.0|
|      1|       41.5|
+-------+-----------+



## Others

Important functions from SQL have their own correspondent, 
you should be able to complete the assignment with the ones listed.
Since there are many solutions to reach the same goal 
for the query translation exercise, 
just check out the documentation for further references:
[PySpark Dataframe API](https://spark.apache.org/docs/latest/api/python/reference/index.html) 

## Define Helper Function

In [22]:
# load table from TPCxx-BB dataset. Returning a dataframe read from parquet format
get_table = lambda table: spark.read.option("header", "true").parquet(
    f"TPCx-BB-dataset/{table}.ptxt"
)

## Explore the dataset

You can use `get_table` to load current dataset tables. A table in Spark is stored as a *DataFrame* - see reference in the exercise intro.

In [23]:
# load the current table
customer = get_table("customer")
customer.show()

+-------------+----------------+------------------+------------------+-----------------+----------------------+---------------------+------------+------------+-----------+---------------------+-----------+-------------+------------+--------------------+-------------+--------------------+------------------+
|c_customer_sk|   c_customer_id|c_current_cdemo_sk|c_current_hdemo_sk|c_current_addr_sk|c_first_shipto_date_sk|c_first_sales_date_sk|c_salutation|c_first_name|c_last_name|c_preferred_cust_flag|c_birth_day|c_birth_month|c_birth_year|     c_birth_country|      c_login|     c_email_address|c_last_review_date|
+-------------+----------------+------------------+------------------+-----------------+----------------------+---------------------+------------+------------+-----------+---------------------+-----------+-------------+------------+--------------------+-------------+--------------------+------------------+
|            0|AAAAAAAAAAAAAAAA|           1824793|              3203|      

In [24]:
# show the 1st row of the customer table
customer.show(n=1)

+-------------+----------------+------------------+------------------+-----------------+----------------------+---------------------+------------+------------+-----------+---------------------+-----------+-------------+------------+--------------------+------------+--------------------+------------------+
|c_customer_sk|   c_customer_id|c_current_cdemo_sk|c_current_hdemo_sk|c_current_addr_sk|c_first_shipto_date_sk|c_first_sales_date_sk|c_salutation|c_first_name|c_last_name|c_preferred_cust_flag|c_birth_day|c_birth_month|c_birth_year|     c_birth_country|     c_login|     c_email_address|c_last_review_date|
+-------------+----------------+------------------+------------------+-----------------+----------------------+---------------------+------------+------------+-----------+---------------------+-----------+-------------+------------+--------------------+------------+--------------------+------------------+
|            0|AAAAAAAAAAAAAAAA|           1824793|              3203|         

In [25]:
# display the table schema, which in Spark is a set of [column, type, nullable]
customer.schema

StructType([StructField('c_customer_sk', LongType(), True), StructField('c_customer_id', StringType(), True), StructField('c_current_cdemo_sk', LongType(), True), StructField('c_current_hdemo_sk', LongType(), True), StructField('c_current_addr_sk', LongType(), True), StructField('c_first_shipto_date_sk', LongType(), True), StructField('c_first_sales_date_sk', LongType(), True), StructField('c_salutation', StringType(), True), StructField('c_first_name', StringType(), True), StructField('c_last_name', StringType(), True), StructField('c_preferred_cust_flag', StringType(), True), StructField('c_birth_day', LongType(), True), StructField('c_birth_month', LongType(), True), StructField('c_birth_year', LongType(), True), StructField('c_birth_country', StringType(), True), StructField('c_login', StringType(), True), StructField('c_email_address', StringType(), True), StructField('c_last_review_date', StringType(), True)])

In [26]:
# or for a nice pretty print tree view of it
customer.printSchema()

root
 |-- c_customer_sk: long (nullable = true)
 |-- c_customer_id: string (nullable = true)
 |-- c_current_cdemo_sk: long (nullable = true)
 |-- c_current_hdemo_sk: long (nullable = true)
 |-- c_current_addr_sk: long (nullable = true)
 |-- c_first_shipto_date_sk: long (nullable = true)
 |-- c_first_sales_date_sk: long (nullable = true)
 |-- c_salutation: string (nullable = true)
 |-- c_first_name: string (nullable = true)
 |-- c_last_name: string (nullable = true)
 |-- c_preferred_cust_flag: string (nullable = true)
 |-- c_birth_day: long (nullable = true)
 |-- c_birth_month: long (nullable = true)
 |-- c_birth_year: long (nullable = true)
 |-- c_birth_country: string (nullable = true)
 |-- c_login: string (nullable = true)
 |-- c_email_address: string (nullable = true)
 |-- c_last_review_date: string (nullable = true)



## Sample query translation
Refer to `queries/q00/explain_q00.sql`. The code below is a valid translation of that query using SparkSQL. You can use any methods in the Spark SQL DataFrame class to implement your solution.

### Query 0
Find the amount of items sold by their category.  
Only in certain categories sold in specific stores are considered,


In [27]:
# look into TCPx-BB-dataset/ directory to check all the available tables.
# gather tables needed

s = get_table("store_sales")
i = get_table("item")

q01_i_category_id_IN = [1, 2, 3]
q01_ss_store_sk_IN = [10, 20, 33, 40, 50]

query0_solution = (
    s.join(other=i, on=s.ss_item_sk == i.i_item_sk)
    .filter(condition=i.i_category_id.isin(q01_i_category_id_IN))
    .filter(condition=s.ss_store_sk.isin(q01_ss_store_sk_IN))
    .groupBy(i.i_category)
    .count()
    .select("i_category", "count")
)

query0_solution.show()

+--------------+-----+
|    i_category|count|
+--------------+-----+
|Home & Kitchen| 1975|
|         Books|14455|
|         Music|25060|
+--------------+-----+



The cell below is a shortcut to display the results file of q00 without navigating to the file.  
The `!` symbol followed by a bash command (`cat` in this case) can be used as in-cell access to the terminal 

In [28]:
## check the result
!cat queries/q00/results/q00-result

Home & Kitchen, 1975
Books, 14455
Music, 25060

# [YOUR SOLUTION BELOW]
Write the query description in a Markdown cell, followed by a code cell with the query implementation.  
Query descriptions can be found in the TCPx-BB specification, page 93: https://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp
and in the _/queries_ folder of this exercise.

You should implement all the queries assigned in the homework sheet.

## 1) Query 07


In [29]:
year=2004
month=7
max=10

j=get_table("item").withColumn(colName="more", col=col("i_current_price")*1.2 )

avg_Cat_Prices = j.groupBy(j.i_category).agg(avg(j.more).alias("Average+20%"))

k=get_table("item")
overpriced =k.join(other=avg_Cat_Prices, on=(k["i_category"] == avg_Cat_Prices["i_category"]) &(k["i_current_price"] > avg_Cat_Prices["Average+20%"]))
    

d=get_table("date_dim").select("d_date_sk", "d_year", "d_moy")
date_range= d.filter(condition = (col("d_year") == year) & (col("d_moy") == month)) .select("d_date_sk")

a= get_table("customer_address")
c= get_table("customer")
c_a = a.join(other = c, on = a.ca_address_sk == c.c_current_addr_sk)#customer +address

s= get_table("store_sales")  
c_a_s=c_a.join(other = s, on = c.c_customer_sk == s.ss_customer_sk) #c_a+store_sales

c_a_s_o=c_a_s.join(other = overpriced, on=s.ss_item_sk==overpriced.i_item_sk)#c_a_s+overpriced
casod=c_a_s_o.join(other = dateRange, on=s.ss_sold_date_sk==dateRange.d_date_sk)

query7_solution= casod.groupBy("ca_state").count().alias("count").filter(
    condition=col("count") >= max).orderBy(desc("count"), "ca_state").limit(max).select("ca_state", "count")

query7_solution.show()

NameError: name 'dateRange' is not defined

In [None]:
# check the result
!cat queries/q07/results/q07-result

## 2.a)

In [None]:
# implementation

## 2.b)

In [None]:
# implementation

## 2.c)

In [None]:
# implementation

## 2.d)

In [None]:
# implementation

## 3) Query 09


In [None]:
# implementation


In [None]:
# check the result
!cat queries/q09/results/q09-result

## 4) Query 20


In [30]:
# implementation
ss1 = get_table("store_sales")
dd = get_table("date_dim")
ca1 = get_table("customer_address")
s = get_table("store")
cd = get_table("customer_demographics")

q09_year=2001 

q09_part1_ca_country="United States"
q09_part1_ca_state_IN=['KY', 'GA', 'NM']
q09_part1_net_profit_min=0
q09_part1_net_profit_max=2000
q09_part1_education_status="4 yr Degree"
q09_part1_marital_status="M"
q09_part1_sales_price_min=100
q09_part1_sales_price_max=150

q09_part2_ca_country="United States"
q09_part2_ca_state_IN=['MT', 'OR', 'IN']
q09_part2_net_profit_min=150
q09_part2_net_profit_max=3000
q09_part2_education_status="4 yr Degree"
q09_part2_marital_status="M"
q09_part2_sales_price_min=50
q09_part2_sales_price_max=200

q09_part3_ca_country="United States"
q09_part3_ca_state_IN=['WI', 'MO', 'WV']
q09_part3_net_profit_min=50
q09_part3_net_profit_max=25000
q09_part3_education_status="4 yr Degree"
q09_part3_marital_status="M"
q09_part3_sales_price_min=150
q09_part3_sales_price_max=200

solution = (
    ss1.join(
        other = dd.filter('d_year' == q09_year), on = (ss1.ss_sold_date_sk == dd.d_date_sk))
        .join(
            other = ca1, on = (ss1.ss_addr_sk == ca1.ca_address_sk))
        .join(
            other = s, on = (s.s_store_sk == ss1.ss_store_sk))
        .join(other = cd, on = (cd.cd_demo_sk == ss1.ss_cdemo_sk))

        .filter(
            (
                (
                (cd.cd_marital_status == q09_part1_marital_status)
                & (cd.cd_education_status == q09_part1_education_status)
                & (q09_part1_sales_price_min <= ss1.ss_sales_price)
                & (ss1.ss_sales_price <= q09_part1_sales_price_max)
                )
                |
                (
                cd.cd_marital_status == q09_part2_marital_status
                & cd.cd_education_status == q09_part2_education_status
                & q09_part2_sales_price_min <= ss1.ss_sales_price
                & ss1.ss_sales_price <= q09_part2_sales_price_max
                )
                |
                (
                cd.cd_marital_status == q09_part3_marital_status
                & cd.cd_education_status == q09_part3_education_status
                & q09_part3_sales_price_min <= ss1.ss_sales_price
                & ss1.ss_sales_price <= q09_part3_sales_price_max
                )
            )
            &
            (
               (
                ca1.ca_country == q09_part1_ca_country
                & ca1.ca_state.isin (q09_part1_ca_state_IN)
                & q09_part1_net_profit_min <= ss1.ss_net_profit
                & ss1.ss_net_profit <= q09_part1_net_profit_max
                )
                |
                (
                ca1.ca_country == q09_part2_ca_country
                & ca1.ca_state.isin (q09_part2_ca_state_IN)
                & q09_part2_net_profit_min <= ss1.ss_net_profit
                & ss1.ss_net_profit <= q09_part2_net_profit_max
                )
                |
                (
                ca1.ca_country == q09_part3_ca_country
                & ca1.ca_state.isin (q09_part3_ca_state_IN)
                & q09_part3_net_profit_min <= ss1.ss_net_profit
                & ss1.ss_net_profit <= q09_part3_net_profit_max
                ) 
            )
            )
            .select(sum("ss_quantity"))
            )

PySparkTypeError: [NOT_COLUMN_OR_STR] Argument `condition` should be a Column or str, got bool.

In [None]:
# check the result
!cat queries/q20/results/q20-result-queryonly

In [None]:
# Resource saver: gracefully stop the spark session :)
# spark.stop()

# hope to see you using clusters with Spark in Databricks platform
# on the cloud as AWS, Azure, GCP
# for your Data Engineering projects :D

## 5) Query 26??


In [None]:
#implementation
category="Books"
count=5
ss=get_table("store_sales")
i = get_table("item")

join1= ss.join(other=i, on= ss.ss_item_sk == i.i_item_sk).filter(condition= i.i_category==category).dropna(subset=["ss_customer_sk"])
join2=join1.groupBy(ss.ss_customer_sk).count().alias("count").filter(condition=col("count") >= count)
# join2.orderBy(asc(col="ss_customer_sk")).select("ss_customer_sk").show()

In [None]:
# check the result
# !cat queries/q26/results/q26-result

## 6) Query11


In [35]:
pr_item= get_table("product_reviews").dropna(subset="pr_item_sk")
# pr_item_sk.show()
r_c= pr_item.groupBy("pr_item_sk").count().alias("count").select("pr_item_sk", "count")
p=pr_item.groupBy("pr_item_sk").agg(avg(pr_item.pr_review_rating).alias("avg_rating")).join(other=r_c, on=pr_item.pr_item_sk==r_c.pr_item_sk) 
p.show()

startDate='2003-01-02'
endDate='2003-02-02'

ws= get_table("web_sales")

# ws_sum= ws.groupBy("ws_item_sk").agg(sum(ws.ws_net_paid))

d= get_table("date_dim")
get_table("web_returns").printSchema()
dd=d.filter(condition= (d.d_date >= startDate)&(d.d_date <= endDate))
# sales_in_range=ws.join(other=date, on= ( ws.ws_sold_date_sk == dd.d_date_sk ))

# .dropna(subset="ws_item_sk")

                                                                                

+----------+------------------+----------+-----+
|pr_item_sk|        avg_rating|pr_item_sk|count|
+----------+------------------+----------+-----+
|     16896|               4.0|     16896|    3|
|     15371|               5.0|     15371|    2|
|     11945|               5.0|     11945|    2|
|     10422|               4.2|     10422|    5|
|      1806| 4.333333333333333|      1806|    6|
|     14719|               4.5|     14719|    2|
|     16597|               5.0|     16597|    1|
|      9458|               4.0|      9458|    3|
|     17499|               4.6|     17499|    5|
|       474|               5.0|       474|    4|
|     14846| 4.166666666666667|     14846|    6|
|      3506|               4.0|      3506|    3|
|     15437|              3.25|     15437|    4|
|       964|3.6666666666666665|       964|    3|
|     14117|               5.0|     14117|    2|
|     13518|               4.5|     13518|    6|
|     13098|               4.5|     13098|    2|
|      2250|        

SELECT corr(reviews_count,avg_rating)
FROM (
  SELECT
    p.pr_item_sk AS pid,
    p.r_count    AS reviews_count,
    p.avg_rating AS avg_rating,
    s.revenue    AS m_revenue
  FROM (
    SELECT
      pr_item_sk,
      count(*) AS r_count,
      avg(pr_review_rating) AS avg_rating
    FROM product_reviews
    WHERE pr_item_sk IS NOT NULL
    --this is GROUP BY 1 in original::same as pr_item_sk here::hive complains anyhow
    GROUP BY pr_item_sk
  ) p
  INNER JOIN (
    SELECT
      ws_item_sk,
      SUM(ws_net_paid) AS revenue
    FROM web_sales ws
    -- Select date range of interest
    LEFT SEMI JOIN (
      SELECT d_date_sk
      FROM date_dim d
      WHERE d.d_date >= '${q11_startDate}'
      AND   d.d_date <= '${q11_endDate}'
    ) dd ON ( ws.ws_sold_date_sk = dd.d_date_sk )
    WHERE ws_item_sk IS NOT null
    --this is GROUP BY 1 in original::same as ws_item_sk here::hive complains anyhow
    GROUP BY ws_item_sk
  ) s
  ON p.pr_item_sk = s.ws_item_sk
) 

In [None]:
# check the result
# !cat queries/q26/results/q26-result

## Chevney 

## 7) Query 06


In [33]:
q06_LIMIT=100
q06_YEAR=2001


store = get_table('TEMP_TABLE1')
web = get_table('TEMP_TABLE2')
c = get_table('customer')

join = (
    web.join(
    other = store, on = (store.customer_sk == web.customer_sk))
    .join(other = c, on = (web.customer_sk == c.customer_sk))
)

filter = (
    join.filter(
        (col("web.second_year_total") / col("web.first_year_total")) > 
        (col("store.second_year_total") / col("store.first_year_total"))
    )
)

quary06_solution = filter.select(
    (col("web.second_year_total") / col("web.first_year_total")).alias("web_sales_increase_ratio"),
    col("c.c_customer_sk"),
    col("c.c_first_name"),
    col("c.c_last_name"),
    col("c.c_preferred_cust_flag"),
    col("c.c_birth_country"),
    col("c.c_login"),
    col("c.c_email_address")).orderBy(
        desc("web_sales_increase_ratio"),
        "c_customer_sk",
        "c_first_name",
        "c_last_name",
        "c_preferred_cust_flag",
        "c_birth_country",
        "c_login"
    ).limit(q06_LIMIT)

25/05/28 22:25:34 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: TPCx-BB-dataset/TEMP_TABLE1.ptxt.
java.io.FileNotFoundException: File TPCx-BB-dataset/TEMP_TABLE1.ptxt does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:917)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1238)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:907)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:56)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:381)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
	at org.apache.spark.sql.catal

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/Users/janiswaser/Desktop/Data_Management/Homework7/TPCx-BB-dataset/TEMP_TABLE1.ptxt. SQLSTATE: 42K03

## Query 13

In [34]:
q13_Year=2001
q13_limit=100


store = get_table('TEMP_TABLE1')
web = get_table('TEMP_TABLE2')
c = get_table('customer')

join = (
    web.join(
    other = store, on = (store.customer_sk == web.customer_sk))
    .join(other = c, on = (web.customer_sk == c.customer_sk))
)

filter = (
    join.filter(
        (col("web.second_year_total") / col("web.first_year_total")) > 
        (col("store.second_year_total") / col("store.first_year_total"))
    )
)

quary13_solution = filter.select(
    (col("store.second_year_total") / col("store.first_year_total")).alias("storeSalesIncreaseRatio"),
    (col("web.second_year_total") / col("web.first_year_total")).alias("webSalesIncreaseRatio"),
    col("c_customer_sk"),
    col("c_first_name"),
    col("c_last_name")
    ).orderBy(
        desc("webSalesIncreaseRatio"),
        "c_customer_sk",
        "c_first_name",
        "c_last_name",
    ).limit(q13_limit)

25/05/28 22:27:59 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: TPCx-BB-dataset/TEMP_TABLE1.ptxt.
java.io.FileNotFoundException: File TPCx-BB-dataset/TEMP_TABLE1.ptxt does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:917)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1238)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:907)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:56)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:381)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
	at org.apache.spark.sql.catal

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/Users/janiswaser/Desktop/Data_Management/Homework7/TPCx-BB-dataset/TEMP_TABLE1.ptxt. SQLSTATE: 42K03