1. Extract

In [68]:
# activate .venv environment
# windows: .\venv\Scripts\activate
# mac: source .venv/bin/activate
# then pip install kaggle, pip install pyspark and pip install findspark inside .venv
# move kaggle.json file into ~/.kaggle
# then you can run data

import kaggle
from pyspark.sql import SparkSession
import findspark
findspark.init()
findspark.find()

dataset = "rohanrao/formula-1-world-championship-1950-2020"

kaggle.api.dataset_download_files(dataset, path='./', unzip=True)

print(f"Dataset {dataset} downloaded successfully!")

 # create spark session
spark = SparkSession.builder.master("local").appName("Driver_Performance_Prediction").getOrCreate()

Dataset URL: https://www.kaggle.com/datasets/rohanrao/formula-1-world-championship-1950-2020
Dataset rohanrao/formula-1-world-championship-1950-2020 downloaded successfully!


In [69]:
results = "results.csv"
qualifying = "qualifying.csv"
lap_times = "lap_times.csv"
pit_stops = "pit_stops.csv"
driver_standings = "driver_standings.csv"
races = "races.csv"
constructors = "constructors.csv"
circuits = "circuits.csv"

2. Transform

In [131]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
result_df = spark.read.csv(results, header=True, inferSchema=True)

# need to remove \N
result_df = result_df.dropna()
result_df = result_df.drop('number', 'positionText', 'time', 'rank','statusId', 'fastestLap', 'fastestLapSpeed', 'milliseconds', 'laps', 'points')
result_df = df.withColumn('position', col('position').cast(IntegerType()))
result_df = result_df.dropna()
result_df = result_df.withColumnRenamed('position', 'final_position')

result_df.show()

+--------+------+--------+-------------+------+----+--------------+------------+-------------+------+----+-----------+------------+----------+----+--------------+---------------+--------+
|resultId|raceId|driverId|constructorId|number|grid|final_position|positionText|positionOrder|points|laps|       time|milliseconds|fastestLap|rank|fastestLapTime|fastestLapSpeed|statusId|
+--------+------+--------+-------------+------+----+--------------+------------+-------------+------+----+-----------+------------+----------+----+--------------+---------------+--------+
|       1|    18|       1|            1|    22|   1|             1|           1|            1|  10.0|  58|1:34:50.616|     5690616|        39|   2|      1:27.452|        218.300|       1|
|       2|    18|       2|            2|     3|   5|             2|           2|            2|   8.0|  58|     +5.478|     5696094|        41|   3|      1:27.739|        217.586|       1|
|       3|    18|       3|            3|     7|   7|        

In [102]:
qualifying_df = spark.read.csv(qualifying, header=True, inferSchema=True)
qualifying_df = qualifying_df.drop('number', 'q1', 'q2', 'q3')
qualifying_df = qualifying_df.withColumnRenamed('position', 'qualifying_position')
qualifying_df.show()

+---------+------+--------+-------------+-------------------+
|qualifyId|raceId|driverId|constructorId|qualifying_position|
+---------+------+--------+-------------+-------------------+
|        1|    18|       1|            1|                  1|
|        2|    18|       9|            2|                  2|
|        3|    18|       5|            1|                  3|
|        4|    18|      13|            6|                  4|
|        5|    18|       2|            2|                  5|
|        6|    18|      15|            7|                  6|
|        7|    18|       3|            3|                  7|
|        8|    18|      14|            9|                  8|
|        9|    18|      10|            7|                  9|
|       10|    18|      20|            5|                 10|
|       11|    18|      22|           11|                 11|
|       12|    18|       4|            4|                 12|
|       13|    18|      18|           11|                 13|
|       

In [107]:
lap_times_df = spark.read.csv(lap_times, header=True, inferSchema=True)
lap_times_df = lap_times_df.drop('milliseconds')
lap_times_df = lap_times_df.withColumnRenamed('time', 'lap_time').withColumnRenamed('position', 'lap_position').show()

+------+--------+---+------------+--------+
|raceId|driverId|lap|lap_position|lap_time|
+------+--------+---+------------+--------+
|   841|      20|  1|           1|1:38.109|
|   841|      20|  2|           1|1:33.006|
|   841|      20|  3|           1|1:32.713|
|   841|      20|  4|           1|1:32.803|
|   841|      20|  5|           1|1:32.342|
|   841|      20|  6|           1|1:32.605|
|   841|      20|  7|           1|1:32.502|
|   841|      20|  8|           1|1:32.537|
|   841|      20|  9|           1|1:33.240|
|   841|      20| 10|           1|1:32.572|
|   841|      20| 11|           1|1:32.669|
|   841|      20| 12|           1|1:32.902|
|   841|      20| 13|           1|1:33.698|
|   841|      20| 14|           3|1:52.075|
|   841|      20| 15|           4|1:38.385|
|   841|      20| 16|           2|1:31.548|
|   841|      20| 17|           1|1:30.800|
|   841|      20| 18|           1|1:31.810|
|   841|      20| 19|           1|1:31.018|
|   841|      20| 20|           

In [113]:
pit_stops_df = spark.read.csv(pit_stops, header=True, inferSchema=True)
pit_stops_df = pit_stops_df.drop('milliseconds', 'time')
pit_stops_df = pit_stops_df.withColumnRenamed('duration', 'stop_duration')
pit_stops_df = pit_stops_df.withColumnRenamed('stop', 'pit_stop')
pit_stops_df.show()

