# An√°lisis de el Retraso de las l√≠neas a√©reas de los Estados Unidos
- Se hace el an√°lisis de grandes vol√∫menes de informaci√≥n usando herramientas especialmente dise√±adas para el manejo de bases de datos de este calibre, se usar√° Spark mediante PySpark con Python para lograr este fin.

## Se prepara el entorno con PySpark para Google Colab

In [1]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://dlcdn.apache.org/spark/spark-3.5.7/spark-3.5.7-bin-hadoop3.tgz
!tar -xzf spark-3.5.7-bin-hadoop3.tgz
!pip install -q findspark

Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:2 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Hit:5 https://cli.github.com/packages stable InRelease
Get:6 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease [18.1 kB]
Hit:7 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Get:8 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:9 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:10 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:11 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Get:12 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ Packages [83.6 kB]
Get:13 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.7-bin-hadoop3"

In [2]:
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Spark") \
    .master("local[*]") \
    .getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/02 04:44:13 WARN Utils: Your hostname, nixos-desktop, resolves to a loopback address: 127.0.0.2; using 192.168.100.38 instead (on interface enp9s0)
25/12/02 04:44:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/02 04:44:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Se monta la unidad de Google Drive para poder obtener los CSVs sobre los que PySpark actuar√° para el an√°lisis de datos que se va a aplicar.

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
!ln -s /content/drive/MyDrive/ColabShared/airline_delay_analysis/ /content/csvs
!ls /content/csvs/

2009.csv  2011.csv  2013.csv  2015.csv	2017.csv  2019.csv_  processed
2010.csv  2012.csv  2014.csv  2016.csv	2018.csv  20.csv_


In [6]:
# Secci√≥n de rutas a usar, se mantienen como variables a conveniencia
ruta_general_csvs = "/content/csvs/"
ruta_general_processed = "/content/csvs/processed/"

## Se prepara el entorno con PySpark para instalaci√≥n local

In [1]:
# Usar para una instalaci√≥n local de Spark en Nix
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SparkLocal") \
    .master("local[*]") \
    .config("spark.driver.memory", "8g") \
    .config("spark.sql.debug.maxToStringFields", "100")\
    .getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/04 01:22:26 WARN Utils: Your hostname, nixos-desktop, resolves to a loopback address: 127.0.0.2; using 192.168.100.38 instead (on interface enp9s0)
25/12/04 01:22:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/04 01:22:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [31]:
# Rutas para versi√≥n local
ruta_general_csvs_local = "csv/"
ruta_salida_csvs_local = "csvs_processed/"
ruta_salida_csvs_procesados = "csvs_cleaned/"
ruta_salida_muestreo = "csv_muestreo/"
ruta_salida_encoding = "csv_encoded/"
ruta_train = "csv_train/"
ruta_test = "csv_test/"

## Secci√≥n de funciones a usar en el c√≥digo

In [4]:
# Funciones para guardado (volcado en almacenamiento) de CSVs
def guardar_csvs_en_almacenamiento(df_a_guardar, ruta_guardado:str):
    df_a_guardar.write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(ruta_guardado)

# Funci√≥n para cargar datos del almacenamiento a la RAM
def cargar_csvs(ruta_a_leer):
    df = spark.read.option("header", "true").csv(ruta_a_leer)
    return df

## Preprocesamiento de datos

Como los CSVs que se tienen poseen estructuras un tanto diferentes a partir del 2018, se buscar√° formar un CSV nuevo a partir de todos para poder estandarizar su estructura.

Se tomar√° en cuenta √∫nicamente lo de 10 a√±os que abarca de 2009 a 2018

In [5]:
# Importaci√≥n de funciones/utilidades de pyspark.sql a usar
from pyspark.sql.functions import col, when, count, isnan, date_format, lit, concat

In [14]:
import glob
import os
from pyspark.sql.types import *
data_dir = "csv/"
 
# Filtrar a√±os 2009-2018
years_to_load = ['2009', '2010', '2011', '2012', '2013', '2014', '2015', '2016', '2017', '2018']
all_csv_files = sorted(glob.glob(f"{data_dir}/*.csv"))
csv_files = [f for f in all_csv_files if any(year in os.path.basename(f) for year in years_to_load)]
 
if len(csv_files) == 0:
    print("‚ùå No se encontraron archivos CSV")
else:
    # Definir schema manualmente basado en las columnas reales del dataset
    schema = StructType([
        StructField("FL_DATE", StringType(), True),
        StructField("OP_CARRIER", StringType(), True),
        StructField("OP_CARRIER_FL_NUM", IntegerType(), True),
        StructField("ORIGIN", StringType(), True),
        StructField("DEST", StringType(), True),
        StructField("CRS_DEP_TIME", IntegerType(), True),
        StructField("DEP_TIME", DoubleType(), True),
        StructField("DEP_DELAY", DoubleType(), True),
        StructField("TAXI_OUT", DoubleType(), True),
        StructField("WHEELS_OFF", DoubleType(), True),
        StructField("WHEELS_ON", DoubleType(), True),
        StructField("TAXI_IN", DoubleType(), True),
        StructField("CRS_ARR_TIME", IntegerType(), True),
        StructField("ARR_TIME", DoubleType(), True),
        StructField("ARR_DELAY", DoubleType(), True),
        StructField("CANCELLED", DoubleType(), True),
        StructField("CANCELLATION_CODE", StringType(), True),
        StructField("DIVERTED", DoubleType(), True),
        StructField("CRS_ELAPSED_TIME", DoubleType(), True),
        StructField("ACTUAL_ELAPSED_TIME", DoubleType(), True),
        StructField("AIR_TIME", DoubleType(), True),
        StructField("DISTANCE", DoubleType(), True),
        StructField("CARRIER_DELAY", DoubleType(), True),
        StructField("WEATHER_DELAY", DoubleType(), True),
        StructField("NAS_DELAY", DoubleType(), True),
        StructField("SECURITY_DELAY", DoubleType(), True),
        StructField("LATE_AIRCRAFT_DELAY", DoubleType(), True),
        StructField("Unnamed_27", StringType(), True)
    ])
    print(f"üìä Cargando {len(csv_files)} archivos (modo R√ÅPIDO)...")
    # Cargar SIN inferSchema
    df = spark.read.csv(
        csv_files,
        header=True,
        schema=schema  # ‚Üê Esto evita el scan completo
    )
    print(f"‚úÖ Cargado! Registros: {df.count():,}")
    df.printSchema()

