# PROYECTO RA1 - FASE 2: Procesamiento con PySpark


In [None]:
# Importar librer√≠as necesarias
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import os

# Crear SparkSession
spark = SparkSession.builder \
    .appName("GymLiftersETL") \
    .config("spark.sql.warehouse.dir", "warehouse") \
    .getOrCreate()

# Configurar nivel de logging para reducir verbosidad
spark.sparkContext.setLogLevel("WARN")

print("‚úÖ SparkSession creada correctamente")
print(f"Spark Version: {spark.version}")


## 1. EXTRACCI√ìN (E) - Carga del Dataset

Cargamos el dataset limpio generado en el flujo de Pandas.


In [None]:
# Cargar el dataset limpio
data_path = "../data/gym_lifters_clean.csv"

# Leer CSV con Spark
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(data_path)

print("‚úÖ Dataset cargado correctamente")
print(f"Filas: {df.count()} | Columnas: {len(df.columns)}")
print("\nEsquema del DataFrame:")
df.printSchema()
print("\nPrimeras 5 filas:")
df.show(5, truncate=False)


## 2. TRANSFORMACIONES CON SPARK

Aplicamos al menos 3 transformaciones diferentes:
1. Selecci√≥n y filtrado de columnas
2. Agregaciones y c√°lculos
3. Creaci√≥n de nuevas columnas derivadas
4. Joins (si es necesario)


In [None]:
# TRANSFORMACI√ìN 1: Selecci√≥n y filtrado
# Filtrar registros v√°lidos (con total_kg > 0 y a√±o v√°lido)
df_filtered = df.filter(
    (col("total_kg").isNotNull()) & 
    (col("total_kg") > 0) &
    (col("year").isNotNull()) &
    (col("year") >= 2010) &
    (col("year") <= 2025)
)

print("‚úÖ Transformaci√≥n 1: Filtrado aplicado")
print(f"Filas despu√©s del filtrado: {df_filtered.count()}")
df_filtered.select("name", "country", "year", "total_kg", "competition").show(5)


In [None]:
# TRANSFORMACI√ìN 2: Creaci√≥n de nuevas columnas derivadas
# Calcular ratio de eficiencia (total_kg / body_weight_kg)
# Calcular diferencia entre clean_and_jerk y snatch
df_with_metrics = df_filtered.withColumn(
    "efficiency_ratio",
    when(col("body_weight_kg") > 0, col("total_kg") / col("body_weight_kg")).otherwise(None)
).withColumn(
    "lift_difference",
    col("clean_and_jerk_kg") - col("snatch_kg")
).withColumn(
    "performance_category",
    when(col("total_kg") >= 350, "Elite")
    .when(col("total_kg") >= 300, "Advanced")
    .when(col("total_kg") >= 250, "Intermediate")
    .otherwise("Beginner")
)

print("‚úÖ Transformaci√≥n 2: Nuevas columnas creadas")
df_with_metrics.select(
    "name", "total_kg", "body_weight_kg", 
    "efficiency_ratio", "lift_difference", "performance_category"
).show(10)


In [None]:
# TRANSFORMACI√ìN 3: Agregaciones por pa√≠s y categor√≠a
agg_by_country = df_with_metrics.groupBy("country", "category") \
    .agg(
        count("*").alias("total_lifters"),
        avg("total_kg").alias("avg_total_kg"),
        max("total_kg").alias("max_total_kg"),
        avg("efficiency_ratio").alias("avg_efficiency")
    ) \
    .orderBy(desc("avg_total_kg"))

print("‚úÖ Transformaci√≥n 3: Agregaciones por pa√≠s y categor√≠a")
agg_by_country.show(10)


## 3. PROCESO ETL COMPLETO

### 3.1 Preparaci√≥n de Tablas Dimensionales

Creamos las tablas de dimensiones:
- **dim_athlete**: Informaci√≥n de los atletas
- **dim_competition**: Informaci√≥n de las competencias
- **dim_team**: Informaci√≥n de equipos y coaches


In [None]:
# DIMENSI√ìN 1: dim_athlete
# Crear tabla de atletas √∫nicos con IDs
from pyspark.sql.window import Window

