In [None]:
from statsbombpy import sb
import mplsoccer as mpl
from kloppy import metrica
import numpy as np
import matplotlib.pyplot as plt

# Colors

In [None]:
from matplotlib.colors import ListedColormap
def bulid_cmap(x, y):
    r,g,b = x
    r_, g_, b_ = y
    N = 256
    A = np.ones((N, 4))
    A[:, 0] = np.linspace(r, 1, N)
    A[:, 1] = np.linspace(g, 1, N)
    A[:, 2] = np.linspace(b, 1, N)
    cmp = ListedColormap(A)
    
    B = np.ones((N, 4))
    B[:, 0] = np.linspace(r_, 1, N)
    B[:, 1] = np.linspace(g_, 1, N)
    B[:, 2] = np.linspace(b_, 1, N)
    cmp_ = ListedColormap(B)
    
    newcolors = np.vstack((cmp(np.linspace(0, 1, 128)),
                            cmp_(np.linspace(1, 0, 128))))
    return ListedColormap(newcolors)
blue, red = (44,123,182), (215,25,28)
blue = [x/256 for x in blue]
red = [x/256 for x in red]
diverging = bulid_cmap(blue, red)
diverging_r = bulid_cmap(red, blue)

figsize = (9, 6)

Let's find the 2018 World cup


In [None]:
comps = sb.competitions()
comps.loc[(comps['competition_international'] == True)  & (comps['competition_gender'] == "male")].sort_values(by='season_name', ascending=False)

In [None]:
comps[comps['country_name'] == 'Germany']

Let's have a look at France's matches.

In [None]:
matches = sb.matches(competition_id=43, season_id=3).sort_values('match_date', ascending=False)
france_matches = matches.loc[(matches['home_team'] == 'France') | (matches['away_team'] == 'France')]
france_matches

4-3 in the Round of 16, that was a good game.

In [None]:
match_id = france_matches.loc[france_matches['competition_stage'] == 'Round of 16'].match_id.item()

events = sb.events(match_id = match_id)

In [None]:
passes = events[(events['type'] == "Pass") & 
                (events['player_id'] == 5484)]

shots = events[(events['type'] == "Shot") &
               (events['player_id'] == 5476)]

In [None]:
shots['shot_statsbomb_xg']

In [None]:
pass_coordinates = passes[['location', 'pass_end_location']]
shot_coordinates = shots[['location', 'shot_end_location']]

In [None]:
x1_pass, y1_pass = np.array(pass_coordinates['location'].tolist()).T
x2_pass, y2_pass = np.array(pass_coordinates['pass_end_location'].tolist()).T

In [None]:
x1_shot, y1_shot = np.array(shot_coordinates['location'].tolist()).T
x2_shot, y2_shot, outcome = np.array(shot_coordinates['shot_end_location'].tolist()).T

In [None]:
pitch = mpl.Pitch()
fig, ax = pitch.draw(figsize=figsize)

p = pitch.arrows(x1_pass, y1_pass, x2_pass, y2_pass, alpha=0.4, color=blue,
                 headaxislength=3, headlength=3, headwidth=4, width=2, ax=ax)
p = pitch.arrows(x1_shot, y1_shot, x2_shot, y2_shot, alpha=0.4, color=red,
                 headaxislength=3, headlength=3, headwidth=4, width=2, ax=ax)

# Implementing the expected threat (xT) algorithm for a specific match

In [None]:
import pandas as pd

In [None]:
comps = sb.competitions()
comps[comps['country_name'] == 'Spain']

In [None]:
matches = sb.matches(competition_id=11, season_id=90).sort_values('match_date', ascending=False)
matches = matches[(matches['home_team'] == 'Real Madrid') | (matches['away_team'] == 'Real Madrid')]    
matches

In [None]:
all_match_ids = matches.match_id.to_list()
all_match_ids

In [None]:
all_events = pd.DataFrame()

