In [1]:
import pandas as pd
import numpy as np

import math
import ast
from tqdm import tqdm
import warnings

warnings.filterwarnings("ignore")

In [2]:
###########################
# This notebook loads StatsBomb's open data and calculates the various more advanced features that we'll ues as heuristics to input into our models. It covers the four heads of features we're looking to add: location-based, goalkeeper, and defensive pressure statistics, as well as features based on our various hypotheses.
###########################

In [3]:
all_shots = pd.read_csv("./data/all_shots.csv")
all_shots = all_shots.dropna(axis=1, how="all")

FileNotFoundError: [Errno 2] No such file or directory: './data/all_shots.csv'

In [None]:
all_shots.shape

In [None]:
all_shots = all_shots[~all_shots["competition_id"].isin(["FIFA U20 World Cup", "North American League", "Liga Profesional", "Indian Super League"])].drop(columns=all_shots.columns[all_shots.nunique() == 0]).sort_values(by=["match_id", "period", "minute", "second"])
all_shots.nunique()

In [None]:
all_shots[["season_id", "match_id", "id", "index", "location", "period", "minute", "second", "player", "player_id", "position", "possession", "possession_team", "possession_team_id", "team", "team_id", "timestamp", "competition_id", "shot_outcome"]].sort_values(by=["match_id", "period", "minute", "second"])

In [None]:
# First, we'll append statistics related to the pass that led to the shot to the shots dataframe.
all_passes = pd.read_csv("./data/all_passes.csv")
all_passes = all_passes.dropna(axis=1, how="all")

In [None]:
key_passes = all_passes[all_passes["id"].isin(all_shots['shot_key_pass_id'])]

In [None]:
key_passes[["duration", "pass_angle", "pass_type", "pass_height", "pass_length", "pass_assisted_shot_id"]]

In [None]:
key_passes["pass_duration"] = key_passes["duration"]

all_shots = pd.merge(all_shots, key_passes[["pass_duration", "pass_angle", "pass_type", "pass_height", "pass_length", "pass_assisted_shot_id"]], how='left', left_on='id', right_on='pass_assisted_shot_id', suffixes=("", ""))
all_shots = all_shots.drop("pass_assisted_shot_id", axis=1)

In [None]:
possession_passes = all_passes.groupby(["match_id", "possession"]).agg({
    'match_id': 'first',
    'possession': 'first',
    'duration': 'count'
}).reset_index(drop=True)
possession_passes["num_passes"] = possession_passes["duration"]
possession_passes = possession_passes.drop("duration", axis=1)

all_shots = pd.merge(all_shots, possession_passes, how='left', on=["match_id", "possession"])

In [None]:
all_shots

In [None]:
# Now, we'll extract relevant attributes from the all_shots dataframe
data = pd.DataFrame()

# general and time attributes
data[["period", "minute", "second", "possession", "duration", "competition_id", "season_id", "match_id", "timestamp", "team", "player", "freeze_frame"]] = all_shots[["period", "minute", "second", "possession", "duration", "competition_id", "season_id", "match_id", "timestamp", "team", "player", "shot_freeze_frame"]]
data.sort_values(by=["match_id", "period", "minute", "second"], inplace=True)

In [None]:
# qualitative attributes
data[["play_pattern", "position"]] = all_shots[["play_pattern", "position"]]
data["player_type"] = all_shots["position"].apply(lambda x: 4 if ("Forward" in x or "Striker" in x) else 3 if "Wing" in x else 2 if "Mid" in x else 1 if ("Back" in x or "Defen" in x) else 0)

In [None]:
# shot attributes
data["location_x"] = all_shots["location"].apply(lambda x: ast.literal_eval(x)[0])
data["location_x_distance"] = 120 - data["location_x"]
data["location_y"] = all_shots["location"].apply(lambda x: ast.literal_eval(x)[1])
data["location_y_distance"] = abs(data["location_y"] - 40)
data["duration"] = all_shots["duration"].apply(lambda x: x if x < 100 else 0)
data["technique"] = all_shots["shot_technique"]
data["body_part"] = all_shots["shot_body_part"]
data["type"] = all_shots["shot_type"]
data["is_penalty"] = all_shots["shot_type"] == "Penalty"
data["is_header"] = all_shots["shot_body_part"] == "Head"

