## Introduction to Data Science Project ##

### Data Loading ###

This first part of the code will be about loading the data and constructing the features to later train our models.

We are working with Statsbomb Open Data 360, which is integrated in the mplsoccer python library. This data is event stream data, that includes the positions of all other players on the pitch at the time of any action.

In [3]:
from mplsoccer import Sbopen
import pandas as pd
import numpy as np
from collections import defaultdict
from matplotlib.path import Path
from sklearn.ensemble import RandomForestRegressor

parser = Sbopen()
df_competition = parser.competition()
competitions360 = df_competition[
    (df_competition['match_available_360'].notna()) & 
    (df_competition["competition_name"].isin(["1. Bundesliga", "FIFA World Cup", "UEFA Euro"]))
]
print(competitions360[['competition_name', 'season_name', 'competition_id', 'season_id']])

   competition_name season_name  competition_id  season_id
0     1. Bundesliga   2023/2024               9        281
29   FIFA World Cup        2022              43        106
68        UEFA Euro        2024              55        282
69        UEFA Euro        2020              55         43


Now all matches from the selected competitions are concatenated into one data frame.

In [4]:
matches_360_list = []

for _, row in competitions360.iterrows():
    comp_id = row['competition_id']
    season_id = row['season_id']
    
    df_matches = parser.match(competition_id=comp_id, season_id=season_id)
    matches_360_list.append(df_matches)

all_matches_360 = pd.concat(matches_360_list, ignore_index=True)

print(len(all_matches_360))

200


And then all events from the selected matches are also concatenated into one data frame.

In [5]:
all_events = pd.DataFrame()

for _, row in competitions360.iterrows():
    comp_id = row['competition_id']
    season_id = row['season_id']
    
    try:
        df_matches = parser.match(competition_id=comp_id, season_id=season_id)
        
        for match_id in df_matches['match_id']: 
            try:
                df_event, _, _, _ = parser.event(match_id)
                df_event['match_id'] = match_id  
                df_event['competition_name'] = row['competition_name']
                df_event['season_name'] = row['season_name']

                all_events = pd.concat([all_events, df_event], ignore_index=True)
            except Exception as e:
                print(f"Skipping match {match_id}: {e}")
                
    except Exception as e:
        print(f"Error retrieving matches for competition {comp_id}, season {season_id}: {e}")

We will also append the corresponding 360 data to each event.

In [6]:
parser_frames = Sbopen(dataframe=False)

all_frames = []

for _, match in all_matches_360.iterrows():
    match_id = match['match_id']
    
    try:
        frames, visible = parser_frames.frame(match_id)


        all_frames.append({
            'match_id': match_id,
            'frames': frames,
            'visible': visible
        })
        
    except Exception as e:
        print(f"Skipping frames for match {match_id}: {e}")

event_data = {}

for match_data in all_frames:
    visible_list = match_data['visible'] 
    frames_list = match_data['frames']   

    freeze_frame_map = defaultdict(list)
    for frame in frames_list:
        event_id = frame.get('event_id') or frame.get('id')
        freeze_frame_map[event_id].append(frame)

    for visible_entry in visible_list:
        event_id = visible_entry['id']
        event_data[event_id] = {
            'visible_area': visible_entry['visible_area'],
            'freeze_frame': freeze_frame_map.get(event_id, [])
        }

all_events['visible_area'] = all_events['id'].map(lambda x: event_data.get(x, {}).get('visible_area'))
all_events['freeze_frame'] = all_events['id'].map(lambda x: event_data.get(x, {}).get('freeze_frame'))

### Data Cleaning ###

For the sake of simplicity we will exclude some action types from the data. This could be because they are off-ball actions, they are not relevant to the model, they are very rare or they are already captured by the model. In particular we will neglect goal keeper and defensive actions since those are primarily off-ball and cannot really be captured by our data.

Also we will exclude penalty shootouts from the model, these are denoted with period = 5.

In [7]:
relevant_events = ['Pass', 'Carry', 'Shot', 'Clearance', 'Dribble', 'Duel', 'Foul Committed', 'Interception',
                   'Miscontrol', 'Ball Recovery', 'Own Goal Against']

all_events = all_events[all_events['type_name'].isin(relevant_events)]
all_events = all_events[~((all_events['type_name'] == "Duel") & (all_events['sub_type_name'] != "Tackle"))]
all_events = all_events[all_events['period'] != 5]

