# Windows Partitioning

## Prerrequisites

Install Spark and Java in VM

In [1]:
# install Java8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# download spark 3.5.3
!wget -q !wget /q https://dlcdn.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz

In [2]:
ls -l # check the .tgz is there

total 391476
drwxr-xr-x 1 root root      4096 Nov 25 19:13 [0m[01;34msample_data[0m/
-rw-r--r-- 1 root root 400864419 Sep  9 05:35 spark-3.5.3-bin-hadoop3.tgz


In [3]:
# unzip it
!tar xf spark-3.5.3-bin-hadoop3.tgz

In [4]:
!pip install -q findspark

Defining the environment

In [7]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--master local[*] pyspark-shell"

Start Spark Session

---

In [8]:
import findspark
findspark.init("spark-3.5.3-bin-hadoop3")# SPARK_HOME

from pyspark.sql import SparkSession

# create the session
spark = SparkSession \
        .builder \
        .appName("Window Partitioning") \
        .master("local[*]") \
        .getOrCreate()

spark.version

'3.5.3'

In [9]:
spark

In [10]:
# For Pandas conversion optimization
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [11]:
# Import sql functions
from pyspark.sql.functions import *

Download datasets

In [12]:
!mkdir -p dataset
!wget -q https://raw.githubusercontent.com/paponsro/spark_edem_2324/master/dataset/characters.csv -P /dataset
!wget -q https://raw.githubusercontent.com/paponsro/spark_edem_2324/master/dataset/employees.csv -P /dataset
!wget -q https://raw.githubusercontent.com/paponsro/spark_edem_2324/master/dataset/salaries.csv -P /dataset
!wget -q https://raw.githubusercontent.com/paponsro/spark_edem_2324/master/dataset/deptmanagers.csv -P /dataset
!wget -q https://raw.githubusercontent.com/paponsro/spark_edem_2324/master/dataset/titles.csv -P /dataset
!ls /dataset

characters.csv	deptmanagers.csv  employees.csv  salaries.csv  titles.csv


## Examples

### Window Partitioning

In [13]:
employeesDF = spark.read.option("header", "true").csv("/dataset/employees.csv")
salariesDF = spark.read.option("header", "true").csv("/dataset/salaries.csv")
deptManagersDF = spark.read.option("header", "true").csv("/dataset/deptmanagers.csv")
titlesDF = spark.read.option("header", "true").csv("/dataset/titles.csv")
charactersDF = spark.read.option("inferSchema", "true").option("header", "true").csv("/dataset/characters.csv")

In [14]:
titlesDF.show(2)

+------+--------+----------+----------+
|emp_no|   title| from_date|   to_date|
+------+--------+----------+----------+
| 10010|Engineer|1996-11-24|9999-01-01|
| 10020|Engineer|1997-12-30|9999-01-01|
+------+--------+----------+----------+
only showing top 2 rows



Get the last title for the employees

In [15]:
titlesDF.filter(col("emp_no") == 10040).show(4)

+------+---------------+----------+----------+
|emp_no|          title| from_date|   to_date|
+------+---------------+----------+----------+
| 10040|       Engineer|1993-02-14|1999-02-14|
| 10040|Senior Engineer|1999-02-14|9999-01-01|
+------+---------------+----------+----------+



In [16]:
# import library
from pyspark.sql.window import Window

In [17]:
# get last title by date for each employee (similar to previous exercise in Joins)
byEmployee = Window.partitionBy("emp_no").orderBy(col("to_date").desc())
mostRecentJobTitlesDF = titlesDF.withColumn("datesOrder", row_number().over(byEmployee)) \
    .filter(col("datesOrder") == 1) \
    .select("emp_no", "title", "to_date")

mostRecentJobTitlesDF.filter(col("emp_no") == 10040).show(4)

+------+---------------+----------+
|emp_no|          title|   to_date|
+------+---------------+----------+
| 10040|Senior Engineer|9999-01-01|
+------+---------------+----------+



In [None]:
# for the previous example we saw in joins, if we use windows partitioning we do not need to do the previous step (filtering max date and then do the join over the same table)

Get the max salaries (three for example) for job title

In [18]:
# we need salary data with title data joined first
bestPaidPerTitlerawDF = salariesDF.withColumn("salary", col("salary").cast("long")) \
    .join(mostRecentJobTitlesDF, (salariesDF.emp_no == mostRecentJobTitlesDF.emp_no) & (salariesDF.to_date == mostRecentJobTitlesDF.to_date)).drop("emp_no", "from_date", "to_date")

bestPaidPerTitlerawDF.filter(salariesDF.emp_no == 10040).show()
bestPaidPerTitlerawDF.show(3)

+------+---------------+
|salary|          title|
+------+---------------+
| 72668|Senior Engineer|
+------+---------------+

+------+---------------+
|salary|          title|
+------+---------------+
| 80324|       Engineer|
| 47017|       Engineer|
| 88806|Senior Engineer|
+------+---------------+
only showing top 3 rows



In [20]:
# now we apply the window partitioning
byTitle = Window.partitionBy("title").orderBy(col("salary").desc())

bestPaidPerTitleDF = bestPaidPerTitlerawDF.withColumn("rank_salary", row_number().over(byTitle)).filter(col("rank_salary") <= 3)

bestPaidPerTitleDF.show(6)

+------+------------------+-----------+
|salary|             title|rank_salary|
+------+------------------+-----------+
|101622|Assistant Engineer|          1|
| 92674|Assistant Engineer|          2|
| 92034|Assistant Engineer|          3|
|130939|          Engineer|          1|
|121819|          Engineer|          2|
|120417|          Engineer|          3|
+------+------------------+-----------+
only showing top 6 rows



### UDFs

In [21]:
charactersDF.show(3)

+--------------+------+----+----------+-----------+---------+----------+------+---------+-------+
|          name|height|mass|hair_color| skin_color|eye_color|birth_year|gender|homeworld|species|
+--------------+------+----+----------+-----------+---------+----------+------+---------+-------+
|Luke Skywalker|   172|  77|     blond|       fair|     blue|     19BBY|  male| Tatooine|  Human|
|         C-3PO|   167|  75|        NA|       gold|   yellow|    112BBY|    NA| Tatooine|  Droid|
|         R2-D2|    96|  32|        NA|white, blue|      red|     33BBY|    NA|    Naboo|  Droid|
+--------------+------+----+----------+-----------+---------+----------+------+---------+-------+
only showing top 3 rows



In [22]:
# we define a function to remove the BBY from the birth_year
year = udf(lambda s: s[:-3])

charactersDF.withColumn("year", year(col("birth_year"))).show(3)

+--------------+------+----+----------+-----------+---------+----------+------+---------+-------+----+
|          name|height|mass|hair_color| skin_color|eye_color|birth_year|gender|homeworld|species|year|
+--------------+------+----+----------+-----------+---------+----------+------+---------+-------+----+
|Luke Skywalker|   172|  77|     blond|       fair|     blue|     19BBY|  male| Tatooine|  Human|  19|
|         C-3PO|   167|  75|        NA|       gold|   yellow|    112BBY|    NA| Tatooine|  Droid| 112|
|         R2-D2|    96|  32|        NA|white, blue|      red|     33BBY|    NA|    Naboo|  Droid|  33|
+--------------+------+----+----------+-----------+---------+----------+------+---------+-------+----+
only showing top 3 rows



## Exercises Window Partitioning
1) Load characters.csv to a DataFrame. Then, get the (two) tallest characters per species and per homeworld planet. Select the name, height, and species/homeworld in their case.
2) Get the height difference for each character with respect to the smallest one in the same homeworld.

Exercise 1

Exercise 2

## Exercises UDFs
1. Choose one of the DFs we have, define two UDFs of your own, and apply them (with a withColumn) on the DF. Show the results