In [None]:
# shot modifiers
data["first_time"] = all_shots["shot_first_time"].fillna(False)
data["open_goal"] = all_shots["shot_open_goal"].fillna(False)
data["one_on_one"] = all_shots["shot_one_on_one"].fillna(False)
data["aerial_won"] = all_shots["shot_aerial_won"].fillna(False)
data["follows_dribble"] = all_shots["shot_follows_dribble"].fillna(False)
data["under_pressure"] = all_shots["under_pressure"].fillna(False)

In [None]:
# preceding pass attributes
data["pass_duration"] = all_shots["pass_duration"].fillna(all_shots["pass_duration"].mean())
data["pass_angle"] = all_shots["pass_angle"].fillna(all_shots["pass_angle"].mean())
data["pass_type"] = all_shots["pass_type"].fillna("Missing")
data["pass_height"] = all_shots["pass_height"].fillna("Missing")
data["pass_length"] = all_shots["pass_length"].fillna(all_shots["pass_length"].mean())
data["num_passes"] = all_shots["num_passes"].fillna(0)

In [None]:
# locations for defensive/goalkeeper attributes
locations = pd.DataFrame()
locations["opponents"] = all_shots["shot_freeze_frame"].fillna('[{"location": [], "position": {"name": ""}, "teammate": False}]').apply(lambda frame: [[player["location"], player["position"]["name"]] for player in ast.literal_eval(frame) if not player["teammate"]])
locations["shot"] = tuple(zip(data["location_x"].apply(lambda x: 119.9 if x == 120.0 else x), data["location_y"]))
locations["goalkeeper"] = locations["opponents"].apply(lambda frame: [player[0] for player in frame if player[1] == "Goalkeeper"]).apply(lambda x: x[0] if len(x) != 0 else [-1, -1]).apply(lambda x: 119.9 if x == 120.0 else x)
locations

In [None]:
# Check if a point is inside a triangle
def point_in_triangle(point, a, b, c):
    as_x = point[0] - a[0]
    as_y = point[1] - a[1]
    s_ab = ((b[0] - a[0]) * as_y) - ((b[1] - a[1]) * as_x) > 0

    if (((c[0] - a[0]) * as_y) - ((c[1] - a[1]) * as_x) > 0) == s_ab \
            or (((c[0] - b[0]) * (point[1] - b[1])) - ((c[1] - b[1]) * (point[0] - b[0])) > 0) != s_ab:
        return False
    else:
        return True

In [None]:
# Calculate the area of a triangle
def area_of_triangle(a, b, c):
    return abs(0.5 * (a[0]*(b[1] - c[1]) + b[0]*(c[1] - a[1]) + c[0]*(a[1] - b[1])))

In [None]:
# Calculate the angle between two sides of the triangle
def angle_between_sides(a, b, c):
    cos_theta = (a**2 + b**2 - c**2) / (2 * a * b)
    theta = math.degrees(math.acos(cos_theta))
    return theta

In [None]:
# What is the best (shortest) straight line path from shooter to goal?
def best_distance(point) :

    line = (0, 8)
    left_line = (point[0] - 120, point[1] - 36)
    right_line = (point[0] - 120, point[1] - 44)

    line_dot_left = line[0] * left_line[0] + line[1] * left_line[1]
    line_dot_right = line[0] * right_line[0] + line[1] * right_line[1]

    if line_dot_right > 0:
        x = 120 - point[0]
        y = 44 - point[1]
        return math.sqrt(x * x + y * y)
    elif line_dot_left < 0:
        x = 120 - point[0]
        y = 36 - point[1]
        return math.sqrt(x * x + y * y)
    else:
        return 120 - point[0]

