# <center>Big Data for Engineers &ndash; Exercises &ndash; Solution</center>
## <center>Spring 2021 &ndash; Week 10 &ndash; ETH Zurich</center>
## <center>Spark Dataframes and SparkSQL</center>

## Introduction
For this exercise please create a Spark cluster on Azure as in the previous exercises. 
- If you have performance problems, check the yarn UI (```https://<cluster_name>.azurehdinsight.net/yarnui/hn/cluster```) and make sure that there are no unwanted applications hogging the resources.

## Getting the data


- Log in using ssh to your cluster:  ```ssh <ssh_user_name>@<cluster_name>-ssh.azurehdinsight.net```
- Download the data: ```wget https://bigdata2020exassets.blob.core.windows.net/ex09/orders.jsonl```
- Upload the data to HDFS: ```hdfs dfs -put orders.jsonl /tmp/```


After you have uploaded the dataset into the Azure Blob, upload this notebook onto the Spark Jupyter server  (`https://<cluster-name>.azurehdinsight.net/jupyter`).

## Spark Session
When you execute the first cell in this notebook, a spark session will be created automatically, you can print information about the session with the ```%%info``` magics

In [None]:
print("Hello")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1620476381790_0004,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.

In [None]:
%%info

By default the session is created with 3 Spark executors. We can change the configuration with the ```%%configure``` magics. Make sure you have enough vCores/Memory. You can see your total amount of available resources in yarnUI. 

In [None]:
%%configure -f
{"executorMemory": "2G", "numExecutors": 6, "driverMemory": "4G"}

## 1. Spark Dataframes

Spark Dataframes allow the user to perform simple and efficient operations on data, as long as the data is structured and has a schema. Dataframes are similar to relational tables in relational databases: conceptually a dataframe is a specialization of a Spark RDD with schema information attached. You can find more information in Karau, H. et al. (2015). Learning Spark, Chapter 9 (optional reading).

### 1.1. Data preprocessing

In [None]:
import json

path = "/tmp/orders.jsonl"
orders_df = spark.read.json(path).cache()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
5,application_1620476381790_0008,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


The type of our dataset object is DataFrame

In [None]:
type(orders_df)

pyspark.sql.dataframe.DataFrame

Print the schema

In [None]:
orders_df.printSchema()

root
 |-- customer: struct (nullable = true)
 |    |-- first_name: string (nullable = true)
 |    |-- last_name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- product: string (nullable = true)
 |    |    |-- quantity: long (nullable = true)
 |-- order_id: long (nullable = true)



Print one row

In [None]:
orders_df.limit(1).collect()

[Row(customer=Row(first_name='Preston', last_name='Landry'), date='2018-2-4', items=[Row(price=1.53, product='fan', quantity=5), Row(price=1.33, product='computer screen', quantity=6), Row(price=1.06, product='kettle', quantity=6), Row(price=1.96, product='stuffed animal', quantity=3), Row(price=1.09, product='the book', quantity=7), Row(price=1.42, product='headphones', quantity=9), Row(price=1.67, product='whiskey bottle', quantity=3)], order_id=0)]

You can access the underlying RDD object and use any functions you learned for Spark RDDs.

In [None]:
orders_df.rdd.filter(lambda ordr: ordr.customer.last_name == "Landry").count()

1960

