In [None]:
import pandas as pd
import os
import numpy as np
from math import sqrt

In [None]:
pd.set_option("display.width", 220)

In [None]:
PATH_ROOT = "./data"
PATH_SPADL = f"{PATH_ROOT}/spadl_format"
PATH_WYSCOUT = f"{PATH_ROOT}/wyscout"
PATH_PROCESSED = f"{PATH_ROOT}/processed"

In [None]:
LEAGUES: list[str] = ["England", "Spain", "France", "Italy", "Germany"]

In [None]:
teams_df = pd.read_json(f"{PATH_WYSCOUT}/teams/teams.json")
players_df = pd.read_json(f"{PATH_WYSCOUT}/players/players.json")
ranking_df = pd.read_json(f"{PATH_WYSCOUT}/playeranks/playeranks.json")

In [None]:
df_dict = {}
if not os.path.exists(PATH_PROCESSED):
    os.mkdir(PATH_PROCESSED)
for league in LEAGUES:
    df = pd.read_csv(f"{PATH_SPADL}/{league}.csv", index_col=0)

    # remove not used columns
    df.drop(
        ["original_event_id", "result_name", "bodypart_id", "type_id"],
        inplace=True,
        axis=1,
    )

    df_dict[league] = df
all_df = pd.concat([df for df in df_dict.values()])
all_df

In [None]:
print(all_df["type_name"].unique())

### Pre-Process

In [None]:
GOAL_CENTER_X: int = 105
GOAL_CENTER_Y: int = 34

UPPER_CROSSBAR_X: int = 105
UPPER_CROSSBAR_Y: int = 38

LOWER_CROSSBAR_X: int = 105
LOWER_CROSSBAR_Y: int = 30

In [None]:
def get_shot_angle(shot_pos_x: float, shot_pos_y: float) -> float:
    v1 = np.array([UPPER_CROSSBAR_X - shot_pos_x, UPPER_CROSSBAR_Y - shot_pos_y])
    v2 = np.array([LOWER_CROSSBAR_X - shot_pos_x, LOWER_CROSSBAR_Y - shot_pos_y])
    return np.arccos(np.dot(v1 / np.linalg.norm(v1), v2 / np.linalg.norm(v2)))

In [None]:
def get_shot_distance(shot_pos_x: float, shot_pos_y: float) -> float:
    return sqrt((shot_pos_x - GOAL_CENTER_X) ** 2 + (shot_pos_y - GOAL_CENTER_Y) ** 2)

In [None]:
actions = [
    "pass",  # 0
    "interception",  # 1
    "dribble",  # 2
    "take_on",  # 3
    "tackle",  # 4
    "foul",  # 5
    "freekick_short",  # 6
    "cross",  # 7
    "shot",  # 8
    "clearance",  # 9
    "throw_in",  # 10
    "goalkick",  # 11
    "corner_short",  # 12
    "corner_crossed",  # 13
    "keeper_save",  # 14
    "freekick_crossed",  # 15
    "shot_freekick",  # 16
    "bad_touch",  # 17
    "shot_penalty",  # 18
]

In [None]:
def map_action_to_number(action: str) -> str:
    return str(actions.index(action))

In [None]:
def generate_shots_with_counts_events(df: pd.DataFrame):
    shot_data = []
    result_ids = []
    grouped = df.groupby(["game_id", "period_id"])

    for (_, _), group in grouped:
        group = group.sort_values(by="time_seconds").reset_index(drop=True)
        i = 0

        while i < len(group):
            shot_indices = group[i:].index[group["type_name"][i:] == "shot"]
            if len(shot_indices) == 0:
                break
            shot_index = shot_indices[0]

            shot_row = group.loc[shot_index]
            play_events = group.loc[i:shot_index]

            # Encontrar o índice onde o time que fez o chute tomou posse da bola
            for j in play_events.index[::-1]:
                if play_events.loc[j, "team_id"] != shot_row["team_id"]:
                    i = j + 1
                    break
            else:
                i = play_events.index[0]

            play_events = group.loc[i:shot_index]

            shot_data.append(
                {
                    "actions": " ".join(
                        list(
                            map(
                                lambda x: map_action_to_number(x),
                                play_events["type_name"].to_list()[
                                    :-1  # remove the actual shot
                                ],
                            )
                        )
                    ),
                    "start_x": round(shot_row["start_x"], 2),
                    "start_y": round(shot_row["start_y"], 2),
                    "end_x": round(shot_row["end_x"], 2),
                    "end_y": round(shot_row["end_y"], 2),
                    "bodypart_name": shot_row["bodypart_name"],
                }
            )

            result_ids.append(shot_row["result_id"])

            # Atualizar o índice de início para a próxima jogada
            i = shot_index + 1

    shots_df = pd.DataFrame(shot_data)
    shots_df["shot_distance_from_goal"] = shots_df.apply(
        lambda pos: get_shot_distance(pos["start_x"], pos["start_y"]), axis=1
    )
    shots_df["shot_angle_from_goal"] = shots_df[["start_x", "start_y"]].apply(
        lambda pos: get_shot_angle(pos["start_x"], pos["start_y"]), axis=1
    )
    shots_df["result_id"] = result_ids
    return shots_df

