In [0]:
# --- imports
from pyspark.sql.functions import col
import pyspark.sql.functions as F
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# =========================
# 0) Datos base (df_top)
# =========================
df = spark.table("fifa_21_male_2")

top_clubs = [
    "Manchester City","Liverpool","Chelsea","Arsenal","Manchester United","Tottenham Hotspur",
    "Real Madrid","FC Barcelona","Atlético Madrid","Sevilla FC","Villarreal CF",
    "Juventus","Inter","AC Milan","Napoli","AS Roma",
    "Bayern Munich","Borussia Dortmund","RB Leipzig","Bayer 04 Leverkusen",
    "Paris Saint-Germain","Olympique Lyonnais","AS Monaco","Marseille"
]
df_top = df.filter(col("Club").isin(top_clubs))

# =========================
# 1) Seleccionar SOLO columnas del rename_map y renombrar
# =========================
rename_map = {
    "Crossing": "Crossing",
    "Finishing": "Finishing",
    "Heading Accuracy": "Heading_Accuracy",
    "Short Passing": "Short_Passing",
    "Volleys": "Volleys",
    "Dribbling": "Dribbling",
    "Curve": "Curve",
    "FK Accuracy": "FK_Accuracy",
    "Long Passing": "Long_Passing",
    "Ball Control": "Ball_Control",
    "Acceleration": "Acceleration",
    "Sprint Speed": "Sprint_Speed",
    "Agility": "Agility",
    "Reactions": "Reactions",
    "Balance": "Balance",
    "Shot Power": "Shot_Power",
    "Jumping": "Jumping",
    "Stamina": "Stamina",
    "Strength": "Strength",
    "Long Shots": "Long_Shots",
    "Aggression": "Aggression",
    "Interceptions": "Interceptions",
    "Positioning": "Positioning",
    "Vision": "Vision",
    "Penalties": "Penalties",
    "Composure": "Composure",
    "Marking": "Marking",
    "Standing Tackle": "Standing_Tackle",
    "Sliding Tackle": "Sliding_Tackle",
    "GK Diving": "GK_Diving",
    "GK Handling": "GK_Handling",
    "GK Kicking": "GK_Kicking",
    "GK Positioning": "GK_Positioning",
    "GK Reflexes": "GK_Reflexes",
    "BP": "BP"   # etiqueta POS detallada
}

# Tomar solo las columnas del mapping que realmente existen en df_top
present_cols = [c for c in rename_map.keys() if c in df_top.columns]
if "BP" not in present_cols:
    raise ValueError("La columna 'BP' no está en el dataset. Revisa el nombre de la columna objetivo.")

df_w = df_top.select(*present_cols)
for old, new in rename_map.items():
    if old in df_w.columns and old != new:
        df_w = df_w.withColumnRenamed(old, new)

# =========================
# 2) Mapear BP (posiciones FIFA finas) → RoleGroup {GK, DEF, MID, OFF}
# =========================
def_positions = ["CB","LB","RB","LWB","RWB"]
mid_positions = ["CDM","CM","LM","RM"]
off_positions = ["ST","CF","CAM","LW","RW","LAM","RAM","LF","RF"]

df_w = df_w.withColumn(
    "RoleGroup",
    F.when(F.col("BP")=="GK", "GK")
     .when(F.col("BP").isin(def_positions), "DEF")
     .when(F.col("BP").isin(mid_positions), "MID")
     .when(F.col("BP").isin(off_positions), "OFF")
     .otherwise(F.lit(None))
)

# Filtrar filas válidas (descarta posiciones raras o NaN)
df_w = df_w.filter(F.col("RoleGroup").isNotNull())

# =========================
# 3) Features vector + label numérica
# =========================
# Features = TODAS las columnas del mapping excepto BP (ya renombradas)
feature_cols = [new for (old,new) in rename_map.items() if new in df_w.columns and new != "BP"]

# UDF para vectorizar (podrías usar VectorAssembler; acá dejo tu estilo original)
to_vector = F.udf(lambda xs: Vectors.dense([float(x) if x is not None else 0.0 for x in xs]), VectorUDT())
df_vec = df_w.withColumn(
    "features",
    to_vector(F.array(*[F.col(c).cast("double") for c in feature_cols]))
)

# Label mapping en el orden: GK=0, DEF=1, MID=2, OFF=3
label_map = F.when(F.col("RoleGroup")=="GK", F.lit(0.0)) \
             .when(F.col("RoleGroup")=="DEF", F.lit(1.0)) \
             .when(F.col("RoleGroup")=="MID", F.lit(2.0)) \
             .when(F.col("RoleGroup")=="OFF", F.lit(3.0))
df_vec = df_vec.withColumn("label", label_map)

# Split
train, test = df_vec.select("features","label").randomSplit([0.8, 0.2], seed=42)

# =========================
# 4) OVR con Lasso (LinearRegression L1)
# =========================
class_names = ["GK","DEF","MID","OFF"]   # debe corresponder al label mapping 0..3

def fit_ovr_lasso(df_train, reg_param):
    models = {}
    for idx, name in enumerate(class_names):
        y_col = F.when(F.col("label")==float(idx), F.lit(1.0)).otherwise(F.lit(0.0)).alias("y")
        train_y = df_train.select("features", y_col)
        lr = LinearRegression(
            featuresCol="features",
            labelCol="y",
            elasticNetParam=1.0,     # L1 puro
            regParam=reg_param,
            maxIter=200,
            standardization=True
        )
        models[name] = lr.fit(train_y)
    return models

