In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('window_function_on-df').getOrCreate()

In [4]:
spark

In [7]:
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType,IntegerType

races = (
    spark.read.option("header", True).option("inferSchema", True)
    .csv("F1_Complete_Dataset/races.csv",nullValue ="\\N")
    .select("raceId", "year","name")
)

results = (
    spark.read.option("header", True).option("inferSchema", True)
    .csv("F1_Complete_Dataset/results.csv", nullValue = "\\N")
    .select("raceId","driverId","constructorId","grid","position","points","laps","milliseconds","statusId")

)

drivers = (
    spark.read.option("header",True).option("inferSchema", True)
    .csv("F1_Complete_Dataset/drivers.csv",nullValue = "\\N")
    .select("driverId",concat_ws(" ", "forename", "surname").alias("driver_name"))
)

fact = (

    results.join(races, "raceId","left")
           .join(drivers, "driverId","left")

)

fact.show()

+--------+------+-------------+----+--------+------+----+------------+--------+----+--------------------+------------------+
|driverId|raceId|constructorId|grid|position|points|laps|milliseconds|statusId|year|                name|       driver_name|
+--------+------+-------------+----+--------+------+----+------------+--------+----+--------------------+------------------+
|       1|    18|            1|   1|       1|  10.0|  58|     5690616|       1|2008|Australian Grand ...|    Lewis Hamilton|
|       2|    18|            2|   5|       2|   8.0|  58|     5696094|       1|2008|Australian Grand ...|     Nick Heidfeld|
|       3|    18|            3|   7|       3|   6.0|  58|     5698779|       1|2008|Australian Grand ...|      Nico Rosberg|
|       4|    18|            4|  11|       4|   5.0|  58|     5707797|       1|2008|Australian Grand ...|   Fernando Alonso|
|       5|    18|            1|   3|       5|   4.0|  58|     5708630|       1|2008|Australian Grand ...| Heikki Kovalainen|


In [8]:
driver_year_points = (
    fact.groupBy("year","driver_name")
        .agg(sum("points").alias("points"))
        .orderBy("year", desc("points"))
)
driver_year_points.show(20, truncate = False)

+----+------------------+------+
|year|driver_name       |points|
+----+------------------+------+
|1950|Nino Farina       |30.0  |
|1950|Luigi Fagioli     |28.0  |
|1950|Juan Fangio       |27.0  |
|1950|Louis Rosier      |13.0  |
|1950|Alberto Ascari    |11.0  |
|1950|Johnnie Parsons   |9.0   |
|1950|Bill Holland      |6.0   |
|1950|Prince Bira       |5.0   |
|1950|Peter Whitehead   |4.0   |
|1950|Louis Chiron      |4.0   |
|1950|Reg Parnell       |4.0   |
|1950|Mauri Rose        |4.0   |
|1950|Philippe Étancelin|3.0   |
|1950|Dorino Serafini   |3.0   |
|1950|Robert Manzon     |3.0   |
|1950|Raymond Sommer    |3.0   |
|1950|Cecil Green       |3.0   |
|1950|Yves Cabantous    |3.0   |
|1950|Felice Bonetto    |2.0   |
|1950|Tony Bettenhausen |1.0   |
+----+------------------+------+
only showing top 20 rows



In [9]:
from pyspark.sql.window import Window

In [11]:
fact.withColumn("total_race_per_year",count("raceId").over(Window.partitionBy("year"))).show()

+--------+------+-------------+----+--------+------+----+------------+--------+----+------------------+--------------------+-------------------+
|driverId|raceId|constructorId|grid|position|points|laps|milliseconds|statusId|year|              name|         driver_name|total_race_per_year|
+--------+------+-------------+----+--------+------+----+------------+--------+----+------------------+--------------------+-------------------+
|     642|   833|           51|   1|       1|   9.0|  70|     8003600|       1|1950|British Grand Prix|         Nino Farina|                160|
|     786|   833|           51|   2|       2|   6.0|  70|     8006200|       1|1950|British Grand Prix|       Luigi Fagioli|                160|
|     686|   833|           51|   4|       3|   4.0|  70|     8055600|       1|1950|British Grand Prix|         Reg Parnell|                160|
|     704|   833|          154|   6|       4|   3.0|  68|        NULL|      12|1950|British Grand Prix|      Yves Cabantous|      