In [None]:
shots_df = generate_shots_with_counts_events(all_df)

In [None]:
shots_df.head()

## XG

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

In [None]:
RANDOM_STATE = 123
TEST_SIZE = 0.3

In [None]:
shots_df_cp = shots_df.copy()

In [None]:
rfc = RandomForestClassifier(random_state=RANDOM_STATE)
X = shots_df_cp[["bodypart_name", "shot_distance_from_goal", "shot_angle_from_goal"]]
X["bodypart_name"] = X["bodypart_name"].apply(
    lambda val: 0 if val == "foot_right" else 1 if val == "foot_left" else 2
)
y = shots_df_cp["result_id"]

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=TEST_SIZE, random_state=RANDOM_STATE
)
rfc.fit(X=X_train, y=y_train)
y_pred = rfc.predict(X=X_test)
classification_report(y_test, y_pred, output_dict=True)

In [None]:
shots_df_cp["xg"] = rfc.predict(X=X)

In [None]:
ssp_cols_df = pd.DataFrame()
ssp_cols_df["xg"] = shots_df_cp["xg"].map(lambda x: str(x) if x == 1 else "-1")
ssp_cols_df["actions"] = shots_df_cp["actions"]
ssp_cols_df.head()

In [None]:
input_df = pd.DataFrame()
input_df["input"] = ssp_cols_df["xg"] + " " + ssp_cols_df["actions"]
input_df

In [None]:
input_df.to_csv("mining_input_full.txt", header=None, index=False)

## Binary

In [None]:
shots_df_cp2 = shots_df.copy()

In [None]:
shots_df_cp2["result_id"]

In [None]:
ssp_cols_df = pd.DataFrame()
ssp_cols_df["result_id"] = shots_df_cp2["result_id"].map(
    lambda x: str(x) if x == 1 else "-1"
)
ssp_cols_df["actions"] = shots_df_cp2["actions"]
ssp_cols_df.head()

In [None]:
input_df = pd.DataFrame()
input_df["input"] = ssp_cols_df["result_id"] + " " + ssp_cols_df["actions"]
input_df

In [None]:
input_df.to_csv("mining_input_full_bin.txt", header=None, index=False)

## VAEP

In [None]:
from tqdm import tqdm
from socceraction.vaep import features as ft
import socceraction.vaep.labels as lab
import socceraction.vaep.formula as fm
import xgboost as xgb
import sklearn.metrics as mt

In [None]:
def features_transform(spadl):
    spadl.loc[spadl.result_id.isin([2, 3]), ["result_id"]] = 0
    spadl.loc[spadl.result_name.isin(["offside", "owngoal"]), ["result_name"]] = "fail"

    xfns = [
        ft.actiontype_onehot,
        ft.bodypart_onehot,
        ft.result_onehot,
        ft.goalscore,
        ft.startlocation,
        ft.endlocation,
        ft.team,
        ft.time,
        ft.time_delta,
    ]

    features = []
    for game in tqdm(np.unique(spadl.game_id).tolist()):
        match_actions = spadl.loc[spadl.game_id == game].reset_index(drop=True)
        match_states = ft.gamestates(actions=match_actions)
        match_feats = pd.concat([fn(match_states) for fn in xfns], axis=1)
        features.append(match_feats)
    features = pd.concat(features).reset_index(drop=True)

    return features


