# Preprocessing Script: Glue Pipeline for Flights Dataset

## Resumen del Script

Este script implementa una pipeline de procesamiento de datos utilizando AWS Glue y PySpark para preparar un dataset de vuelos. Los datos se leen desde un bucket S3 y se transforman para análisis y modelado posterior. Finalmente, los datos procesados se guardan en formato Parquet dentro de una tabla en una base de datos Glue.

---

## Transformaciones Realizadas

1. **Carga de Datos:**
   - Se lee un archivo CSV ubicado en `s3://data-bucket-itba-delays/full_data_flightdelay.csv`.
   - Se configura el esquema automáticamente (`inferSchema=True`).

2. **Cálculo de la Estación del Año:**
   - Se define una función `determinar_estacion` para asignar estaciones según la latitud y el mes.
   - Resultado almacenado en la columna `SEASON`.

3. **Conversión de Temperaturas:**
   - Conversión de temperaturas máximas (`TMAX`) de Fahrenheit a Kelvin.
   - Columna convertida renombrada a `TMAX_K` y la original eliminada.

4. **Codificación de Horas de Salida:**
   - La columna `DEP_TIME_BLK` se transforma a valores numéricos representativos de cada intervalo de tiempo.

5. **Codificación de Variables Categóricas:**
   - `CARRIER_NAME`, `DEPARTING_AIRPORT` y `PREVIOUS_AIRPORT` se codifican utilizando:
     - `StringIndexer` para convertir texto a índices numéricos.
     - `OneHotEncoder` para generar representaciones en formato one-hot encoding.

6. **Escalamiento de Variables Numéricas:**
   - Se ensamblan múltiples columnas numéricas relevantes en un vector usando `VectorAssembler`.
   - Los valores son escalados con `StandardScaler` y almacenados en la columna `scaled_features`.

7. **Pipeline de Transformación:**
   - Se define una pipeline que incluye los siguientes pasos:
     - Indexación y codificación one-hot de variables categóricas.
     - Creación de vectores y escalado de características numéricas.

8. **Expansión de Vectores:**
   - Las columnas `scaled_features` y `CARRIER_NAME_onehot` se convierten en arrays.
   - Cada elemento de los arrays se divide en columnas individuales para análisis granular.

9. **Eliminación de Columnas Originales:**
   - Se eliminan columnas intermedias y originales, incluyendo las versiones sin procesar de las variables transformadas.

10. **Escritura del Dataset Procesado:**
    - Se escribe el dataset final en formato Parquet en `s3://data-bucket-itba-delays/glue-flights/`.
    - El dataset se guarda como una tabla en la base de datos `dwh` bajo el nombre `flights_dataset`.

---

## Estructura del Dataset Final

El dataset procesado incluye:
- Variables numéricas escaladas (`scaled_features_X`).
- Codificación one-hot expandida de `CARRIER_NAME` (`carrier_name_one_hot_X`).
- Columnas adicionales derivadas del procesamiento (e.g., `SEASON`, `DEP_TIME_BLK`).

---

## Uso

El script está diseñado para ejecutarse en un entorno Glue, configurando automáticamente las versiones de Glue y el número de workers. Los datos procesados están listos para análisis en plataformas como Athena o para modelado en herramientas de machine learning.


# Glue Pipeline Flights

####  Run this cell to set up and start your interactive session.


In [None]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: 6d8d692e-d669-4ae6-8507-964941b600d2
Applying the following default arguments:
--glue_kernel_version 1.0.7
--enable-glue-datacatalog true
Waiting for session 6d8d692e-d669-4ae6-8507-964941b600d2 to get into ready status...
Session 6d8d692e-d669-4ae6-8507-964941b600d2 ha

## Preprocessing


In [None]:
from pyspark.sql import functions as F
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.types import IntegerType, ArrayType, DoubleType
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import udf, col, size




In [None]:
df = spark.read.csv('s3://data-bucket-itba-delays/full_data_flightdelay.csv', header=True, inferSchema=True)




In [None]:
def determinar_estacion(lat, mes):
    if lat >= 0:
        if mes in [6, 7, 8]:
            return 1
        elif mes in [9, 10, 11]:
            return 2
        elif mes in [12, 1, 2]:
            return 3
        else:
            return 4
    else:
        if mes in [12, 1, 2]:
            return 1
        elif mes in [3, 4, 5]:
            return 2
        elif mes in [6, 7, 8]:
            return 3
        else:
            return 4

season_udf = udf(determinar_estacion, IntegerType())