In [12]:
driver_year_points.show()

+----+------------------+------+
|year|       driver_name|points|
+----+------------------+------+
|1950|       Nino Farina|  30.0|
|1950|     Luigi Fagioli|  28.0|
|1950|       Juan Fangio|  27.0|
|1950|      Louis Rosier|  13.0|
|1950|    Alberto Ascari|  11.0|
|1950|   Johnnie Parsons|   9.0|
|1950|      Bill Holland|   6.0|
|1950|       Prince Bira|   5.0|
|1950|   Peter Whitehead|   4.0|
|1950|      Louis Chiron|   4.0|
|1950|       Reg Parnell|   4.0|
|1950|        Mauri Rose|   4.0|
|1950|Philippe Étancelin|   3.0|
|1950|   Dorino Serafini|   3.0|
|1950|     Robert Manzon|   3.0|
|1950|    Raymond Sommer|   3.0|
|1950|       Cecil Green|   3.0|
|1950|    Yves Cabantous|   3.0|
|1950|    Felice Bonetto|   2.0|
|1950| Tony Bettenhausen|   1.0|
+----+------------------+------+
only showing top 20 rows



In [14]:
driver_year_points.withColumn("Rank", rank().over(Window.partitionBy("year").orderBy(desc('points')))).show()

+----+------------------+------+----+
|year|       driver_name|points|Rank|
+----+------------------+------+----+
|1950|       Nino Farina|  30.0|   1|
|1950|     Luigi Fagioli|  28.0|   2|
|1950|       Juan Fangio|  27.0|   3|
|1950|      Louis Rosier|  13.0|   4|
|1950|    Alberto Ascari|  11.0|   5|
|1950|   Johnnie Parsons|   9.0|   6|
|1950|      Bill Holland|   6.0|   7|
|1950|       Prince Bira|   5.0|   8|
|1950|   Peter Whitehead|   4.0|   9|
|1950|      Louis Chiron|   4.0|   9|
|1950|       Reg Parnell|   4.0|   9|
|1950|        Mauri Rose|   4.0|   9|
|1950|Philippe Étancelin|   3.0|  13|
|1950|   Dorino Serafini|   3.0|  13|
|1950|     Robert Manzon|   3.0|  13|
|1950|    Raymond Sommer|   3.0|  13|
|1950|       Cecil Green|   3.0|  13|
|1950|    Yves Cabantous|   3.0|  13|
|1950|    Felice Bonetto|   2.0|  19|
|1950| Tony Bettenhausen|   1.0|  20|
+----+------------------+------+----+
only showing top 20 rows



In [15]:
window_spec = Window.partitionBy("year").orderBy(desc('points'))

driver_year_points.withColumn("Rank", rank().over(window_spec)).show()

+----+------------------+------+----+
|year|       driver_name|points|Rank|
+----+------------------+------+----+
|1950|       Nino Farina|  30.0|   1|
|1950|     Luigi Fagioli|  28.0|   2|
|1950|       Juan Fangio|  27.0|   3|
|1950|      Louis Rosier|  13.0|   4|
|1950|    Alberto Ascari|  11.0|   5|
|1950|   Johnnie Parsons|   9.0|   6|
|1950|      Bill Holland|   6.0|   7|
|1950|       Prince Bira|   5.0|   8|
|1950|   Peter Whitehead|   4.0|   9|
|1950|      Louis Chiron|   4.0|   9|
|1950|       Reg Parnell|   4.0|   9|
|1950|        Mauri Rose|   4.0|   9|
|1950|Philippe Étancelin|   3.0|  13|
|1950|   Dorino Serafini|   3.0|  13|
|1950|     Robert Manzon|   3.0|  13|
|1950|    Raymond Sommer|   3.0|  13|
|1950|       Cecil Green|   3.0|  13|
|1950|    Yves Cabantous|   3.0|  13|
|1950|    Felice Bonetto|   2.0|  19|
|1950| Tony Bettenhausen|   1.0|  20|
+----+------------------+------+----+
only showing top 20 rows



In [16]:
driver_year_points.withColumn("Rank", rank().over(window_spec))\
    .withColumn("Dense_Rank",dense_rank().over(window_spec)) \
    .withColumn("Row_Number", row_number().over(window_spec)) \
    .show()

