## An치lisis de partidas de ajedrez con Stockfish

En este proyecto se han probado distintas estrategias para analizar partidas de ajedrez en paralelo usando Apache Spark. Sin embargo, algunas de ellas no ofrecieron mejoras en rendimiento cuando se ejecutaron en un entorno local compuesto por contenedores Docker. A continuaci칩n se explica por qu칠.


###  Estrategia 1: An치lisis con Spark por partida (flatMap + mapPartitions)

- Cada partida individual se trat칩 como una unidad de trabajo en Spark.
- El motor `Stockfish` se ejecutaba dentro de cada partici칩n.
- El resultado se recolectaba en el driver como un 칰nico DataFrame.

#### Problemas en entorno local (Docker):
- Los contenedores Docker comparten CPU y disco del host, no son nodos f칤sicos reales.
- Cada partici칩n lanza su propia instancia de `Stockfish`, saturando los recursos del sistema.
- No se aprovechan m칰ltiples nodos f칤sicos ni almacenamiento distribuido.
- Resultado: rendimiento peor que el an치lisis secuencial local.

####  칔til en entornos reales si:
- Se dispone de un cl칰ster Spark distribuido con nodos f칤sicos separados.
- `Stockfish` est치 disponible en todos los nodos.
- Se procesan decenas de miles de partidas.

---

### Estrategia 2: Un nodo procesa cada archivo con Spark (map por archivo)

- Cada archivo `.pgn` completo se procesaba por un nodo.
- El an치lisis se hac칤a con `Stockfish` y se guardaban los resultados en `.parquet`.

####  Problemas en entorno local (Docker):
- Los contenedores comparten recursos del host, lo que limita la paralelizaci칩n efectiva.
- Varias instancias de `Stockfish` compitiendo generan cuellos de botella.
- Los `print()` y barras de progreso (`tqdm`) no se muestran en la notebook, ya que Spark no reenv칤a la salida est치ndar desde los ejecutores al driver.

#### 칔til en entornos reales cuando:
- Existen nodos f칤sicos con CPUs independientes.
- Hay muchos archivos `.pgn` bien distribuidos.

---

 Por tanto, aunque Spark es potente para procesamiento distribuido, no ofrece beneficios reales en un entorno local con contenedores Docker.  Para trabajos medianos o pruebas desde notebooks, el an치lisis secuencial (uno a uno) es **m치s r치pido, controlable y f치cil de depurar**.



## C칩digo efectivo a realizar en un cluster real (estrategia 2)

In [1]:
from pyspark.sql import SparkSession
from pathlib import Path

# Crear sesi칩n de Spark distribuida (usar치 nodos del cl칰ster)
spark = SparkSession.builder \
    .appName("Analisis_PGN_Distribuido_HDFS") \
    .master("yarn") \
    .getOrCreate()

# Ruta de entrada con archivos .pgn en HDFS (cada uno con ~50 partidas)
ruta_entrada_hdfs = "hdfs:///user/ajedrez/raw/seleccionados/"