In [None]:
# Calculate the perpendicular line 1.5m on either side of a line between two points
def calculate_perpendicular(a, b):

    slope = (b[1] - a[1])/(b[0] - a[0])
    slope = 0.00001 if slope == 0 else slope
    perpendicular_slope = -1/slope

    offset_x = 1.5 / np.sqrt(1 + perpendicular_slope**2)
    offset_y = perpendicular_slope * offset_x
    point1 = (a[0] + offset_x, a[1] + offset_y)
    point2 = (a[0] - offset_x, a[1] - offset_y)

    return point1, point2

In [None]:
# A normal Gaussian
def gaussian(x, y, a, b):
    d2 = (x - a)**2 + (y - b)**2
    return np.exp(-d2 / 2)

In [None]:
# defensive pressure/goalkeeper attributes
data["defenders_3m_radius"] = locations.apply(lambda shot: (len([defender for defender in shot["opponents"] if ((defender[0][0]-shot["shot"][0])**2 + (defender[0][1]-shot["shot"][1])**2) < 3**2])) if shot["opponents"] != [[[], '']] else -1, axis=1)
data["defenders_triangle"] = locations.apply(lambda shot: (len([defender for defender in shot["opponents"] if point_in_triangle(defender[0], shot["shot"], [120, 32], [120, 48])])) if shot["opponents"] != [[[], '']] else -1, axis=1)

data["goalkeeper_x"] = locations["goalkeeper"].apply(lambda x: x[0])
data["goalkeeper_y"] = locations["goalkeeper"].apply(lambda x: x[1])
data["distance_to_goalie"] = data.apply(lambda x: math.sqrt((x["goalkeeper_x"] - x["location_x"])**2 + (x["goalkeeper_y"] - x["location_y"])**2), axis=1)

In [None]:
# angle/location-based attributes
data["shooting_range"] = locations["shot"].apply(lambda point: angle_between_sides(math.dist(point, (120, 36)), math.dist((120, 44), point), 8))
data["goal_distance"] = locations["shot"].apply(lambda point: math.dist(point, (120, 40)))
data["best_distance"] = locations["shot"].apply(lambda point: best_distance(point))

In [None]:
# target variables
data["statsbomb_xg"] = all_shots["shot_statsbomb_xg"].apply(lambda x: float(x))
data["end_location_x"] = all_shots["shot_end_location"].apply(lambda x: ast.literal_eval(x)[0])
data["end_location_y"] = all_shots["shot_end_location"].apply(lambda x: ast.literal_eval(x)[1])
data["is_goal"] = all_shots["shot_outcome"].apply(lambda x: True if x == "Goal" else False)

In [None]:
# The angle at which the ball goes, and whether it was taken from the favoured side of the pitch or not
data["shot_angle"] =  data.apply(lambda x: math.atan2((x["end_location_y"] - x["location_y"]), (x["end_location_x"] - x["location_x"])), axis=1)
data["good_foot"] = data.apply(lambda shot: True if ((shot["body_part"] == "Right Foot" and shot["location_y"] < 42) or (shot["body_part"] == "Left Foot" and shot["location_y"] > 42)) else False, axis=1)

In [None]:
# How many shots and xg there has been so far
data["shots_so_far"] = all_shots.groupby(["match_id", "team"])["timestamp"].cumcount()
data["xg_so_far"] = all_shots.groupby(["match_id", "team"])["shot_statsbomb_xg"].cumsum()
data["xg_so_far"] = data["xg_so_far"] - data["statsbomb_xg"]

In [None]:
# We can now calculate the game state and which team was leading at the time of the shot
all_goals = all_shots[all_shots["shot_outcome"] == "Goal"]

first_scorers = all_goals.groupby(['match_id']).first()[["team"]]
all_goals["is_first"] = all_goals[['match_id', 'team']].apply(tuple, axis=1).isin(list(zip(first_scorers.index, first_scorers["team"])))