def add_scores(df_in, models):
    out = df_in
    for name, m in models.items():
        out = m.transform(out).withColumnRenamed("prediction", f"score_{name}")
        # limpieza de columnas temporales
        for col_drop in ["y", "prediction"]:
            if col_drop in out.columns:
                out = out.drop(col_drop)
    return out

def predict_class(df_in):
    score_cols = [f"score_{n}" for n in class_names]
    pred_idx = F.array_max(F.array(*[F.col(c) for c in score_cols]))
    return (
        df_in
        .withColumn("pred_idx", pred_idx)
        .withColumn(
            "prediction",
            F.when(F.col("pred_idx")==F.col("score_GK"),  F.lit(0.0))
             .when(F.col("pred_idx")==F.col("score_DEF"), F.lit(1.0))
             .when(F.col("pred_idx")==F.col("score_MID"), F.lit(2.0))
             .otherwise(F.lit(3.0)) # OFF
        )
        .drop("pred_idx")
    )

# =========================
# 5) Búsqueda simple de regParam
# =========================
reg_grid = [1e-4, 1e-3, 1e-2, 1e-1, 3e-1]
best_r, best_f1 = None, -1.0
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

for r in reg_grid:
    models_r = fit_ovr_lasso(train, r)
    val_scores = add_scores(train, models_r)  # si querés, separá un val split
    val_pred = predict_class(val_scores)
    f1 = evaluator.evaluate(val_pred)
    if f1 > best_f1:
        best_f1, best_r = f1, r

print(f"Mejor regParam: {best_r} | F1 (train-val): {best_f1:.3f}")

# =========================
# 6) Entrenar con mejor regParam y evaluar en test
# =========================
best_models = fit_ovr_lasso(train, best_r)

test_scored = add_scores(test, best_models)
test_pred   = predict_class(test_scored)

acc = MulticlassClassificationEvaluator(metricName="accuracy", labelCol="label").evaluate(test_pred)
f1  = MulticlassClassificationEvaluator(metricName="f1", labelCol="label").evaluate(test_pred)
wp  = MulticlassClassificationEvaluator(metricName="weightedPrecision", labelCol="label").evaluate(test_pred)
wr  = MulticlassClassificationEvaluator(metricName="weightedRecall", labelCol="label").evaluate(test_pred)

print(f"[TEST] Accuracy: {acc:.3f} | F1: {f1:.3f} | wPrec: {wp:.3f} | wRec: {wr:.3f}")

# =========================
# 7) Importancia (suma de |coef| en las 4 clases)
# =========================
import numpy as np
coef_sum_abs = np.zeros(len(feature_cols), dtype=float)
for name in class_names:
    coef_sum_abs += np.abs(best_models[name].coefficients.toArray())

ranking = sorted([(f, float(w)) for f, w in zip(feature_cols, coef_sum_abs)],
                 key=lambda x: x[1], reverse=True)

print("Top 15 variables por importancia (suma |coef| OVR):")
for feat, w in ranking[:15]:
    print(f"{feat}: {w:.4f}")

# =========================
# 8) Predicción de ejemplo (usando una fila real del dataset para asegurar dimensiones)
# =========================
example = df_w.limit(1)
example_vec = example.select(
    to_vector(F.array(*[F.col(c).cast("double") for c in feature_cols])).alias("features")
)

ex_scored = add_scores(example_vec, best_models)
ex_pred = predict_class(ex_scored)
ex_pred.select("score_GK","score_DEF","score_MID","score_OFF","prediction").show(truncate=False)


Mejor regParam: 0.001 | F1 (train-val): 0.815
[TEST] Accuracy: 0.894 | F1: 0.890 | wPrec: 0.894 | wRec: 0.894
Top 15 variables por importancia (suma |coef| OVR):
Crossing: 0.0226
Stamina: 0.0172
Heading_Accuracy: 0.0170
Reactions: 0.0165
Sliding_Tackle: 0.0156
Dribbling: 0.0118
Long_Passing: 0.0117
Vision: 0.0113
Marking: 0.0105
Finishing: 0.0104
Sprint_Speed: 0.0101
GK_Handling: 0.0085
Agility: 0.0082
GK_Diving: 0.0080
GK_Reflexes: 0.0072
+--------------------+-------------------+------------------+------------------+----------+
|score_GK            |score_DEF          |score_MID         |score_OFF         |prediction|
+--------------------+-------------------+------------------+------------------+----------+
|-0.02538900248718938|-0.1252865572817458|0.3996523369762138|0.7201159743118317|3.0       |
+--------------------+-------------------+------------------+------------------+----------+



In [0]:
#from pyspark.sql import functions as F

# Contar cada combinación real vs predicho
cm = (test_pred
      .groupBy("label","prediction")
      .count()
      .orderBy("label","prediction"))

import pandas as pd

# Pasar a pandas y pivotear
cm_pdf = cm.toPandas().pivot(index="label", columns="prediction", values="count").fillna(0)

# Reordenar columnas por si falta alguna
cm_pdf = cm_pdf.reindex(index=[0,1,2,3], columns=[0,1,2,3], fill_value=0)

# Renombrar índices a nombres legibles
cm_pdf.index = ["GK (real)", "DEF (real)", "MID (real)", "OFF (real)"]
cm_pdf.columns = ["GK (pred)", "DEF (pred)", "MID (pred)", "OFF (pred)"]

print("\n=== MATRIZ DE CONFUSIÓN ===\n")
print(cm_pdf.to_string())




=== MATRIZ DE CONFUSIÓN ===

            GK (pred)  DEF (pred)  MID (pred)  OFF (pred)
GK (real)        17.0         0.0         0.0         0.0
DEF (real)        0.0        37.0         0.0         0.0
MID (real)        0.0         4.0        20.0         5.0
OFF (real)        0.0         1.0         3.0        36.0