# Ruta donde se guardar치n los resultados individuales en HDFS
ruta_salida_hdfs = "hdfs:///user/ajedrez/analizados_grupo/"

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/18 17:32:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/18 17:32:26 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [2]:
def analizar_pgn(pgn_path: str, stockfish_path: str = "stockfish.exe"):
    log_path = "errores_pgn.log"
    central_squares = {"d4", "e4", "d5", "e5"}
    
    piece_value = {"p": 1, "n": 3, "b": 3, "r": 5, "q": 9, "k": 0}
    stockfish = Stockfish(path=stockfish_path)

    players = defaultdict(lambda: {
        "fide_id": None,
        "elo": [],
        "resultados": [],
        "partidas": 0,
        "movimientos_total": [],
        "enroque": [],
        "intercambio_piezas": [],
        "sacrificios": [],
        "mov_peones": [],
        "mov_centrales": [],
        "cod_eco": [],
        "evaluacion_cp": [],
        "evaluacion_cp_apertura": [],
        "evaluacion_cp_mediojuego": [],
        "evaluacion_cp_final": [],
        "errores_graves": [],
        "errores": [],
        "errores_leves": [],
        "errores_graves_apertura": [],
        "errores_graves_mediojuego": [],
        "errores_graves_final": [],
        "errores_apertura": [],
        "errores_mediojuego": [],
        "errores_final": [],
        "errores_leves_apertura": [],
        "errores_leves_mediojuego": [],
        "errores_leves_final": [],
        "evento": [],
        "lugar": [],
        "pgn": [],
        "san": [],
        "fechas": []
    })

    jugadores_objetivo = set()
    with open(pgn_path, encoding="utf-8", errors="ignore") as f:
        while True:
            game = chess.pgn.read_game(f)
            if game is None:
                break
            jugadores_objetivo.add(game.headers.get("White", "Unknown"))
            jugadores_objetivo.add(game.headers.get("Black", "Unknown"))
    jugadores_objetivo = {j for j in jugadores_objetivo if j not in {"?", "Unknown", ""}}

    with open(pgn_path, encoding="utf-8", errors="ignore") as f:
        total_games = sum(1 for _ in iter(lambda: chess.pgn.read_game(f), None))

    with open(log_path, "w", encoding="utf-8") as log_file:
        log_file.write(f"Registro de errores - {datetime.datetime.now()}\n\n")

    with open(pgn_path, encoding="utf-8", errors="ignore") as f:
        for i in tqdm(range(total_games), desc="Analizando partidas"):
            try:
                game = chess.pgn.read_game(f)
                if game is None:
                    break

                white = game.headers.get("White", "Unknown")
                black = game.headers.get("Black", "Unknown")
                white_fide = game.headers.get("WhiteFideId", "None")
                black_fide = game.headers.get("BlackFideId", "None")
                white_elo = int(game.headers.get("WhiteElo", "0"))
                black_elo = int(game.headers.get("BlackElo", "0"))
                resultado_raw = game.headers.get("Result", "*")

                exporter = chess.pgn.StringExporter(headers=True, variations=False, comments=False)
                pgn_str = game.accept(exporter)

                board = game.board()
                jugadas_san = []
                for move in game.mainline_moves():
                    jugadas_san.append(board.san(move))
                    board.push(move)

                resultado_white, resultado_black = {
                    "1-0": (1.0, 0.0),
                    "0-1": (0.0, 1.0),
                    "1/2-1/2": (0.5, 0.5)
                }.get(resultado_raw, (None, None))

                if white not in jugadores_objetivo and black not in jugadores_objetivo:
                    continue

                eco = game.headers.get("ECO", "None")
                fecha_str = game.headers.get("Date", None)
                try:
                    fecha = datetime.datetime.strptime(fecha_str, "%Y.%m.%d").date()
                except:
                    fecha = None

                game_counted = {"W": False, "B": False}
                board = game.board()
                move_number = 0
                moves_list = []
                jugador_stats_temporales = {"W": defaultdict(int), "B": defaultdict(int)}
                contador_jugadas_por_jugador = {"W": 0, "B": 0}
                evaluaciones_por_fase = {"apertura": [], "mediojuego": [], "final": [], "total": []}

                for move in game.mainline_moves():
                    move_number += 1
                    current_player = white if board.turn else black
                    color_tag = "W" if board.turn else "B"

                    if current_player not in jugadores_objetivo:
                        board.push(move)
                        continue

                    contador_jugadas_por_jugador[color_tag] += 1

                    key = (current_player, color_tag)
                    stats = players[key]
                    stats_temp = jugador_stats_temporales[color_tag]

                    if not game_counted[color_tag]:
                        stats["partidas"] += 1
                        stats["cod_eco"].append(eco)
                        stats["fide_id"] = white_fide if color_tag == "W" else black_fide
                        stats["elo"].append(white_elo if color_tag == "W" else black_elo)
                        stats["resultados"].append(resultado_white if color_tag == "W" else resultado_black)
                        if fecha:
                            stats["fechas"].append(fecha)
                        stats["evento"].append(game.headers.get("Event", ""))
                        stats["lugar"].append(game.headers.get("Site", ""))
                        stats["pgn"].append(pgn_str)
                        stats["san"].append(jugadas_san)

                    game_counted[color_tag] = True
                    uci = move.uci()
                    moves_list.append(uci)

                    try:
                        stockfish.set_position(moves_list[:-1])
                        eval_before = stockfish.get_evaluation()["value"] or 0
                        stockfish.set_position(moves_list)
                        eval_after = stockfish.get_evaluation()["value"] or 0
                    except Exception as sf_error:
                        raise RuntimeError(f"Stockfish error en jugada {uci}: {sf_error}")

                    loss = abs(eval_before - eval_after)
                    fase = "apertura" if move_number <= 20 else "mediojuego" if move_number <= 60 else "final"
                    eval_abs = abs(eval_after)
                    evaluaciones_por_fase[fase].append(eval_abs)
                    evaluaciones_por_fase["total"].append(eval_abs)

                    piece = board.piece_at(move.from_square)
                    if not board.is_legal(move):
                        raise ValueError(f"Movimiento ilegal: {uci} en {board.fen()}")

                    san = board.san(move)
                    board.push(move)

                    stats_temp["movimientos_total"] += 1

                    if san in ["O-O", "O-O-O"] and "enroque" not in stats_temp:
                        stats_temp["enroque"] = contador_jugadas_por_jugador[color_tag]

                    if "x" in san:
                        stats_temp["intercambio_piezas"] += 1
                        capturada = board.piece_at(move.to_square)
                        if capturada:
                            atacante = piece_value[piece.symbol().lower()]
                            capturada_valor = piece_value[capturada.symbol().lower()]
                            if atacante > capturada_valor and loss < 150:
                                stats_temp["sacrificios"] += 1

                    if piece and piece.symbol().lower() == "p":
                        stats_temp["mov_peones"] += 1

                    if chess.square_name(move.to_square) in central_squares:
                        stats_temp["mov_centrales"] += 1

                    if loss > 300:
                        stats_temp["errores_graves"] += 1
                        stats_temp[f"blunders_{fase}"] += 1
                    elif loss > 100:
                        stats_temp["errores"] += 1
                        stats_temp[f"errores_{fase}"] += 1
                    elif loss > 50:
                        stats_temp["errores_leves"] += 1
                        stats_temp[f"imprecisiones_{fase}"] += 1

                for color_tag, jugador in [("W", white), ("B", black)]:
                    if jugador in jugadores_objetivo:
                        stats = players[(jugador, color_tag)]
                        temp = jugador_stats_temporales[color_tag]
                        stats["movimientos_total"].append(temp["movimientos_total"])
                        stats["mov_peones"].append(temp["mov_peones"])
                        stats["mov_centrales"].append(temp["mov_centrales"])
                        stats["intercambio_piezas"].append(temp["intercambio_piezas"])
                        stats["sacrificios"].append(temp["sacrificios"])
                        stats["enroque"].append(temp["enroque"] if "enroque" in temp else 0)

                        for fase_eval in ["total", "apertura", "mediojuego", "final"]:
                            lista = evaluaciones_por_fase[fase_eval]
                            campo = f"evaluacion_cp" if fase_eval == "total" else f"evaluacion_cp_{fase_eval}"
                            stats[campo].append(statistics.mean(lista) if lista else None)

                        errores_map = {
                            "errores_graves": "errores_graves",
                            "errores": "errores",
                            "errores_leves": "errores_leves",
                            "errores_graves_apertura": "blunders_apertura",
                            "errores_graves_mediojuego": "blunders_mediojuego",
                            "errores_graves_final": "blunders_final",
                            "errores_apertura": "errores_apertura",
                            "errores_mediojuego": "errores_mediojuego",
                            "errores_final": "errores_final",
                            "errores_leves_apertura": "imprecisiones_apertura",
                            "errores_leves_mediojuego": "imprecisiones_mediojuego",
                            "errores_leves_final": "imprecisiones_final"
                        }

                        for campo_destino, campo_temp in errores_map.items():
                            valor = temp.get(campo_temp, 0)
                            stats[campo_destino].append(valor)

            except Exception as e:
                msg = f" Error en partida #{i+1} ({white} vs {black}): {e}"
                print("\n" + msg)
                with open(log_path, "a", encoding="utf-8") as log_file:
                    log_file.write(msg + "\n")

    filas = []
    for (nombre, color), datos in players.items():
        for i in range(datos["partidas"]):
            fila = {"jugador": nombre, "color": color, "fide_id": datos.get("fide_id", "")}
            for campo, valores in datos.items():
                if isinstance(valores, list) and len(valores) > i:
                    fila[campo] = valores[i]
            filas.append(fila)

    df_partidas = pd.DataFrame(filas)
    df_partidas["fechas"] = pd.to_datetime(df_partidas["fechas"], errors="coerce")
    df_partidas = df_partidas.fillna(np.nan)

    nombre_archivo = f"analisis_{os.path.splitext(os.path.basename(pgn_path))[0]}.parquet"
    df_partidas.to_parquet(nombre_archivo, engine="pyarrow", coerce_timestamps="ms")
    return nombre_archivo