def parse_action_row(row):
    type_name = row.get("type_name", "")
    outcome = row.get("outcome_name", "")
    body_part_name = row.get("body_part_name", "")

    if body_part_name in ["Right Foot", "Left Foot"]:
        body_part_name = "Foot"
    elif body_part_name == "Head":
        body_part_name = "Head"
    else:
        body_part_name = "Other"
    
    # Defaults
    action_type = "non_action"
    result = "success"

    if type_name == "Pass":
        cross = False
        if row.get("pass_cross", False) == np.nan:
            cross = True
        height = row.get("pass_height_name", "")
        subtype = row.get("sub_type_name", "")

        if subtype == "Free Kick":
            action_type = "Free Kick Pass"
        elif subtype == "Corner":
            action_type = "Corner"
        elif subtype == "Goal Kick":
            action_type = "Goal Kick"
        elif subtype == "Throw-in":
            action_type = "Throw In"
        elif cross:
            action_type = "Cross"
        else:
            action_type = "Pass"

        if outcome in ["Incomplete", "Out"]:
            result = "fail"
        elif outcome == "Pass Offside":
            result = "offside"

    elif type_name == "Shot":
        subtype = row.get("sub_type_name", "")
        if subtype == "Free Kick":
            action_type = "Free Kick Shot"
        elif subtype == "Penalty":
            action_type = "Penalty Shot"
        else:
            action_type = "Shot"

        result = "success" if outcome == "Goal" else "fail"

    elif type_name == "Dribble":
        action_type = "Carry"
        result = "fail" if outcome == "Incomplete" else "success"

    elif type_name == "Carry":
        action_type = "Carry"
        result = "success"

    elif type_name == "Foul Committed":
        action_type = "Foul"
        result = "fail"

    elif type_name == "Duel":
        action_type = "Tackle"
        result = "fail" if outcome in ["Lost In Play", "Lost Out"] else "success"

    elif type_name == "Interception":
        action_type = "Interception"
        result = "fail" if outcome in ["Lost In Play", "Lost Out"] else "success"

    elif type_name == "Own Goal Against":
        action_type = "Own Goal"
        result = "owngoal"

    elif type_name == "Clearance":
        action_type = "Clearance"
        result = "success"

    elif type_name == "Miscontrol":
        action_type = "Miscontrol"
        result = "fail"
    
    elif type_name == "Ball Recovery":
        action_type = "Ball Recovery"
        result = "fail" if row.get("ball_recovery_recovery_failure") == True else "success"

    return pd.Series([action_type, result, body_part_name])

all_events_cleaned = all_events
all_events_cleaned[["action_type", "result", "body_part_name"]] = all_events_cleaned.apply(parse_action_row, axis=1)

### Feature Construction ###

As the initial features we add distance to goal, angle to goal and time elapsed to every game state.

Note: The data is already normalized such that the attacking team will always attack from x = 0 to x = 120, so the distance to goal can be calculated the same for both teams regardless of possession changes. Also the x and y dimensions of the pitch (as well as individual player positions) are already normalized to x = [0, 120] and y = [0, 80] to deal with different pitch dimensions in football. The goal size is set to 7.32m.

In [8]:
GOAL_X = 120
GOAL_Y = 40
LEFT_POST = 43.66
RIGHT_POST = 36.34

def calculate_distance_to_goal(df_event):
    
    df_event['distance_to_goal'] = np.sqrt((df_event['x'] - GOAL_X)**2 + (df_event['y'] - GOAL_Y)**2)
    
    return df_event

def calculate_angle_to_goal(df_event):

    dx = GOAL_X - df_event['x']
    dy1 = LEFT_POST - df_event['y']
    dy2 = RIGHT_POST - df_event['y']

    angle = np.degrees(np.abs(np.arctan2(dy1, dx) - np.arctan2(dy2, dx)))
    df_event['angle_to_goal'] = angle

    return df_event

def calculate_time_elapsed(df_event):

    minute = df_event['minute']
    second = df_event['second']

    df_event['time_elapsed'] = minute * 60 + second

    return df_event

all_events_cleaned = (
    all_events_cleaned
    .pipe(calculate_distance_to_goal)
    .pipe(calculate_angle_to_goal)
    .pipe(calculate_time_elapsed)
)

Now for some game context we will also add columns to keep track of the current score of the game.

