# F1 Elo EDA

This notebook creates Elo rankings for drivers over races from 1985-2023 (Brazil), and uses the following approaches to identify the F1 greatest of all time (GOAT):

1. Days as highest rated driver
2. Peak score

We also plot a greatest of their time graph given limitations of the initial approach (e.g. older generation drivers raced less and cars were more prone to retiring).

## Environment setup

In [1]:
import datetime as dt
import itertools

import dvclive
import pandas as pd
import plotly.express as px
import tqdm

### Helper functions

In [2]:
points_map = {
    1: 25,
    2: 18,
    3: 15,
    4: 12,
    5: 10,
    6: 8,
    7: 6,
    8: 4,
    9: 2,
    10: 1
}

def get_points(race_pos: int, fast_pos: int, points_map: dict = points_map) -> int:
    '''Returns points scored with 2023 F1 scoring system'''
    
    points = points_map[race_pos] if race_pos >= 1 and race_pos <= 10 else 0
    points += 1 if fast_pos == 1 else 0

    return points

## Data importing

In [3]:
DATA_DIR = "../data/raw"

results_df = pd.read_csv(f"{DATA_DIR}/results.csv")[["raceId", "driverId", "constructorId", "grid", "position", "statusId"]]
races_df = pd.read_csv(f"{DATA_DIR}/races.csv")[["raceId", "year", "round", "date"]]
drivers_df = pd.read_csv(f"{DATA_DIR}/drivers.csv")[["driverId", "driverRef"]]
constructors_df = pd.read_csv(f"{DATA_DIR}/constructors.csv")[["constructorId", "constructorRef"]]
status_df = pd.read_csv(f"{DATA_DIR}/status.csv")

raw_df = results_df.merge(races_df, on="raceId", how="left")
raw_df = raw_df.merge(drivers_df, on="driverId", how="left")
raw_df = raw_df.merge(constructors_df, on="constructorId", how="left")
raw_df = raw_df.merge(status_df, on="statusId", how="left")

raw_df = raw_df[["year", "round", "date", "constructorRef", "driverRef", "grid", "position", "status"]]

## Data cleaning

In [4]:
res_df = raw_df.copy().sort_values(["year", "round", "position", "grid"])

driver_reasons = [
    "collision", "accident", "disqualified", "spun off", "107% rule", "did not qualify", "stalled", 
    "did not prequalify", "damage"
    ]
non_driver_reasons = [
    'engine', 'transmission', 'clutch', 'electrical', 'hydraulics', 'gearbox', 'radiator', 
    'suspension', 'brakes', 'overheating', 'mechanical', 'tyre', 'driver seat', 'puncture', 
    'driveshaft', 'retired', 'fuel pressure', 'front wing', 'water pressure', 'refuelling', 'wheel', 
    'throttle', 'steering', 'technical', 'electronics', 'broken wing', 'heat shield fire', 'exhaust', 
    'oil leak', 'wheel rim', 'water leak', 'fuel pump', 'track rod', 'oil pressure', 'pneumatics', 
    'withdrew', 'engine fire', 'tyre puncture', 'out of fuel', 'wheel nut', 'not classified',
    'handling', 'rear wing', 'fire', 'fuel system', 'oil line', 'fuel rig', 'launch control', 
    'injured', 'fuel', 'power loss', 'safety', 'drivetrain', 'ignition', 'injury', 'chassis', 
    'battery', 'halfshaft', 'crankshaft', 'safety concerns', 'not restarted', 'alternator', 
    'differential', 'wheel bearing', 'physical', 'vibrations', 'underweight', 'safety belt', 
    'oil pump', 'fuel leak', 'excluded', 'injection', 'distributor', 'driver unwell', 'turbo', 
    'cv joint', 'water pump', 'fatal accident', 'spark plugs', 'fuel pipe', 'eye injury', 'oil pipe', 
    'axle', 'water pipe', 'magneto', 'supercharger', 'engine misfire', 'collision damage', 'ers', 
    'power unit', 'brake duct', 'seat', 'debris', 'illness', 'cooling system', 'undertray'
]

def map_status(status: str, driver_reasons: list = driver_reasons, non_driver_reasons: list = non_driver_reasons) -> str:

    if status == "finished" or "lap" in status:
        return "finished"
    
    elif status in driver_reasons:
        return "driver retirement"
    
    elif status in non_driver_reasons:
        return "other retirement"

    else:
        return status

# clean and map statuses to simplify identifying finishers and driver caused retirements
res_df["status"] = res_df["status"].apply(str.lower)
res_df["status"] = res_df["status"].apply(map_status)

assert res_df["status"].unique().size  <= 3 # check in case new statuses are added

