In [1]:
from bs4 import BeautifulSoup
import requests
import pandas as pd
import numpy as np
import wikipedia

# Constants

In [2]:
MODIFIER_SCORE_MAPPINGS = {
    'DNS': 0,
    'NC': 0,
    'Ret': 0,
    'DNQ': -5,
    'DNPQ': -5,
    'DSQ': -10,
    'C': 0,
    'DNP': 0,
    'EX': 0,
    'DNA': 0,
    'WD': 0,
    'P': 10,
    'F': 5,
    'PF': 15,
}

POSITION_POINT_MAPPING = {
    '1': 25,
    '2': 18,
    '3': 15,
    '4': 12,
    '5': 10,
    '6': 8,
    '7': 6,
    '8': 4,
    '9': 2,
    '10': 1
}

In [3]:
def parse_table(raw_table, scores_or_teams="scores"):
    """
    Parses HTML table into a reasonable/interpretable
    pandas dataframe.
    """
    table_rows = raw_table.find_all('tr')

    l = []
    for tr in table_rows:
        td = tr.find_all('td')
        row = [tr.text for tr in td]
        l.append(row)
    if scores_or_teams == "scores":
        # header's first col is treated as index, so we skip that
        header = [th.text.rstrip() for th in table_rows[0].find_all('th')][1:]
    else:
        header = ['Entrant', 'Chassis', 'Power unit',
                  'No.', 'Driver name', 'Rounds']

    try:
        parsed_df = pd.DataFrame(l, columns=header)
    except:
        if scores_or_teams != 'scores':
            header = ['Entrant', 'Constructor', 'Chassis', 'Power unit',
                      'No.', 'Driver name', 'Rounds']
            parsed_df = pd.DataFrame(l, columns = header)
        else:
            header = header[:-1]
            parsed_df = pd.DataFrame(l, columns = header)
        
    return parsed_df


def map_outcome_to_score(race_outcome: str) -> int:
    '''
    Crunches Fantasy points from race outcome.
    INPUT
        race_outcome: representation of race outcome (e.g. "4P")
    OUTPUT
        race_score: score for racer for race
    '''
    position = ""
    modifiers = ""
    for char in race_outcome:
        if char.isdigit():
            position += char
        else:
            modifiers += char

    # This just ignores modifiers it doesn't have handling for
    modifier_score = MODIFIER_SCORE_MAPPINGS.get(modifiers, 0)
    position_score = POSITION_POINT_MAPPING.get(str(position), 0)
    race_score = modifier_score + position_score

    return race_score


def map_outcome_to_position(race_outcome: str) -> int:
    '''
    Crunches Fantasy points from race outcome.
    INPUT
        race_outcome: representation of race outcome (e.g. "4P")
    OUTPUT
        race_position: score for racer for race
    '''
    position = ""
    for char in race_outcome:
        if char.isdigit():
            position += char
    if position:
        return int(position)
    else:
        return 10000


def score_dataframe_cleanup(dirty_df):
    '''
    Cleans up the text in the dataframe
    and removes e.g. empty rows.
    '''
    score_df = dirty_df.copy()
    try:
        score_df = score_df.dropna().drop('Points', axis=1)
    except:
        score_df = score_df.dropna()
        
    position_df = score_df.copy()
    
    for col in score_df.columns:
        score_df[col] = score_df[col].apply(lambda x: x.replace("\n", ""))
        position_df[col] = position_df[col].apply(lambda x: x.replace("\n", ""))
        if col != 'Driver':
            score_df[col] = score_df[col].map(map_outcome_to_score)
            position_df[col] = position_df[col].map(map_outcome_to_position)

    return score_df, position_df


def team_dataframe_cleanup(dirty_df):
    df = dirty_df.dropna(axis=0)
    df = df.loc[:, ["Driver name", "Entrant"]]
    for col in df.columns:
        df[col] = df[col].apply(lambda x: x.replace("\n", ""))

    return df


def get_data_by_year(year):
    """
    Pulls in the raw HTML table of F1 results from Wikipedia.
    """
    wiki = wikipedia.WikipediaPage(f"{year} Formula One World Championship")
    soup = BeautifulSoup(wiki.html())
    tables = soup.find_all('table', {'class': 'wikitable'})

    raw_score_table = tables[-4]
    raw_team_table = tables[0]

    score_table = parse_table(raw_score_table, "scores")
    score_df, position_df = score_dataframe_cleanup(score_table)

    
    team_table = parse_table(raw_team_table, "teams")
    team_df = team_dataframe_cleanup(team_table)

    position_teammates_df = get_teammate_mappings(position_df, team_df)
    score_teammates_final_df = update_scores_by_comparison(position_teammates_df, score_df)
    
    score_teammates_final_df = score_teammates_final_df.set_index('Driver')
    
    return score_teammates_final_df


def split_drivers(team_members, drivers):
    results = []
    for d in drivers:
        if d in team_members:
            results.append(d)
            team_members = team_members.replace(d, "")
    else:
        return results

def get_teammate(driver, members):
    for m in members:
        if driver in m:
            teammate = [person for person in m if person != driver][0]
            return teammate
    