### 1.2. Dataframe Operations
We perform some queries using operations on Dataframes ([Here](https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#untyped-dataset-operations-aka-dataframe-operations) is a guide on DF Operations with a link to the [API Documentation](https://spark.apache.org/docs/2.3.0/api/python/pyspark.sql.html))

We can select columns and show the result

In [None]:
orders_df.select("customer.first_name", "customer.last_name").limit(5).show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|   Preston|   Landry|
|    Jamari|Dominguez|
|   Brendon|  Sicilia|
|    Armani|   Ardeni|
|    Jamari|     Miao|
+----------+---------+



As you can see we can navigate to the nested items with the dot

In [None]:
orders_df.filter(orders_df["customer.last_name"] == "Landry").count()

1960

How about nested arrays?

In [None]:
orders_df.select("order_id", "items").orderBy("order_id").limit(5).show()

+--------+--------------------+
|order_id|               items|
+--------+--------------------+
|       0|[[1.53, fan, 5], ...|
|       1|[[1.61, fan, 7], ...|
|       2|[[1.41, the book,...|
|       3|[[1.05, computer ...|
|       4|[[1.92, headphone...|
+--------+--------------------+



Let us try to find orders of a fan.

In [None]:
orders_df.filter(orders_df["items.product"] == "fan").count()

AnalysisException: cannot resolve '(`items`.`product` = 'fan')' due to data type mismatch: differing types in '(`items`.`product` = 'fan')' (array<string> and string).;;
'Filter (items#9.product = fan)
+- Relation[customer#7,date#8,items#9,order_id#10L] json


The above code doesn't work! Use ```array contains``` instead.

In [None]:
from pyspark.sql.functions import array_contains

orders_df.filter(array_contains("items.product", "fan")).count()

32778

Let us try to unnest the data.

Unnest the products with explode.

Explode will generate as many rows as there are elements in the array and match them to other attributes.

In [None]:
from pyspark.sql.functions import explode

orders_df.select(explode("items").alias("i"), "i.product", "order_id").orderBy("order_id").limit(5).show()

+--------------------+---------------+--------+
|                   i|        product|order_id|
+--------------------+---------------+--------+
|      [1.53, fan, 5]|            fan|       0|
|[1.33, computer s...|computer screen|       0|
|   [1.06, kettle, 6]|         kettle|       0|
|[1.96, stuffed an...| stuffed animal|       0|
| [1.09, the book, 7]|       the book|       0|
+--------------------+---------------+--------+



Now we can use this table to filter.

In [None]:
exploded_df = orders_df.select(explode("items").alias("i"), "i.product", "order_id")
exploded_df.filter(exploded_df["product"] == "fan").count()

39922

You might have tried to access the i.product column directly using a ```.filter``` right after the ```.select```. That, however, does not work, because the column is not available to ```orders_df``` when creating a clause like ```(orders_df["i.product"] == "fan")```. A possible workaround when using Dataframe operations is that of using a string clause in ```.filter```, so that the product column will be resolved after it has been added with the ```.select```.

In [None]:
orders_df.select(explode("items").alias("i"), "i.product", "order_id").filter("product = 'fan'").count()

39922

Project the nested columns

In [None]:
orders_df.select(explode("items").alias("i"), "*").select(
    "order_id", "customer.*", "date", "i.*").limit(3).show()

+--------+----------+---------+--------+-----+---------------+--------+
|order_id|first_name|last_name|    date|price|        product|quantity|
+--------+----------+---------+--------+-----+---------------+--------+
|       0|   Preston|   Landry|2018-2-4| 1.53|            fan|       5|
|       0|   Preston|   Landry|2018-2-4| 1.33|computer screen|       6|
|       0|   Preston|   Landry|2018-2-4| 1.06|         kettle|       6|
+--------+----------+---------+--------+-----+---------------+--------+



### 1.3 Exercises

1) Find the average quantity at which each product is purchased. Only show the top 10 products by quantity. (Hint: you may need to import the function ```desc``` from ```pyspark.sql.functions``` to define descending order)

In [None]:
from pyspark.sql.functions import desc

orders_df.select(explode("items").alias("i"), "*").select(
    "i.product", "i.quantity"
).groupBy("product").avg("quantity").orderBy(desc("avg(quantity)")).limit(10).show()

+---------------+-----------------+
|        product|    avg(quantity)|
+---------------+-----------------+
|        toaster|5.515549016184942|
|       the book|5.514178678641427|
|         kettle|5.512053325314489|
|computer screen|5.504839685420448|
|     mouse trap|5.503895651308093|
|            fan|5.496342868593758|
|     headphones|5.485920795060985|
|       notebook|5.483182341458532|
| whiskey bottle|5.475555222463714|
| stuffed animal|5.470854598218753|
+---------------+-----------------+



2) Find the most expensive order

In [None]:
exploded_df = orders_df.select(explode("items").alias("i"), "*")
exploded_df.select(
    "order_id", (exploded_df["i.quantity"] * exploded_df["i.price"]).alias("total")
).groupBy("order_id").sum("total").orderBy(desc("sum(total)")).limit(1).show()

+--------+------------------+
|order_id|        sum(total)|
+--------+------------------+
|   99636|104.95999999999998|
+--------+------------------+



## 2. Spark SQL

Spark SQL allows the users to formulate their queries using SQL. The requirement is the use of Dataframes, which as said before are similar to relational tables. In addition to a familiar interface, writing queries in SQL might provide better performance than RDDs, inheriting efficiency from the Dataframe operations, while also performing automatic optimization of queries.

In order to use sql we need to create a temporary table.

This table only exists for the current session.

In [None]:
orders_df.registerTempTable("orders")

### 2.1 Queries

Finally, run SQL queries on the registered tables. We will run the same queries as during the previous section, but with SQL.

As you can see we can navigate to the nested items with the dot.

In [None]:
%%sql
-- Finally, run SQL queries on the registered tables
-- As you can see we can navigate to the nested items with the dot
SELECT count(*)
FROM orders
WHERE orders.customer.last_name == "Landry"

How about nested arrays?

In [None]:
%%sql
-- How about nested arrays?
SELECT order_id, items
FROM orders AS o
ORDER BY order_id
LIMIT 5

Let us try to find orders of a fan.

In [None]:
%%sql 
SELECT count(*)
FROM orders
WHERE items.product = "fan"

The above code doesn't work! Use ```array contains``` instead.

In [None]:
%%sql

SELECT count(*)
FROM orders
WHERE array_contains(items.product, "fan")

Let us try to unnest the data.

Unnest the products with explode.

Explode will generate as many rows as there are elements in the array and match them to other attributes.

In [None]:
%%sql
SELECT explode(items) as i, i.product, order_id
FROM orders
ORDER BY order_id
limit 5

Now we can use this table to filter.

In [None]:
%%sql
-- Filter on product
SELECT count(*)
    FROM (
    SELECT explode(items) as i, i.product, order_id
    FROM orders
    ORDER BY order_id
    )
WHERE product = "fan"

You might have tried to access the i.product column directly in the same ```SELECT``` clause. That, however, does not work, because the column is not available to the ```WHERE``` clause. In order to access the built columns directly, we need to unnest the data and make it part of our ```FROM``` clause. ```LATERAL VIEW``` lets us do just that, matching each non-array attribute to an unnested row from the array.  

In [None]:
%%sql
SELECT *
FROM orders lateral view explode(items) as flat_items
WHERE flat_items.product = "fan"
ORDER BY order_id
LIMIT 3

Project the nested columns

In [None]:
%%sql
SELECT order_id, customer.first_name, customer.last_name, date, flat_items.*
FROM orders lateral view explode(items) item_table as flat_items
WHERE flat_items.product = "fan"
ORDER BY order_id
LIMIT 3

Having built an unnested table, we can now easily aggregate over the previously nested columns

### 2.2 Exercises

1) Find the average quantity at which each product is purchased. Only show the top 10 products by quantity. 

In [None]:
%%sql
SELECT flat_items.product, AVG(flat_items.quantity) as av_quantity
FROM orders lateral view explode(items) flat_table flat_items
GROUP BY flat_items.product
ORDER BY av_quantity DESC
LIMIT 10

2) Find the most expensive order

In [None]:
%%sql
SELECT order_id, SUM(flat_items.quantity * flat_items.price) as total
FROM orders lateral view explode(items) flat_table flat_items
GROUP BY order_id
ORDER BY total desc
LIMIT 1

## 3. Create Nestedness

We've already had a look at the solution of dataframes/SparkSQL towards unnesting arrays by using <font face="courier">explode</font> method. For the other way round, Spark Dataframes / Spark SQL also provide ways for us to nest our data by creating arrays, especially after clauses like <font face="courier">group by</font>.

In traditional PostgreSQL, we have to use one of the aggregation functions (<font face="courier">max, sum, count,</font> ...) to process the result after the <font face="courier">group by</font> operation. For example, for each customer (assume there are no customers with both the same first name and last name), we want to find out how many dates at which they placed an order. The query should look like this (fill in the queries yourself):

In [None]:
from pyspark.sql.functions import countDistinct
orders_df.groupBy("customer.first_name", "customer.last_name").agg(countDistinct("date")).show()

+----------+----------+--------------------+
|first_name| last_name|count(DISTINCT date)|
+----------+----------+--------------------+
|      Zane|      Dahl|                   3|
|    Dorian|      Dahl|                   2|
|     Annie|     Dower|                   2|
|    Landen| Galatioto|                   2|
|     Allan|        Po|                   4|
|  Clarissa|   Sicilia|                   2|
|     Ariel|   Coulson|                   3|
|   Xiomara| Christofi|                   2|
|     Rylie|      Dahl|                   1|
|    Azaria|Berenguier|                   2|
|     Allie|Berenguier|                   4|
|    Daniel|   Schuler|                   1|
|     Chana|   Balster|                   2|
|     Steve|    Badash|                   3|
|     Jaida|    Corfas|                   2|
|   Gabriel|   Suchoff|                   3|
|   Presley|   Coulson|                   2|
|   Janelle|     Mayer|                   1|
|   Dashawn|  Gottardo|                   2|
|     Terr

In [None]:
%%sql
select customer.first_name, customer.last_name, count(distinct date) from orders group by customer.first_name, customer.last_name

But what if we are interested not only in the number of dates, but the actual
dates themselves? Luckily Spark Dataframes / Spark SQL do provide us with methods to preserve the original information of the date list. If now we would like to know for each customer, on which dates they placed an order, we shall use <font face="courier">collect_set</font> method:

In [None]:
from pyspark.sql.functions import collect_set
orders_df.groupBy("customer.first_name", "customer.last_name").agg(collect_set("date")).show()

+----------+--------------------+--------------------+
|first_name|           last_name|   collect_set(date)|
+----------+--------------------+--------------------+
|     Abbie|                Egan|[2018-4-8, 2016-3...|
|  Abigayle|           Mc namara|[2016-4-2, 2017-6-9]|
|   Adalynn|              Ardeni|[2018-2-2, 2018-6...|
|      Aden|          Rosenbloom|         [2016-3-10]|
|    Adonis|              Badash|          [2017-6-8]|
|   Agustin|          Srivastava|         [2018-4-10]|
|     Aiden|             Suchoff|          [2018-2-8]|
|    Aiyana|              Landry|          [2018-2-3]|
|    Alaina|              Gruber|[2016-3-1, 2018-5-1]|
|    Alayna|               Mayer|         [2016-3-10]|
|Alexandria|         Butterfield|[2018-5-3, 2018-3...|
|Alexzander|              Landry|[2017-1-3, 2017-6-3]|
|     Alice|             Balster|[2018-4-4, 2017-4-7]|
|     Allan|                  Po|[2016-1-9, 2016-5...|
|     Allie|          Berenguier|[2016-2-1, 2018-4...|
|    Alyvi

In [None]:
%%sql
select customer.first_name, customer.last_name, collect_set("date") from orders group by customer.first_name, customer.last_name

For some other cases where we want to preserve all the entries rather than the de-duplicated ones, we can use  <font face="courier">collect_list</font> method. For example, for each date we want to record the first and last names of customers. Since two customers might share the same last name, we need to collect all of them. The query should look like this:

In [None]:
from pyspark.sql.functions import collect_list
orders_df.groupBy("date").agg(collect_list("customer.last_name")).show()

+---------+--------------------------------+
|     date|collect_list(customer.last_name)|
+---------+--------------------------------+
| 2017-4-8|            [Miao, Decaro, Da...|
| 2016-2-5|            [Drago, Mignani, ...|
| 2016-1-1|            [Suchoff, Lowe, D...|
| 2018-2-6|            [Gottardo, Po, Go...|
| 2018-1-7|            [Galatioto, Migna...|
| 2016-6-7|            [Ardeni, Kerman, ...|
|2018-5-10|            [Ho-stuart, Galat...|
| 2017-5-4|            [Srivastava, Ho-s...|
| 2018-4-7|            [Drago, Mayer, La...|
|2018-6-10|            [Lowe, Dominguez,...|
| 2017-3-3|            [Dahlstedt, Lemay...|
| 2017-3-5|            [Balster, Horah, ...|
| 2017-6-4|            [Findley, Marinko...|
| 2018-6-3|            [Cerda, Gruber, S...|
| 2018-1-6|            [Zapata, Miao, Ne...|
| 2016-4-9|            [Badash, Dahlsted...|
| 2016-1-5|            [Gottardo, Ho-stu...|
|2018-1-10|            [Landry, Badash, ...|
| 2017-1-5|            [Dower, Zapata, M...|
| 2018-1-4

In [None]:
%%sql
select "date", collect_list(customer.last_name) from orders group by "date"

Now try it on yourself.

For every order in 2016-1-1, return the list of products that appeared in that order:

In [None]:
from pyspark.sql.functions import explode
exploded_df = orders_df.select(explode("items").alias("i"), "*")
exploded_df.filter(exploded_df["date"] == "2016-1-1").groupBy("order_id").agg(collect_list("i.product")).show()

+--------+-----------------------+
|order_id|collect_list(i.product)|
+--------+-----------------------+
|    3120|   [whiskey bottle, ...|
+--------+-----------------------+

For every product, return the set of dates at which it's purchased:

In [None]:
from pyspark.sql.functions import collect_set
exploded_df.orderBy("date").groupBy("i.product").agg(collect_set("date")).show()

+---------------+--------------------+
|        product|   collect_set(date)|
+---------------+--------------------+
|       the book|[2018-2-1, 2018-4...|
|     mouse trap|[2018-3-9, 2018-4...|
|computer screen|[2018-2-1, 2018-4...|
| whiskey bottle|[2018-3-9, 2018-4...|
|        toaster|[2018-2-1, 2018-4...|
| stuffed animal|[2018-2-1, 2018-4...|
|         kettle|[2018-3-9, 2018-4...|
|            fan|[2018-2-1, 2018-4...|
|     headphones|[2018-3-9, 2018-4...|
|       notebook|[2018-2-1, 2018-4...|
+---------------+--------------------+

One of the drawbacks of the <font face="courier">collect_set/collect_list</font> method is they only accept one column as the argument. Later we will see how we can create nestedness on pretty much everything after we get the hang of the mighty JSONiq.