In [3]:
import pandas as pd
from socceraction.data.statsbomb import StatsBombLoader
import socceraction.spadl as spadl
import socceraction.xthreat as xthreat
import matplotsoccer as mps
import numpy as np
import matplotlib.pyplot as plt
from socceraction.xthreat import load_model, get_successful_move_actions
import json
from mplsoccer import VerticalPitch
import statsmodels.formula.api as smf
import statsmodels.api as sm
import pickle 

In [4]:
# Preparación básica: Tienen que ser valores float para poder comparar, en csv vienen con ',' y python lo quiere con '.'
def preparar_datos(df):
    df['x'] = df['x'].astype(str).str.replace(',', '.').astype(float)
    df['y'] = df['y'].astype(str).str.replace(',', '.').astype(float)
    df['endX'] = df['endX'].astype(str).str.replace(',', '.').astype(float)
    df['endX'] = df['endX'].fillna(0)
    df['endY'] = df['endY'].astype(str).str.replace(',', '.').astype(float)
    df['endY'] = df['endY'].fillna(0)
    df['time_seconds'] = pd.to_numeric(df['time_seconds'], errors='coerce')


    df['minute'] = df['minute'].astype(str).str.replace(',', '.').astype(float)
    df['second'] = df['second'].astype(str).str.replace(',', '.').astype(float)
    df['Segundos'] = (df['minute'] * 60 + df['second']).fillna(0).astype(int)
    
    df['x_real'] = df['x'] * 1.05      # 100 -> 105 m
    df['y_real'] = df['y'] * 0.68      # 100 -> 68 m
    df['endx_real'] = df['endX'] * 1.05
    df['endy_real'] = df['endY'] * 0.68

    df['ZonaX'] = (df['x_real'] // 13.125).astype(int)
    df['ZonaY'] = (df['y_real'] // 13.6).astype(int)

    df['Zona_end_X'] = (df['endx_real'] // 13.125).astype(int)
    df['Zona_end_Y'] = (df['endy_real'] // 13.6).astype(int)
    return df

# Métricas de pase
def calcular_pases(df):
    df['LongPass'] = ((df['type_displayName'] == 'Pass') & (df['value_Longball'] == '1,0')).astype(int)
    df['ShortPass'] = ((df['type_displayName'] == 'Pass') & (df['value_Longball'].isna())).astype(int)
    
    # Key passes
    id_evento_keypasses = df.loc[df['value_Assisted'] == '1,0', 'value_RelatedEventId']
    df['KeyPasses'] = df['eventId'].isin(id_evento_keypasses).astype(int)

    # Progressive passes
    df['ProgressPasses'] = (
        (df['type_displayName'] == 'Pass') &
        (df['outcomeType_value'] == 1) &
        (df['x'] < df['endX']) &
        (
            ((df.endX - df.x) * 105 / 100 > 30) & (df.endX < 50) |
            ((df.endX - df.x) * 105 / 100 > 15) & (df.endX > 50) & (df.x <= 50) |
            ((df.endX - df.x) * 105 / 100 > 10) & (df.endX > 50) & (df.x >= 50)
        )
    ).astype(int)

    return df

# Toques en área rival y propia
def calcular_toques_area(df):
    df['TouchesRivalArea'] = (
        (~((df['type_displayName'] == 'Pass') & (df['value_KeeperThrow'] == 1))) &
        (df['type_value'].isin([2, 3, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 41, 42, 50, 52, 54, 61])) &
        (df['x'] > 83) &
        (df['endX'] < 100) &
        (df['y'] > 21.1) &
        (df['endY'] < 78.9)
    ).astype(int)

    df['TouchesOwnArea'] = (
        (~((df['type_displayName'] == 'Pass') & (df['value_KeeperThrow'] == 1))) &
        (df['type_value'].isin([2, 3, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 41, 42, 50, 52, 54, 61])) &
        (df['x'] > 0) &
        (df['endX'] < 17) &
        (df['y'] > 21.1) &
        (df['endY'] < 78.9)
    ).astype(int)

    return df

# Modelo xThreat
def aplicar_xT(df):
    df_xT = df[(df['isShot'] == True) | (df['type_displayName'] == 'Pass')].copy()

    # Formato SPADL
    df_xT.rename(columns={
        "id":"game_id", "eventId":"event_id", "teamId":"team_id",
        "x": "start_x", "y": "start_y", "endX":"end_x", "endY":"end_y",
        "period_value":"value", "period_displayName":"displayName",
        "type_value": "type_id", "type_displayName": "type_name",
        "outcomeType_value": "result_id", "outcomeType_displayName": "result_name",
        "playerId": "player_id"
    }, inplace=True)

    df_xT['type_name'] = np.where(df_xT['isShot'] == 1, 'shot', 'pass')
    df_xT['type_id'] = np.where(df_xT['isShot'] == 1, 11, 0)
    df_xT['isGoal'] = df_xT.isGoal.fillna(0)
    df_xT['result_name'] = np.where(
        (df_xT['type_name'] == 'shot') & (df_xT['isGoal'] != 1),
        'fail',
        np.where((df_xT['result_id'] != 1) & (df_xT['type_name'] == 'pass'),
                 'Unsuccessful', 'Successful')
    )
    df_xT[['end_x','end_y']] = df_xT[['end_x', 'end_y']].fillna(0)
    df_xT['start_x'] *= 1.05
    df_xT['start_y'] *= 0.68
    df_xT['end_x'] *= 1.05
    df_xT['end_y'] *= 0.68

    # Aplicar modelo
    model = load_model("https://karun.in/blog/data/open_xt_12x8_v1.json")
    df_xT = get_successful_move_actions(df_xT)
    df_xT["xT_value"] = model.rate(df_xT)

    # Unir de vuelta por índice (posición en el dataframe original)
    df['Xt_value'] = 0
    df.loc[df_xT.index, 'Xt_value'] = df_xT['xT_value']
    return df

#Tiros realizados
def add_shots_and_goals(df):
    df['Shot'] = df['isShot'].fillna(0).astype(int)
    df['Goal'] = df['isGoal'].fillna(0).astype(int)
    return df

#Regates
def add_dribbles(df):
    df['Dribble_Successful'] = (
        (df['type_displayName'] == 'TakeOn') &
        (df['outcomeType_displayName'] == 'Successful')
    ).astype(int)

    df['Dribble_Unsuccessful'] = (
        (df['type_displayName'] == 'TakeOn') &
        (df['outcomeType_displayName'] == 'Unsuccessful')
    ).astype(int)
    return df

#Acciones defensivas
def add_defensive_actions(df):
    acciones_defensivas_exitosas = [
        'Interception', 'BallRecovery', 'Tackle', 'BlockedPass',
        'Clearance', 'Aerial', 'Challenge', 'ShieldBallOpp', 'OffsideProvoked'
    ]
    df['DefensiveAction_Successful'] = (
        df['type_displayName'].isin(acciones_defensivas_exitosas) &
        (df['outcomeType_displayName'] == 'Successful')
    ).astype(int)

    df['DefensiveAction_Unsuccessful'] = (
        df['type_displayName'].isin(acciones_defensivas_exitosas) &
        (df['outcomeType_displayName'] == 'Unsuccessful')
    ).astype(int)

    df['Clearence'] = (df['type_displayName'] == 'Clearance').astype(int)

    df['Entries_Successful'] = (
        (df['type_displayName'] == 'Tackle') &
        (df['outcomeType_displayName'] == 'Successful')  # CORREGIDO
    ).astype(int)

    df['Aerial_Successful'] = (
        (df['type_displayName'] == 'Aerial') &
        (df['outcomeType_displayName'] == 'Successful')
    ).astype(int)

    df['Aerial_Unsuccessful'] = (
        (df['type_displayName'] == 'Aerial') &
        (df['outcomeType_displayName'] == 'Unsuccessful')
    ).astype(int)

    return df

#Parte del cuerpo utilizada por acciones
def add_body_part(df):
    body_parts_cols = {
        'value_HeadPass': 'Head',
        'value_LeftFoot': 'LeftFoot',
        'value_RightFoot': 'RightFoot',
        'value_OtherBodyPart': 'OtherBodyPart',
        'value_Hands': 'Hands'
    }

    def obtener_parte_cuerpo(row):
        for col, label in body_parts_cols.items():
            if pd.notna(row.get(col)) and row.get(col) != 0:
                return label
        return 'Unknown'

    df['body_part'] = df.apply(obtener_parte_cuerpo, axis=1)
    return df

#Duelos
def add_duels(df):
    duels = ['TakeOn', 'Tackle', 'Aerial', 'Challenge', 'ShieldBallOpp']
    
    df['Duels_Successful'] = (
        df['type_displayName'].isin(duels) &
        (df['outcomeType_displayName'] == 'Successful')
    ).astype(int)

    df['Duels_Unsuccessful'] = (
        df['type_displayName'].isin(duels) &
        (df['outcomeType_displayName'] == 'Unsuccessful')
    ).astype(int)

    return df

#Accion hacia una transición
def add_defensive_to_transition(df):
    acciones_defensivas_exitosas = [
        'Interception', 'BallRecovery', 'Tackle', 'BlockedPass',
        'Clearance', 'Aerial', 'Challenge', 'ShieldBallOpp', 'OffsideProvoked'
    ]

    df['DefensiveToTransition'] = 0

    for i in range(len(df) - 3):
        fila = df.iloc[i]
        if (
            fila['type_displayName'] in acciones_defensivas_exitosas and
            fila['outcomeType_displayName'] == 'Successful'
        ):
            equipo = fila['teamId']
            for j in range(1, 4):  # mirar las 3 siguientes filas
                siguiente = df.iloc[i + j]
                if siguiente['teamId'] == equipo and siguiente['type_displayName'] in ['Pass', 'TakeOn', 'Shot']:
                    df.at[i, 'DefensiveToTransition'] = 1
                    break

    return df

#Eficiencia Ofensiva Tras Presión Alta
def calcular_eficiencia_ofensiva_presion_alta(df):
    df['HighTurnover'] = 0
    df['HighTurnoverShot'] = 0

    acciones_defensivas = ['Interception', 'BallRecovery']

    for i in range(len(df)):
        fila = df.iloc[i]

        if (
            fila['type_displayName'] in acciones_defensivas and
            fila['outcomeType_displayName'] == 'Successful' and
            fila['x'] > 50  # Campo rival
        ):
            equipo = fila['teamId']
            tiempo_inicio = fila['time_seconds']
            df.at[i, 'HighTurnover'] = 1

            # Buscar si en los próximos eventos hay un tiro del mismo equipo en los siguientes 20 segundos
            for j in range(i + 1, min(i + 20, len(df))):  # Limita la búsqueda a los 20 siguientes eventos (ajustable)
                siguiente = df.iloc[j]
                if (
                    siguiente['teamId'] == equipo and
                    siguiente['isShot'] == 1 and
                    siguiente['time_seconds'] - tiempo_inicio <= 20
                ):
                    df.at[i, 'HighTurnoverShot'] = 1
                    break
    return df

#Expected goals
def calcular_xG_modelo(df, ruta_entrenamiento):
    """
    -----------
    df : pd.DataFrame
        DataFrame con eventos del partido.
    xg_model.pkl : Parámetros entrenados a partir de una temporada de la premier league.
    
    Retorna:
    --------
    df : pd.DataFrame
        DataFrame original con una columna nueva "xG" para los disparos.
    """
# =======================
# 1. Carga EL MODELO
# =======================

    with open('Modelos/xg_model.pkl', 'rb') as f:
        model = pickle.load(f)
    
    b = model.params.values  # <- ESTO DEFINE b
    model_variables = model.model.exog_names[1:]  # omitir el intercepto
    # Diccionario para renombrar columnas
    rename_dict = {
        'type_value': 'eventId',
        'playerId': 'playerId',
        'matchId': 'matchId',
        'Shot': 'eventName',
        'teamId': 'teamId',
        'period_value': 'matchPeriod',
        'Segundos': 'eventSec',
        'id': 'id',
        'x': 'X',
        'y': 'Y'
    }

    # Copia y renombra columnas
    df_xG = df.rename(columns=rename_dict).copy()
    df_xG = df_xG[df_xG['eventName'] == 1]  # Filtrar solo disparos

    # Ajustar coordenadas al campo
    df_xG["X"] = (100 - df_xG['X']) * 105 / 100
    df_xG["Y"] = df_xG['Y'] * 68 / 100
    df_xG["C"] = abs(df_xG['Y'] - 50) * 68 / 100
    df_xG["Distance"] = np.sqrt(df_xG["X"]**2 + df_xG["C"]**2)

    # Calcular ángulo de disparo
    df_xG["Angle"] = np.where(
        np.arctan(7.32 * df_xG["X"] / (df_xG["X"]**2 + df_xG["C"]**2 - (7.32 / 2)**2)) > 0,
        np.arctan(7.32 * df_xG["X"] / (df_xG["X"]**2 + df_xG["C"]**2 - (7.32 / 2)**2)),
        np.arctan(7.32 * df_xG["X"] / (df_xG["X"]**2 + df_xG["C"]**2 - (7.32 / 2)**2)) + np.pi
    )

    # Variables para el modelo
    df_xG["X2"] = df_xG['X']**2
    df_xG["C2"] = df_xG['C']**2
    df_xG["AX"] = df_xG['Angle'] * df_xG['X']

    # Función de predicción de xG
    def calculate_xG(row):    
        bsum = b[0]  # Intercepto
        for i, v in enumerate(model_variables):
            bsum += b[i + 1] * row[v]
        xG = 1 / (1 + np.exp(-bsum))
        return xG

    # Aplicar función a los disparos
    df_xG["xG"] = df_xG.apply(calculate_xG, axis=1)

    # Unir columna xG al DataFrame original, asignar 0 donde no hay disparos
    df = df.copy()
    df = df.merge(df_xG[["id", "xG"]], on="id", how="left")
    df["xG"] = df["xG"].fillna(0)

    return df

In [5]:
# 1. Cargar archivo CSV
df = pd.read_csv(r'G:\Mi unidad\TFM\OPTA\1821467_eventData.csv', delimiter=';', engine='python')
ruta_entrenamiento = r'G:\Mi unidad\TFM\OPTA\Eventes_Premier_train.xlsx'
# 2. Aplicar funciones paso a paso
df = preparar_datos(df)
df = calcular_pases(df)
df = calcular_toques_area(df)
df = aplicar_xT(df)
df = add_shots_and_goals(df)
df = add_dribbles(df)
df = add_defensive_actions(df)
df = add_body_part(df)
df = add_duels(df)
df = add_defensive_to_transition(df)
df = calcular_eficiencia_ofensiva_presion_alta(df)
df = calcular_xG_modelo(df, ruta_entrenamiento)

#3. Renombrar columnas "value_" a nombres más claros
columnas_renombradas = {
    'value_Angle': 'Angle',
    'value_Length': 'Length',
    'value_Zone': 'Zone',
    'value_ThrowIn': 'ThrowIn',
    'value_Cross': 'Cross',
    'value_FirstTouch': 'FirstTouch',
    'value_LastMan': 'LastMan',
    'value_Foul': 'Foul',
    'value_PlayerPosition': 'PlayerPosition',
    'value_AerialFoul': 'AerialFoul'
}
df.rename(columns=columnas_renombradas, inplace=True)

# 4. Filtrar columnas finales

columnas_finales = [
    # Identificadores generales
    'id', 'matchId', 'teamId', 'teamName',

    # Tiempo y período
    'minute', 'second', 'Segundos', 'period_value', 'period_displayName',

    # Jugador
    'playerId', 'playerName', 'position', 'body_part',

    # Evento base
    'type_displayName', 'outcomeType_value', 'outcomeType_displayName',
    'Foul',

    # Coordenadas originales
    'x', 'y', 'endX', 'endY',

    # Coordenadas transformadas / zonas
    'ZonaX', 'ZonaY', 'Zona_end_X', 'Zona_end_Y','Length',

    # Tipos de pase
    'LongPass', 'ShortPass', 'KeyPasses', 'ProgressPasses', 'Cross',

    # Toques y zonas de influencia
    'TouchesRivalArea', 'TouchesOwnArea', 'FirstTouch', 'Entries_Successful',

    # Disparo y gol
    'Shot', 'Goal', 'xG',

    # Dribblings
    'Dribble_Successful', 'Dribble_Unsuccessful',

    # Duelos
    'Duels_Successful', 'Duels_Unsuccessful',

    # Acciones defensivas
    'DefensiveAction_Successful', 'DefensiveAction_Unsuccessful', 'Clearence',

    # Transiciones y presión
    'DefensiveToTransition', 'HighTurnover', 'HighTurnoverShot',

    # Modelos avanzados
    'Xt_value',

]

# Asegurar que solo queden las columnas deseadas (las que existan)
df = df[[col for col in columnas_finales if col in df.columns]]

# 4. (Opcional) Guardar resultados
df.to_excel("output_analizado.xlsx", index=False, engine='openpyxl')


  0.000000e+00  0.000000e+00  1.332470e-03  0.000000e+00  1.969600e-03
 -1.969600e-03  0.000000e+00 -1.144499e-02 -1.272150e-03 -1.100200e-04
  6.485330e-03 -2.585340e-03 -1.272150e-03  2.049300e-03 -2.604000e-03
 -1.599540e-03  2.154240e-03 -2.144090e-03  3.473290e-03  0.000000e+00
  0.000000e+00  6.109000e-05  1.159680e-03  2.083000e-04  2.044220e-03
 -3.473290e-03  4.792700e-04  3.491670e-03  1.665471e-02  1.184340e-03
  7.288600e-04 -4.965400e-04 -2.378680e-03  6.564610e-03  1.302700e-03
  1.071900e-04 -4.151700e-03  1.637200e-04 -2.793920e-03  2.793920e-03
 -1.847510e-03  3.114740e-03 -2.520100e-03  1.900720e-03 -4.615360e-03
 -1.272150e-03  1.808190e-03  1.900720e-03  4.771080e-03  0.000000e+00
 -6.478500e-04 -2.495360e-03  1.808190e-03  3.770690e-03  1.486080e-03
 -3.536300e-04  1.888540e-03  7.012512e-02  7.012512e-02  3.696380e-03
  1.000580e-03 -1.000580e-03 -1.362943e-02 -2.714640e-03  1.100200e-04
  1.272150e-03  3.233190e-03 -1.252870e-03 -3.159320e-03 -6.528000e-05
 -2.08