+----+------------------+------+----+----------+----------+
|year|       driver_name|points|Rank|Dense_Rank|Row_Number|
+----+------------------+------+----+----------+----------+
|1950|       Nino Farina|  30.0|   1|         1|         1|
|1950|     Luigi Fagioli|  28.0|   2|         2|         2|
|1950|       Juan Fangio|  27.0|   3|         3|         3|
|1950|      Louis Rosier|  13.0|   4|         4|         4|
|1950|    Alberto Ascari|  11.0|   5|         5|         5|
|1950|   Johnnie Parsons|   9.0|   6|         6|         6|
|1950|      Bill Holland|   6.0|   7|         7|         7|
|1950|       Prince Bira|   5.0|   8|         8|         8|
|1950|   Peter Whitehead|   4.0|   9|         9|         9|
|1950|      Louis Chiron|   4.0|   9|         9|        10|
|1950|       Reg Parnell|   4.0|   9|         9|        11|
|1950|        Mauri Rose|   4.0|   9|         9|        12|
|1950|Philippe Étancelin|   3.0|  13|        10|        13|
|1950|   Dorino Serafini|   3.0|  13|   

In [18]:
driver_year_points.withColumn("Rank", rank().over(window_spec)).where(col('Rank') <= 3).show()

+----+--------------------+------+----+
|year|         driver_name|points|Rank|
+----+--------------------+------+----+
|1950|         Nino Farina|  30.0|   1|
|1950|       Luigi Fagioli|  28.0|   2|
|1950|         Juan Fangio|  27.0|   3|
|1951|         Juan Fangio|  37.0|   1|
|1951|      Alberto Ascari|  28.0|   2|
|1951|José Froilán Gonz...|  27.0|   3|
|1952|      Alberto Ascari|  53.5|   1|
|1952|         Nino Farina|  27.0|   2|
|1952|       Piero Taruffi|  22.0|   3|
|1953|      Alberto Ascari|  46.5|   1|
|1953|         Nino Farina|  32.0|   2|
|1953|         Juan Fangio|  29.5|   3|
|1954|         Juan Fangio| 57.14|   1|
|1954|José Froilán Gonz...| 26.64|   2|
|1954|       Mike Hawthorn| 24.64|   3|
|1955|         Juan Fangio|  41.0|   1|
|1955|       Stirling Moss|  23.0|   2|
|1955| Eugenio Castellotti|  12.0|   3|
|1956|         Juan Fangio|  34.5|   1|
|1956|       Stirling Moss|  28.0|   2|
+----+--------------------+------+----+
only showing top 20 rows



In [19]:
driver_year_points.withColumn("Rank",rank().over(Window.partitionBy("year") \
                                                .orderBy(desc('points'))))  \
                                                .where(col('Rank') <= 3)  \
                                                .orderBy(desc("year")).show()

+----+----------------+------+----+
|year|     driver_name|points|Rank|
+----+----------------+------+----+
|2024|  Max Verstappen| 399.0|   1|
|2024|    Lando Norris| 344.0|   2|
|2024| Charles Leclerc| 327.0|   3|
|2023|  Max Verstappen| 530.0|   1|
|2023|    Sergio Pérez| 260.0|   2|
|2023|  Lewis Hamilton| 217.0|   3|
|2022|  Max Verstappen| 433.0|   1|
|2022| Charles Leclerc| 291.0|   2|
|2022|    Sergio Pérez| 291.0|   2|
|2021|  Max Verstappen| 388.5|   1|
|2021|  Lewis Hamilton| 385.5|   2|
|2021| Valtteri Bottas| 219.0|   3|
|2020|  Lewis Hamilton| 347.0|   1|
|2020| Valtteri Bottas| 223.0|   2|
|2020|  Max Verstappen| 214.0|   3|
|2019|  Lewis Hamilton| 413.0|   1|
|2019| Valtteri Bottas| 326.0|   2|
|2019|  Max Verstappen| 278.0|   3|
|2018|  Lewis Hamilton| 408.0|   1|
|2018|Sebastian Vettel| 320.0|   2|
+----+----------------+------+----+
only showing top 20 rows



In [20]:
driver_year_points.withColumn("Rank", rank().over(window_spec)) \
        .where(col('Rank') <= 3) \
        .orderBy(desc("year")).show()

