
# Exercise 5 (Spark in Scala)   &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;     [4 points]
---

For this exercise, you will work on this JupyterLab notebook, and solve the tasks listed herein. These tasks, in addition to writing Spark code, require you to analyse various query plans and to reason about them.

To familiarise yourself with Spark and the Scala language, we also provide you with two JupyterLab notebooks, namely Notebook 1 and Notebook 2, which you can upload on JupyterLab and run yourself. To get a deeper understanding, and look up the types and definitions of various functions, we recommend that you visit the Spark and Spark SQL documentation.

## a) From SQL to Dataframe (and back again)

#### Find for each of the Spark SQL queries an equivalent one that only uses the Dataframe API (or vice versa)


In [1]:
val articlesDF = spark.read.options(Map("header"->"true")).format("csv").load("/user/adbs23_shared/hm/articles.csv")
val customersDF = spark.read.options(Map("header"->"true")).format("csv").load("/user/adbs23_shared/hm/customers.csv")
val transactionsDF = spark.read.options(Map("header"->"true")).format("csv").load("/user/adbs23_shared/hm/transactions.csv")


// Creating the views for SparkSQL
articlesDF.createOrReplaceTempView("articles")
customersDF.createOrReplaceTempView("customers")
transactionsDF.createOrReplaceTempView("transactions")



Intitializing Scala interpreter ...

Spark Web UI available at http://captain01.os.hpc.tuwien.ac.at:9999/proxy/application_1683091297697_2635
SparkContext available as 'sc' (version = 3.2.3, master = yarn, app id = application_1683091297697_2635)
SparkSession available as 'spark'


articlesDF: org.apache.spark.sql.DataFrame = [article_id: string, product_code: string ... 23 more fields]
customersDF: org.apache.spark.sql.DataFrame = [customer_id: string, FN: string ... 5 more fields]
transactionsDF: org.apache.spark.sql.DataFrame = [t_dat: string, customer_id: string ... 3 more fields]


In [2]:
articlesDF.printSchema()
customersDF.printSchema()
transactionsDF.printSchema()

root
 |-- article_id: string (nullable = true)
 |-- product_code: string (nullable = true)
 |-- prod_name: string (nullable = true)
 |-- product_type_no: string (nullable = true)
 |-- product_type_name: string (nullable = true)
 |-- product_group_name: string (nullable = true)
 |-- graphical_appearance_no: string (nullable = true)
 |-- graphical_appearance_name: string (nullable = true)
 |-- colour_group_code: string (nullable = true)
 |-- colour_group_name: string (nullable = true)
 |-- perceived_colour_value_id: string (nullable = true)
 |-- perceived_colour_value_name: string (nullable = true)
 |-- perceived_colour_master_id: string (nullable = true)
 |-- perceived_colour_master_name: string (nullable = true)
 |-- department_no: string (nullable = true)
 |-- department_name: string (nullable = true)
 |-- index_code: string (nullable = true)
 |-- index_name: string (nullable = true)
 |-- index_group_no: string (nullable = true)
 |-- index_group_name: string (nullable = true)
 |-- sec

#### Query 1: Transform the given Spark SQL query into the Dataframe API

In [3]:
val query1 = spark.sql("SELECT COUNT(*) FROM articles WHERE department_name = 'Limited Edition' AND section_name = 'Special Collections' ")
query1.show() // 'false' turns of truncation of row entries
query1.explain()