all_goals["first_tally_temp"] = all_goals[all_goals["is_first"] == True].groupby(["match_id", "is_first"]).cumcount()
all_goals["first_tally"] = all_goals.groupby(["match_id"])["first_tally_temp"].fillna(method="ffill")
all_goals.loc[all_goals['first_tally_temp'].isnull() & ~all_goals['first_tally'].isnull(), 'first_tally'] = all_goals.loc[all_goals['first_tally_temp'].isnull() & ~all_goals['first_tally'].isnull(), 'first_tally'] + 1

all_goals["second_tally_temp"] = all_goals[all_goals["is_first"] == False].groupby(["match_id", "is_first"]).cumcount()
all_goals["second_tally"] = all_goals.groupby(["match_id"])["second_tally_temp"].fillna(method="ffill")
all_goals.loc[all_goals['second_tally_temp'].isnull() & ~all_goals['second_tally'].isnull(), 'second_tally'] = all_goals.loc[all_goals['second_tally_temp'].isnull() & ~all_goals['second_tally'].isnull(), 'second_tally'] + 1
all_goals["second_tally"].fillna(0, inplace=True)

all_goals[["season_id", "match_id", "period", "minute", "second", "team", "is_first", "first_tally", "second_tally"]]

In [None]:
data["first_tally"] = all_goals["first_tally"]
data["second_tally"] = all_goals["second_tally"]
data[["season_id", "match_id", "period", "minute", "second", "team", "is_goal", "shots_so_far", "xg_so_far", "first_tally", "second_tally"]]

In [None]:
data["is_first"] = data[['match_id', 'team']].apply(tuple, axis=1).isin(list(zip(first_scorers.index, first_scorers["team"])))
data["first_tally"] = data["first_tally"].fillna(method="bfill")
data["second_tally"] = data["second_tally"].fillna(method="bfill")

data["row_index"] = data.index
last_goals = data[data["is_goal"] == True].groupby(["match_id"]).last()["row_index"]
last_shots = data.groupby(["match_id"]).last()["row_index"]
last_shots = last_shots[last_shots.index.isin(last_goals.index)]

for i in range(len(last_goals)):
    if last_goals.iloc[i] != last_shots.iloc[i]:
        goal = data.iloc[last_goals.iloc[i]]
        prop_values = (goal["first_tally"]+1, goal["second_tally"]) if goal["is_first"] else (goal["first_tally"], goal["second_tally"]+1)
        data.loc[last_goals.iloc[i]+1 : last_shots.iloc[i], "first_tally"] = prop_values[0]
        data.loc[last_goals.iloc[i]+1 : last_shots.iloc[i], "second_tally"] = prop_values[1]

data[["season_id", "match_id", "period", "minute", "second", "team", "is_goal", "shots_so_far", "xg_so_far", "first_tally", "second_tally"]]

In [None]:
data["game_state"] = data.apply(lambda x: (x["first_tally"] - x["second_tally"]) if x["is_first"] else (x["second_tally"] - x["first_tally"]) ,axis=1)
data["was_leading"] = data["game_state"].apply(lambda x: 1 if x > 0 else 0 if x == 0 else -1)

data.drop(["row_index", "is_first", "first_tally", "second_tally"], axis=1, inplace=True)

data[["season_id", "match_id", "period", "minute", "second", "team", "is_goal", "shots_so_far", "xg_so_far", "game_state", "was_leading"]]

In [None]:
# To capture phases of play, we'll count the number of shots, and the number of shots by the same team, over the last minute and 15 minutes
data["time"] = data.apply(lambda x: x["timestamp"].replace("00:", "01:", 1) if x["period"] == 2.0 else x["timestamp"], axis=1)
data["time"] = pd.to_datetime(data["time"])

data.sort_values(by=["match_id", "time"], inplace=True)
data["past_minute"] = data.groupby(["match_id"]).apply(lambda match: match
                                           .set_index('time').sort_index()
                                           .rolling('60s')
                                           .agg({'team': 'count'}).reset_index(drop=True)).reset_index(drop=True)
data["past_15"] = data.groupby(["match_id"]).apply(lambda match: match
                                           .set_index('time').sort_index()
                                           .rolling('900s')
                                           .agg({'team': 'count'}).reset_index(drop=True)).reset_index(drop=True)