# infer positon and points using 2023 f1 scoring system minus fastest lap
car_df = res_df[["year", "round", "grid"]].drop_duplicates()
car_df["mapPosition"] = car_df.groupby(["year", "round"]).cumcount() + 1
res_df = res_df.merge(car_df, on=["year", "round", "grid"], how="left").drop(columns="position")
res_df["mapPoints"] = res_df["mapPosition"].map(points_map).fillna(0)


In [10]:
elo_scores = {id: 1500 for id in set(res_df["driverRef"])}
yr_rounds = res_df.groupby("year")["round"].nunique()
res_df["score"] = None
k = 320
c = 300
l = 40
sse = 0

yrc_df = res_df[["year", "round", "constructorRef"]].drop_duplicates()
for _, (yr, rnd, ctr) in tqdm.tqdm(yrc_df.iterrows(), total=yrc_df.shape[0]):
    valid_ix = (res_df["year"] == yr) & (res_df["round"] == rnd) & (res_df["constructorRef"] == ctr)
    sub_df = res_df[valid_ix]

    round_scores = {dvr: {"diff": 0, "n": 0} for dvr in sub_df["driverRef"]}
    for ix_1, ix_2 in itertools.combinations(sub_df.index, 2):
        dvr_a = res_df.loc[ix_1, "driverRef"]
        elo_a = elo_scores[dvr_a]
        pos_a = res_df.loc[ix_1, "mapPosition"]
        poi_a = res_df.loc[ix_1, "mapPoints"]

        dvr_b = res_df.loc[ix_2, "driverRef"]
        elo_b = elo_scores[dvr_b]
        pos_b = res_df.loc[ix_2, "mapPosition"]
        poi_b = res_df.loc[ix_2, "mapPoints"]

        # continue if drivers in same car
        if pos_a == pos_b:
            continue

        # calculate points influence
        if poi_a + poi_b == 0:
            s_a = 0.5
            s_b = 0.5
        else:
            s_a = poi_a / (poi_a + poi_b)
            s_b = poi_b / (poi_a + poi_b)

        # calculate position influence
        q_a = 10 ** (elo_a / c)
        q_b = 10 ** (elo_b / c)
   
        e_a = q_a / (q_a + q_b)        
        e_b = q_b / (q_a + q_b)

        # score outcome
        if pos_a < pos_b:
            o_a = 1
            o_b = 0
        else:
            o_a = 0
            o_b = 1
            
        # calculate score change and update round_scores
        diff_a = ((k / yr_rounds[yr]) * (o_a - e_a)) + ((l / yr_rounds[yr]) * s_a)
        diff_b = ((k / yr_rounds[yr]) * (o_b - e_b)) + ((l / yr_rounds[yr]) * s_b)

        round_scores[dvr_a]["diff"] += diff_a
        round_scores[dvr_a]["n"] += 1

        round_scores[dvr_b]["diff"] += diff_b
        round_scores[dvr_b]["n"] += 1

        # update sse (sum of squared errors)
        sse += ((o_a - e_a) ** 2) + ((o_b - e_b) ** 2)
    
    # insert score for end of round
    for dvr in round_scores.keys():
        if round_scores[dvr]["n"] != 0: # more than 1 car on grid
            elo_scores[dvr] += (round_scores[dvr]["diff"] / round_scores[dvr]["n"])
        
        res_df.loc[valid_ix & (res_df["driverRef"] == dvr), "score"] = elo_scores[dvr]

100%|██████████| 12768/12768 [00:34<00:00, 368.65it/s]


In [11]:
# log experiment outcome
with dvclive.Live() as live:
    live.log_param("k", k)
    live.log_param("c", c)
    live.log_param("l", l)
    live.log_metric("SSE", sse)

print(f"SSE = {sse}")

	.dvcignore, .dvc/config, .dvc/.gitignore


SSE = 9891.87595816547


## Data exploration

In [80]:
min_races = 2 * res_df.groupby("year")["round"].nunique().sort_values().iloc[0] # 2x shortest season

gott_df = res_df.sort_values(["year", "round", "score"], ascending=[True, True, False]).drop_duplicates(["year", "round"])
gott_days = gott_df["driverRef"].value_counts()
gott_drivers = set(gott_days[gott_days > min_races].index)
gott_df = res_df[res_df["driverRef"].isin(gott_drivers)].sort_values(["year", "round"])

px.line(gott_df, x="date", y="score", color="driverRef")

In [81]:
gott_days.iloc[:10]

driverRef
hamilton              160
michael_schumacher    136
senna                  83
button                 68
prost                  55
mclaren                47
berger                 43
alonso                 33
emerson_fittipaldi     32
alesi                  32
Name: count, dtype: int64

## Data export

In [82]:
OUT_DIR = "../data/processed"

gott_df.to_csv(f"{OUT_DIR}/gott.csv", index=False)
gott_days[gott_days > min_races].reset_index().to_csv(f"{OUT_DIR}/gott_days.csv", index=False)