def get_teammate_mappings(score_df, team_df):
    teammates = []
    # Break string of members into separate drivers
    drivers = score_df.Driver.values
    team_df2 = team_df.copy()
    team_df2["Driver name"] = team_df2["Driver name"].map(
        lambda x: split_drivers(x, drivers))

    score_teammate_df = score_df.copy()
    score_teammate_df["teammate"] = None
    for idx, row in score_teammate_df.iterrows():
        score_teammate_df.loc[idx, "teammate"] = get_teammate(
            row.Driver, 
            team_df2["Driver name"].values
        )
    return score_teammate_df


def update_scores_by_comparison(position_teammates_df, score_df):
    final_score_df = score_df.copy()
    
    for row_index, (_ ,row) in enumerate(position_teammates_df.iterrows()):
        teammate = row.teammate
        teammate_row = position_teammates_df.loc[position_teammates_df.Driver==teammate]
        
        for col_index, column in enumerate(position_teammates_df.columns):
            if column not in ['Driver', 'teammate']:
                racer_position = row[column] 
                teammate_position = teammate_row[column].iloc[0]
                if racer_position < teammate_position:
                    try:
                        final_score_df.iloc[row_index, col_index] += 3
                    except:
                        print(row_index, col_index)
        
    return final_score_df


def aggregate_features(score_df):
    df = score_df.copy()
    total_scores = df.sum(axis=1)
    mean_scores = df.mean(axis=1)
    std_scores = df.std(axis=1)
    
    df['total_score'] = total_scores
    df['mean_score'] = mean_scores
    df['std_score'] = std_scores
    
    return df[['total_score', 'mean_score', 'std_score']]

In [7]:
score_df_17 = get_data_by_year(2017)
score_df_18 = get_data_by_year(2018)
score_df_19 = get_data_by_year(2019)
score_df_20 = get_data_by_year(2020)

agg_score_17 = aggregate_features(score_df_17)
agg_score_18 = aggregate_features(score_df_18)
agg_score_19 = aggregate_features(score_df_19)
agg_score_20 = aggregate_features(score_df_20)

score_dfs = [score_df_17, score_df_18, score_df_19, score_df_20]
agg_dfs = [agg_score_17, agg_score_18, agg_score_19, agg_score_20]

In [38]:
data = [] # data that'll be used for the training/testing dataframes
for year_idx, score_df in enumerate(score_dfs):
    for driver_idx, (_, row) in enumerate(score_df.iterrows()):
        driver_name = row.name
        
        # get last year's aggregate statistics
        try:
            last_year_tot = agg_dfs[year_idx - 1].loc[driver_name, "total_score"]
            last_year_avg = agg_dfs[year_idx - 1].loc[driver_name, "mean_score"]
            last_year_std = agg_dfs[year_idx - 1].loc[driver_name, "std_score"]
        except: # case for when we're dealing with the first year (or a racer's first year)
            last_year_tot, last_year_avg, last_year_std = np.nan, np.nan, np.nan
            
        for race_idx, race_name in enumerate(score_df.columns):
            
            # get the scores of the past 4 races
            try:
                race_1 = score_df.iloc[driver_idx, race_idx-1]
            except:
                try:
                    last_score_df = score_dfs[year_idx-1]
                    race_1 = last_score_df.loc[driver_name, -1]
                except:
                    race_1 = np.nan

            try:
                race_2 = score_df.iloc[driver_idx, race_idx-2]
            except:
                try:
                    last_score_df = score_dfs[year_idx-1]
                    last_year_race_idx = race_idx - 2
                    race_2 = last_score_df.loc[driver_name, last_year_race_idx]
                except:
                    race_2 = np.nan
            
            try:
                race_3 = score_df.iloc[driver_idx, race_idx-3]
            except:
                try:
                    last_score_df = score_dfs[year_idx-1]
                    last_year_race_idx = race_idx - 3
                    race_3 = last_score_df.loc[driver_name, last_year_race_idx]
                except:
                    race_3 = np.nan
            
            try:
                race_4 = score_df.iloc[driver_idx, race_idx-4]
            except:
                try:
                    last_score_df = score_dfs[year_idx-1]
                    last_year_race_idx = race_idx - 4
                    race_4 = last_score_df.loc[driver_name, last_year_race_idx]
                except:
                    race_4 = np.nan
            
            # get the score of the same race last year
            try:
                last_score_df = score_dfs[year_idx-1]
                same_race_last_year = last_score_df.loc[driver_name, race_name]
            except:
                same_race_last_year = np.nan
            
            current_race_score = row[race_name]
            year = year_idx + 2017
            
            data.append([
                driver_name, race_name, race_1, race_2, race_3, race_4,
                last_year_tot, last_year_avg, last_year_std,
                same_race_last_year, current_race_score, year
            ])

  and should_run_async(code)


In [43]:
final_dataset = pd.DataFrame(data, columns=[
    "driver", "race", "race_1", "race_2", "race_3", "race_4", "last_year_tot",
    "last_year_avg", "last_year_std", "same_race_last_year", "score", "year"
])

  and should_run_async(code)


In [44]:
final_dataset.to_csv('dataset.csv', index=False)