data[["season_id", "match_id", "period", "minute", "second", "team", "is_goal", "past_minute", "past_15"]].sort_values(by=["match_id", "period", "minute", "second"])

In [None]:
data.sort_values(by=["match_id", "team", "time"], inplace=True)
data["own_past_minute"] = data.groupby(["match_id", "team"]).apply(lambda match: match
                                           .set_index('time').sort_index()
                                           .rolling('60s')
                                           .agg({'team': 'count'}).reset_index(drop=True)).reset_index(drop=True)
data["own_past_15"] = data.groupby(["match_id", "team"]).apply(lambda match: match
                                           .set_index('time').sort_index()
                                           .rolling('900s')
                                           .agg({'team': 'count'}).reset_index(drop=True)).reset_index(drop=True)

data.drop(["time"], axis=1, inplace=True)
data.sort_values(by=["match_id", "period", "minute", "second"], inplace=True)
data[["season_id", "match_id", "period", "minute", "second", "team", "is_goal", "own_past_minute", "own_past_15"]]

In [None]:
data["is_extra_time"] = data["minute"] >= 90

In [None]:
press = []
for i in range(len(all_shots)):
    press.append(0)

sff = all_shots.shot_freeze_frame

#to get the list of all the shot freeze frames
list_of_dicts = []
for i in range(len(sff)):
  if not pd.isna(sff[i]):
        list_of_dicts.append(ast.literal_eval(sff[i]))
  else:
        list_of_dicts.append(0)

In [None]:
 #to get the freeze frames of only the opponents
opponents_list = []
for i in range(len(list_of_dicts)):
  if list_of_dicts[i] != 0:
    opponents_dicts = [d for d in list_of_dicts[i] if d['teammate'] == False]
    opponents_list.append(opponents_dicts)
  else:
    opponents_list.append(0)

In [None]:
# to get the locations of all the opponents from the freeze frames
list_of_locations = []
for i in range(len(opponents_list)):
  if opponents_list[i] != 0:
    location = [d['location'] for d in opponents_list[i]]
    list_of_locations.append(location)
  else:
    list_of_locations.append(0)

In [None]:
# calculating the influence on the pitch for each shot as a sum total of the
# influence of all the defenders
influence = []
for loc in list_of_locations:
    if loc !=0:
        x = np.linspace(0, 120, 121)
        y = np.linspace(0, 80, 81)
        X, Y = np.meshgrid(x, y)

        # Initialize Z to zero
        Z = np.zeros_like(X)
        for pos in loc:
            Z += gaussian(X, Y, pos[0], pos[1])
        influence.append(Z)
    else:
        influence.append([-1])

In [None]:
# location of all the shots
import ast
locs = []
for i in range(len(all_shots.location)):
    locs.append(ast.literal_eval(all_shots.location[i]))

# press values as a gaussian function
for i in range(len(press)):
    if list_of_locations[i] != 0:
        for j in range(len(list_of_locations[i])):
            dist = math.sqrt((locs[i][0]-list_of_locations[i][j][0])**2 + (locs[i][1]-list_of_locations[i][j][1])**2)
            press[i] = press[i] + math.exp(-dist ** 2 / (2))
    else:
        press[i] = -1

In [None]:
# influence in triangle will have the sum of the influence values of all the points
# in the triangle
influence_in_triangle=[]
for k in range(0,len(influence)):
    total_influence = 0
    for i in range(int(min(locs[k][1],36)),int(max(locs[k][1],44) + 1)):
        for j in range(int(locs[k][0]),121):
            if point_in_triangle(np.array([j, i]), [120,36], [120,44], locs[k]):
                total_influence += influence[0][i][j]
    influence_in_triangle.append(total_influence)

In [None]:
data["press"] = press
data["influence_in_triangle"] = influence_in_triangle

In [None]:
data

In [None]:
data.nunique()

In [None]:
data.to_csv("./data/augmented_data.csv", index=False)