# Pyspark Window Function Problem

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import sum, col, round

spark = SparkSession\
    .builder\
    .config("spark.jars", "/Users/sahiltest/Downloads/mysql-connector-java-8.0.22.jar")\
    .config("spark.driver.host","127.0.0.1")\
    .config("spark.driver.bindAddress","127.0.0.1")\
    .master("local[*]")\
    .appName("cats data set")\
    .getOrCreate()

## Reading Cats Dataset

In [1]:
cats_data = spark\
    .read\
    .format("csv")\
    .option("header",True)\
    .option("delimiter",",")\
    .option("inferSchema",True)\
    .load("/Users/sahiltest/PycharmProjects/spark-fundamentals/learn.Input/cats_data.txt")

22/02/02 17:42:08 WARN Utils: Your hostname, Sahils-iMac.local resolves to a loopback address: 127.0.0.1; using 192.168.2.18 instead (on interface en1)
22/02/02 17:42:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/02/02 17:42:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### dataset schema

In [11]:
cats_data.printSchema()

root
 |-- name: string (nullable = true)
 |-- breed: string (nullable = true)
 |-- weight: double (nullable = true)
 |-- color: string (nullable = true)
 |-- age: integer (nullable = true)



### We would like to find the total weight of cats grouped by age. But only return those groups with a total weight larger than 12.

In [8]:
cats_data\
    .select("age","weight")\
    .groupby(col("age"))\
    .agg(sum(col("weight")).alias("total_weight"))\
    .filter(col("total_weight")>12)\
    .orderBy(col("total_weight").desc())\
    .show()

+---+------------+
|age|total_weight|
+---+------------+
|  2|        19.6|
|  4|        15.8|
|  5|        15.4|
+---+------------+



### The cats must be ordered by name and will enter an elevator one by one. We would like to know what the running total weight is.

In [9]:
cats_data\
    .select("name","weight")\
    .withColumn("running_total_weight",sum(col("weight")).over(Window.orderBy(col("name"))))\
    .select("name",round("running_total_weight"))\
    .show()

22/02/02 17:45:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/02 17:45:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/02 17:45:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-------+------------------------------+
|   name|round(running_total_weight, 0)|
+-------+------------------------------+
|  Alfie|                           6.0|
|  Ashes|                          10.0|
|Charlie|                          15.0|
|  Felix|                          20.0|
| Millie|                          25.0|
|  Misty|                          31.0|
|  Molly|                          35.0|
|  Oscar|                          41.0|
|   Puss|                          46.0|
| Smokey|                          52.0|
| Smudge|                          57.0|
| Tigger|                          61.0|
+-------+------------------------------+



### The cats must be ordered first by breed and second by name. They are about to enter an elevator one by one. When all the cats of the same breed have entered they leave.

In [10]:
cats_data\
    .select("name","weight","breed")\
    .withColumn("running_total_weight",sum(col("weight")).over(Window.partitionBy(col("breed")).orderBy(col("name"))))\
    .select("name","breed",round("running_total_weight").alias("running_total_weight"))\
    .show()

+-------+-----------------+--------------------+
|   name|            breed|running_total_weight|
+-------+-----------------+--------------------+
|Charlie|British Shorthair|                 5.0|
| Smudge|British Shorthair|                10.0|
| Tigger|British Shorthair|                14.0|
| Millie|       Maine Coon|                 5.0|
|  Misty|       Maine Coon|                11.0|
|   Puss|       Maine Coon|                16.0|
| Smokey|       Maine Coon|                22.0|
|  Ashes|          Persian|                 5.0|
|  Felix|          Persian|                10.0|
|  Molly|          Persian|                14.0|
|  Alfie|          Siamese|                 6.0|
|  Oscar|          Siamese|                12.0|
+-------+-----------------+--------------------+



### The cats would like to see the average of the weight of them, the cat just after them and the cat just before them. The first and last cats are content to have an average weight of consisting of 2 cats not 3.

In [15]:
from pyspark.sql.functions import avg

cats_data\
    .select("name","weight")\
    .withColumn("average_weight",avg(col("weight")).over(Window.orderBy(col("weight"))))\
    .select("name","weight",round("average_weight",1).alias("average_weight"))\
    .show()

+-------+------+--------------+
|   name|weight|average_weight|
+-------+------+--------------+
| Tigger|   3.8|           3.8|
|  Molly|   4.2|           4.0|
|  Ashes|   4.5|           4.2|
|Charlie|   4.8|           4.3|
| Smudge|   4.9|           4.4|
|  Felix|   5.0|           4.5|
|   Puss|   5.1|           4.6|
| Millie|   5.4|           4.7|
|  Alfie|   5.5|           4.8|
|  Misty|   5.7|           4.9|
|  Oscar|   6.1|           5.1|
| Smokey|   6.1|           5.1|
+-------+------+--------------+



