# Line-Breaking Passes

In [None]:
import torch
import numpy as np
from scipy.cluster.hierarchy import linkage, fcluster
import numpy as np
from Metrica_IO import (
    tracking_data,
    to_metric_coordinates,
    read_event_data,
)
from helper import intersects

DEVICE = "cpu"; DTYPE = torch.float32
XDIM, YDIM = 105, 68

## Data Loading

In [4]:
DATADIR = "."
GAMEID = 1
PERIOD = 1

tracking_home = tracking_data(DATADIR=DATADIR, game_id=GAMEID, teamname="Home")
tracking_away = tracking_data(DATADIR=DATADIR, game_id=GAMEID, teamname="Away")
events_df = read_event_data(DATADIR, GAMEID)
events_df = to_metric_coordinates(events_df)
tracking_home = to_metric_coordinates(tracking_home)
tracking_away = to_metric_coordinates(tracking_away)
tracking_home = tracking_home.query("Period==1")
tracking_away = tracking_away.query("Period==1")
team_attacking_left_to_right={
    "Home":True,
    "Away":False
}

Reading team: home


Reading team: away


### Line-Breaking passes parameters

In [5]:
MAX_DISTANCE_DEFENDERS_X = 9
MAX_DISTANCE_DEFENDERS_Y = 20
MIN_DISTANCE_BEFORE_LINE = 1
MIN_DISTANCE_AFTER_LINE = 1

max_distance_defenders = torch.tensor(
    [MAX_DISTANCE_DEFENDERS_X, MAX_DISTANCE_DEFENDERS_Y]
).to(device=DEVICE, dtype=DTYPE)

NB_CLUSTERS = 3
nb_points_line = 150

### Load ball tracking data

In [6]:
num_frames = len(tracking_home)
x_col = f"ball_x"
y_col = f"ball_y"
xy_ball = torch.full((num_frames, 2), float("nan"))
x_values = torch.tensor(tracking_home[x_col].values, dtype=torch.float32) + XDIM/2
y_values = torch.tensor(tracking_home[y_col].values, dtype=torch.float32) + YDIM/2
xy_ball[:, 0] = x_values
xy_ball[:, 1] = y_values

## Line-Breaking passes detection

