In [96]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Olympic Games").getOrCreate()

In [97]:
# Definir el esquema
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType

schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Name", StringType(), True),
    StructField("Sex", StringType(), True),
    StructField("Age", StringType(), True), 
    StructField("Height", StringType(), True),
    StructField("Weight", StringType(), True), 
    StructField("Team", StringType(), True),
    StructField("NOC", StringType(), True),
    StructField("Games", StringType(), True),
    StructField("Year", StringType(), True),
    StructField("Season", StringType(), True),
    StructField("City", StringType(), True),
    StructField("Sport", StringType(), True),
    StructField("Event", StringType(), True),
    StructField("Medal", StringType(), True)
])

# Carga del dataset

df = spark.read.csv("C:/Users/Usuario/ETL olympic games/raw/athlete_events.csv", header=True, schema=schema)

# Ver schema y tipos de datos
df.printSchema()
df.show()

root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Height: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- Team: string (nullable = true)
 |-- NOC: string (nullable = true)
 |-- Games: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Season: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Sport: string (nullable = true)
 |-- Event: string (nullable = true)
 |-- Medal: string (nullable = true)

+---+--------------------+---+---+------+------+--------------+---+-----------+----+------+-----------+--------------------+--------------------+-----+
| ID|                Name|Sex|Age|Height|Weight|          Team|NOC|      Games|Year|Season|       City|               Sport|               Event|Medal|
+---+--------------------+---+---+------+------+--------------+---+-----------+----+------+-----------+--------------------+--------------------

In [126]:
# Renombrar columnas, cambiar tipos de datos y manejo de nulos/NA

from pyspark.sql.functions import col, coalesce, when, lit, mean, sum, round, regexp_extract
from pyspark.sql.types import IntegerType, DoubleType

# Renombrar columnas en castellano y minúscula
df_modificado = df.withColumnRenamed("Name", "nombre_atleta") \
                  .withColumnRenamed("Sex", "genero") \
                  .withColumnRenamed("Age", "edad") \
                  .withColumnRenamed("Height", "altura") \
                  .withColumnRenamed("Weight", "peso") \
                  .withColumnRenamed("Team", "equipo") \
                  .withColumnRenamed("NOC", "pais") \
                  .withColumnRenamed("Games", "juegos") \
                  .withColumnRenamed("Year", "anio") \
                  .withColumnRenamed("Season", "temporada") \
                  .withColumnRenamed("City", "ciudad") \
                  .withColumnRenamed("Sport", "deporte") \
                  .withColumnRenamed("Event", "evento") \
                  .withColumnRenamed("Medal", "medalla")

# Reemplazar 'NA' con 'Sin Medalla' en la columna medalla
df_modificado = df_modificado.withColumn("medalla", when(col("medalla") == "NA", "Sin Medalla").otherwise(col("medalla")))

# Extraer el año numérico de la columna 'anio'

df_modificado = df_modificado.withColumn("anio", regexp_extract(col("anio"), "\\d{4}", 0))
df_modificado = df_modificado.withColumn("anio", when(col("anio") == "", lit(None)).otherwise(col("anio")).cast(IntegerType()))

# Limpiar y convertir otras columnas
df_modificado = df_modificado.withColumn("edad", when(col("edad").rlike("^[0-9]+$"), col("edad")).otherwise(lit(None)).cast(IntegerType())) \
                             .withColumn("altura", when(col("altura").rlike("^[0-9.]+$"), col("altura")).otherwise(lit(None)).cast(DoubleType())) \
                             .withColumn("peso", when(col("peso").rlike("^[0-9.]+$"), col("peso")).otherwise(lit(None)).cast(DoubleType()))

# Calcular las medias para imputación
medias = df_modificado.agg(round(mean(col("altura")), 2).alias("media_altura"),
                                       round(mean(col("peso")), 2).alias("media_peso")).first().asDict()

mean_altura = medias["media_altura"]
mean_peso = medias["media_peso"]

# Rellenar los valores nulos con las medias
imputacion = {
    "altura": mean_altura,
    "peso": mean_peso
}

df_modificado = df_modificado.fillna(imputacion)

# Mostrar el esquema final para verificar los tipos de datos
print("Esquema después de las transformaciones e imputación:")
df_modificado.printSchema()

# Mostrar una muestra del DataFrame final
print("Muestra del DataFrame final:")
df_modificado.show(5)

Esquema después de las transformaciones e imputación:
root
 |-- ID: integer (nullable = true)
 |-- nombre_atleta: string (nullable = true)
 |-- genero: string (nullable = true)
 |-- edad: integer (nullable = true)
 |-- altura: double (nullable = false)
 |-- peso: double (nullable = false)
 |-- equipo: string (nullable = true)
 |-- pais: string (nullable = true)
 |-- juegos: string (nullable = true)
 |-- anio: integer (nullable = true)
 |-- temporada: string (nullable = true)
 |-- ciudad: string (nullable = true)
 |-- deporte: string (nullable = true)
 |-- evento: string (nullable = true)
 |-- medalla: string (nullable = true)

Muestra del DataFrame final:
+---+--------------------+------+----+------+-----+--------------+----+-----------+----+---------+---------+-------------+--------------------+-----------+
| ID|       nombre_atleta|genero|edad|altura| peso|        equipo|pais|     juegos|anio|temporada|   ciudad|      deporte|              evento|    medalla|
+---+-------------------

In [127]:
# Query para ver si la base de datos está OK

df_modificado.createOrReplaceTempView("datos_finales")

sql_query = """
SELECT
    nombre_atleta,
    deporte,
    COUNT(*) AS total_medallas_oro
FROM
    datos_finales
WHERE
    medalla = 'Gold'
GROUP BY
    nombre_atleta, deporte
ORDER BY
    total_medallas_oro DESC
"""

spark.sql(sql_query).show(truncate=False)

+---------------------------------------------------+----------+------------------+
|nombre_atleta                                      |deporte   |total_medallas_oro|
+---------------------------------------------------+----------+------------------+
|Michael Fred Phelps, II                            |Swimming  |23                |
|"Raymond Clarence ""Ray"" Ewry"                    |Athletics |10                |
|Larysa Semenivna Latynina (Diriy-)                 |Gymnastics|9                 |
|"Frederick Carlton ""Carl"" Lewis"                 |Athletics |9                 |
|Paavo Johannes Nurmi                               |Athletics |9                 |
|Mark Andrew Spitz                                  |Swimming  |9                 |
|Usain St. Leo Bolt                                 |Athletics |8                 |
|"Matthew Nicholas ""Matt"" Biondi"                 |Swimming  |8                 |
|Ole Einar Bjrndalen                                |Biathlon  |8           

In [130]:
# Guardar dataframe modificado en formato parquet

ruta = "../processed/olympic_data_cleaned.parquet"
df_modificado.write.mode("overwrite").parquet(ruta)

print(f"Datos procesados cargados con éxito en la carpeta '{ruta}'.")



Datos procesados cargados con éxito en la carpeta '../processed/olympic_data_cleaned.parquet'.


In [132]:
# Finalizar la sesión de Spark
spark.stop()

print("Sesión de Spark finalizada.")

Sesión de Spark finalizada.