22/02/02 17:53:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/02 17:53:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/02 17:53:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


### The cats must be ordered by weight descending and will enter an elevator one by one. We would like to know what the running total weight is. If two cats have the same weight they must enter separately

In [18]:
cats_data\
    .select("name","weight")\
    .withColumn("running_total_weight",sum(col("weight")).over(Window.orderBy(col("weight").desc()).rowsBetween(Window.unboundedPreceding,0)))\
    .select("name","weight",round("running_total_weight",1).alias("running_total_weight"))\
    .show()

+-------+------+--------------------+
|   name|weight|running_total_weight|
+-------+------+--------------------+
|  Oscar|   6.1|                 6.1|
| Smokey|   6.1|                12.2|
|  Misty|   5.7|                17.9|
|  Alfie|   5.5|                23.4|
| Millie|   5.4|                28.8|
|   Puss|   5.1|                33.9|
|  Felix|   5.0|                38.9|
| Smudge|   4.9|                43.8|
|Charlie|   4.8|                48.6|
|  Ashes|   4.5|                53.1|
|  Molly|   4.2|                57.3|
| Tigger|   3.8|                61.1|
+-------+------+--------------------+



22/02/02 18:06:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/02 18:06:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/02 18:06:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


### The cats form a line grouped by color. Inside each color group the cats order themselves by name. Every cat must have a unique number for its place in the line.

In [20]:
from pyspark.sql.functions import row_number

cats_data\
    .withColumn("unique_number",row_number().over(Window.orderBy(col("name"),col("color"))))\
    .select("unique_number","name","color")\
    .show()

+-------------+-------+-------------+
|unique_number|   name|        color|
+-------------+-------+-------------+
|            1|  Alfie|        Brown|
|            2|  Ashes|        Black|
|            3|Charlie|        Black|
|            4|  Felix|Tortoiseshell|
|            5| Millie|Tortoiseshell|
|            6|  Misty|        Brown|
|            7|  Molly|        Black|
|            8|  Oscar|        Black|
|            9|   Puss|Tortoiseshell|
|           10| Smokey|        Brown|
|           11| Smudge|        Black|
|           12| Tigger|Tortoiseshell|
+-------------+-------+-------------+



22/02/02 18:15:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/02 18:15:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


### We would like to find the fattest cat. Order all our cats by weight.The two heaviest cats should both be 1st. The next heaviest should be 3rd.

In [22]:
from pyspark.sql.functions import rank

cats_data\
    .withColumn("ranking",rank().over(Window.orderBy(col("weight").desc())))\
    .select("ranking","weight","name")\
    .show()

+-------+------+-------+
|ranking|weight|   name|
+-------+------+-------+
|      1|   6.1|  Oscar|
|      1|   6.1| Smokey|
|      3|   5.7|  Misty|
|      4|   5.5|  Alfie|
|      5|   5.4| Millie|
|      6|   5.1|   Puss|
|      7|   5.0|  Felix|
|      8|   4.9| Smudge|
|      9|   4.8|Charlie|
|     10|   4.5|  Ashes|
|     11|   4.2|  Molly|
|     12|   3.8| Tigger|
+-------+------+-------+



22/02/02 18:17:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/02 18:17:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


### For cats age means seniority, we would like to rank the cats by age (oldest first).For cats age means seniority, we would like to rank the cats by age (oldest first).

In [24]:
from pyspark.sql.functions import dense_rank

cats_data\
    .withColumn("rankings",dense_rank().over(Window.orderBy(col("age").desc())))\
    .select("rankings","name","age")\
    .orderBy("name")\
    .show()

22/02/02 18:32:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/02 18:32:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/02 18:32:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+--------+-------+---+
|rankings|   name|age|
+--------+-------+---+
|       1|  Alfie|  5|
|       1|  Ashes|  5|
|       2|Charlie|  4|
|       3|  Felix|  2|
|       1| Millie|  5|
|       3|  Misty|  2|
|       4|  Molly|  1|
|       4|  Oscar|  1|
|       3|   Puss|  2|
|       2| Smokey|  4|
|       2| Smudge|  4|
|       3| Tigger|  2|
+--------+-------+---+



### Each cat would like to know what percentage of other cats weigh less than it

In [27]:
from pyspark.sql.functions import percent_rank

cats_data\
    .withColumn("percent",percent_rank().over(Window.orderBy(col("weight"))))\
    .select("name","weight",round(col("percent")*100,1).alias("percent"))\
    .show()