df = df.withColumn('SEASON', season_udf('LATITUDE', 'MONTH'))




In [None]:
df = df.withColumn('TMAX_K', (df['TMAX'] - 32) * 5 / 9 + 273.15)

df = df.drop('TMAX')




In [None]:
df = df.withColumn(
    'DEP_TIME_BLK',
    F.when(df['DEP_TIME_BLK'] == '0001-0559', 0)
    .when(df['DEP_TIME_BLK'] == '0600-0659', 6)
    .when(df['DEP_TIME_BLK'] == '0700-0759', 7)
    .when(df['DEP_TIME_BLK'] == '0800-0859', 8)
    .when(df['DEP_TIME_BLK'] == '0900-0959', 9)
    .when(df['DEP_TIME_BLK'] == '1000-1059', 10)
    .when(df['DEP_TIME_BLK'] == '1100-1159', 11)
    .when(df['DEP_TIME_BLK'] == '1200-1259', 12)
    .when(df['DEP_TIME_BLK'] == '1300-1359', 13)
    .when(df['DEP_TIME_BLK'] == '1400-1459', 14)
    .when(df['DEP_TIME_BLK'] == '1500-1559', 15)
    .when(df['DEP_TIME_BLK'] == '1600-1659', 16)
    .when(df['DEP_TIME_BLK'] == '1700-1759', 17)
    .when(df['DEP_TIME_BLK'] == '1800-1859', 18)
    .when(df['DEP_TIME_BLK'] == '1900-1959', 19)
    .when(df['DEP_TIME_BLK'] == '2000-2059', 20)
    .when(df['DEP_TIME_BLK'] == '2100-2159', 21)
    .when(df['DEP_TIME_BLK'] == '2200-2259', 22)
    .when(df['DEP_TIME_BLK'] == '2300-2359', 23)
    .otherwise(-1)
)




In [None]:
indexer = StringIndexer(inputCol="CARRIER_NAME", outputCol="CARRIER_NAME_index")
encoder = OneHotEncoder(inputCol="CARRIER_NAME_index", outputCol="CARRIER_NAME_onehot")

airport_indexer1 = StringIndexer(inputCol="DEPARTING_AIRPORT", outputCol="DEPARTING_AIRPORT_index")
airport_indexer2 = StringIndexer(inputCol="PREVIOUS_AIRPORT", outputCol="PREVIOUS_AIRPORT_index")

numerical_columns_to_scale = [
    'DISTANCE_GROUP', 'SEGMENT_NUMBER', 'CONCURRENT_FLIGHTS', 'NUMBER_OF_SEATS',
    'AIRPORT_FLIGHTS_MONTH', 'AIRLINE_FLIGHTS_MONTH', 'AIRLINE_AIRPORT_FLIGHTS_MONTH',
    'AVG_MONTHLY_PASS_AIRPORT', 'AVG_MONTHLY_PASS_AIRLINE', 'FLT_ATTENDANTS_PER_PASS',
    'GROUND_SERV_PER_PASS', 'PLANE_AGE', 'LATITUDE', 'LONGITUDE', 'PRCP', 'SNOW',
    'SNWD', 'TMAX_K', 'AWND', 'SEASON', 'DEP_TIME_BLK'
]

assembler = VectorAssembler(inputCols=numerical_columns_to_scale, outputCol="features")

scaler = StandardScaler(inputCol="features", outputCol="scaled_features")




In [None]:
pipeline = Pipeline(stages=[
    indexer,
    encoder,
    airport_indexer1,
    airport_indexer2,
    assembler,
    scaler
])

model = pipeline.fit(df)
df_transformed = model.transform(df)




In [None]:
df_transformed = df_transformed.drop(
    'CARRIER_NAME',
    'DEPARTING_AIRPORT',
    'PREVIOUS_AIRPORT',
    'LATITUDE',
    'LONGITUDE',
    'SEASON',
    'DEP_TIME_BLK',
    'TMAX_K',
    'DISTANCE_GROUP', 'SEGMENT_NUMBER', 'CONCURRENT_FLIGHTS', 'NUMBER_OF_SEATS',
    'AIRPORT_FLIGHTS_MONTH', 'AIRLINE_FLIGHTS_MONTH', 'AIRLINE_AIRPORT_FLIGHTS_MONTH',
    'AVG_MONTHLY_PASS_AIRPORT', 'AVG_MONTHLY_PASS_AIRLINE', 'FLT_ATTENDANTS_PER_PASS',
    'GROUND_SERV_PER_PASS', 'PLANE_AGE', 'PRCP', 'SNOW', 'SNWD', 'AWND',
    'CARRIER_NAME_index'
)