for match_id in all_match_ids:
    match_events = sb.events(match_id=match_id)
    all_events = pd.concat([all_events, match_events], ignore_index=True)

all_events

In [None]:
GRID_X, GRID_Y = 16, 12
pitch_x, pitch_y = 105, 68  # Standard pitch dimensions in meters. https://mplsoccer.readthedocs.io/en/latest/gallery/pitch_setup/plot_pitches.html
x_bins = np.linspace(0, pitch_x, GRID_X + 1)
y_bins = np.linspace(0, pitch_y, GRID_Y + 1)


In [None]:
def get_grid_cell(x, y):
    x_idx = np.digitize(x, x_bins) - 1
    y_idx = np.digitize(y, y_bins) - 1
    return x_idx, y_idx

In [None]:
transition_matrix = np.zeros((GRID_X, GRID_Y, GRID_X, GRID_Y))

In [None]:
# Extract relevant passes and carries
passes = all_events[all_events['type'] == 'Pass']
carries = all_events[all_events['type'] == 'Carry']
shots = all_events[all_events['type'] == 'Shot']

In [None]:
T = np.zeros((GRID_X, GRID_Y, GRID_X, GRID_Y))
S = np.zeros((GRID_X, GRID_Y))  # shot counts
M = np.zeros((GRID_X, GRID_Y))  # move counts

In [None]:
# Process passes and carries
for _, row in pd.concat([passes, carries]).iterrows():
    try:
        start_x, start_y = row['location']
        end = row.get('pass_end_location') if isinstance(row.get('pass_end_location'), list) else row.get('carry_end_location')
        if not isinstance(end, list):
            continue
        end_x, end_y = end
        sx, sy = get_grid_cell(start_x, start_y)
        ex, ey = get_grid_cell(end_x, end_y)
        if 0 <= sx < GRID_X and 0 <= sy < GRID_Y and 0 <= ex < GRID_X and 0 <= ey < GRID_Y:
            T[sx, sy, ex, ey] += 1
            M[sx, sy] += 1
    except:
        continue

In [None]:
# Process shots
for _, row in shots.iterrows():
    try:
        x, y = row['location']
        sx, sy = get_grid_cell(x, y)
        S[sx, sy] += row['shot_statsbomb_xg']
    except:
        continue

In [None]:
# Normalize transition probabilities
P_move = np.divide(M, M + S, out=np.zeros_like(M), where=(M + S) != 0)
P_shot = np.divide(S, M + S, out=np.zeros_like(S), where=(M + S) != 0)
P_trans = np.divide(T, T.sum(axis=(2,3), keepdims=True), out=np.zeros_like(T), where=T.sum(axis=(2,3), keepdims=True)!=0)


In [None]:
for iteration in range(50):
    new_xT = np.copy(xT)
    for i in range(GRID_X):
        for j in range(GRID_Y):
            future_threat = np.sum(P_trans[i, j] * xT)
            new_xT[i, j] = P_shot[i, j] * S[i, j] + P_move[i, j] * future_threat

    # Plot xT grid using mplsoccer
    pitch = mpl.Pitch(pitch_type='statsbomb', pitch_color='white', line_color='black')
    fig, ax = pitch.draw(figsize=(10, 7))
    bin_statistic = pitch.bin_statistic(
        np.repeat(x_bins[:-1], GRID_Y) + (pitch_x / GRID_X / 2),
        np.tile(y_bins[:-1], GRID_X) + (pitch_y / GRID_Y / 2),
        values=new_xT.flatten(),
        statistic='mean',
        bins=(GRID_X, GRID_Y)
    )
    pitch.heatmap(bin_statistic, ax=ax, cmap='Greens', edgecolors='grey', alpha=0.75)
    ax.set_title(f"xT - Iteration {iteration + 1}", fontsize=16)
    plt.show()

    if np.sum(np.abs(new_xT - xT)) < 1e-6:
        print(f"Converged at iteration {iteration+1}")
        break
    xT = new_xT