In [14]:
def add_score_columns(df_all_events):
    df_all_events = df_all_events.copy()
    df_all_events['score_possession_team'] = 0
    df_all_events['score_opponent_team'] = 0
    df_all_events['score_difference'] = 0

    for match_id, df_match in df_all_events.groupby('match_id'):
        df_match = df_match.sort_values(['period', 'time_elapsed']).copy()

        team_scores = defaultdict(int)
        team_ids = df_match['team_id'].unique()

        for idx, row in df_match.iterrows():
            possession_team = row['team_id']
            event_team = row['team_id']
            event_type = row['type_name']
            outcome = row['outcome_name']

            if event_type == "Own Goal Against":
                conceding_team = event_team
                other_teams = [t for t in team_ids if t != conceding_team]
                if other_teams:
                    scoring_team = other_teams[0]
                    team_scores[scoring_team] += 1

            elif event_type == "Shot" and outcome == "Goal":
                team_scores[event_team] += 1

            opponent_teams = [t for t in team_ids if t != possession_team]
            opponent_team = opponent_teams[0] if opponent_teams else None

            possession_score = team_scores.get(possession_team, 0)
            opponent_score = team_scores.get(opponent_team, 0) if opponent_team else 0
            score_diff = possession_score - opponent_score

            df_all_events.at[idx, 'score_possession_team'] = possession_score
            df_all_events.at[idx, 'score_opponent_team'] = opponent_score
            df_all_events.at[idx, 'score_difference'] = score_diff

    return df_all_events

all_events_cleaned = add_score_columns(all_events_cleaned)

Some more advanced features that we will use and test are:

1. Defensive Pressure: since decision making is very much influenced by the pressure. For this we will use the distance to nearest opponent as a proxy.
2. Opponents behind the ball: this is used to detect offensive overload situations or situations were an attacker is through on goal.
3. Goalkeeper position: this is used to detect situations where the goalkeeper is not in goal and the goal is open to the attacker.

All of these features are 360 features and since we do not have all the positions of all players at all times (just those that are in the camera frame), we will have to deal with missing data. For the pressure we will assume a base pressure if we do not have any data, for the opponents behind the ball we will just leave it as is and for the goalkeeper position we will assume that the goalkeeper is in goal if we do not have the position.

In [9]:
def calculate_closest_opponent_distance(df):
    def get_min_distance(row):
        x0, y0 = row.get('x'), row.get('y')

        if not isinstance(x0, (int, float)) or not isinstance(y0, (int, float)):
            return np.nan

        freeze_frame = row.get('freeze_frame')
        if not freeze_frame or not isinstance(freeze_frame, list):
            return np.nan

        freeze_df = pd.DataFrame(freeze_frame)

        opponents = freeze_df[freeze_df.get('teammate') == False]

        if opponents.empty or 'x' not in opponents or 'y' not in opponents:
            return np.nan

        opponents['distance'] = np.sqrt((opponents['x'] - x0) ** 2 + (opponents['y'] - y0) ** 2)

        return opponents['distance'].min()

    df['closest_defender_distance'] = df.apply(get_min_distance, axis=1)
    return df

def calculate_opponents_in_front(df):
    def count_opponents_in_front(row):
        x0 = row.get('x')

        if not isinstance(x0, (int, float)):
            return np.nan

        freeze_frame = row.get('freeze_frame')
        if not freeze_frame or not isinstance(freeze_frame, list):
            return np.nan

        freeze_df = pd.DataFrame(freeze_frame)

        opponents = freeze_df[freeze_df.get('teammate') == False]

        if opponents.empty or 'x' not in opponents:
            return np.nan

        count = (opponents['x'] > x0).sum()
        return count

    df['opponents_in_front'] = df.apply(count_opponents_in_front, axis=1)
    return df

all_events_cleaned = (
    all_events_cleaned
    .pipe(calculate_closest_opponent_distance)
    .pipe(calculate_opponents_in_front)
)

Now we still have missing values for closest_opponent_distance and calculate_opponents_in_front, since we cannot leave out the missing values (because action scores would then be misconstrued), we will impute the missing values with a Random Forest Regressor based on x and y positions.

In [10]:
def model_impute(df, target_column):
    df_complete = df[df[target_column].notna()]
    df_missing = df[df[target_column].isna()]

    if df_missing.empty:
        return df

    features = ['x', 'y']

    model = RandomForestRegressor()
    model.fit(df_complete[features], df_complete[target_column])

    df.loc[df[target_column].isna(), target_column] = model.predict(df_missing[features])

    return df