üìä Cargando 10 archivos (modo R√ÅPIDO)...




‚úÖ Cargado! Registros: 61,556,964
root
 |-- FL_DATE: string (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- DEP_TIME: double (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- TAXI_OUT: double (nullable = true)
 |-- WHEELS_OFF: double (nullable = true)
 |-- WHEELS_ON: double (nullable = true)
 |-- TAXI_IN: double (nullable = true)
 |-- CRS_ARR_TIME: integer (nullable = true)
 |-- ARR_TIME: double (nullable = true)
 |-- ARR_DELAY: double (nullable = true)
 |-- CANCELLED: double (nullable = true)
 |-- CANCELLATION_CODE: string (nullable = true)
 |-- DIVERTED: double (nullable = true)
 |-- CRS_ELAPSED_TIME: double (nullable = true)
 |-- ACTUAL_ELAPSED_TIME: double (nullable = true)
 |-- AIR_TIME: double (nullable = true)
 |-- DISTANCE: double (nullable = true)
 |-- CARRIER_DELAY: double (null

                                                                                

In [6]:
# Juntar todos los DataFrames (del 2009 al 2018) en uno solo para su an√°lisis
df_csvs = spark.read.option("header", "True").option("inferSchema", "True").csv(ruta_general_csvs_local)
total_instancias = df_csvs.count()
print(f"Total de instancias: {total_instancias}")
df_csvs

                                                                                

Total de instancias: 61556964


FL_DATE,OP_CARRIER,OP_CARRIER_FL_NUM,ORIGIN,DEST,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,CANCELLED,CANCELLATION_CODE,DIVERTED,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,DISTANCE,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,Unnamed: 27
2009-01-01,XE,1204,DCA,EWR,1100.0,1058.0,-2.0,18.0,1116.0,1158.0,8.0,1202.0,1206.0,4.0,0.0,,0.0,62.0,68.0,42.0,199.0,,,,,,
2009-01-01,XE,1206,EWR,IAD,1510.0,1509.0,-1.0,28.0,1537.0,1620.0,4.0,1632.0,1624.0,-8.0,0.0,,0.0,82.0,75.0,43.0,213.0,,,,,,
2009-01-01,XE,1207,EWR,DCA,1100.0,1059.0,-1.0,20.0,1119.0,1155.0,6.0,1210.0,1201.0,-9.0,0.0,,0.0,70.0,62.0,36.0,199.0,,,,,,
2009-01-01,XE,1208,DCA,EWR,1240.0,1249.0,9.0,10.0,1259.0,1336.0,9.0,1357.0,1345.0,-12.0,0.0,,0.0,77.0,56.0,37.0,199.0,,,,,,
2009-01-01,XE,1209,IAD,EWR,1715.0,1705.0,-10.0,24.0,1729.0,1809.0,13.0,1900.0,1822.0,-38.0,0.0,,0.0,105.0,77.0,40.0,213.0,,,,,,
2009-01-01,XE,1212,ATL,EWR,1915.0,1913.0,-2.0,19.0,1932.0,2108.0,15.0,2142.0,2123.0,-19.0,0.0,,0.0,147.0,130.0,96.0,745.0,,,,,,
2009-01-01,XE,1212,CLE,ATL,1645.0,1637.0,-8.0,12.0,1649.0,1820.0,5.0,1842.0,1825.0,-17.0,0.0,,0.0,117.0,108.0,91.0,554.0,,,,,,
2009-01-01,XE,1214,DCA,EWR,1915.0,1908.0,-7.0,9.0,1917.0,1953.0,34.0,2035.0,2027.0,-8.0,0.0,,0.0,80.0,79.0,36.0,199.0,,,,,,
2009-01-01,XE,1215,EWR,DCA,1715.0,1710.0,-5.0,28.0,1738.0,1819.0,4.0,1838.0,1823.0,-15.0,0.0,,0.0,83.0,73.0,41.0,199.0,,,,,,
2009-01-01,XE,1217,EWR,DCA,1300.0,1255.0,-5.0,15.0,1310.0,1349.0,7.0,1408.0,1356.0,-12.0,0.0,,0.0,68.0,61.0,39.0,199.0,,,,,,


In [16]:
df_dropped = df.drop('Unnamed_27')
df_dropped

FL_DATE,OP_CARRIER,OP_CARRIER_FL_NUM,ORIGIN,DEST,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,CANCELLED,CANCELLATION_CODE,DIVERTED,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,DISTANCE,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY
2009-01-01,XE,1204,DCA,EWR,1100,1058.0,-2.0,18.0,1116.0,1158.0,8.0,1202,1206.0,4.0,0.0,,0.0,62.0,68.0,42.0,199.0,,,,,
2009-01-01,XE,1206,EWR,IAD,1510,1509.0,-1.0,28.0,1537.0,1620.0,4.0,1632,1624.0,-8.0,0.0,,0.0,82.0,75.0,43.0,213.0,,,,,
2009-01-01,XE,1207,EWR,DCA,1100,1059.0,-1.0,20.0,1119.0,1155.0,6.0,1210,1201.0,-9.0,0.0,,0.0,70.0,62.0,36.0,199.0,,,,,
2009-01-01,XE,1208,DCA,EWR,1240,1249.0,9.0,10.0,1259.0,1336.0,9.0,1357,1345.0,-12.0,0.0,,0.0,77.0,56.0,37.0,199.0,,,,,
2009-01-01,XE,1209,IAD,EWR,1715,1705.0,-10.0,24.0,1729.0,1809.0,13.0,1900,1822.0,-38.0,0.0,,0.0,105.0,77.0,40.0,213.0,,,,,
2009-01-01,XE,1212,ATL,EWR,1915,1913.0,-2.0,19.0,1932.0,2108.0,15.0,2142,2123.0,-19.0,0.0,,0.0,147.0,130.0,96.0,745.0,,,,,
2009-01-01,XE,1212,CLE,ATL,1645,1637.0,-8.0,12.0,1649.0,1820.0,5.0,1842,1825.0,-17.0,0.0,,0.0,117.0,108.0,91.0,554.0,,,,,
2009-01-01,XE,1214,DCA,EWR,1915,1908.0,-7.0,9.0,1917.0,1953.0,34.0,2035,2027.0,-8.0,0.0,,0.0,80.0,79.0,36.0,199.0,,,,,
2009-01-01,XE,1215,EWR,DCA,1715,1710.0,-5.0,28.0,1738.0,1819.0,4.0,1838,1823.0,-15.0,0.0,,0.0,83.0,73.0,41.0,199.0,,,,,
2009-01-01,XE,1217,EWR,DCA,1300,1255.0,-5.0,15.0,1310.0,1349.0,7.0,1408,1356.0,-12.0,0.0,,0.0,68.0,61.0,39.0,199.0,,,,,


In [7]:
# Eliminar columna innecesaria que viene por defecto en cada CSV
df_original = df_csvs.drop('Unnamed: 27')
# Guardar los CSV al almacenamiento sin la columna innecesaria
guardar_csvs_en_almacenamiento(df_original, ruta_salida_csvs_local)
df_original

                                                                                

FL_DATE,OP_CARRIER,OP_CARRIER_FL_NUM,ORIGIN,DEST,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,CANCELLED,CANCELLATION_CODE,DIVERTED,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,DISTANCE,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY
2009-01-01,XE,1204,DCA,EWR,1100.0,1058.0,-2.0,18.0,1116.0,1158.0,8.0,1202.0,1206.0,4.0,0.0,,0.0,62.0,68.0,42.0,199.0,,,,,
2009-01-01,XE,1206,EWR,IAD,1510.0,1509.0,-1.0,28.0,1537.0,1620.0,4.0,1632.0,1624.0,-8.0,0.0,,0.0,82.0,75.0,43.0,213.0,,,,,
2009-01-01,XE,1207,EWR,DCA,1100.0,1059.0,-1.0,20.0,1119.0,1155.0,6.0,1210.0,1201.0,-9.0,0.0,,0.0,70.0,62.0,36.0,199.0,,,,,
2009-01-01,XE,1208,DCA,EWR,1240.0,1249.0,9.0,10.0,1259.0,1336.0,9.0,1357.0,1345.0,-12.0,0.0,,0.0,77.0,56.0,37.0,199.0,,,,,
2009-01-01,XE,1209,IAD,EWR,1715.0,1705.0,-10.0,24.0,1729.0,1809.0,13.0,1900.0,1822.0,-38.0,0.0,,0.0,105.0,77.0,40.0,213.0,,,,,
2009-01-01,XE,1212,ATL,EWR,1915.0,1913.0,-2.0,19.0,1932.0,2108.0,15.0,2142.0,2123.0,-19.0,0.0,,0.0,147.0,130.0,96.0,745.0,,,,,
2009-01-01,XE,1212,CLE,ATL,1645.0,1637.0,-8.0,12.0,1649.0,1820.0,5.0,1842.0,1825.0,-17.0,0.0,,0.0,117.0,108.0,91.0,554.0,,,,,
2009-01-01,XE,1214,DCA,EWR,1915.0,1908.0,-7.0,9.0,1917.0,1953.0,34.0,2035.0,2027.0,-8.0,0.0,,0.0,80.0,79.0,36.0,199.0,,,,,
2009-01-01,XE,1215,EWR,DCA,1715.0,1710.0,-5.0,28.0,1738.0,1819.0,4.0,1838.0,1823.0,-15.0,0.0,,0.0,83.0,73.0,41.0,199.0,,,,,
2009-01-01,XE,1217,EWR,DCA,1300.0,1255.0,-5.0,15.0,1310.0,1349.0,7.0,1408.0,1356.0,-12.0,0.0,,0.0,68.0,61.0,39.0,199.0,,,,,


In [8]:
# Determinar la cantidad de instancias que no fueron canceladas
df_canceladas_filtered = df_original.filter(col("CANCELLED") == 1).count()
print(f"Total de vuelos cancelados: {df_canceladas_filtered}")



Total de vuelos cancelados: 973209


                                                                                

### Se preparan los datos dejando las columnas a usar para formar los conjuntos que se usar√°n en los modelos

In [9]:
# Lista de atributos a usar para el objetivo
atributos_utiles = [
    'FL_DATE', 'OP_CARRIER', 'ORIGIN', 'DEST', 'CRS_DEP_TIME', 'CRS_ARR_TIME', 'CRS_ELAPSED_TIME', 'DISTANCE'
]

In [10]:
# Obtener las instancias √∫tiles (no canceladas)
df_sin_canceladas = df_original.filter(col("CANCELLED") == 0)
# Como la variable importante para el objetivo es 'DEP_DELAY' y con eso se determina si un vuelo est√° a tiempo o no se crea nueva columna
# Si es mayor a 15 minutos se considerar√° como retraso en caso contrario como 'a tiempo', convirti√©ndola en clasificaci√≥n binaria
df_late = df_sin_canceladas.withColumn("LATE", when(col("DEP_DELAY") > 15, 1.0).otherwise(0.0))
df_late

FL_DATE,OP_CARRIER,OP_CARRIER_FL_NUM,ORIGIN,DEST,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,CANCELLED,CANCELLATION_CODE,DIVERTED,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,DISTANCE,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,LATE
2009-01-01,XE,1204,DCA,EWR,1100.0,1058.0,-2.0,18.0,1116.0,1158.0,8.0,1202.0,1206.0,4.0,0.0,,0.0,62.0,68.0,42.0,199.0,,,,,,0.0
2009-01-01,XE,1206,EWR,IAD,1510.0,1509.0,-1.0,28.0,1537.0,1620.0,4.0,1632.0,1624.0,-8.0,0.0,,0.0,82.0,75.0,43.0,213.0,,,,,,0.0
2009-01-01,XE,1207,EWR,DCA,1100.0,1059.0,-1.0,20.0,1119.0,1155.0,6.0,1210.0,1201.0,-9.0,0.0,,0.0,70.0,62.0,36.0,199.0,,,,,,0.0
2009-01-01,XE,1208,DCA,EWR,1240.0,1249.0,9.0,10.0,1259.0,1336.0,9.0,1357.0,1345.0,-12.0,0.0,,0.0,77.0,56.0,37.0,199.0,,,,,,0.0
2009-01-01,XE,1209,IAD,EWR,1715.0,1705.0,-10.0,24.0,1729.0,1809.0,13.0,1900.0,1822.0,-38.0,0.0,,0.0,105.0,77.0,40.0,213.0,,,,,,0.0
2009-01-01,XE,1212,ATL,EWR,1915.0,1913.0,-2.0,19.0,1932.0,2108.0,15.0,2142.0,2123.0,-19.0,0.0,,0.0,147.0,130.0,96.0,745.0,,,,,,0.0
2009-01-01,XE,1212,CLE,ATL,1645.0,1637.0,-8.0,12.0,1649.0,1820.0,5.0,1842.0,1825.0,-17.0,0.0,,0.0,117.0,108.0,91.0,554.0,,,,,,0.0
2009-01-01,XE,1214,DCA,EWR,1915.0,1908.0,-7.0,9.0,1917.0,1953.0,34.0,2035.0,2027.0,-8.0,0.0,,0.0,80.0,79.0,36.0,199.0,,,,,,0.0
2009-01-01,XE,1215,EWR,DCA,1715.0,1710.0,-5.0,28.0,1738.0,1819.0,4.0,1838.0,1823.0,-15.0,0.0,,0.0,83.0,73.0,41.0,199.0,,,,,,0.0
2009-01-01,XE,1217,EWR,DCA,1300.0,1255.0,-5.0,15.0,1310.0,1349.0,7.0,1408.0,1356.0,-12.0,0.0,,0.0,68.0,61.0,39.0,199.0,,,,,,0.0


In [11]:
# Formar el DataFrame con las columnas necesarias m√°s variable objetivo
df_ajustado = df_late.select(*atributos_utiles, "LATE")
df_ajustado

FL_DATE,OP_CARRIER,ORIGIN,DEST,CRS_DEP_TIME,CRS_ARR_TIME,CRS_ELAPSED_TIME,DISTANCE,LATE
2009-01-01,XE,DCA,EWR,1100.0,1202.0,62.0,199.0,0.0
2009-01-01,XE,EWR,IAD,1510.0,1632.0,82.0,213.0,0.0
2009-01-01,XE,EWR,DCA,1100.0,1210.0,70.0,199.0,0.0
2009-01-01,XE,DCA,EWR,1240.0,1357.0,77.0,199.0,0.0
2009-01-01,XE,IAD,EWR,1715.0,1900.0,105.0,213.0,0.0
2009-01-01,XE,ATL,EWR,1915.0,2142.0,147.0,745.0,0.0
2009-01-01,XE,CLE,ATL,1645.0,1842.0,117.0,554.0,0.0
2009-01-01,XE,DCA,EWR,1915.0,2035.0,80.0,199.0,0.0
2009-01-01,XE,EWR,DCA,1715.0,1838.0,83.0,199.0,0.0
2009-01-01,XE,EWR,DCA,1300.0,1408.0,68.0,199.0,0.0


In [12]:
print("Total de instacias: ", df_ajustado.count())



Total de instacias:  60583755


                                                                                

In [13]:
# 1. Definimos las columnas que quieres revisar (tus atributos importantes)
# Si ya tienes el DF filtrado con select, puedes usar 'df.columns'
columnas_importantes = df_ajustado.columns

# 2. Construimos la "Query Maestra" usando una List Comprehension
# L√≥gica: "Para cada columna 'c', cuenta 1 si es Nulo, de lo contrario nada."
expresiones_nulos = [count(when(col(c).isNull(), c)).alias(c) for c in columnas_importantes]

# 3. Ejecutamos el c√°lculo (Spark lo hace en paralelo)
resultado_nulos = df_ajustado.select(*expresiones_nulos)

# 4. Mostramos el resultado
# vertical=True es vital aqu√≠ para leerlo bien lista hacia abajo
resultado_nulos.show(vertical=True)



-RECORD 0---------------
 FL_DATE          | 0   
 OP_CARRIER       | 0   
 ORIGIN           | 0   
 DEST             | 0   
 CRS_DEP_TIME     | 1   
 CRS_ARR_TIME     | 2   
 CRS_ELAPSED_TIME | 23  
 DISTANCE         | 0   
 LATE             | 0   



                                                                                

In [20]:
# Eliminar registros nulos
df_para_modelo = df_ajustado.dropna()
print("Total de instacias: ", df_para_modelo.count())



Total de instacias:  60583731


                                                                                

In [29]:
# Guardar el DF con menos atributos y menos instancias
guardar_csvs_en_almacenamiento(df_para_modelo, ruta_salida_csvs_procesados)

NameError: name 'df_para_modelo' is not defined

In [17]:
# Cargar los CSV a un DataFrame de Spark
df_cargado = cargar_csvs(ruta_salida_csvs_procesados)
df_cargado.show(5)

+----------+----------+------+----+------------+------------+----------------+--------+----+
|   FL_DATE|OP_CARRIER|ORIGIN|DEST|CRS_DEP_TIME|CRS_ARR_TIME|CRS_ELAPSED_TIME|DISTANCE|LATE|
+----------+----------+------+----+------------+------------+----------------+--------+----+
|2009-08-31|        9E|   ORF| ATL|      1159.0|      1400.0|           121.0|   516.0| 0.0|
|2009-08-31|        9E|   ATL| PIT|       725.0|       910.0|           105.0|   526.0| 0.0|
|2009-08-31|        9E|   PIT| ATL|       941.0|      1129.0|           108.0|   526.0| 0.0|
|2009-08-31|        9E|   ATL| MCI|       915.0|      1037.0|           142.0|   692.0| 0.0|
|2009-08-31|        9E|   TYS| ATL|       540.0|       638.0|            58.0|   152.0| 0.0|
+----------+----------+------+----+------------+------------+----------------+--------+----+
only showing top 5 rows


El proceso de caracterizaci√≥n de la poblaci√≥n se realizar√° en dos fases, de forma general se har√° tomando en cuenta el tiempo (las estaciones del a√±o para ser m√°s espec√≠fico), con los ya conocidos espacios temporales de 3 meses cada uno.

La segunda fase ser√° de acuerdo al porcentaje de distribuci√≥n de los pasajeros de acuerdo al listado de los 30 aeropuertos m√°s usados de los Estados Unidos tambi√©n conocido como el 'Core 30' que acumulan aproximadamente el 77% de los viajes aerona√∫ticos.

Por lo que se har√°n dos grupos grandes, el mayoritario de 77% corresponder√° al grupo del Core30, mientras que los dem√°s (23%) se clasificar√°n como 'NoCore', as√≠ no se excluir√°n aeropuertos que dispongan de menos viajes.

In [20]:
# Porcentajes de vuelo por estaci√≥n del a√±o
porcentajes_estacion = {
    'Primavera': 0.275,
    'Verano': 0.258,
    'Otono': 0.249,
    'Invierno': 0.218
}

# Meses del a√±o por estaci√≥n
meses_primavera = ['03', '04', '05']
meses_verano = ['06', '07', '08']
meses_otono = ['09', '10', '11']
meses_invierno = ['12', '01', '02']

# Listado de los aeropuertos m√°s usados de los Estados Unidos, los que no se encuentren en esta lista
# se considerar√°n parte de un grupo complementario.
core_30 = [
    'ATL', 'DFW', 'DEN', 'LAX', 'ORD',
    'JFK', 'MCO', 'LAS', 'CLT', 'MIA',
    'SEA', 'EWR', 'SFO', 'PHX', 'IAH',
    'BOS', 'FLL', 'MSP', 'LGA', 'DTW',
    'PHL', 'SLC', 'BWI', 'DCA', 'SAN',
    'IAD', 'TPA', 'MDW', 'HNL', 'MEM'
]
pct_core30 = 0.77
pct_otros = 0.23

In [21]:
df_final = df_cargado

# Crear columna del mes (texto 'MM')
df_prep = df_final.withColumn("Month_Str", date_format(col("FL_DATE"), "MM"))

# Crear Etiqueta de Estaci√≥n (Stage 1)
df_stage1 = df_prep.withColumn("SEASON",
    when(col("Month_Str").isin(meses_invierno), "Invierno")
    .when(col("Month_Str").isin(meses_primavera), "Primavera")
    .when(col("Month_Str").isin(meses_verano), "Verano")
    .when(col("Month_Str").isin(meses_otono), "Otono")
)

# Crear Etiqueta de Aeropuerto (Stage 2)
# Si est√° en la lista -> 'Core30', si no -> 'Others'
df_stage2 = df_stage1.withColumn("AIRPORT_TYPE",
    when(col("ORIGIN").isin(core_30), "Core30")
    .otherwise("Others")
)

# CREAR LA SUPER-LLAVE (Concatenaci√≥n)
# Ejemplo de resultado: "Primavera_Core30", "Invierno_Others"
df_strata = df_stage2.withColumn("STRATA", 
    concat(col("SEASON"), lit("_"), col("AIRPORT_TYPE"))
)

df_strata.show(5)

+----------+----------+------+----+------------+------------+----------------+--------+----+---------+------+------------+-------------+
|   FL_DATE|OP_CARRIER|ORIGIN|DEST|CRS_DEP_TIME|CRS_ARR_TIME|CRS_ELAPSED_TIME|DISTANCE|LATE|Month_Str|SEASON|AIRPORT_TYPE|       STRATA|
+----------+----------+------+----+------------+------------+----------------+--------+----+---------+------+------------+-------------+
|2009-08-31|        9E|   ORF| ATL|      1159.0|      1400.0|           121.0|   516.0| 0.0|       08|Verano|      Others|Verano_Others|
|2009-08-31|        9E|   ATL| PIT|       725.0|       910.0|           105.0|   526.0| 0.0|       08|Verano|      Core30|Verano_Core30|
|2009-08-31|        9E|   PIT| ATL|       941.0|      1129.0|           108.0|   526.0| 0.0|       08|Verano|      Others|Verano_Others|
|2009-08-31|        9E|   ATL| MCI|       915.0|      1037.0|           142.0|   692.0| 0.0|       08|Verano|      Core30|Verano_Core30|
|2009-08-31|        9E|   TYS| ATL|      

In [22]:
total_instancias_para_muestreo = df_strata.count()
print("Total instancias: ", total_instancias_para_muestreo)



Total instancias:  60583731


                                                                                

In [23]:
from pyspark.sql.functions import rand
import builtins  # Para usar min() de Python, no de PySpark
 
print("üîÑ Balanceando clases...")
 
# Contar instancias de cada clase
count_0 = df_strata.filter(col("LATE") == 0.0).count()
count_1 = df_strata.filter(col("LATE") == 1.0).count()
 
print(f"Antes del balanceo:")
print(f"  LATE=0: {count_0:,}")
print(f"  LATE=1: {count_1:,}")
 
# Determinar la clase minoritaria usando min() de Python
min_count = builtins.min(count_0, count_1)
print(f"\nUsando {min_count:,} instancias por clase (undersampling)")
 
# Separar las clases
df_late_0 = df_strata.filter(col("LATE") == 0.0)
df_late_1 = df_strata.filter(col("LATE") == 1.0)
 
# Submuestrear la clase mayoritaria (LATE=0)
df_late_0_sampled = df_late_0.orderBy(rand(seed=42)).limit(min_count)
 
# Combinar ambas clases balanceadas
df_balanced = df_late_0_sampled.union(df_late_1)
 
# Cachear para evitar recomputaci√≥n
df_balanced.cache()
 
# Verificar el balance (sin count() adicional, usar el conocido)
print("\n‚úÖ Despu√©s del balanceo:")
total_balanced = 2 * min_count  # Sabemos que son exactamente min_count por clase
print(f"+----+----------+------------+")
print(f"|LATE|   count  | porcentaje |")
print(f"+----+----------+------------+")
print(f"| 0.0|{min_count:>10}|      50.00%|")
print(f"| 1.0|{min_count:>10}|      50.00%|")
print(f"+----+----------+------------+")
 
print(f"\nTotal instancias balanceadas: {total_balanced:,}")
print("Dataset balanceado cacheado y listo para entrenamiento ‚úì")

üîÑ Balanceando clases...


                                                                                

Antes del balanceo:
  LATE=0: 50,058,622
  LATE=1: 10,525,109

Usando 10,525,109 instancias por clase (undersampling)

‚úÖ Despu√©s del balanceo:
+----+----------+------------+
|LATE|   count  | porcentaje |
+----+----------+------------+
| 0.0|  10525109|      50.00%|
| 1.0|  10525109|      50.00%|
+----+----------+------------+

Total instancias balanceadas: 21,050,218
Dataset balanceado cacheado y listo para entrenamiento ‚úì


In [25]:
# Calcular cu√°ntas filas NECESITAMOS de cada grupo (Target)
# Matriz de probabilidad combinada: P(Estaci√≥n) * P(Tipo)
target_counts = {}

for estacion, pct_est in porcentajes_estacion.items():
    # Grupo Core30
    key_core = f"{estacion}_Core30"
    target_counts[key_core] = int(total_instancias_para_muestreo * pct_est * pct_core30)
    
    # Grupo Others
    key_others = f"{estacion}_Others"
    target_counts[key_others] = int(total_instancias_para_muestreo * pct_est * pct_otros)

# Esto es necesario para saber qu√© fracci√≥n pedirle a Spark
actual_counts_row = df_balanced.groupBy("STRATA").count().collect()
actual_counts = {row['STRATA']: row['count'] for row in actual_counts_row}

# 4. Generar el diccionario de 'fractions' para sampleBy
fractions_dict = {}

print("--- Plan de Muestreo ---")
for stratum, target_n in target_counts.items():
    available_n = actual_counts.get(stratum, 0)
    
    if available_n > 0:
        # La fracci√≥n es: Lo que quiero / Lo que tengo
        frac = target_n / available_n
        
        # Si queremos m√°s de lo que existe, tomamos todo (1.0)
        frac = min(frac, 1.0) 
        fractions_dict[stratum] = frac
        print(f"Estrato: {stratum} | Objetivo: {target_n} | Disponible: {available_n} | Fracci√≥n: {frac:.5f}")
    else:
        fractions_dict[stratum] = 0.0

# Calcular la probabilidad te√≥rica combinada de cada estrato
# y determinar cu√°l es el l√≠mite m√°ximo que permite ese estrato.
limites_totales = []

print("--- Buscando el Cuello de Botella ---")

for estacion, pct_est in porcentajes_estacion.items():
    # --- Analizando Core30 ---
    key_core = f"{estacion}_Core30"
    prob_core = pct_est * pct_core30
    disponible_core = actual_counts.get(key_core, 0)
    
    # ¬øDe qu√© tama√±o total podr√≠a ser la muestra si este estrato fuera el l√≠mite?
    # Total = Disponible / %Te√≥rico
    if prob_core > 0:
        max_total_core = disponible_core / prob_core
        limites_totales.append(max_total_core)
        print(f"{key_core}: Permite un dataset m√°x de {int(max_total_core):,} filas")

    # --- Analizando Others ---
    key_others = f"{estacion}_Others"
    prob_others = pct_est * pct_otros
    disponible_others = actual_counts.get(key_others, 0)
    
    if prob_others > 0:
        max_total_others = disponible_others / prob_others
        limites_totales.append(max_total_others)
        print(f"{key_others}: Permite un dataset m√°x de {int(max_total_others):,} filas")

# Seleccionar el NUEVO tama√±o total basado en el m√≠nimo (el m√°s restrictivo)
# Esto asegura que ninguna fracci√≥n supere 1.0
NUEVO_TOTAL_AJUSTADO = int(min(limites_totales))

print(f"\n>>> El tama√±o m√°ximo posible para mantener proporciones perfectas es: {NUEVO_TOTAL_AJUSTADO:,} filas")

# ---------------------------------------------------------
# RECALCULAR LAS FRACCIONES CON EL NUEVO TOTAL
# ---------------------------------------------------------
fractions_dict = {}
print("\n--- Plan de Muestreo Ajustado ---")

for estacion, pct_est in porcentajes_estacion.items():
    # --- Core 30 ---
    key_core = f"{estacion}_Core30"
    target_core = int(NUEVO_TOTAL_AJUSTADO * pct_est * pct_core30)
    avail_core = actual_counts.get(key_core, 0)
    
    frac_core = target_core / avail_core
    fractions_dict[key_core] = frac_core
    print(f"{key_core} | Obj: {target_core} | Disp: {avail_core} | Frac: {frac_core:.5f}")
    
    # --- Others ---
    key_others = f"{estacion}_Others"
    target_others = int(NUEVO_TOTAL_AJUSTADO * pct_est * pct_otros)
    avail_others = actual_counts.get(key_others, 0)
    
    frac_others = target_others / avail_others
    fractions_dict[key_others] = frac_others
    print(f"{key_others} | Obj: {target_others} | Disp: {avail_others} | Frac: {frac_others:.5f}")

# 4. Ejecutar el Sample con el plan ajustado
df_muestra_final = df_balanced.sampleBy("STRATA", fractions=fractions_dict, seed=42)

# Limpieza final: Eliminar columnas auxiliares si ya no las necesitas
# (Aunque recomiendo dejarlas para an√°lisis posteriores)
# df_muestra_final = df_muestra_final.drop("Month_Str", "STRATA")

print(f"Total registros obtenidos: {df_muestra_final.count()}")
df_muestra_final.groupBy("SEASON", "AIRPORT_TYPE").count().show()

                                                                                

--- Plan de Muestreo ---
Estrato: Primavera_Core30 | Objetivo: 12828605 | Disponible: 3541408 | Fracci√≥n: 1.00000
Estrato: Primavera_Others | Objetivo: 3831920 | Disponible: 1771436 | Fracci√≥n: 1.00000
Estrato: Verano_Core30 | Objetivo: 12035564 | Disponible: 3998897 | Fracci√≥n: 1.00000
Estrato: Verano_Others | Objetivo: 3595038 | Disponible: 1941735 | Fracci√≥n: 1.00000
Estrato: Otono_Core30 | Objetivo: 11615718 | Disponible: 3145832 | Fracci√≥n: 1.00000
Estrato: Otono_Others | Objetivo: 3469630 | Disponible: 1588239 | Fracci√≥n: 1.00000
Estrato: Invierno_Core30 | Objetivo: 10169585 | Disponible: 3362878 | Fracci√≥n: 1.00000
Estrato: Invierno_Others | Objetivo: 3037668 | Disponible: 1699793 | Fracci√≥n: 1.00000
--- Buscando el Cuello de Botella ---
Primavera_Core30: Permite un dataset m√°x de 16,724,476 filas
Primavera_Others: Permite un dataset m√°x de 28,006,893 filas
Verano_Core30: Permite un dataset m√°x de 20,129,351 filas
Verano_Others: Permite un dataset m√°x de 32,722,194 f

                                                                                

Total registros obtenidos: 16408518




+---------+------------+-------+
|   SEASON|AIRPORT_TYPE|  count|
+---------+------------+-------+
| Invierno|      Core30|2754783|
|   Verano|      Core30|3259311|
|Primavera|      Core30|3474070|
|    Otono|      Others| 939805|
|Primavera|      Others|1038193|
| Invierno|      Others| 822292|
|   Verano|      Others| 974232|
|    Otono|      Core30|3145832|
+---------+------------+-------+



                                                                                

In [26]:
# Guardar los CSVs tras la aplicaci√≥n del muestreo estratificado
guardar_csvs_en_almacenamiento(df_muestra_final, ruta_salida_muestreo)

                                                                                

In [27]:
df_muestra_final.show(5)

+----------+----------+------+----+------------+------------+----------------+--------+----+---------+--------+------------+---------------+
|   FL_DATE|OP_CARRIER|ORIGIN|DEST|CRS_DEP_TIME|CRS_ARR_TIME|CRS_ELAPSED_TIME|DISTANCE|LATE|Month_Str|  SEASON|AIRPORT_TYPE|         STRATA|
+----------+----------+------+----+------------+------------+----------------+--------+----+---------+--------+------------+---------------+
|2013-10-10|        EV|   IAD| BUF|       815.0|       925.0|            70.0|   283.0| 0.0|       10|   Otono|      Core30|   Otono_Core30|
|2013-07-15|        AA|   DEN| ORD|       955.0|      1315.0|           140.0|   888.0| 0.0|       07|  Verano|      Core30|  Verano_Core30|
|2018-06-22|        G4|   EWR| SAV|      1000.0|      1220.0|           140.0|   708.0| 0.0|       06|  Verano|      Core30|  Verano_Core30|
|2013-12-02|        DL|   LAX| CMH|      1025.0|      1737.0|           252.0|  1995.0| 0.0|       12|Invierno|      Core30|Invierno_Core30|
|2011-06-07| 

### Lectura de datos del CSV ya balanceado y muestreadp

In [None]:
# Cargar el CSV del conjunto balanceado y estratificado
df_muestra_final = cargar_csvs(ruta_salida_muestreo)

In [28]:
# Divisi√≥n Train / Test (80% / 20%) <para probar>
# Usamos una semilla (seed) para reproducibilidad
train_data, test_data = df_muestra_final.randomSplit([0.8, 0.2], seed=42)

print(f"Instancias de Entrenamiento: {train_data.count()}")
print(f"Instancias de Prueba: {test_data.count()}")

train_data.show(5, truncate=False)
test_data.show(5, truncate=False)

                                                                                

Instancias de Entrenamiento: 13127526


                                                                                

Instancias de Prueba: 3280992


                                                                                

+----------+----------+------+----+------------+------------+----------------+--------+----+---------+--------+------------+---------------+
|FL_DATE   |OP_CARRIER|ORIGIN|DEST|CRS_DEP_TIME|CRS_ARR_TIME|CRS_ELAPSED_TIME|DISTANCE|LATE|Month_Str|SEASON  |AIRPORT_TYPE|STRATA         |
+----------+----------+------+----+------------+------------+----------------+--------+----+---------+--------+------------+---------------+
|2009-01-01|9E        |ATL   |AUS |2123.0      |2300.0      |157.0           |813.0   |0.0 |01       |Invierno|Core30      |Invierno_Core30|
|2009-01-01|9E        |ATL   |EWR |1100.0      |1320.0      |140.0           |745.0   |0.0 |01       |Invierno|Core30      |Invierno_Core30|
|2009-01-01|9E        |ATL   |JAN |2231.0      |2251.0      |80.0            |341.0   |0.0 |01       |Invierno|Core30      |Invierno_Core30|
|2009-01-01|9E        |ATW   |DTW |1331.0      |1552.0      |81.0            |297.0   |0.0 |01       |Invierno|Others      |Invierno_Others|
|2009-01-01|9

[Stage 93:>                                                         (0 + 1) / 1]

+----------+----------+------+----+------------+------------+----------------+--------+----+---------+--------+------------+---------------+
|FL_DATE   |OP_CARRIER|ORIGIN|DEST|CRS_DEP_TIME|CRS_ARR_TIME|CRS_ELAPSED_TIME|DISTANCE|LATE|Month_Str|SEASON  |AIRPORT_TYPE|STRATA         |
+----------+----------+------+----+------------+------------+----------------+--------+----+---------+--------+------------+---------------+
|2009-01-01|9E        |ATL   |HOU |1604.0      |1710.0      |126.0           |696.0   |0.0 |01       |Invierno|Core30      |Invierno_Core30|
|2009-01-01|9E        |AUS   |ATL |530.0       |838.0       |128.0           |813.0   |0.0 |01       |Invierno|Others      |Invierno_Others|
|2009-01-01|9E        |BHM   |MEM |1620.0      |1727.0      |67.0            |211.0   |0.0 |01       |Invierno|Others      |Invierno_Others|
|2009-01-01|9E        |CLT   |MEM |1700.0      |1759.0      |119.0           |512.0   |0.0 |01       |Invierno|Core30      |Invierno_Core30|
|2009-01-01|9

                                                                                

In [32]:
# Guardar en el disco
guardar_csvs_en_almacenamiento(train_data, ruta_train)
guardar_csvs_en_almacenamiento(test_data, ruta_test)

                                                                                

### Aplicar t√©cnicas de discretizaci√≥n para poder generar un modelo

In [35]:
from pyspark.sql.types import DoubleType

# Lista de columnas a convertir en tipo Double para evitar errores de tipo de formato
cols_numericas = ['CRS_DEP_TIME', 'CRS_ELAPSED_TIME', 'DISTANCE', 'LATE']

# Bucle para convertir cada una a DoubleType
for col_name in cols_numericas:
    df_muestra_final = df_muestra_final.withColumn(
        col_name, 
        col(col_name).cast(DoubleType())
    )

# Verificaci√≥n r√°pida: Imprime el esquema para asegurar que ahora digan 'double'
print("--- Esquema Corregido ---")
df_muestra_final.select(cols_numericas).printSchema()

--- Esquema Corregido ---
root
 |-- CRS_DEP_TIME: double (nullable = true)
 |-- CRS_ELAPSED_TIME: double (nullable = true)
 |-- DISTANCE: double (nullable = true)
 |-- LATE: double (nullable = true)



In [34]:
# Importar herramientas para la discretizaci√≥n
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

# Separamos las columnas por tipo
cols_categoricas = ['OP_CARRIER', 'ORIGIN', 'DEST', 'SEASON', 'AIRPORT_TYPE']

# Construimos las etapas del Pipeline din√°micamente
stages = []

input_cols_OHE = [] # Aqu√≠ guardaremos los nombres de las columnas ya vectorizadas

for col_name in cols_categoricas:
    # Indexar (Texto -> N√∫mero)
    indexer = StringIndexer(
        inputCol=col_name, 
        outputCol=f"{col_name}_Index", 
        handleInvalid="keep" # Si aparece una categor√≠a nueva, crea un bucket extra
    )
    
    # Encodear (N√∫mero -> Vector Binario)
    encoder = OneHotEncoder(
        inputCol=f"{col_name}_Index", 
        outputCol=f"{col_name}_Vec"
    )
    
    stages += [indexer, encoder]
    input_cols_OHE.append(f"{col_name}_Vec")

# Ensamblar todo en un solo vector de caracter√≠sticas ('features')
# Juntamos: [Vectores de OneHot] + [Columnas Num√©ricas Originales]
assembler = VectorAssembler(
    inputCols=input_cols_OHE + cols_numericas,
    outputCol="features"
)

stages.append(assembler)

# 4. Crear el Pipeline Oficial
pipeline = Pipeline(stages=stages)

print("Pipeline construido exitosamente. Listo para ajustar a los datos.")

Pipeline construido exitosamente. Listo para ajustar a los datos.


In [36]:
# Ajustar el Pipeline a tus datos (Aprender qu√© categor√≠as existen)
# Esto puede tardar un poco porque recorre los 47 Millones de filas
model_pipeline = pipeline.fit(df_muestra_final)

# Transformar los datos (Crear la columna 'features')
df_transformado = model_pipeline.transform(df_muestra_final)

# Seleccionar solo lo necesario para ahorrar memoria
# Nos quedamos con la etiqueta 'LATE' y el vector 'features'
data_final = df_transformado.select("features", "LATE")

                                                                                

### Generar conjuntos para entrenamiento y prueba (solo para probar y generar modelos)

In [37]:
# Divisi√≥n Train / Test (80% / 20%) <para probar>
# Usamos una semilla (seed) para reproducibilidad
train_data, test_data = data_final.randomSplit([0.8, 0.2], seed=42)

print(f"Instancias de Entrenamiento: {train_data.count()}")
print(f"Instancias de Prueba: {test_data.count()}")

# Ver c√≥mo qued√≥ el vector (features)
train_data.show(5, truncate=False)

                                                                                

Instancias de Entrenamiento: 13127526


                                                                                

Instancias de Prueba: 3280992


[Stage 154:>                                                        (0 + 1) / 1]

+-----------------------------------------------------------------------------+----+
|features                                                                     |LATE|
+-----------------------------------------------------------------------------+----+
|(781,[0,23,401,771,775,777,778,779],[1.0,1.0,1.0,1.0,1.0,830.0,300.0,1947.0])|0.0 |
|(781,[0,23,401,771,775,777,778,779],[1.0,1.0,1.0,1.0,1.0,830.0,300.0,1947.0])|0.0 |
|(781,[0,23,401,771,775,777,778,779],[1.0,1.0,1.0,1.0,1.0,830.0,300.0,1947.0])|0.0 |
|(781,[0,23,401,771,775,777,778,779],[1.0,1.0,1.0,1.0,1.0,830.0,315.0,1947.0])|0.0 |
|(781,[0,23,401,771,775,777,778,779],[1.0,1.0,1.0,1.0,1.0,830.0,315.0,1947.0])|0.0 |
+-----------------------------------------------------------------------------+----+
only showing top 5 rows


                                                                                

In [39]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Usaremos m√©tricas ponderadas (weighted) por el desbalance de clases
evaluator_acc = MulticlassClassificationEvaluator(labelCol="LATE", predictionCol="prediction", metricName="accuracy")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="LATE", predictionCol="prediction", metricName="f1")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="LATE", predictionCol="prediction", metricName="weightedRecall")

def imprimir_metricas(nombre_modelo, predictions):
    acc = evaluator_acc.evaluate(predictions)
    f1 = evaluator_f1.evaluate(predictions)
    rec = evaluator_recall.evaluate(predictions)
    
    print(f"--- Resultados para {nombre_modelo} ---")
    print(f"Accuracy: {acc:.4f}")
    print(f"F1-Score: {f1:.4f}")
    print(f"Recall:   {rec:.4f}")
    print("-" * 30)

# ENTRENAMIENTO: REGRESI√ìN LOG√çSTICA
print("Entrenando Regresi√≥n Log√≠stica con datos balanceados...")
lr = LogisticRegression(featuresCol="features", labelCol="LATE", maxIter=10)
lr_model = lr.fit(train_data)

# Predecir
lr_predictions = lr_model.transform(test_data)
imprimir_metricas("Regresi√≥n Log√≠stica", lr_predictions)


Entrenando Regresi√≥n Log√≠stica con datos balanceados...


25/12/04 03:01:14 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS

--- Resultados para Regresi√≥n Log√≠stica ---
Accuracy: 1.0000
F1-Score: 1.0000
Recall:   1.0000
------------------------------


                                                                                