Importes necesarios

In [None]:
import polars as pl
import os
import zipfile
import orjson as json
import tqdm as tqdm

CARPETA_DATA = r"./Data/"

DATA_PATHS = {
    "EUW": CARPETA_DATA + r"matches_raw_euw_ranked.zip",
    "KR": CARPETA_DATA + r"matches_raw_kr_ranked.zip",
    "NA": CARPETA_DATA + r"matches_raw_na_ranked.zip"
}
OUTPUT_FILE = "draft_oracle_master_data.parquet"

print(f"Directorio de trabajo actual: {os.getcwd()}")
for region, path in DATA_PATHS.items():
    exists = os.path.exists(path)
    status = "Encontrada" if exists else "NO ENCONTRADA (Revisa la ruta)"
    print(f"Región {region} ({path}): {status}")

Definición de la Función de Extracción

In [None]:
def extract_features_complete(data, region, filename="unknown"):
    """
    Combina las métricas Base (v1) y Pro (v2) en un solo registro.
    """
    try:
        info = data.get('info', {})
        meta = data.get('metadata', {})

        if info.get('queueId', 0) != 420 or info.get('gameDuration', 0) < 900:
            return []

        match_id = meta.get('matchId', filename)

        extracted_rows = []
        for p in info.get('participants', []):
            challenges = p.get('challenges', {})
            perks = p.get('perks', {})

            dmg = p.get('totalDamageDealtToChampions', 0)
            gold = p.get('goldEarned', 1)
            dcr = dmg / gold if gold > 0 else 0

            try:
                styles = perks.get('styles', [])
                primary = styles[0].get('style', -1) if styles else -1
                keystone = styles[0].get('selections', [])[0].get('perk', -1) if styles and styles[0].get('selections') else -1
            except:
                primary = -1; keystone = -1

            row = {
                "game_id": match_id, "region": region, "patch": info.get('gameVersion', "0.0.0"),
                "champ_id": p.get('championId'), "position": p.get('teamPosition', 'UTILITY'),
                "side": p.get('teamId'), "target": 1 if p.get('win') else 0, "duration": info.get('gameDuration', 0), "queue": info.get('queueId', 0),

                "stat_phys_dmg": p.get('physicalDamageDealtToChampions', 0),
                "stat_magic_dmg": p.get('magicDamageDealtToChampions', 0),
                "stat_true_dmg": p.get('trueDamageDealtToChampions', 0),
                "stat_dmg_taken": p.get('totalDamageTaken', 0),
                "stat_mitigated": p.get('damageSelfMitigated', 0),
                "stat_heal": p.get('totalHealsOnTeammates', 0),

                "stat_cc_duration": p.get('timeCCingOthers', 0),
                "stat_hard_cc": challenges.get('enemyChampionImmobilizations', 0),
                "stat_vision": challenges.get('visionScorePerMinute', 0),

                "stat_dcr": dcr,
                "stat_dpm": challenges.get('damagePerMinute', 0),
                "stat_gpm": challenges.get('goldPerMinute', 0),
                "stat_turret_plates": challenges.get('turretPlatesTaken', 0),
                "stat_obj_control": challenges.get('dragonTakedowns', 0),

                "stat_lane_cs_10": challenges.get('laneMinionsFirst10Minutes', 0),
                "stat_solo_kills": challenges.get('soloKills', 0),
                "stat_lane_diff": challenges.get('laningPhaseGoldExpAdvantage', 0),
                "stat_roam_kills": challenges.get('killsOnOtherLanesEarlyJungleAsLaner', 0),
                "stat_dodge": challenges.get('skillshotsDodged', 0),

                "rune_primary": primary, "rune_keystone": keystone
            }
            extracted_rows.append(row)
        return extracted_rows
    except: return []

In [None]:
BATCH_SIZE = 5000
chunks_list = []
current_batch = []

print("Iniciando Ingesta (Soporte para Archivos ZIP Directos)...")

def flush_batch():
    if not current_batch: return
    try:
        df_chunk = pl.DataFrame(current_batch)
        stat_cols = [c for c in df_chunk.columns if c.startswith("stat_")]

        optimizations = [
            pl.col("game_id").cast(pl.Utf8),
            pl.col("region").cast(pl.Categorical),
            pl.col("position").cast(pl.Categorical),
            pl.col("patch").cast(pl.Utf8),
            pl.col("target").cast(pl.Int8),
            pl.col("side").cast(pl.Int16),
            pl.col("champ_id").cast(pl.Int16),
            pl.col("rune_primary").cast(pl.Int16),
            pl.col("rune_keystone").cast(pl.Int16),
            pl.col("duration").cast(pl.Int16),
            pl.col("queue").cast(pl.Int16)
        ] + [pl.col(c).cast(pl.Float32) for c in stat_cols]

        df_chunk = df_chunk.with_columns(optimizations)
        chunks_list.append(df_chunk)
    except Exception as e:
        print(f"Error chunk: {e}")
    current_batch.clear()

for region, path in DATA_PATHS.items():
    if not os.path.exists(path):
        print(f"Ruta no encontrada: {path}")
        continue

    print(f"Procesando Región {region} en: {path}")

    if os.path.isfile(path) and path.endswith(".zip"):
        try:
            with zipfile.ZipFile(path, "r") as z:
                all_json_names = [f for f in z.namelist() if f.endswith(".json")]

                for json_name in tqdm.tqdm(all_json_names, desc=f"Extrayendo {region}", unit="json"):
                    with z.open(json_name) as f:
                        try:
                            content = f.read()
                            data = json.loads(content)

                            rows = extract_features_complete(data, region, filename=json_name)
                            current_batch.extend(rows)

                            if len(current_batch) >= BATCH_SIZE * 10:
                                flush_batch()
                        except Exception:
                            continue
        except Exception as e:
            print(f"Error crítico leyendo el ZIP {path}: {e}")

    elif os.path.isdir(path):
        files_in_folder = os.listdir(path)
        for filename in tqdm.tqdm(files_in_folder, desc=f"Carpeta {region}"):
            full_path = os.path.join(path, filename)

            if filename.endswith(".json"):
                 try:
                    with open(full_path, "rb") as f:
                        data = json.loads(f.read())
                        rows = extract_features_complete(data, region, filename=filename)
                        current_batch.extend(rows)
                        if len(current_batch) >= BATCH_SIZE * 10:
                            flush_batch()
                 except: continue

flush_batch()

print(f"Ingesta finalizada. Se generaron {len(chunks_list)} chunks optimizados.")

In [None]:
if len(chunks_list) > 0:
    print("Unificando chunks en un solo DataFrame Maestro...")

    try:
        df_master = pl.concat(chunks_list)

        print(f"Guardando {len(df_master)} registros en {OUTPUT_FILE}...")
        df_master.write_parquet(OUTPUT_FILE, compression="snappy")

        print(f"ETL Completado! Dataset Final: {df_master.shape}")

        del chunks_list

    except Exception as e:
        print(f"Error al concatenar: {e}. Verifica que todos los chunks tengan las mismas columnas.")
else:
    print("No se extrajeron datos.")