all_events_cleaned = model_impute(all_events_cleaned, 'closest_defender_distance')
all_events_cleaned = model_impute(all_events_cleaned, 'opponents_in_front')

Now onto the goalkeeper features.

Note that we are going to use imputation to account for missing values. The goalkeeper position is actually missing on a lot of frames so we are going to set it to a default of x = 120 and y = 40. This imputation will also mean that gk_in_traingle and gk_in_penalty_area will be True by default.

In [11]:
PENALTY_AREA_DEPTH = 16.5
PENALTY_AREA_LENGTH_FROM_POST = 16.5
tolerance = 3

def calculate_goalkeeper_features(df):
    def extract_features(row):
        x0, y0 = row.get('x'), row.get('y')
        freeze_frame = row.get('freeze_frame')

        gk_x, gk_y = GOAL_X, GOAL_Y
        gk_distance_to_ball = np.nan
        gk_in_triangle = False
        gk_in_penalty_area = False

        if isinstance(freeze_frame, list):
            freeze_df = pd.DataFrame(freeze_frame)
            gk_candidates = freeze_df[
                (freeze_df.get('keeper') == True) &
                (freeze_df.get('teammate') == False)
            ]
            if not gk_candidates.empty:
                gk_x = gk_candidates.iloc[0].get('x', GOAL_X)
                gk_y = gk_candidates.iloc[0].get('y', GOAL_Y)

        if isinstance(x0, (int, float)) and isinstance(y0, (int, float)):
            gk_distance_to_ball = np.sqrt((gk_x - x0)**2 + (gk_y - y0)**2)

            triangle = Path([
                (x0, y0),
                (GOAL_X, LEFT_POST),
                (GOAL_X, RIGHT_POST)
            ])

            gk_in_triangle = triangle.contains_point((gk_x, gk_y), radius=tolerance)

            gk_in_penalty_area = (
                (GOAL_X - PENALTY_AREA_DEPTH <= gk_x <= GOAL_X + tolerance) and
                (RIGHT_POST - PENALTY_AREA_LENGTH_FROM_POST <= gk_y <= LEFT_POST + PENALTY_AREA_LENGTH_FROM_POST)
            )

        return pd.Series({
            'gk_x': gk_x,
            'gk_y': gk_y,
            'gk_distance_to_ball': gk_distance_to_ball,
            'gk_in_shooting_triangle': gk_in_triangle,
            'gk_in_penalty_area': gk_in_penalty_area
        })

    # Drop existing columns if they already exist to prevent duplicates
    cols_to_remove = [
        'gk_x', 'gk_y', 'gk_distance_to_ball',
        'gk_in_shooting_triangle', 'gk_in_penalty_area'
    ]
    df = df.drop(columns=[col for col in cols_to_remove if col in df.columns])

    gk_features = df.apply(extract_features, axis=1)
    df = pd.concat([df, gk_features], axis=1)
    return df

all_events_cleaned = (
    all_events_cleaned
    .pipe(calculate_goalkeeper_features)
)

### Target Label Construction ###

Now we will search all goals in the match and for the previous k = 10 actions we will assign a value of 1 to label_team_goal if the team that scored a goal was in possession at the game state or a label of 1 to label_opponent_goal if the opponent team was in possession at the game state. All other game states will have a default value of 0.

In [16]:
def assign_goal_labels(df_all_events, lookback=10):
    df_all_events = df_all_events.copy()
    df_all_events['label_team_goal'] = 0
    df_all_events['label_opponent_goal'] = 0

    grouped = df_all_events.groupby(['match_id', 'period'])

    for (match_id, period), df_match in grouped:
        match_indices = df_match.index.to_list()

        for i, global_idx in enumerate(match_indices):
            row = df_all_events.loc[global_idx]
            outcome = row['outcome_name']
            action_type = row['type_name']
            acting_team = row['team_id']

            if outcome == 'Goal':
                scoring_team = acting_team

                for j in range(max(0, i - lookback + 1), i + 1):
                    event_idx = match_indices[j]
                    event_team = df_all_events.loc[event_idx, 'team_id']

                    if event_team == scoring_team:
                        df_all_events.at[event_idx, 'label_team_goal'] = 1
                    else:
                        df_all_events.at[event_idx, 'label_opponent_goal'] = 1

            elif action_type == 'Own Goal Against':
                conceding_team = acting_team

                past_teams = df_match.loc[:global_idx - 1, 'team_id'].unique()
                opponents = [team for team in past_teams if team != conceding_team]
                if not opponents:
                    continue
                scoring_team = opponents[0]

                for j in range(max(0, i - lookback + 1), i + 1):
                    event_idx = match_indices[j]
                    event_team = df_all_events.loc[event_idx, 'team_id']

                    if event_team == scoring_team:
                        df_all_events.at[event_idx, 'label_team_goal'] = 1
                    else:
                        df_all_events.at[event_idx, 'label_opponent_goal'] = 1

    return df_all_events