In [None]:
df_transformed.show()

+-----+-----------+---------+-------------------+-----------------------+----------------------+--------------------+--------------------+
|MONTH|DAY_OF_WEEK|DEP_DEL15|CARRIER_NAME_onehot|DEPARTING_AIRPORT_index|PREVIOUS_AIRPORT_index|            features|     scaled_features|
+-----+-----------+---------+-------------------+-----------------------+----------------------+--------------------+--------------------+
|    1|          7|        0|     (16,[0],[1.0])|                   10.0|                   0.0|[2.0,1.0,25.0,143...|[0.83954854654594...|
|    1|          7|        0|     (16,[1],[1.0])|                   10.0|                   0.0|[7.0,1.0,29.0,191...|[2.93841991291081...|
|    1|          7|        0|     (16,[1],[1.0])|                   10.0|                   0.0|[7.0,1.0,27.0,199...|[2.93841991291081...|
|    1|          7|        0|     (16,[1],[1.0])|                   10.0|                   0.0|[9.0,1.0,27.0,180...|[3.77796845945676...|
|    1|          7|        

In [None]:
def vector_to_array(v):
    return v.toArray().tolist() if v is not None else []

vector_to_array_udf = udf(vector_to_array, ArrayType(DoubleType()))

df_transformed = df_transformed.withColumn("carrier_name_onehot_array", vector_to_array_udf(col("CARRIER_NAME_onehot")))

df_transformed = df_transformed.withColumn("carrier_size", size(col("carrier_name_onehot_array")))

carrier_range = df_transformed.select("carrier_size").distinct().collect()

num_carrier_columns = carrier_range[0]["carrier_size"]

df_transformed = df_transformed.withColumn("scaled_features_array", vector_to_array_udf(col("scaled_features")))

df_transformed = df_transformed.withColumn("scaled_features_size", size(col("scaled_features_array")))

scaled_features_range = df_transformed.select("scaled_features_size").distinct().collect()

num_scaled_features_columns = scaled_features_range[0]["scaled_features_size"]

for i in range(num_carrier_columns):
  df_transformed = df_transformed.withColumn(
        f"carrier_name_one_hot_{i+1}",
       col("carrier_name_onehot_array")[i]
   )

for i in range(num_scaled_features_columns):
  df_transformed = df_transformed.withColumn(
        f"scaled_features_{i+1}",
       col("scaled_features_array")[i]
   )

df_transformed.show()

+-----+-----------+---------+-------------------+-----------------------+----------------------+--------------------+--------------------+-------------------------+------------+---------------------+--------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+------------------+-----------------+-------------------+------------------+------------------+-------------------+--------------------+------------------+------------------+-------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+


## Write

In [None]:
spark.sql("CREATE DATABASE IF NOT EXISTS dwh")

DataFrame[]


In [None]:
df_transformed = df_transformed.drop("CARRIER_NAME_onehot","features","scaled_features","carrier_name_onehot_array","carrier_size",
                                     "scaled_features_array","scaled_features_size")
df_transformed.printSchema()

root
 |-- MONTH: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- DEP_DEL15: integer (nullable = true)
 |-- DEPARTING_AIRPORT_index: double (nullable = false)
 |-- PREVIOUS_AIRPORT_index: double (nullable = false)
 |-- carrier_name_one_hot_1: double (nullable = true)
 |-- carrier_name_one_hot_2: double (nullable = true)
 |-- carrier_name_one_hot_3: double (nullable = true)
 |-- carrier_name_one_hot_4: double (nullable = true)
 |-- carrier_name_one_hot_5: double (nullable = true)
 |-- carrier_name_one_hot_6: double (nullable = true)
 |-- carrier_name_one_hot_7: double (nullable = true)
 |-- carrier_name_one_hot_8: double (nullable = true)
 |-- carrier_name_one_hot_9: double (nullable = true)
 |-- carrier_name_one_hot_10: double (nullable = true)
 |-- carrier_name_one_hot_11: double (nullable = true)
 |-- carrier_name_one_hot_12: double (nullable = true)
 |-- carrier_name_one_hot_13: double (nullable = true)
 |-- carrier_name_one_hot_14: double (nullable = true)

In [None]:
df_transformed.write \
    .format("parquet") \
    .mode("overwrite") \
    .option("path", "s3://data-bucket-itba-delays/glue-flights/") \
    .saveAsTable("dwh.flights_dataset")