In [None]:
xy_home, xy_away = None, None
for team_passes in ["Home","Away"]:

    if team_passes == "Home":
        team = "Away"
        tracking_df = tracking_away.copy()
    else:
        team = "Home"
        tracking_df = tracking_home.copy()


    player_ids = []
    for col in tracking_df.columns:
        if col.startswith(f"{team}_") and col.endswith("_x"):
            player_id = col.replace(f"{team}_", "").replace("_x", "")
            player_ids.append(player_id)
    num_home_players = len(player_ids)
    tracking_tensor = torch.full((num_home_players, num_frames, 2), float("nan"))

    events = events_df.copy().query(f"Period == 1 and Type=='PASS' and Team=='{team_passes}'").reset_index()
    events["last_line_broken"] = None
    events["last_line_broken_through_or_around"] = None
    events["nb_lines_broken"] = 0
    events["Start X"] = events["Start X"] + XDIM/2
    events["End X"] = events["End X"] + XDIM/2
    events["Start Y"] = events["Start Y"] + YDIM/2
    events["End Y"] = events["End Y"] + YDIM/2

    for i, player_id in enumerate(player_ids):
        x_col = f"{team}_{player_id}_x"
        y_col = f"{team}_{player_id}_y"

        x_values = torch.tensor(tracking_df[x_col].values, dtype=torch.float32) + XDIM/2
        y_values = torch.tensor(tracking_df[y_col].values, dtype=torch.float32) + YDIM/2
        tracking_tensor[i, :, 0] = x_values
        tracking_tensor[i, :, 1] = y_values

    if team == "Home":
        xy_home = tracking_tensor
    else:
        xy_away = tracking_tensor

    mask_lines_broke = torch.zeros(len(events), NB_CLUSTERS, device=DEVICE, dtype=bool)

    # Initialize the mask of broken lines around
    mask_lines_broke_around = torch.zeros(
        len(events), NB_CLUSTERS, device=DEVICE, dtype=bool
    )
    mean_positions = np.nanmean(tracking_tensor, axis=1)
    x_positions = mean_positions[:, 0]
    if team_attacking_left_to_right[team_passes]:
        x_positions_safe = np.nan_to_num(
            x_positions, nan=-1
        )
        index_gk = np.argmax(x_positions_safe)
    else:
        x_positions_safe = np.nan_to_num(
            x_positions, nan=10000
        )
        index_gk = np.argmin(x_positions_safe)

    for idx, frame_index in enumerate(events.index):
        e = events.loc[frame_index]
        start_frame = e["Start Frame"]
        end_frame = e["End Frame"]
        xy_pass_start = torch.tensor(e[["Start X", "Start Y"]])
        xy_pass_end = torch.tensor(e[["End X", "End Y"]])
        end_xy = tracking_tensor[:, end_frame]
        # remove nan value and gk index
        start_xy = tracking_tensor[:, start_frame]
        valid_mask = ~torch.isnan(start_xy).any(dim=1)
        valid_mask[index_gk] = False
        start_xy = start_xy[valid_mask]

        end_xy = tracking_tensor[:, end_frame]
        valid_mask = ~torch.isnan(end_xy).any(dim=1)
        valid_mask[index_gk] = False
        end_xy = end_xy[valid_mask]

        linkage_matrix_d = linkage(
            start_xy[:, 0].to(device=DEVICE).reshape(-1, 1), method="ward"
        )

        # Compute the hierarchical clustering, constraining the max number of clusters to be 3
        clusters_d = fcluster(linkage_matrix_d, NB_CLUSTERS, criterion="maxclust")

        # Sort clusters by the x-axis mean : 1 corresponds to the attackers, 2 to midfielders and 3 to defenders
        sorted_clusters_d = sorted(
            np.arange(1, NB_CLUSTERS + 1),
            key=lambda i: start_xy[clusters_d == i, 0].mean().item(),
        )

        # Use the clusters sorting to update the clusters assigned to each player
        clusters_d = np.vectorize(
            dict(zip(sorted_clusters_d, np.arange(1, NB_CLUSTERS + 1))).get
        )(clusters_d)

        # If there are only 2 clusters, add 1 to all clusters (moving atts and mids to mids and defs)
        # and assign the most forward player to the attacker cluster (1)
        if clusters_d.max() == NB_CLUSTERS - 1:
            clusters_d += 1
            clusters_d[start_xy[:, 0].argmin()] = 1

        for cluster in range(1, NB_CLUSTERS + 1):
            # Fetch coordinates of the elements of the cluster
            xy_cluster_d_start = start_xy[clusters_d == cluster]
            xy_cluster_d_end = end_xy[clusters_d == cluster]

            # If there is only one player in the cluster or there is not the
            # same number of players at the start and at the end, continue
            if (len(xy_cluster_d_start) <= 1) or len(start_xy) != len(end_xy):
                continue

            # Sort indices of players in the cluster along y-axis
            sorted_indices_inside_cluster_d = xy_cluster_d_start[:, 1].argsort()

            # Get the positions of the players, ordered by the new sorting
            sorted_xy_cluster_d_start = xy_cluster_d_start[
                sorted_indices_inside_cluster_d
            ].to(device=DEVICE)
            sorted_xy_cluster_d_end = xy_cluster_d_end[sorted_indices_inside_cluster_d].to(
                device=DEVICE
            )

            # Augment the positions of the players by adding the positions of the sideline
            sorted_xy_cluster_d_start_augmented = torch.vstack(
                (
                    torch.tensor(
                        [sorted_xy_cluster_d_start[0, 0], 0.0],
                        dtype=DTYPE,
                        device=DEVICE,
                    ),
                    sorted_xy_cluster_d_start,
                    torch.tensor(
                        [sorted_xy_cluster_d_start[-1, 0], YDIM],
                        dtype=DTYPE,
                        device=DEVICE,
                    ),
                )
            )
            sorted_xy_cluster_d_end_augmented = torch.vstack(
                (
                    torch.tensor(
                        [sorted_xy_cluster_d_end[0, 0], 0.0], dtype=DTYPE, device=DEVICE
                    ),
                    sorted_xy_cluster_d_end,
                    torch.tensor(
                        [sorted_xy_cluster_d_end[-1, 0], YDIM],
                        dtype=DTYPE,
                        device=DEVICE,
                    ),
                )
            )

            # Compute the mask of effective connections between two neighbor players of the same line
            mask_effective_connections_d = (
                (sorted_xy_cluster_d_start_augmented.diff(1, dim=0) ** 2).sqrt()
                < max_distance_defenders
            ).all(1)

            # Get connections that intersect with the pass segment at the start of the pass
            mask_start_connections_intersecting_with_pass = intersects(
                torch.stack(
                    (
                        sorted_xy_cluster_d_start_augmented[:-1],
                        sorted_xy_cluster_d_start_augmented[1:],
                    ),
                    1,
                ),
                torch.stack((xy_pass_start, xy_pass_end)),
            )
            # Get connections that intersect with the pass segment at the end of the pass
            mask_end_connections_intersecting_with_pass = intersects(
                torch.stack(
                    (
                        sorted_xy_cluster_d_end_augmented[:-1],
                        sorted_xy_cluster_d_end_augmented[1:],
                    ),
                    1,
                ),
                torch.stack((xy_pass_start, xy_pass_end)),
            )

            # For a connection to be considered as intersecting with the pass,
            # the pass segment should intersect with the connection at the start
            # and at the end of the pass
            mask_connections_intersecting_with_pass = (
                mask_start_connections_intersecting_with_pass
                & mask_end_connections_intersecting_with_pass
            )

            # Compute distance from the start of the pass to the connection
            distance_from_start_to_connections = np.sqrt(
                (
                    (
                        np.linspace(
                            sorted_xy_cluster_d_start_augmented[:-1],
                            sorted_xy_cluster_d_start_augmented[1:],
                        ).transpose(1, 0, 2)
                        - xy_pass_start.numpy()
                    )
                    ** 2
                )
                .sum(2)
                .min(1)
            )

            # Compute distance from the start of the pass to the connection
            distance_from_end_to_connections = np.sqrt(
                (
                    (
                        np.linspace(
                            sorted_xy_cluster_d_end_augmented[:-1],
                            sorted_xy_cluster_d_end_augmented[1:],
                        ).transpose(1, 0, 2)
                        - xy_pass_end.numpy()
                    )
                    ** 2
                )
                .sum(2)
                .min(1)
            )
            mask_start_lines_intersecting_with_pass = intersects(
                torch.stack(
                    (
                        sorted_xy_cluster_d_start_augmented[:-1],
                        sorted_xy_cluster_d_start_augmented[1:],
                    ),
                    1,
                ),
                torch.stack((xy_pass_start, xy_pass_end)),
            )
            mask_end_lines_intersecting_with_pass = intersects(
                torch.stack(
                    (
                        sorted_xy_cluster_d_end_augmented[:-1],
                        sorted_xy_cluster_d_end_augmented[1:],
                    ),
                    1,
                ),
                torch.stack((xy_pass_start, xy_pass_end)),
            )

            mask_lines_intersecting_with_pass = (
                mask_start_lines_intersecting_with_pass
                & mask_end_lines_intersecting_with_pass
            )

            # Build a mask with True if the ball was far enough from the connection,
            # at the start and at the end of the pass
            mask_far_enough_from_lines = (
                distance_from_start_to_connections > MIN_DISTANCE_BEFORE_LINE
            ) & (distance_from_end_to_connections > MIN_DISTANCE_AFTER_LINE)

            # Check if the pass moves the ball forward
            if team_attacking_left_to_right[team_passes]:
                pass_moving_forward = xy_pass_end[0] > xy_pass_start[0]
            else:
                pass_moving_forward = xy_pass_end[0] < xy_pass_start[0]

            # Update the mask with the broken lines
            mask_lines_broke[idx, cluster - 1] = (
                pass_moving_forward
                & (
                    mask_connections_intersecting_with_pass
                    & mask_effective_connections_d
                    & mask_far_enough_from_lines
                ).any()
            )

            # Update the mask with lines broken around
            mask_lines_broke_around[idx, cluster - 1] = pass_moving_forward and (
                (
                    mask_connections_intersecting_with_pass[
                        0
                    ].item()  # first segment broken
                    and mask_effective_connections_d[0].item()
                    and mask_far_enough_from_lines[0].item()
                )
                or (
                    mask_connections_intersecting_with_pass[
                        -1
                    ].item()  # last segment broken
                    and mask_effective_connections_d[-1].item()
                    and mask_far_enough_from_lines[-1].item()
                )
            )

        if mask_lines_broke[idx][0]:
            events.at[frame_index, "last_line_broken"] = "attack"
        elif mask_lines_broke[idx][1]:
            events.at[frame_index, "last_line_broken"] = "midfield"
        elif mask_lines_broke[idx][2]:
            events.at[frame_index, "last_line_broken"] = "defense"
        else:
            events.at[frame_index, "last_line_broken"] = None

        events.at[frame_index, "nb_lines_broken"] = mask_lines_broke[idx].sum().item()

        # last_line_broken_through_or_around: "through" by default if 'last_line_broken' is not None
        if events.at[frame_index, "last_line_broken"] is not None:
            events.at[frame_index, "last_line_broken_through_or_around"] = "through"

        # if broken around, change value to "around"
        for i, line_name in enumerate(["attack", "midfield", "defense"]):
            if (
                mask_lines_broke_around[idx][i]
                and events.at[frame_index, "last_line_broken"] == line_name
            ):
                events.at[frame_index, "last_line_broken_through_or_around"] = "around"