# Seleccionar atletas √∫nicos (usando name y country como clave de unicidad)
dim_athlete = df_with_metrics.select(
    "athlete_id",
    "name",
    "gender",
    "age",
    "country"
).distinct().filter(
    col("name").isNotNull() & (col("name") != "")
)

# Limpiar athlete_id: si est√° vac√≠o o es null, generar uno nuevo
dim_athlete = dim_athlete.withColumn(
    "athlete_id_clean",
    when((col("athlete_id").isNull()) | (col("athlete_id") == ""), None)
    .otherwise(col("athlete_id"))
)

# Agregar ID num√©rico para la dimensi√≥n (clave primaria)
dim_athlete = dim_athlete.withColumn(
    "id_athlete",
    row_number().over(Window.orderBy("athlete_id_clean", "name", "country"))
)

# Si athlete_id est√° vac√≠o, generar uno basado en el id num√©rico
dim_athlete = dim_athlete.withColumn(
    "final_athlete_id",
    coalesce(
        col("athlete_id_clean"), 
        concat(lit("ath_"), col("id_athlete"))
    )
).select(
    "id_athlete",
    col("final_athlete_id").alias("athlete_id"),
    "name",
    "gender",
    "age",
    "country"
).distinct()

print("‚úÖ Dimensi√≥n dim_athlete creada")
print(f"Total de atletas √∫nicos: {dim_athlete.count()}")
dim_athlete.show(10)


In [None]:
# DIMENSI√ìN 2: dim_competition
# Crear tabla de competencias √∫nicas
dim_competition = df_with_metrics.select(
    "competition",
    "year",
    "category"
).distinct().filter(
    col("competition").isNotNull() & 
    col("year").isNotNull()
)

# Agregar ID num√©rico
dim_competition = dim_competition.withColumn(
    "id_competition",
    row_number().over(Window.orderBy("year", "competition", "category"))
).select(
    "id_competition",
    "competition",
    "year",
    "category"
)

print("‚úÖ Dimensi√≥n dim_competition creada")
print(f"Total de competencias √∫nicas: {dim_competition.count()}")
dim_competition.show(10)


In [None]:
# DIMENSI√ìN 3: dim_team
# Crear tabla de equipos y coaches √∫nicos
dim_team = df_with_metrics.select(
    "team",
    "coach"
).distinct().filter(col("team").isNotNull())

# Agregar ID num√©rico
dim_team = dim_team.withColumn(
    "id_team",
    row_number().over(Window.orderBy("team", "coach"))
).select(
    "id_team",
    "team",
    "coach"
)

print("‚úÖ Dimensi√≥n dim_team creada")
print(f"Total de equipos √∫nicos: {dim_team.count()}")
dim_team.show(10)


### 3.2 Creaci√≥n de Tabla de Hechos

Creamos la tabla de hechos que relaciona todas las dimensiones con las m√©tricas de levantamiento.


In [None]:
# TABLA DE HECHOS: fact_lifting
# Preparar datos para la tabla de hechos con joins a las dimensiones

# Preparar el DataFrame base con las claves de uni√≥n
fact_base = df_with_metrics.select(
    "athlete_id",
    "name",
    "country",
    "competition",
    "year",
    "category",
    "team",
    "coach",
    "snatch_kg",
    "clean_and_jerk_kg",
    "total_kg",
    "body_weight_kg",
    "event_rank",
    "medal",
    "record_status",
    "lifting_style",
    "efficiency_ratio",
    "lift_difference",
    "performance_category"
)

# Join con dim_athlete: primero intentar por athlete_id, luego por name+country
fact_with_athlete = fact_base.join(
    dim_athlete,
    ((fact_base.athlete_id == dim_athlete.athlete_id) & 
     fact_base.athlete_id.isNotNull() & 
     (fact_base.athlete_id != "")) |
    ((fact_base.name == dim_athlete.name) & 
     (fact_base.country == dim_athlete.country)),
    "inner"  # Usar inner join para asegurar que todos tengan id_athlete
).select(
    fact_base["competition"],
    fact_base["year"],
    fact_base["category"],
    fact_base["team"],
    fact_base["coach"],
    fact_base["snatch_kg"],
    fact_base["clean_and_jerk_kg"],
    fact_base["total_kg"],
    fact_base["body_weight_kg"],
    fact_base["event_rank"],
    fact_base["medal"],
    fact_base["record_status"],
    fact_base["lifting_style"],
    fact_base["efficiency_ratio"],
    fact_base["lift_difference"],
    fact_base["performance_category"],
    dim_athlete["id_athlete"]
)