+-------+------+-------+
|   name|weight|percent|
+-------+------+-------+
| Tigger|   3.8|    0.0|
|  Molly|   4.2|    9.1|
|  Ashes|   4.5|   18.2|
|Charlie|   4.8|   27.3|
| Smudge|   4.9|   36.4|
|  Felix|   5.0|   45.5|
|   Puss|   5.1|   54.5|
| Millie|   5.4|   63.6|
|  Alfie|   5.5|   72.7|
|  Misty|   5.7|   81.8|
|  Oscar|   6.1|   90.9|
| Smokey|   6.1|   90.9|
+-------+------+-------+



22/02/02 18:46:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/02 18:46:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


### We are worried our cats are too fat and need to diet. We would like to group the cats into quartiles by their weight. Here we have to use n_tile window function.



In [28]:
from pyspark.sql.functions import ntile,lag

cats_data\
    .withColumn("weight_quartile",ntile(4).over(Window.orderBy(col("weight"))))\
    .select("name","weight","weight_quartile")\
    .show()

22/02/02 20:01:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/02 20:01:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-------+------+---------------+
|   name|weight|weight_quartile|
+-------+------+---------------+
| Tigger|   3.8|              1|
|  Molly|   4.2|              1|
|  Ashes|   4.5|              1|
|Charlie|   4.8|              2|
| Smudge|   4.9|              2|
|  Felix|   5.0|              2|
|   Puss|   5.1|              3|
| Millie|   5.4|              3|
|  Alfie|   5.5|              3|
|  Misty|   5.7|              4|
|  Oscar|   6.1|              4|
| Smokey|   6.1|              4|
+-------+------+---------------+



### Cats are fickle. Each cat would like to lose weight to be the equivalent weight of the cat weighing just less than it. Print a list of cats, their weights and the weight difference between them and the nearest lighter cat ordered by weight.

In [44]:
from pyspark.sql.functions import lag

cats_data\
    .withColumn("previous_values",lag(col("weight")).over(Window.orderBy(col("weight"))))\
    .withColumn("weight_to_lose",col("weight")-col("previous_values"))\
    .fillna(value=0,subset=["weight_to_lose"])\
    .select("name","weight",round("weight_to_lose",1).alias("weight_to_lose"))\
    .show()


22/02/03 21:48:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/03 21:48:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/03 21:48:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-------+------+--------------+
|   name|weight|weight_to_lose|
+-------+------+--------------+
| Tigger|   3.8|           0.0|
|  Molly|   4.2|           0.4|
|  Ashes|   4.5|           0.3|
|Charlie|   4.8|           0.3|
| Smudge|   4.9|           0.1|
|  Felix|   5.0|           0.1|
|   Puss|   5.1|           0.1|
| Millie|   5.4|           0.3|
|  Alfie|   5.5|           0.1|
|  Misty|   5.7|           0.2|
|  Oscar|   6.1|           0.4|
| Smokey|   6.1|           0.0|
+-------+------+--------------+



In [43]:
print("Partitioning Started")
cats_data\
    .write\
    .format("csv")\
    .option("header",True)\
    .partitionBy("breed")\
    .mode("overwrite")\
    .save("/Users/sahiltest/PycharmProjects/spark-fundamentals/learn.Input/breedwiseData/")
print("Partitioning Done")

Partitioning Started
Partitioning Done


### The cats now want to lose weight according to their breed. Each cat would like to lose weight to be the equivalent weight of the cat in the same breed weighing just less than it. Print a list of cats, their breeds, weights and the weight difference between them and the nearest lighter cat of the same breed.

In [51]:
cats_data\
    .select("name","breed","weight")\
    .withColumn("previous_weight",lag("weight").over(Window.partitionBy(col("breed")).orderBy(col("weight"))))\
    .withColumn("weight_to_lose",col("weight")-col("previous_weight"))\
    .fillna(value=0.0,subset=["previous_weight","weight_to_lose"])\
    .select("name","breed","weight",round("weight_to_lose",1).alias("weight_to_lose"))\
    .orderBy("weight")\
    .show()

+-------+-----------------+------+--------------+
|   name|            breed|weight|weight_to_lose|
+-------+-----------------+------+--------------+
| Tigger|British Shorthair|   3.8|           0.0|
|  Molly|          Persian|   4.2|           0.0|
|  Ashes|          Persian|   4.5|           0.3|
|Charlie|British Shorthair|   4.8|           1.0|
| Smudge|British Shorthair|   4.9|           0.1|
|  Felix|          Persian|   5.0|           0.5|
|   Puss|       Maine Coon|   5.1|           0.0|
| Millie|       Maine Coon|   5.4|           0.3|
|  Alfie|          Siamese|   5.5|           0.0|
|  Misty|       Maine Coon|   5.7|           0.3|
| Smokey|       Maine Coon|   6.1|           0.4|
|  Oscar|          Siamese|   6.1|           0.6|
+-------+-----------------+------+--------------+