+----+----------------+------+----+
|year|     driver_name|points|Rank|
+----+----------------+------+----+
|2024|  Max Verstappen| 399.0|   1|
|2024|    Lando Norris| 344.0|   2|
|2024| Charles Leclerc| 327.0|   3|
|2023|  Max Verstappen| 530.0|   1|
|2023|    Sergio Pérez| 260.0|   2|
|2023|  Lewis Hamilton| 217.0|   3|
|2022|  Max Verstappen| 433.0|   1|
|2022| Charles Leclerc| 291.0|   2|
|2022|    Sergio Pérez| 291.0|   2|
|2021|  Max Verstappen| 388.5|   1|
|2021|  Lewis Hamilton| 385.5|   2|
|2021| Valtteri Bottas| 219.0|   3|
|2020|  Lewis Hamilton| 347.0|   1|
|2020| Valtteri Bottas| 223.0|   2|
|2020|  Max Verstappen| 214.0|   3|
|2019|  Lewis Hamilton| 413.0|   1|
|2019| Valtteri Bottas| 326.0|   2|
|2019|  Max Verstappen| 278.0|   3|
|2018|  Lewis Hamilton| 408.0|   1|
|2018|Sebastian Vettel| 320.0|   2|
+----+----------------+------+----+
only showing top 20 rows



In [21]:
driver_year_points.withColumn("Rank", rank().over(Window.partitionBy("year") \
                                        .orderBy(desc('points')))) \
                                        .where((col('Rank') <= 3) & (col("driver_name") == 'Michael Schumacher'))  \
                                        .orderBy(desc("year")).show()

+----+------------------+------+----+
|year|       driver_name|points|Rank|
+----+------------------+------+----+
|2006|Michael Schumacher| 121.0|   2|
|2005|Michael Schumacher|  62.0|   3|
|2004|Michael Schumacher| 148.0|   1|
|2003|Michael Schumacher|  93.0|   1|
|2002|Michael Schumacher| 144.0|   1|
|2001|Michael Schumacher| 123.0|   1|
|2000|Michael Schumacher| 108.0|   1|
|1998|Michael Schumacher|  86.0|   2|
|1997|Michael Schumacher|  78.0|   2|
|1996|Michael Schumacher|  59.0|   3|
|1995|Michael Schumacher| 102.0|   1|
|1994|Michael Schumacher|  92.0|   1|
|1992|Michael Schumacher|  53.0|   3|
+----+------------------+------+----+



In [23]:
df6 =spark.read.option("header",True).option("inferschema", True).csv("F1_Complete_Dataset/races.csv")

In [25]:
df6.show(2)

+------+----+-----+---------+--------------------+----------+--------+--------------------+--------+--------+--------+--------+--------+--------+----------+----------+-----------+-----------+
|raceId|year|round|circuitId|                name|      date|    time|                 url|fp1_date|fp1_time|fp2_date|fp2_time|fp3_date|fp3_time|quali_date|quali_time|sprint_date|sprint_time|
+------+----+-----+---------+--------------------+----------+--------+--------------------+--------+--------+--------+--------+--------+--------+----------+----------+-----------+-----------+
|     1|2009|    1|        1|Australian Grand ...|2009-03-29|06:00:00|http://en.wikiped...|      \N|      \N|      \N|      \N|      \N|      \N|        \N|        \N|         \N|         \N|
|     2|2009|    2|        2|Malaysian Grand Prix|2009-04-05|09:00:00|http://en.wikiped...|      \N|      \N|      \N|      \N|      \N|      \N|        \N|        \N|         \N|         \N|
+------+----+-----+---------+-----------

In [26]:
driver_year_points.withColumn("Rank", rank().over(Window.partitionBy("year") \
                                                 .orderBy(desc('points')))) \
                                                .where((col('Rank') <= 3) & (col("driver_name") == 'Lewis Hamilton')) \
                                                .orderBy(desc("year")).show()

+----+--------------+------+----+
|year|   driver_name|points|Rank|
+----+--------------+------+----+
|2023|Lewis Hamilton| 217.0|   3|
|2021|Lewis Hamilton| 385.5|   2|
|2020|Lewis Hamilton| 347.0|   1|
|2019|Lewis Hamilton| 413.0|   1|
|2018|Lewis Hamilton| 408.0|   1|
|2017|Lewis Hamilton| 363.0|   1|
|2016|Lewis Hamilton| 380.0|   2|
|2015|Lewis Hamilton| 381.0|   1|
|2014|Lewis Hamilton| 384.0|   1|
|2008|Lewis Hamilton|  98.0|   1|
|2007|Lewis Hamilton| 109.0|   2|
+----+--------------+------+----+