# Join con dim_competition
fact_with_competition = fact_with_athlete.join(
    dim_competition,
    (fact_with_athlete.competition == dim_competition.competition) &
    (fact_with_athlete.year == dim_competition.year) &
    (coalesce(fact_with_athlete.category, lit("")) == coalesce(dim_competition.category, lit(""))),
    "inner"
).select(
    fact_with_athlete["id_athlete"],
    fact_with_athlete["team"],
    fact_with_athlete["coach"],
    fact_with_athlete["snatch_kg"],
    fact_with_athlete["clean_and_jerk_kg"],
    fact_with_athlete["total_kg"],
    fact_with_athlete["body_weight_kg"],
    fact_with_athlete["event_rank"],
    fact_with_athlete["medal"],
    fact_with_athlete["record_status"],
    fact_with_athlete["lifting_style"],
    fact_with_athlete["efficiency_ratio"],
    fact_with_athlete["lift_difference"],
    fact_with_athlete["performance_category"],
    dim_competition["id_competition"]
)

# Join con dim_team
fact_lifting = fact_with_competition.join(
    dim_team,
    (fact_with_competition.team == dim_team.team) &
    (coalesce(fact_with_competition.coach, lit("")) == coalesce(dim_team.coach, lit(""))),
    "inner"
).select(
    col("id_athlete").alias("id_athlete"),
    col("id_competition").alias("id_competition"),
    col("id_team").alias("id_team"),
    "snatch_kg",
    "clean_and_jerk_kg",
    "total_kg",
    "body_weight_kg",
    "event_rank",
    "medal",
    "record_status",
    "lifting_style",
    "efficiency_ratio",
    "lift_difference",
    "performance_category"
)

print("‚úÖ Tabla de hechos fact_lifting creada")
print(f"Total de registros en fact_lifting: {fact_lifting.count()}")
fact_lifting.show(10)


In [None]:
# Importar librer√≠as para SQLite
import sqlite3
import pandas as pd
import os

# Crear directorio warehouse si no existe
os.makedirs("../warehouse", exist_ok=True)

# Ruta de la base de datos
db_path = "../warehouse/warehouse_pyspark.db"

# Eliminar base de datos existente si existe (para recrearla)
if os.path.exists(db_path):
    os.remove(db_path)
    print("‚ö†Ô∏è Base de datos existente eliminada")

# Crear conexi√≥n a SQLite
conn = sqlite3.connect(db_path)
print(f"‚úÖ Conexi√≥n a SQLite establecida: {db_path}")

# Convertir DataFrames de Spark a Pandas y cargar en SQLite
print("\nüìä Cargando tablas en SQLite...")

# Cargar dim_athlete
dim_athlete_pd = dim_athlete.toPandas()
dim_athlete_pd.to_sql("dim_athlete", conn, if_exists="replace", index=False)
print(f"‚úÖ dim_athlete: {len(dim_athlete_pd)} registros cargados")

# Cargar dim_competition
dim_competition_pd = dim_competition.toPandas()
dim_competition_pd.to_sql("dim_competition", conn, if_exists="replace", index=False)
print(f"‚úÖ dim_competition: {len(dim_competition_pd)} registros cargados")

# Cargar dim_team
dim_team_pd = dim_team.toPandas()
dim_team_pd.to_sql("dim_team", conn, if_exists="replace", index=False)
print(f"‚úÖ dim_team: {len(dim_team_pd)} registros cargados")

# Cargar fact_lifting
fact_lifting_pd = fact_lifting.toPandas()
fact_lifting_pd.to_sql("fact_lifting", conn, if_exists="replace", index=False)
print(f"‚úÖ fact_lifting: {len(fact_lifting_pd)} registros cargados")

# Cerrar conexi√≥n
conn.close()
print(f"\n‚úÖ Proceso ETL completado. Base de datos guardada en: {db_path}")


### 3.4 Verificaci√≥n de Datos Cargados