all_events_cleaned = assign_goal_labels(all_events_cleaned)
all_events_cleaned.to_pickle("data/data_cleaned.pkl")

### Player Minutes Computation ###

To later calculate individual action value scores per 90 mins we need to count the player minutes from all players. The minutes are later saved in a file so we can load them for later.

In [18]:
minutes_played_list = []

for match_id in all_matches_360['match_id']:
    events, related, freeze, tactics = parser.event(match_id)
    lineup = parser.lineup(match_id)

    match_row = all_matches_360[all_matches_360['match_id'] == match_id].iloc[0]
    competition_name = match_row['competition_name']
    season_name = match_row['season_name']

    match_max_minute = events['minute'].max()
    if pd.isna(match_max_minute):
        match_max_minute = 90

    time_off = events.loc[events.type_name == 'Substitution', ['player_id', 'minute']].rename(columns={'minute': 'off'})
    time_on = events.loc[events.type_name == 'Substitution', ['substitution_replacement_id', 'minute']].rename(columns={'substitution_replacement_id': 'player_id', 'minute': 'on'})

    lineup = lineup.merge(time_on, on='player_id', how='left')
    lineup = lineup.merge(time_off, on='player_id', how='left')

    starting_ids = events[events.type_name == 'Starting XI'].id
    starting_xi = tactics[tactics.id.isin(starting_ids)]
    starting_players = starting_xi.player_id

    mask_played = (
        lineup.on.notnull() |
        lineup.off.notnull() |
        lineup.player_id.isin(starting_players)
    )
    lineup = lineup[mask_played].copy()

    lineup['minutes_played'] = np.where(
        lineup['player_id'].isin(starting_players),
        np.where(lineup['off'].notnull(), lineup['off'], match_max_minute),
        np.where(lineup['on'].notnull(), match_max_minute - lineup['on'], 0)
    )

    lineup['minutes_played'] = np.where(
        (lineup['on'].notnull()) & (lineup['off'].notnull()),
        lineup['off'] - lineup['on'],
        lineup['minutes_played']
    )

    lineup['match_id'] = match_id
    lineup['competition_name'] = competition_name
    lineup['season_name'] = season_name

    minutes_played_list.append(
        lineup[['match_id', 'player_id', 'player_name', 'player_nickname',
                'minutes_played', 'competition_name', 'season_name']]
    )

all_minutes_played = pd.concat(minutes_played_list, ignore_index=True)

player_minutes = all_minutes_played.groupby(
    ['player_id', 'player_name', 'player_nickname'],
    as_index=False
)['minutes_played'].sum()

print(player_minutes.sort_values('minutes_played', ascending=False))
player_minutes.to_pickle("data/player_minutes.pkl")

      player_id                 player_name             player_nickname  \
75         3500                Granit Xhaka                Granit Xhaka   
524        8667              Lukáš Hrádecký              Lukáš Hrádecký   
462        8221                Jonathan Tah                Jonathan Tah   
632       10336   Alejandro Grimaldo García               Álex Grimaldo   
1263      40724               Florian Wirtz               Florian Wirtz   
...         ...                         ...                         ...   
1142      32253          Luca Jannis Kilian                 Luca Kilian   
1149      32602             Kristijan Jakić             Kristijan Jakić   
1455     132772  Daniel Alonso Chacón Salas  Daniel Alonso Chacón Salas   
872       20037       Julian Baumgartlinger       Julian Baumgartlinger   
365        6941               Jasmin Kurtič               Jasmin Kurtič   

      minutes_played  
75            4232.0  
524           3393.0  
462           3073.0  
632    