def labels_transform(spadl):
    yfns = [lab.scores, lab.concedes]

    labels = []
    for game in tqdm(np.unique(spadl.game_id).tolist()):
        match_actions = spadl.loc[spadl.game_id == game].reset_index(drop=True)
        labels.append(pd.concat([fn(actions=match_actions) for fn in yfns], axis=1))

    labels = pd.concat(labels).reset_index(drop=True)

    return labels


def train_vaep(X_train, y_train, X_test, y_test):
    models = {}
    for m in ["scores", "concedes"]:
        models[m] = xgb.XGBClassifier(random_state=0, n_estimators=50, max_depth=3)

        print("training " + m + " model")
        models[m].fit(X_train, y_train[m])

        p = sum(y_train[m]) / len(y_train[m])
        base = [p] * len(y_train[m])
        y_train_pred = models[m].predict_proba(X_train)[:, 1]
        train_brier = mt.brier_score_loss(
            y_train[m], y_train_pred
        ) / mt.brier_score_loss(y_train[m], base)
        print(m + " Train NBS: " + str(train_brier))
        print()

        p = sum(y_test[m]) / len(y_test[m])
        base = [p] * len(y_test[m])
        y_test_pred = models[m].predict_proba(X_test)[:, 1]
        test_brier = mt.brier_score_loss(y_test[m], y_test_pred) / mt.brier_score_loss(
            y_test[m], base
        )
        print(m + " Test NBS: " + str(test_brier))
        print()

        print("----------------------------------------")

    return models


def generate_predictions(features, models):
    preds = {}
    for m in ["scores", "concedes"]:
        preds[m] = models[m].predict_proba(features)[:, 1]
    preds = pd.DataFrame(preds)

    return preds


def calculate_action_values(spadl, predictions):
    action_values = fm.value(
        actions=spadl, Pscores=predictions["scores"], Pconcedes=predictions["concedes"]
    )
    action_values = pd.concat(
        [
            spadl[
                [
                    "original_event_id",
                    "player_id",
                    "action_id",
                    "game_id",
                    "start_x",
                    "start_y",
                    "end_x",
                    "end_y",
                    "type_name",
                    "result_name",
                ]
            ],
            predictions.rename(columns={"scores": "Pscores", "concedes": "Pconcedes"}),
            action_values,
        ],
        axis=1,
    )

    return action_values

In [None]:
spadl = {}
for league in LEAGUES:
    spadl[league] = pd.read_csv(f"{PATH_SPADL}/{league}.csv")

features = {}
for league in LEAGUES:
    features[league] = features_transform(spadl[league])

labels = {}
for league in LEAGUES:
    labels[league] = labels_transform(spadl[league])

models = train_vaep(
    X_train=features["England"],
    y_train=labels["England"],
    X_test=features["Spain"],
    y_test=labels["Spain"],
)

In [None]:
preds = {}
action_values = {}

for league in LEAGUES:
    preds[league] = generate_predictions(features=features[league], models=models)
    action_values[league] = calculate_action_values(
        spadl=spadl[league], predictions=preds[league]
    )

all_action_values = pd.concat([df for df in action_values.values()])

In [None]:
shots_df_cp3 = shots_df.copy()
all_action_values_cp = all_action_values.copy()
all_action_values_cp.drop(
    ["original_event_id", "result_name", "action_id", "type_name"], inplace=True, axis=1
)

shots_df_cp3 = shots_df_cp3.merge(
    all_action_values_cp,
    on=["game_id", "player_id", "start_x", "start_y", "end_x", "end_y"],
    how="left",
)

# Exibir o dataframe resultante
shots_df_cp3.head()

In [None]:
shots_df_cp3.drop(
    ["game_id", "player_id", "result_id"],
    inplace=True,
    axis=1,
)

In [None]:
save_df = pd.DataFrame()
save_df["scores"] = shots_df_cp3["Pscores"].map(lambda x: "1" if x > 0.5 else "-1")
save_df["actions"] = shots_df_cp3["actions"]

In [None]:
input_df = pd.DataFrame()
input_df["input"] = save_df["scores"] + " " + save_df["actions"]
input_df

In [None]:
input_df.to_csv("mining_input_full_vaep.txt", header=None, index=False)