Verificamos que los datos se hayan cargado correctamente en SQLite.


In [None]:
# Verificar datos cargados
conn = sqlite3.connect(db_path)

# Consultar n√∫mero de registros en cada tabla
tables = ["dim_athlete", "dim_competition", "dim_team", "fact_lifting"]
print("üìä Resumen de tablas en warehouse_pyspark.db:\n")

for table in tables:
    count = pd.read_sql_query(f"SELECT COUNT(*) as count FROM {table}", conn)
    print(f"{table}: {count['count'].iloc[0]} registros")

# Mostrar muestras de cada tabla
print("\n" + "="*50)
print("MUESTRA DE dim_athlete:")
print("="*50)
pd.read_sql_query("SELECT * FROM dim_athlete LIMIT 5", conn)

print("\n" + "="*50)
print("MUESTRA DE dim_competition:")
print("="*50)
pd.read_sql_query("SELECT * FROM dim_competition LIMIT 5", conn)

print("\n" + "="*50)
print("MUESTRA DE dim_team:")
print("="*50)
pd.read_sql_query("SELECT * FROM dim_team LIMIT 5", conn)

print("\n" + "="*50)
print("MUESTRA DE fact_lifting:")
print("="*50)
pd.read_sql_query("SELECT * FROM fact_lifting LIMIT 5", conn)

conn.close()


### 3.5 Consultas de Ejemplo

Ejemplos de consultas que se pueden realizar sobre el Data Warehouse.


In [None]:
# Ejemplos de consultas SQL
conn = sqlite3.connect(db_path)

print("="*60)
print("CONSULTA 1: Top 10 atletas por total_kg")
print("="*60)
query1 = """
SELECT 
    a.name,
    a.country,
    a.gender,
    COUNT(f.id_athlete) as total_competitions,
    AVG(f.total_kg) as avg_total_kg,
    MAX(f.total_kg) as max_total_kg
FROM fact_lifting f
JOIN dim_athlete a ON f.id_athlete = a.id_athlete
GROUP BY a.id_athlete, a.name, a.country, a.gender
ORDER BY max_total_kg DESC
LIMIT 10
"""
result1 = pd.read_sql_query(query1, conn)
print(result1)

print("\n" + "="*60)
print("CONSULTA 2: Promedio de total_kg por pa√≠s")
print("="*60)
query2 = """
SELECT 
    a.country,
    COUNT(DISTINCT a.id_athlete) as num_athletes,
    AVG(f.total_kg) as avg_total_kg,
    MAX(f.total_kg) as max_total_kg
FROM fact_lifting f
JOIN dim_athlete a ON f.id_athlete = a.id_athlete
GROUP BY a.country
ORDER BY avg_total_kg DESC
LIMIT 10
"""
result2 = pd.read_sql_query(query2, conn)
print(result2)

print("\n" + "="*60)
print("CONSULTA 3: Competencias con m√°s participantes")
print("="*60)
query3 = """
SELECT 
    c.competition,
    c.year,
    c.category,
    COUNT(f.id_athlete) as num_participants,
    AVG(f.total_kg) as avg_total_kg
FROM fact_lifting f
JOIN dim_competition c ON f.id_competition = c.id_competition
GROUP BY c.id_competition, c.competition, c.year, c.category
ORDER BY num_participants DESC
LIMIT 10
"""
result3 = pd.read_sql_query(query3, conn)
print(result3)

print("\n" + "="*60)
print("CONSULTA 4: Equipos con mejor rendimiento")
print("="*60)
query4 = """
SELECT 
    t.team,
    t.coach,
    COUNT(DISTINCT f.id_athlete) as num_athletes,
    AVG(f.total_kg) as avg_total_kg,
    COUNT(CASE WHEN f.medal IS NOT NULL AND f.medal != '' THEN 1 END) as total_medals
FROM fact_lifting f
JOIN dim_team t ON f.id_team = t.id_team
GROUP BY t.id_team, t.team, t.coach
ORDER BY avg_total_kg DESC
LIMIT 10
"""
result4 = pd.read_sql_query(query4, conn)
print(result4)

conn.close()


In [None]:
# Cerrar SparkSession
spark.stop()
print("‚úÖ SparkSession cerrada correctamente")
