In [None]:
#| default_exp  pitch_control

In [None]:
#| hide

%load_ext autoreload
%autoreload 2
from IPython.core.debugger import set_trace

In [None]:
%%javascript

MathJax.Hub.Config({
    TeX: { equationNumbers: { autoNumber: "AMS" } }
});
MathJax.Hub.Queue(
  ["resetEquationNumbers", MathJax.InputJax.TeX],
  ["PreProcess", MathJax.Hub],
  ["Reprocess", MathJax.Hub]
);

<IPython.core.display.Javascript object>

<center text-align=\"center;center\"><h1>Pitch Control</h1></center>

In this module, we will compute the pitch control feature as described in [Spearman's paper](https://www.researchgate.net/publication/327139841_Beyond_Expected_Goals).

We will recycle some code from [Laurie Tutorial 3](https://github.com/Friends-of-Tracking-Data-FoTD/LaurieOnTracking/blob/master/Tutorial3_PitchControl.py) to produce more optimized code. You can also check [this presentation by Spearman](https://www.youtube.com/watch?v=5X1cSehLg6s&feature=youtu.be&ab_channel=FriendsofTracking) for more information.

In [None]:
#| export

from pathlib import Path
from typing import Any, Callable, Optional, Tuple

import numpy as np
import pandas as pd
from fastcore.basics import *
from fastcore.foundation import L

In [None]:
#| export

PITCH_SIZE = (105, 68)

As usual, we start by reading some data from local disk. We will pick a particular event from the `tracking/event` mapping and select the associated frame:

In [None]:
data_path = Path("../data")

tracking_event_mapping = pd.read_csv(
    data_path / "tracking_event_mapping.csv", low_memory=False
)
tracking_df = pd.read_csv(
    data_path / "tracking_vel_df.csv", low_memory=False
).set_index("frameId")

## pick the frame-id associated with a randomly picked event
frame_id = tracking_event_mapping.sample(n=1).frameId.values[0]

## select the frame
frame = tracking_df.loc[frame_id]
lineup = pd.read_csv(data_path / "lineup.csv")

## Model Assumptions

The model discussed <a href=https://www.researchgate.net/publication/327139841_Beyond_Expected_Goals.>Spearman's paper</a> is based on a few central assumptions:
+ Players take a constant time to make a controlled touch on the ball. The associated rate $\lambda_i$ (inverse of the mean time it would take the player to control the ball) can be constant, player specific or vary across attacking, defending and goal-keeper players.
+ At every time step $t$ (within some period of time $T$) and for every pitch location $\vec{r}$ is denoted by ${f}_{j}(t,\vec{r},T|s)$. It is centred around the *expected* intercept time $\tau_{exp}(t, \vec{r})$ i.e the time it will take the player to reach the ball location starting from his initial position but vary greatly around it. The authors suggest a logistic model that leads to:

\begin{equation}
 {f}_{j}(t,\vec{r},T|s) = [1 + {e^{-{\pi \frac{T-{t}_{exp}(t , \vec{r})}{\sqrt 3 s}}} } ]^{-1}   \label{eq:time-intercept}
\end{equation}
where $s$ is the variance parameter.
+ The model suggests discretising the time interval into smaller intervals between [$t$, $t + dt$], $0 \leq t \leq T$ and defines each unit probability as:

\begin{equation}
 dPPCF_{j}(t,\vec{r},T|s, \lambda_j) =  \Big ( 1- \sum_k PPCF_{k}(t,\vec{r},T|s, \lambda_k) \Big ) {f}_{j}(t,\vec{r},T|s) \lambda_j \label{eq:unit-probability}
\end{equation}
which is the mixture of 3 terms:
1. The probability of not controlling the ball by any player up to time $t$: $1- \sum_k PPCF_{k}(t,\vec{r},T|s, \lambda_k)$
2. the probability of intercepting by player $j$ between $t$ and $dt$: ${f}_{j}(t,\vec{r},T|s)$
3. the probability of taking a controlled touch by player $j$ between $t$ and $dt$: $\lambda_j$

In the rest of this notebook, we will detail how each component is implemented and how it can be generalised further. We will also provide a vectorised implementation to compute all players over all pitch locations in one shot.

In [None]:
#| export


class PitchControl:
    def __init__(
        self,
        max_player_accel: float = 7.0,  # maximum player acceleration m/s/s, not used in this implementation
        max_player_speed: float = 5.0,  # maximum player speed m/s
        reaction_time: float = 0.7,  # seconds, time taken for player to react and change trajectory. Roughly determined as vmax/amax
        tti_sigma: float = 0.45,  # Standard deviation of sigmoid function in Spearman 2018 ('s') that determines uncertainty in player arrival time
        lambda_att: float = 4.3,  # ball control parameter for attacking team
        kappa_def: float = 1.72,  # kappa parameter in Spearman 2018 that gives the advantage defending players to control ball
        kappa_gk: float = 3.0,  # advantage in ball control given to GK compared to defenders
        average_ball_speed: float = 15.0,  # average ball travel speed in m/s
        int_dt: float = 0.04,  # integration timestep (dt)
        max_int_time: float = 10.0,  # upper limit on integral time
        model_converge_tol: float = 0.01,  # assume convergence when PPCF>0.99 at a given location.
        time_to_control_veto: float = 3.0,  # If the probability that another team or player can get to the ball and control it is less than `10^-time_to_control_veto`, ignore that player
        pitch_size: Tuple[int, int] = PITCH_SIZE,  # standard pitch size
    ):
        store_attr()
        self.lambda_def = self.lambda_att * self.kappa_def
        self.lambda_gk = self.lambda_def * self.kappa_gk

        # pitch grid
        self.pitch_cells = np.array(
            [
                [i, j]
                for i in range(self.pitch_size[0])
                for j in range(self.pitch_size[1])
            ]
        )

### Data preparation

We need to change the data format to help us in vectorisation. For each frame, we create a `numpy` array from the raw frame information in the following way:
+ the raw represent the players, the columns the features
+ we will extract the players' jerseys, positions ($x$, $y$), velocities ($v_x$, $v_y$) and a marker to tell whether a player is i) attacking (1), ii) defending (0), iii) a goalkeeper for the attacking team (3) or iv) goalkeeper for the away team (4).

We will a `prepare_data()` method and attach it to the `PitchControl` class using the [`fastcore` `@Patch` decorator](https://fastcore.fast.ai/basics.html#patching):

In [None]:
#| export


def _extract_features(
    frame: pd.DataFrame, side: str, attacking_side: str, gk_jersey: int
) -> Tuple:
    "Extract relevant features from a tracking frame"
    assert side in ("home", "away"), "side can either be home or away"
    assert attacking_side in (
        "home",
        "away",
    ), "attacking_side can either be home or away"

    _str = L(
        x.removesuffix("_x")
        for x in frame.filter(regex=f"{side}_player_[0-9]+_x$").columns.tolist()
    )
    _jerseys = np.array(L(int(x.removeprefix(f"{side}_player_")) for x in _str))
    _x = frame[L(x + "_x" for x in _str)].values.astype(float)
    _y = frame[L(x + "_y" for x in _str)].values.astype(float)

    _vx_cols, _vy_cols = L(x + "_vx" for x in _str), L(x + "_vy" for x in _str)

    _vx = frame[_vx_cols].values.astype(float)
    _vy = frame[_vy_cols].values.astype(float)
    if attacking_side == side:
        _val, _gk_val = 1, 3
    else:
        _val, _gk_val = 0, 4
    _idx = np.array([_val] * len(_str))
    _idx[np.where(_jerseys == gk_jersey)] = _gk_val

    return _jerseys, _x, _y, _vx, _vy, _idx


@patch
def prepare_data(
    self: PitchControl,
    raw_frame: pd.Series,  # frame raw tracking information
    lineup: pd.DataFrame,  # lineup information to identify players' positions
    events_to_frame: pd.DataFrame,  # mapping between tracking and events
) -> np.ndarray:  # features with dimension agents (usually 23) x features (6: jersey, x, y, vx, vy, index)
    "Convert tracking data to numpy frienly format"
    # reshape
    if isinstance(raw_frame, pd.Series):
        clean_frame = raw_frame.copy().to_frame().T

    # remove nans
    clean_frame.fillna(
        {
            x: 0.0
            for x in (
                clean_frame.filter(like="_vx").columns.tolist()
                + clean_frame.filter(like="_vy").columns.tolist()
                + L("ball_vx", "ball_vy")
            )
        },
        inplace=True,
    )

    # drop na
    clean_frame.dropna(axis=1, inplace=True)

    frame_id = clean_frame.index.values[0]

    # identify goalkeeper
    home_gk_jersey = lineup[
        (lineup.position == "GK") & (lineup.side == "home")
    ].jerseyNumber.values[0]
    away_gk_jersey = lineup[
        (lineup.position == "GK") & (lineup.side == "away")
    ].jerseyNumber.values[0]

    # identify sides
    possession_team_id = events_to_frame.loc[
        events_to_frame.frameId == frame_id, "teamId"
    ].values[0]
    attacking_side = lineup.loc[lineup.teamId == possession_team_id, "side"].values[0]

    # home players
    home_jerseys, home_x, home_y, home_vx, home_vy, home_idx = _extract_features(
        clean_frame, "home", attacking_side, home_gk_jersey
    )

    # away players
    away_jerseys, away_x, away_y, away_vx, away_vy, away_idx = _extract_features(
        clean_frame, "away", attacking_side, home_gk_jersey
    )

    # ball values
    ball_jersey, ball_x, ball_y, ball_vx, ball_vy, ball_idx = (
        np.array([0]),
        clean_frame.ball_x.values.astype(float),
        clean_frame.ball_y.values.astype(float),
        clean_frame.ball_vx.values.astype(float),
        clean_frame.ball_vy.values.astype(float),
        np.array([-1]),
    )

    # collect results
    return np.column_stack(
        (
            np.concatenate((home_jerseys, away_jerseys, ball_jersey), axis=None),
            np.concatenate((home_x, away_x, ball_x), axis=None),
            np.concatenate((home_y, away_y, ball_y), axis=None),
            np.concatenate((home_vx, away_vx, ball_vx), axis=None),
            np.concatenate((home_vy, away_vy, ball_vy), axis=None),
            np.concatenate((home_idx, away_idx, ball_idx), axis=None),
        )
    )

In [None]:
pc = PitchControl()
np_frame = pc.prepare_data(
    raw_frame=frame.copy(), lineup=lineup, events_to_frame=tracking_event_mapping
)

assert np_frame.shape[0] <= 23, "We can't have more than 23 agents"
assert np_frame.shape[1] == 6, "We expect 6 features"
assert not np.any(np.isnan(np_frame)), "NaN not accepted in feature data"

### Time to intercept

As mentioned above, the probability to intercept depends on the expected time to reach the ball location. The model used here assumes that the player runs at current speed for `reaction_time` seconds and then runs at full speed to the target ball position. We will allow the computation to compute the time to intercept for *all players* at all cells location on the pitch:

In [None]:
#| export


@patch
def simple_time_to_intercept(
    self: PitchControl,
    np_frame: np.ndarray,  # frame data produced by `prepare_data()` method
    r_final: np.ndarray,  # picth location where to compute time to intercept
    v_max: np.ndarray,  # scalar if vmax is the same for all players or individual max speed (same order as players in `np_frame`)
    reaction_time: np.ndarray,  # scalar if `reaction_time` is the same for all players or individual `reaction_time` (same order as players in `np_frame`)
) -> np.ndarray:  # time to intercept for all players (rows) for all locations (columns) passed in `r_final`
    "Time to reach `r_final` localtions based on a simple physics based model"
    # n_players
    n_players = np_frame.shape[0] - 1

    # convert values to numpy array
    if np.isscalar(r_final):
        r_final = np.array(listify(r_final))

    if np.isscalar(v_max):
        v_max = np.array(listify(v_max) * n_players)

    if np.isscalar(reaction_time):
        reaction_time = np.array(listify(reaction_time) * n_players)

    # where the players will be after running at current speed during reaction time
    # remember ball elements are stored in final row
    # Expected shape: n_players x 2
    r_reaction = (
        np_frame[:-1, [1, 2]] + np_frame[:-1, [3, 4]] * reaction_time[:, np.newaxis]
    )

    # distance of each player (rows) to each cell
    # expected shape: n_cells x n_players
    delta_dist = np.linalg.norm(
        r_final[:, np.newaxis, :] - r_reaction[np.newaxis, :], axis=2
    )

    # time to intercept; expected shape n_cells x 22
    time_to_intercept = reaction_time[np.newaxis, :] + delta_dist / v_max[np.newaxis, :]

    return time_to_intercept


@patch
def ball_travel_time(
    self: PitchControl,
    np_frame: np.ndarray,  # frame data produced by `prepare_data()` method
    r_final: np.ndarray,  # picth location where to compute time to intercept
    ball_speed: np.ndarray = None,  # ball speed, if not provided, current speed will be used
) -> np.ndarray:  # shape should be the samed as `r_final`
    "Ball travel time to reach `r_final' location"
    ball_location = np_frame[-1, [1, 2]]

    # define ball speed
    if ball_speed is None:
        ball_speed = np.sqrt(np.sum(np_frame[-1, [3, 4]] ** 2))
    if ball_speed == 0.0:
        ball_speed = self.max_player_speed

    return np.linalg.norm(r_final - ball_location[np.newaxis, :], axis=1) / ball_speed

In [None]:
pc = PitchControl()

# parameters
player_speed, ball_speed, player_reaction_time = (
    pc.max_player_speed,
    pc.average_ball_speed,
    pc.reaction_time,
)
target_cells = pc.pitch_cells

# player time to intercept at all locations
time_intercp = pc.simple_time_to_intercept(
    np_frame=np_frame,
    r_final=target_cells,
    v_max=player_speed,
    reaction_time=player_reaction_time,
)
assert time_intercp.shape == (
    pc.pitch_cells.shape[0],
    np_frame.shape[0] - 1,
), f"got shape {time_intercp.shape}, expecting ({pc.pitch_cells.shape[0]}, {np_frame.shape[0]-1})"

In [None]:
# ball travel time
ball_travel_tm = pc.ball_travel_time(np_frame=np_frame, r_final=target_cells)
assert (
    ball_travel_tm.shape[0] == target_cells.shape[0]
), f"expcted shape {ball_travel_tm.shape[0]}, got shape {target_cells.shape[0]} !"

### Integration interval

Equation $\eqref{eq:unit-probability}$ will need to be integrated over an interval of time $[t_0, t_0 + T]$ where $t_0$ is the ball arrival time at the target location and $T$ is the max integration time. In order to carry on computation in a vectorised way and given that the ball travel time is different for every location, we can define a time steps grid that is as permissive as possible and artificially shut down the time steps we don't need for every location.

Technically, this means that the time grid can be defined between the minimum ball arrival time and max arrival time + max integration time to start with. This guarantees that all interval of type $[t_0, t_0 + T]$ are within the time intervals defined for every location. Then, we just need to replace the cells we don't care about with `np.nan` to guarantee that no computation is done on these cells. Note that the time steps matrix defined will have $(m \times n)$ shape where $n$ is the total number of locations and $m$ the maximum number of time steps.

In [None]:
#| export


@patch
def time_grid(
    self: PitchControl,
    np_frame: np.ndarray,  # frame data produced by `prepare_data()` method
    r_final: np.ndarray,  # picth location where to compute time to intercept
    ball_speed: np.ndarray = None,  # ball speed, if not provided, current speed will be used
) -> np.ndarray:  # time grid with shape max_time_steps x r_final
    "Define valid integration time for every location in `r_final`"

    # ball travel time; shape r_final
    ball_travel_tm = self.ball_travel_time(np_frame, r_final, ball_speed)

    # max time grid; shape number of time steps
    time_steps = np.arange(
        ball_travel_tm.min(),
        ball_travel_tm.max() + self.max_int_time,
        self.int_dt,
    )

    # identify cells where travel time < time_step
    cancel_cells = np.where(
        time_steps[:, np.newaxis] - ball_travel_tm[np.newaxis, :] < 0
    )

    time_steps = np.broadcast_to(
        time_steps[:, np.newaxis], (time_steps.shape[0], ball_travel_tm.shape[0])
    )
    time_steps_cp = time_steps.copy()
    time_steps_cp[cancel_cells] = np.nan

    return time_steps_cp


@patch
def probability_intercept_ball(
    self: PitchControl,
    np_frame: np.ndarray,  # frame data produced by `prepare_data()` method
    r_final: np.ndarray,  # picth location where to compute time to intercept
    v_max: np.ndarray,  # scalar if vmax is the same for all players or individual max speed (same order as players in `np_frame`)
    reaction_time: np.ndarray,  # scalar if `reaction_time` is the same for all players or individual `reaction_time` (same order as players in `np_frame`)
    ball_speed: np.ndarray = None,  # ball speed, if not provided, current speed will be used
) -> np.ndarray:  # Implementation of (time-intercept) Equation; shape max_time x r_final x n_players
    "Implementation of Spearman Eq 4 (time-intercept) for all times, all locations and all players"
    # expected time to reach 'r_final' for all players
    # shape: r_final.shape x n_players
    exp_time = self.simple_time_to_intercept(np_frame, r_final, v_max, reaction_time)

    # time grid; shape max_time_steps x r_final
    time_grid = self.time_grid(np_frame, r_final, ball_speed)

    # probability to intercept has shape:
    # shape max_time_steps x  r_final x n_players
    return 1.0 / (
        1.0
        + np.exp(
            -np.pi
            * (time_grid[..., np.newaxis] - exp_time[np.newaxis, ...])
            / (np.sqrt(3.0) * self.tti_sigma)
        )
    )

In [None]:
pc = PitchControl()

# time grid
times = pc.time_grid(np_frame=np_frame, r_final=target_cells, ball_speed=ball_speed)

assert (
    times.shape[1] == target_cells.shape[0]
), f"expected {target_cells.shape[0]} columns, got {times.shape[1]} !"
assert (
    np.sum(~np.isnan(times[0])) == 1
), f"Expecting only one non missing value at first time step, got {np.sum(~np.isnan(times[0]))}"
assert (
    np.sum(np.isnan(times[-1])) == 0
), f"Expecting no missing value at last time step, got {np.sum(~np.isnan(times[-1]))}"

Now that we have a grid of times for all locations, we can compute equation $\eqref{eq:time-intercept}$ to approximate probability that the player will be able to intercept the ball at all times in the grid for every cell on the pitch:

In [None]:
ball_intercept_proba = pc.probability_intercept_ball(
    np_frame=np_frame,
    r_final=target_cells,
    v_max=player_speed,
    reaction_time=player_reaction_time,
    ball_speed=ball_speed,
)

## Probability to control a pitch area

Now we have all components we need to compute the unit probability defined in equation $\eqref{eq:unit-probability}$: 

In [None]:
#| export


@patch
def pitch_control_probability(
    self: PitchControl,
    np_frame: np.ndarray,  # frame data produced by `prepare_data()` method
    r_final: np.ndarray,  # picth location where to compute time to intercept
    v_max: np.ndarray,  # scalar if vmax is the same for all players or individual max speed (same order as players in `np_frame`)
    reaction_time: np.ndarray,  # scalar if `reaction_time` is the same for all players or individual `reaction_time` (same order as players in `np_frame`)
    ball_speed: np.ndarray = None,  # ball speed, if not provided, current speed will be used
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """Compute pitch control probability on all pitch locations defined in `r_final`

    Returns
    -------
    Tupel[np.ndarray, np.ndarray, np.ndarray]
        + players: n_locations x n_players: probability for every location, for the player to control the space
        + attacking: n_times, n_locations: the probability at each time step for the attacking team to control the location
        + defending: n_times, n_locations: the probability at each time step for the defending team to control the location
        + unit: n_times x n_locations x n_players: unit ptobability to control the ball at all times, locations and for all players

    """
    # compute lambda vector for all players
    players_lambdas = np.ones_like(np_frame[:-1, -1])

    # indices
    att_players_idx, def_players_idx, att_gk_idx, def_gk_idx = (
        np.where(np_frame[:-1, -1] == 1)[0],
        np.where(np_frame[:-1, -1] == 0)[0],
        np.where(np_frame[:-1, -1] == 3)[0],
        np.where(np_frame[:-1, -1] == 4)[0],
    )

    # attacking players
    players_lambdas[att_players_idx] *= pc.lambda_att
    # defending players
    players_lambdas[def_players_idx] *= pc.lambda_def
    # gks
    players_lambdas[np.concatenate((att_gk_idx, def_gk_idx))] *= pc.lambda_gk

    # compute all unit probabilities to intercept; shape times x locations x players
    # each cell contains the probability to intercept and control ball between [t, t+dt]
    # for all players and every location
    unit_intercept_probs = np.nan_to_num(
        self.probability_intercept_ball(
            np_frame=np_frame,
            r_final=r_final,
            v_max=v_max,
            reaction_time=reaction_time,
            ball_speed=ball_speed,
        )
        * players_lambdas[np.newaxis, np.newaxis, :]
        * self.int_dt,
        copy=False,
        nan=0.0,
    )

    # prepare place holders for intermediate computation
    n_times, n_locations, n_players = unit_intercept_probs.shape
    # contribution of every player at every location
    players_ppcf = np.zeros((n_locations, n_players))
    # team contribution at every location over time
    ppcf_att, ppcf_def = np.zeros((n_times, n_locations)), np.zeros(
        (n_times, n_locations)
    )
    # convergence criteria at every location
    ptot = np.zeros((n_locations,))

    # time iterator
    time_iter = 1

    # locations to update
    locations_arr = np.arange(n_locations)

    # TODO: implement shortcuts discussed in Laurie to avoid computations at some locations
    # -> remove cells in locations_arr, assign some values in players_ppcf, ppcf_att and ppcf_def

    while locations_arr.size > 0 and time_iter < n_times:
        players_ppcf[locations_arr] += (
            1 - ptot[locations_arr, np.newaxis]
        ) * unit_intercept_probs[time_iter, locations_arr, :]
        
        ppcf_att[time_iter, locations_arr], ppcf_def[time_iter, locations_arr] = (
            players_ppcf[locations_arr][:,np.concatenate((att_players_idx, att_gk_idx))].sum(axis=1),
            players_ppcf[locations_arr][:,np.concatenate((def_players_idx, def_gk_idx))].sum(axis=1),
        )

        # update total probs for all locations
        ptot[locations_arr] = (
            ppcf_att[time_iter, locations_arr] + ppcf_def[time_iter, locations_arr]
        )

        # find locations where we did not converge
        idx_remove = np.where(ptot[locations_arr] > (1.0 - self.model_converge_tol))[0]
        if idx_remove.size:
            locations_arr = np.delete(locations_arr, idx_remove)

        # update iteration index
        time_iter += 1
    
    return players_ppcf, ppcf_att, ppcf_def, unit_intercept_probs

In [None]:
pc = PitchControl()

pc_players, pc_att, pc_def, details = pc.pitch_control_probability(
    np_frame=np_frame,
    r_final=target_cells,
    v_max=player_speed,
    reaction_time=player_reaction_time,
    ball_speed=ball_speed,
)

In [None]:
#| hide

import nbdev

nbdev.nbdev_export()