+------+--------+--------+---+-------------+
|raceId|driverId|pit_stop|lap|stop_duration|
+------+--------+--------+---+-------------+
|   841|     153|       1|  1|       26.898|
|   841|      30|       1|  1|       25.021|
|   841|      17|       1| 11|       23.426|
|   841|       4|       1| 12|       23.251|
|   841|      13|       1| 13|       23.842|
|   841|      22|       1| 13|       23.643|
|   841|      20|       1| 14|       22.603|
|   841|     814|       1| 14|       24.863|
|   841|     816|       1| 14|       25.259|
|   841|      67|       1| 15|       25.342|
|   841|       2|       1| 15|       22.994|
|   841|       1|       1| 16|       23.227|
|   841|     808|       1| 16|       24.535|
|   841|       3|       1| 16|       23.716|
|   841|     155|       1| 16|       24.064|
|   841|      16|       1| 16|       25.978|
|   841|      15|       1| 16|       24.899|
|   841|      18|       1| 17|       16.867|
|   841|     153|       2| 17|       24.463|
|   841|  

In [112]:
# driver position is driver's championship standing
driver_standings_df = spark.read.csv(driver_standings, header=True, inferSchema=True)
driver_standings_df = driver_standings_df.drop('position','positionText', 'wins')
#driver_standings_df = driver_standings_df.withColumnRenamed('position', 'driver_position')
driver_standings_df.show()
print(driver_standings_df.dtypes)

+-----------------+------+--------+------+
|driverStandingsId|raceId|driverId|points|
+-----------------+------+--------+------+
|                1|    18|       1|  10.0|
|                2|    18|       2|   8.0|
|                3|    18|       3|   6.0|
|                4|    18|       4|   5.0|
|                5|    18|       5|   4.0|
|                6|    18|       6|   3.0|
|                7|    18|       7|   2.0|
|                8|    18|       8|   1.0|
|                9|    19|       1|  14.0|
|               10|    19|       2|  11.0|
|               11|    19|       3|   6.0|
|               12|    19|       4|   6.0|
|               13|    19|       5|  10.0|
|               14|    19|       6|   3.0|
|               15|    19|       7|   2.0|
|               16|    19|       8|  11.0|
|               17|    19|       9|   8.0|
|               18|    19|      15|   5.0|
|               19|    19|      17|   2.0|
|               20|    19|      14|   0.0|
+----------

In [120]:
races_df = spark.read.csv(races, header=True, inferSchema=True)
races_df = races_df.drop('year','date','time', 'round','url', 'fp1_date', 'fp1_time', 'fp2_date', 'fp2_time', 'fp3_date', 'fp3_time', 'quali_date', 'quali_time', 'sprint_date', 'sprint_time')
races_df.show()

+------+---------+--------------------+
|raceId|circuitId|                name|
+------+---------+--------------------+
|     1|        1|Australian Grand ...|
|     2|        2|Malaysian Grand Prix|
|     3|       17|  Chinese Grand Prix|
|     4|        3|  Bahrain Grand Prix|
|     5|        4|  Spanish Grand Prix|
|     6|        6|   Monaco Grand Prix|
|     7|        5|  Turkish Grand Prix|
|     8|        9|  British Grand Prix|
|     9|       20|   German Grand Prix|
|    10|       11|Hungarian Grand Prix|
|    11|       12| European Grand Prix|
|    12|       13|  Belgian Grand Prix|
|    13|       14|  Italian Grand Prix|
|    14|       15|Singapore Grand Prix|
|    15|       22| Japanese Grand Prix|
|    16|       18|Brazilian Grand Prix|
|    17|       24|Abu Dhabi Grand Prix|
|    18|        1|Australian Grand ...|
|    19|        2|Malaysian Grand Prix|
|    20|        3|  Bahrain Grand Prix|
+------+---------+--------------------+
only showing top 20 rows



In [121]:
constructors_df = spark.read.csv(constructors, header=True, inferSchema=True)
constructors_df = constructors_df.drop('url', 'nationality', 'constructorRef')
constructors_df.show()

+-------------+-----------+
|constructorId|       name|
+-------------+-----------+
|            1|    McLaren|
|            2| BMW Sauber|
|            3|   Williams|
|            4|    Renault|
|            5| Toro Rosso|
|            6|    Ferrari|
|            7|     Toyota|
|            8|Super Aguri|
|            9|   Red Bull|
|           10|Force India|
|           11|      Honda|
|           12|     Spyker|
|           13|        MF1|
|           14| Spyker MF1|
|           15|     Sauber|
|           16|        BAR|
|           17|     Jordan|
|           18|    Minardi|
|           19|     Jaguar|
|           20|      Prost|
+-------------+-----------+
only showing top 20 rows



In [123]:
circuits_df = spark.read.csv(circuits, header=True, inferSchema=True)
circuits_df = circuits_df.drop('circuitRef', 'name', 'location', 'url', 'country')
circuits_df.show()

+---------+--------+---------+---+
|circuitId|     lat|      lng|alt|
+---------+--------+---------+---+
|        1|-37.8497|  144.968| 10|
|        2| 2.76083|  101.738| 18|
|        3| 26.0325|  50.5106|  7|
|        4|   41.57|  2.26111|109|
|        5| 40.9517|   29.405|130|
|        6| 43.7347|  7.42056|  7|
|        7|    45.5| -73.5228| 13|
|        8| 46.8642|  3.16361|228|
|        9| 52.0786| -1.01694|153|
|       10| 49.3278|  8.56583|103|
|       11| 47.5789|  19.2486|264|
|       12| 39.4589|-0.331667|  4|
|       13| 50.4372|  5.97139|401|
|       14| 45.6156|  9.28111|162|
|       15|  1.2914|  103.864| 18|
|       16| 35.3717|  138.927|583|
|       17| 31.3389|   121.22|  5|
|       18|-23.7036| -46.6997|785|
|       19|  39.795| -86.2347|223|
|       20| 50.3356|   6.9475|578|
+---------+--------+---------+---+
only showing top 20 rows



3. EDA