+--------+
|count(1)|
+--------+
|     138|
+--------+

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(1)])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=125]
      +- HashAggregate(keys=[], functions=[partial_count(1)])
         +- Project
            +- Filter (((isnotnull(department_name#31) AND isnotnull(section_name#37)) AND (department_name#31 = Limited Edition)) AND (section_name#37 = Special Collections))
               +- FileScan csv [department_name#31,section_name#37] Batched: false, DataFilters: [isnotnull(department_name#31), isnotnull(section_name#37), (department_name#31 = Limited Edition..., Format: CSV, Location: InMemoryFileIndex(1 paths)[hdfs://captain01.os.hpc.tuwien.ac.at:9000/user/adbs23_shared/hm/articl..., PartitionFilters: [], PushedFilters: [IsNotNull(department_name), IsNotNull(section_name), EqualTo(department_name,Limited Edition), E..., ReadSchema: struct<department_name:string,section_na

query1: org.apache.spark.sql.DataFrame = [count(1): bigint]


In [5]:
val query1DF = articlesDF.where($"department_name" === "Limited Edition" && $"section_name" === "Special Collections").select(count("*"))

// ... your solution goes here ...
query1DF.show()
query1DF.explain()

+--------+
|count(1)|
+--------+
|     138|
+--------+

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(1)])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=249]
      +- HashAggregate(keys=[], functions=[partial_count(1)])
         +- Project
            +- Filter (((isnotnull(department_name#31) AND isnotnull(section_name#37)) AND (department_name#31 = Limited Edition)) AND (section_name#37 = Special Collections))
               +- FileScan csv [department_name#31,section_name#37] Batched: false, DataFilters: [isnotnull(department_name#31), isnotnull(section_name#37), (department_name#31 = Limited Edition..., Format: CSV, Location: InMemoryFileIndex(1 paths)[hdfs://captain01.os.hpc.tuwien.ac.at:9000/user/adbs23_shared/hm/articl..., PartitionFilters: [], PushedFilters: [IsNotNull(department_name), IsNotNull(section_name), EqualTo(department_name,Limited Edition), E..., ReadSchema: struct<department_name:string,section_na

query1DF: org.apache.spark.sql.Dataset[Long] = [count(1): bigint]


#### Query 2: Transform the given Dataframe API query into Spark SQL

In [4]:
val query2 = customersDF.groupBy("age", "postal_code").agg(countDistinct("*"))
query2.show(false) // 'false' turns of truncation of row entries
query2.explain()

+---+----------------------------------------------------------------+-----------------------+
|age|postal_code                                                     |count(unresolvedstar())|
+---+----------------------------------------------------------------+-----------------------+
|18 |7dd2332aac02b1b4aa1b5754dd3ff302e398049482a1837a833925857f22f9f9|0                      |
|29 |3d503ddbe4714788f214cd9e7cfe6d26ffdf95f907ff79d2efe3b0cbb2a1c802|1                      |
|48 |6bf64e9b563ace70c7f185a9483d001d6b78d94f462de95d634ee10a7a0d0044|1                      |
|25 |ac8f1b7c0dce04c73ee65b906a257504ca2d3eccb04cc7e21119c44f94cce080|0                      |
|22 |7f400ca33459e114e8eaa175e625bc602a9e588f7f0876e1b1004d4c4cb8eb92|1                      |
|20 |0f8ff152a3a406106876d111d067367240a2de164ae5a857ec02a55923efd6ed|0                      |
|49 |2b91daa9471d959f5ea58d4d3a50bf116b6033ea14f42d0c26ea0288778ef54a|1                      |
|45 |7c72296077cff43341ddfbeb0dbfd9d0c85d9ee0c7fec

query2: org.apache.spark.sql.DataFrame = [age: string, postal_code: string ... 1 more field]


In [11]:
val query2DF = spark.sql("SELECT age, postal_code, COUNT(DISTINCT *) as count FROM customers GROUP BY age, postal_code;")
query2DF.show(false) // 'false' turns of truncation of row entries
query2DF.explain()

+---+----------------------------------------------------------------+-----+
|age|postal_code                                                     |count|
+---+----------------------------------------------------------------+-----+
|24 |9bad9727c1e78acd230510e14a12510d7a56c5cd6d0a9c8c88496626985b329a|1    |
|55 |7d658f624b9e01f8161d4a977d0dc854c0a0e0665eabd8f0ebd9588f83475ac3|0    |
|40 |9689774f511926fd680f14994c18a5f1b89f4faa852f4ef557c96acfd024a896|0    |
|25 |abc5a4fab86ec7d69f7f9aaa05be3b08cd26a8656363fc0401af1eca0f73a240|0    |
|20 |423c8f32db84cdc328d12be02c71002800868f8fd7b16e67d2e3a4656e018646|0    |
|22 |4e4708014a3763783b6f932e00b0e8fe72470a407390c2ca8e194773081e55c7|1    |
|29 |a9c6e6510c9d0f46d119eecf1301fed74d2f455fa1e251639edcc52284c4c11b|1    |
|36 |1f6def8101e8458d17021f7b2a06e5c1b2cf2b920afe28f42453769e7a8ce3f1|1    |
|23 |f3aeed11914f14cc4903d0d666c9421c058260d331c202dfde088f32b4e42a69|1    |
|24 |6d708d51997a821b7178097b45b9b159822dfe7e8cb23d790d6c58288acabe2a|0    |

query2DF: org.apache.spark.sql.DataFrame = [age: string, postal_code: string ... 1 more field]


#### Query 3: Transform the given Dataframe API query into Spark SQL

In [5]:
// row_number() over partition by garment_group_no order by count(prod_name) desc orders by desc garment_group_no and then calculates sequential number for each row

val subquery = 
        "( select garment_group_no , prod_name, " +
        "row_number() over (partition by garment_group_no order by count(prod_name) desc) as seqnum " +
        "from articles a1 " +
        "group by garment_group_no, prod_name )"

// You can find more details on the row_number function here: https://sparkbyexamples.com/spark/spark-sql-add-row-number-dataframe/
// To define windows for row number, you will need to import the Window object via "import org.apache.spark.sql.expressions.Window"

val query = "SELECT garment_group_no, prod_name " +
            "FROM " + subquery +
            " WHERE seqnum > 1 "


val query3 = spark.sql(query)
query3.show(false) // 'false' turns of truncation of row entries
query3.explain()



+----------------+--------------------------+
|garment_group_no|prod_name                 |
+----------------+--------------------------+
|1001            |SULIMA jkt                |
|1001            |NORA shorts               |
|1001            |WC T-shirts               |
|1001            |EQ TERRIER TEE            |
|1001            |DIV Zebra sweater         |
|1001            |W DAVID TRS EQ            |
|1001            |BB 2-pack Carter shorts   |
|1001            |SULIMA  jkt               |
|1001            |DIV Maja cardigan         |
|1001            |RC MARTINI TEE            |
|1001            |EQ TIKKA TEE              |
|1001            |NORA RW shorts innerbriefs|
|1001            |Supreme tights            |
|1001            |BB Bolt T-shirt           |
|1001            |SUPREME Fancy Tights      |
|1001            |BB 2-pack T-shirts        |
|1001            |KATE TEE                  |
|1001            |WC Shorts                 |
|1001            |W GENOVA TRS EQ 

subquery: String = ( select garment_group_no , prod_name, row_number() over (partition by garment_group_no order by count(prod_name) desc) as seqnum from articles a1 group by garment_group_no, prod_name )
query: String = "SELECT garment_group_no, prod_name FROM ( select garment_group_no , prod_name, row_number() over (partition by garment_group_no order by count(prod_name) desc) as seqnum from articles a1 group by garment_group_no, prod_name ) WHERE seqnum > 1 "
query3: org.apache.spark.sql.DataFrame = [garment_group_no: string, prod_name: string]


In [15]:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, count, row_number}

val windowSpec = Window.partitionBy("garment_group_no").orderBy(col("prod_count").desc)
val subquery = articlesDF
  .groupBy("garment_group_no", "prod_name")
  .agg(count("*").alias("prod_count"))
  .withColumn("seqnum", row_number().over(windowSpec))

val query3 = subquery.filter(col("seqnum") > 1)
  .select("garment_group_no", "prod_name")

query3.show(false) // 'false' turns of truncation of row entries
query3.explain()

+----------------+--------------------------+
|garment_group_no|prod_name                 |
+----------------+--------------------------+
|1001            |SULIMA jkt                |
|1001            |NORA shorts               |
|1001            |WC T-shirts               |
|1001            |EQ TERRIER TEE            |
|1001            |DIV Zebra sweater         |
|1001            |W DAVID TRS EQ            |
|1001            |BB 2-pack Carter shorts   |
|1001            |SULIMA  jkt               |
|1001            |DIV Maja cardigan         |
|1001            |RC MARTINI TEE            |
|1001            |EQ TIKKA TEE              |
|1001            |NORA RW shorts innerbriefs|
|1001            |Supreme tights            |
|1001            |BB Bolt T-shirt           |
|1001            |SUPREME Fancy Tights      |
|1001            |BB 2-pack T-shirts        |
|1001            |KATE TEE                  |
|1001            |WC Shorts                 |
|1001            |W GENOVA TRS EQ 

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, count, row_number}
windowSpec: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@151de53b
subquery: org.apache.spark.sql.DataFrame = [garment_group_no: string, prod_name: string ... 2 more fields]
query3: org.apache.spark.sql.DataFrame = [garment_group_no: string, prod_name: string]


#### Query 4: Transform the given Dataframe API query into Spark SQL

In [6]:
val query4 = customersDF
      .join(transactionsDF, customersDF("customer_id") === transactionsDF("customer_id"))
      .join(articlesDF, articlesDF("article_id") === transactionsDF("article_id"))
      .filter("age > 40 AND (department_name = 'Jersey Basic' OR department_name = 'Shirt') ")
      .select(articlesDF("article_id"), articlesDF("department_no")).distinct
      .alias("df1")
      .join(articlesDF.alias("df2"), $"df1.department_no" === $"df2.department_no")
      .groupBy(articlesDF.as("df1")("article_id"))
      .agg(count("*").as("count_same_dep"))
      .orderBy(desc("count_same_dep"))
query4.show(false) // 'false' turns of truncation of row entries
query4.explain()

+----------+--------------+
|article_id|count_same_dep|
+----------+--------------+
|0630313008|1359          |
|0735131001|1359          |
|0554598018|1359          |
|0778064012|1359          |
|0525335018|1359          |
|0760834001|1359          |
|0753061006|1359          |
|0856667005|1359          |
|0610776072|1359          |
|0740824002|1359          |
|0717063001|1359          |
|0819520001|1359          |
|0904584008|1359          |
|0456163009|1359          |
|0610776106|1359          |
|0820484001|1359          |
|0880001002|1359          |
|0506110001|1359          |
|0565379022|1359          |
|0786304002|1359          |
+----------+--------------+
only showing top 20 rows

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count_same_dep#410L DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(count_same_dep#410L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=1115]
      +- HashAggregate(keys=[article_id#16], functions=[count(1)])
         +- 

query4: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [article_id: string, count_same_dep: bigint]


In [16]:
customersDF.createOrReplaceTempView("customers")
transactionsDF.createOrReplaceTempView("transactions")
articlesDF.createOrReplaceTempView("articles")

val query = """
  SELECT df1.article_id, COUNT(*) AS count_same_dep
  FROM (
    SELECT DISTINCT articles.article_id, articles.department_no
    FROM customers
    JOIN transactions ON customers.customer_id = transactions.customer_id
    JOIN articles ON articles.article_id = transactions.article_id
    WHERE customers.age > 40 AND (articles.department_name = 'Jersey Basic' OR articles.department_name = 'Shirt')
  ) AS df1
  JOIN articles AS df2 ON df1.department_no = df2.department_no
  GROUP BY df1.article_id
  ORDER BY count_same_dep DESC
"""

val query4 = spark.sql(query)
query4.show(false) // 'false' turns off truncation of row entries
query4.explain()


+----------+--------------+
|article_id|count_same_dep|
+----------+--------------+
|0735131001|1359          |
|0554598018|1359          |
|0372008012|1359          |
|0778064012|1359          |
|0753061006|1359          |
|0760834001|1359          |
|0610776072|1359          |
|0856667005|1359          |
|0717063001|1359          |
|0740824002|1359          |
|0904584008|1359          |
|0819520001|1359          |
|0610776106|1359          |
|0629420020|1359          |
|0880001002|1359          |
|0630313008|1359          |
|0608458002|1359          |
|0506110001|1359          |
|0565379022|1359          |
|0786304002|1359          |
+----------+--------------+
only showing top 20 rows

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count_same_dep#403L DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(count_same_dep#403L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=1649]
      +- HashAggregate(keys=[article_id#16], functions=[count(1)])
         +- 

query: String =
"
  SELECT df1.article_id, COUNT(*) AS count_same_dep
  FROM (
    SELECT DISTINCT articles.article_id, articles.department_no
    FROM customers
    JOIN transactions ON customers.customer_id = transactions.customer_id
    JOIN articles ON articles.article_id = transactions.article_id
    WHERE customers.age > 40 AND (articles.department_name = 'Jersey Basic' OR articles.department_name = 'Shirt')
  ) AS df1
  JOIN articles AS df2 ON df1.department_no = df2.department_no
  GROUP BY df1.article_id
  ORDER BY count_same_dep DESC
"
query4: org.apache.spark.sql.DataFrame = [article_id: string, count_same_dep: bigint]


---

- ### **b) Wide and Narrow Dependencies**

You are given a DAG visualisation of how Spark executed a query in various stages, with arrows indicating various parts of the query plan. 


Here is the query plan of query4. 

Here is the final query plan which Spark produces (exported from the internal Web UI).

<img src="adbs1.png" width="1200"  align="left" />



Analyse the dependencies and stages of the queries, and try to determine which commands of the query plan are  executed as wide dependencies and which as narrow dependencies.

# Solution

## Example Operations above with narrow dependencies: 

* Any Project operation
* Any Filter operation

## Operations above with wide dependencies

* sortMergeJoin -- since the contents of parking and medallion tables need to be brought a single node
* sorting -- presumably since neither tables was previously in a single location. This sorting happens in a separate node from the one where the sortMergejoin is performed. Perhaps to reduce communication costs. 

The "Exchange" operation is always a wide dependency, in the sense of linking data from one stage to the next. 