In [3]:
import os
import io
import datetime
import statistics
import chess.pgn
import pandas as pd
import numpy as np
from collections import defaultdict
from stockfish import Stockfish

def procesar_y_guardar_en_hdfs(archivo):
    from pathlib import Path
    
    nombre_archivo = Path(archivo[0]).stem
    contenido_pgn = archivo[1]
    
    # Guardar temporalmente en disco local
    ruta_temp = f"/tmp/{nombre_archivo}.pgn"
    with open(ruta_temp, "w", encoding="utf-8") as f:
        f.write(contenido_pgn)

    try:
        salida = analizar_pgn(
            pgn_path=ruta_temp,
            stockfish_path="stockfish",
            log_errores=False,
            mostrar_progreso=False,
            verbose=False
        )
        
        parquet_local = Path(salida)
        parquet_hdfs = f"{ruta_salida_hdfs}analisis_{nombre_archivo}.parquet"
        
        # Subir al HDFS (sobrescribe si existe)
        os.system(f"hdfs dfs -put -f {parquet_local} {parquet_hdfs}")
        print(f"Subido a HDFS: {parquet_hdfs}")
        return parquet_hdfs
    
    except Exception as e:
        print(f"Error en archivo {nombre_archivo}: {e}")
        return None


In [4]:
# Leer archivos desde HDFS como RDD [(ruta, contenido), ...]
rdd_archivos = spark.sparkContext.wholeTextFiles(ruta_entrada_hdfs)

# Analizar y guardar resultados por archivo en paralelo en el cl칰ster
resultados = rdd_archivos.map(procesar_y_guardar_en_hdfs).collect()

# Mostrar los archivos subidos
print("\n Archivos .parquet generados en HDFS:")
for archivo in resultados:
    if archivo:
        print("   -", archivo)





游닍 Archivos .parquet generados en HDFS:


                                                                                