In [27]:
driver_year_points.withColumn("Rank", rank().over(Window.partitionBy("year") \
                                                 .orderBy(desc('points')))) \
                                                .where((col('Rank') <= 3) & (col("driver_name") == 'Max Verstappen')) \
                                                .orderBy(desc("year")).show()

+----+--------------+------+----+
|year|   driver_name|points|Rank|
+----+--------------+------+----+
|2024|Max Verstappen| 399.0|   1|
|2023|Max Verstappen| 530.0|   1|
|2022|Max Verstappen| 433.0|   1|
|2021|Max Verstappen| 388.5|   1|
|2020|Max Verstappen| 214.0|   3|
|2019|Max Verstappen| 278.0|   3|
+----+--------------+------+----+



In [29]:
driver_year_points.withColumn("prev_year_points", lag("points").over(Window.partitionBy("driver_name") \
                                                                    .orderBy('year'))).show()

+----+-------------+------+----------------+
|year|  driver_name|points|prev_year_points|
+----+-------------+------+----------------+
|1952| Adolf Brudes|   0.0|            NULL|
|1953|  Adolfo Cruz|   0.0|            NULL|
|2007| Adrian Sutil|   1.0|            NULL|
|2008| Adrian Sutil|   0.0|             1.0|
|2009| Adrian Sutil|   5.0|             0.0|
|2010| Adrian Sutil|  47.0|             5.0|
|2011| Adrian Sutil|  42.0|            47.0|
|2013| Adrian Sutil|  29.0|            42.0|
|2014| Adrian Sutil|   0.0|            29.0|
|1987|Adrián Campos|   0.0|            NULL|
|1988|Adrián Campos|   0.0|             0.0|
|1988| Aguri Suzuki|   0.0|            NULL|
|1989| Aguri Suzuki|   0.0|             0.0|
|1990| Aguri Suzuki|   6.0|             0.0|
|1991| Aguri Suzuki|   1.0|             6.0|
|1992| Aguri Suzuki|   0.0|             1.0|
|1993| Aguri Suzuki|   0.0|             0.0|
|1994| Aguri Suzuki|   0.0|             0.0|
|1995| Aguri Suzuki|   1.0|             0.0|
|1955|    

In [30]:
yoy_window = Window.partitionBy("driver_name").orderBy("year")

driver_year_points.withColumn("prev_year_points", lag("points", 1).over(yoy_window)) \
    .withColumn("improvement",col("points") - col("prev_year_points")) \
    .orderBy(desc('improvement')).show()

+----+----------------+------+----------------+-----------+
|year|     driver_name|points|prev_year_points|improvement|
+----+----------------+------+----------------+-----------+
|2022|  George Russell| 262.0|            16.0|      246.0|
|2010| Fernando Alonso| 252.0|            26.0|      226.0|
|2019| Charles Leclerc| 264.0|            39.0|      225.0|
|2017| Valtteri Bottas| 305.0|            85.0|      220.0|
|2014|Daniel Ricciardo| 238.0|            20.0|      218.0|
|2014|  Lewis Hamilton| 384.0|           189.0|      195.0|
|2010|  Lewis Hamilton| 240.0|            49.0|      191.0|
|2024|   Oscar Piastri| 265.0|            82.0|      183.0|
|2014| Valtteri Bottas| 186.0|             4.0|      182.0|
|2021|  Max Verstappen| 388.5|           214.0|      174.5|
|2010|     Mark Webber| 242.0|            69.5|      172.5|
|2010|Sebastian Vettel| 256.0|            84.0|      172.0|
|2016|Daniel Ricciardo| 256.0|            92.0|      164.0|
|2024|    Lando Norris| 344.0|          

In [31]:
driver_year_points.filter(col('driver_name') == 'George Russell').show()

+----+--------------+------+
|year|   driver_name|points|
+----+--------------+------+
|2019|George Russell|   0.0|
|2020|George Russell|   3.0|
|2021|George Russell|  16.0|
|2022|George Russell| 262.0|
|2023|George Russell| 157.0|
|2024|George Russell| 226.0|
+----+--------------+------+

