# 1 Quarantine Tables

En este entorno aún **no estamos en producción**, por lo que vamos a trabajar de forma **iterativa y experimental** sobre los datos que han sido enviados a la tabla de **cuarentena**.

El objetivo es **detectar los errores más frecuentes**, **probar correcciones paso a paso** y **evaluar sus efectos** antes de integrarlas definitivamente en el flujo de validación oficial.

A lo largo de este notebook, iremos aplicando transformaciones que conviertan los registros en cuarentena en datos **limpios, consistentes y listos para su reintegración en la capa Silver**.


## 1.1 Dim_Aircraft

Empecemos mostrando los registros que tenemos actualmente en la tabla Dim_Aircraft de Cuarentena

In [0]:
%sql
--drop table if exists sesion1_2.layer30_silver_quarantine.dim_aircraft_quarantine;

In [0]:
%sql
select *
from sesion1_2.layer30_silver_quarantine.dim_aircraft_quarantine

aircraft_id,tail_number,model,manufacturer,engine_model,first_service_dt,status,cycles_total,hours_total,fixed,type_fix,date_fix,fix_version
AC1000,TN-1000B,A320,Airbus,CFM56,2018-06-01,ACTIVE,12100,23600.0,,,,
,TN-NULL,A319,Airbus,CFM56,2017-03-10,INACTIVE,8000,15000.0,,,,
,TN-BLANK,B737,Boeing,LEAP-1B,2019-09-09,ACTIVE,9000,17000.0,,,,
AC2001,TN-2001,A321,Airbus,LEAP-1A,2020-02-02,ACTIVE,-5,10000.0,,,,
AC2002,TN-2002,737-800,Boeing,GEnx,2021-11-20,ACTIVE,5000,250000.0,,,,
AC2004,TN-2004,A320neo,Airbus,LEAP-1A,2020-08-08,ACTIVE,110000,12000.0,,,,
AC1000,TN-1000B,A320,Airbus,CFM56,2018-06-01,ACTIVE,12100,23600.0,,,,
,TN-NULL,A319,Airbus,CFM56,2017-03-10,INACTIVE,8000,15000.0,,,,
,TN-BLANK,B737,Boeing,LEAP-1B,2019-09-09,ACTIVE,9000,17000.0,,,,
AC2001,TN-2001,A321,Airbus,LEAP-1A,2020-02-02,ACTIVE,-5,10000.0,,,,


### 1.1.0 Creamos columnas de arreglos

%md
Para poder **gestionar de forma trazable** los errores y sus correcciones, añadiremos una serie de columnas a la tabla de **cuarentena**.  
Estas columnas nos permitirán registrar tanto el estado de cada registro como la evolución de las correcciones aplicadas a lo largo del tiempo.

A continuación, añadimos las siguientes columnas:

| Columna | Tipo | Descripción |
|----------|------|-------------|
| **fixed** | `BOOLEAN` | Indica si el registro ya ha sido corregido (`TRUE`) o sigue pendiente (`FALSE`). |
| **type_fix** | `STRING` | Describe el tipo de corrección aplicada (por ejemplo: *null_filled*, *format_adjusted*, *id_relinked*, etc.). |
| **date_fix** | `TIMESTAMP` | Fecha y hora en la que se realizó la corrección. Permite hacer seguimiento temporal de los cambios. |
| **fix_version** | `INT` | Número de versión o iteración de la corrección, útil si un mismo registro ha sido corregido más de una vez. |

Con esto, la tabla de cuarentena no solo almacena los registros problemáticos, sino también **su historial de resolución**, lo que facilita el **análisis de calidad de datos** y la **auditoría del proceso de validación**.


In [0]:
%sql
--drop catalog if exists layer20_bronze cascade;
--drop catalog if exists layer30_silver cascade;
--drop catalog if exists layer30_silver_quarantine cascade;


``/!\ EJERCICIO``

In [0]:
table = "sesion1_2.layer30_silver_quarantine.dim_aircraft_quarantine"
existing = {f.name.lower() for f in spark.table(table).schema}

to_add = []
if "fixed" not in existing:        to_add.append("fixed BOOLEAN")
if "type_fix" not in existing:     to_add.append("type_fix STRING")
if "date_fix" not in existing:     to_add.append("date_fix TIMESTAMP")
if "fix_version" not in existing:  to_add.append("fix_version INT")

if to_add:
    spark.sql(f"ALTER TABLE {table} ADD COLUMNS ({', '.join(to_add)})")



### 1.1.1 Caso de corrección: incoherencia entre horas y ciclos de vuelo

Los registros correspondientes a aeronaves del fabricante **Boeing** representan vuelos **nacionales**, caracterizados por un ciclo medio de duración comprendido entre **1 hora y 30 minutos y 2 horas y 15 minutos**.

Durante la validación se ha detectado que, en algunos casos, la columna `cycles_total` contiene valores registrados, mientras que la columna `hours_total` aparece vacía.  
Dado que existe un consenso técnico sobre la relación aproximada entre ciclos y horas de vuelo, se acordó **corregir estos registros asignando a `hours_total` el valor `2000 * 2`**.

Esta corrección permite mantener la coherencia interna de los datos y asegurar la consistencia entre ambas métricas operativas.


``/!\ EJERCICIO``

In [0]:
%sql
UPDATE sesion1_2.layer30_silver_quarantine.dim_aircraft_quarantine
SET 
    hours_total = cycles_total * 2,           -- regla de corrección
    fixed = true,                             -- marcamos como corregido
    type_fix = 'fix_hours_from_cycles',       -- tipo de arreglo aplicado
    date_fix = current_timestamp(),           -- timestamp del arreglo
    fix_version = 1                           -- versión de la regla
WHERE manufacturer = 'Boeing'
  AND (hours_total IS NULL OR hours_total < 0)
  AND cycles_total IS NOT NULL;





In [0]:
%sql
select *
from sesion1_2.layer30_silver_quarantine.dim_aircraft_quarantine




###  1.1.2 Caso de corrección: valores nulos en *Engine Model*

Se han identificado registros en la tabla de cuarentena con la columna `engine_model` vacía.  
Para estos casos, se deberá **consultar la tabla Silver** correspondiente y **asignar el valor de `engine_model` utilizado para el mismo modelo de aeronave**.


In [0]:
%sql
select *
from sesion1_2.layer30_silver_quarantine.dim_aircraft_quarantine
where engine_model is null
or trim(engine_model) = ''




- Creamos una vista con las valores unicos de modelo y su engine_model asociado 

``/!\ EJERCICIO``

In [0]:
%sql
CREATE OR REPLACE VIEW sesion1_2.layer30_silver.dim_aircraft_models AS
SELECT 
  model,
  first(engine_model, TRUE) AS engine_model
FROM sesion1_2.layer30_silver.dim_aircraft
WHERE engine_model IS NOT NULL
  AND trim(engine_model) <> ''
GROUP BY model;





- Veamos la vista que hemos creado

In [0]:
%sql
SELECT * 
FROM sesion1_2.layer30_silver.dim_aircraft_models
ORDER BY model
LIMIT 50;




- Por ultimo hagamos el merge trayendo a nuestra tabla ``dim_aircraft_quarantine`` los engine_model correspondientes desde la vista creada

``/!\ EJERCICIO``

In [0]:
%sql
-- Rellenar engine_model en cuarentena a partir de la vista de referencia por model
MERGE INTO sesion1_2.layer30_silver_quarantine.dim_aircraft_quarantine AS tgt
USING sesion1_2.layer30_silver.dim_aircraft_models                         AS src
ON  tgt.model = src.model

WHEN MATCHED
  AND (tgt.engine_model IS NULL OR trim(tgt.engine_model) = '')
  AND src.engine_model IS NOT NULL
THEN UPDATE SET
  tgt.engine_model = src.engine_model,
  tgt.fixed        = TRUE,
  tgt.type_fix     = 'engine_model_from_lookup_by_model',
  tgt.date_fix     = current_timestamp(),
  tgt.fix_version  = coalesce(tgt.fix_version, 0) + 1
;




In [0]:
%sql
select *
from sesion1_2.layer30_silver_quarantine.dim_aircraft_quarantine



### 1.1.3 Join a Tabla Principal

%md
### 🔄 Reintegración de registros corregidos en la capa Silver

Una vez completadas las correcciones en la tabla de **cuarentena**, los registros marcados como `fixed = TRUE` pueden ser **reincorporados** a la tabla principal de la capa **Silver**.  

Para ello, se realiza una operación de tipo **MERGE**, que compara ambas tablas (`Silver` y `Quarantine`) en base a la clave primaria `aircraft_id`.  
Este proceso actualiza los registros ya existentes y añade aquellos que aún no estaban presentes en la tabla destino.

El flujo resultante es:

1. **Seleccionar** los registros corregidos en cuarentena (`fixed = TRUE`).  
2. **Actualizar** en la tabla Silver los campos modificados.  
3. **Insertar** los nuevos registros que aún no existían.  

De esta forma, los datos recuperados pasan a formar parte de la tabla **`dim_aircraft`** validada, garantizando trazabilidad, control de versiones y consistencia con el proceso de validación general.


In [0]:
%sql
MERGE INTO sesion1_2.layer30_silver.dim_aircraft AS tgt
USING (
  SELECT
    aircraft_id, tail_number, model, manufacturer,
    engine_model, first_service_dt, status,
    cycles_total, hours_total
  FROM sesion1_2.layer30_silver_quarantine.dim_aircraft_quarantine
  WHERE fixed = TRUE
) AS src
ON tgt.aircraft_id = src.aircraft_id

WHEN MATCHED THEN UPDATE SET
  tgt.tail_number      = src.tail_number,
  tgt.model            = src.model,
  tgt.manufacturer     = src.manufacturer,
  tgt.engine_model     = src.engine_model,
  tgt.first_service_dt = src.first_service_dt,
  tgt.status           = src.status,
  tgt.cycles_total     = src.cycles_total,
  tgt.hours_total      = src.hours_total

WHEN NOT MATCHED THEN INSERT (
  aircraft_id, tail_number, model, manufacturer,
  engine_model, first_service_dt, status,
  cycles_total, hours_total
) VALUES (
  src.aircraft_id, src.tail_number, src.model, src.manufacturer,
  src.engine_model, src.first_service_dt, src.status,
  src.cycles_total, src.hours_total
);




In [0]:
%sql
DELETE FROM sesion1_2.layer30_silver_quarantine.dim_aircraft_quarantine
WHERE fixed = TRUE;



## 1.2 facts_sensors


Tras haber completado la recuperación y actualización de los registros en **`dim_aircraft`**, continuaremos con la revisión de la siguiente tabla del flujo.  
El objetivo es **aplicar la misma lógica de análisis, corrección y reintegración**, adaptándola a las particularidades de cada dataset.

En esta nueva sección, identificaremos los principales errores detectados, propondremos las reglas de corrección y prepararemos los datos para su posterior promoción a la capa **Silver**.


In [0]:
%sql
select *
from sesion1_2.layer30_silver_quarantine.fact_engine_sensor_quarantine



### 1.2.0 Creamos columnas de trazabilidad de fallos

de la misma manera creamos las mismas columnas que nos ayuden con la trazabilidad de errores

In [0]:
table = "sesion1_2.layer30_silver_quarantine.fact_engine_sensor_quarantine"
existing = {f.name.lower() for f in spark.table(table).schema}

to_add = []
if "fixed" not in existing:        to_add.append("fixed BOOLEAN")
if "type_fix" not in existing:     to_add.append("type_fix STRING")
if "date_fix" not in existing:     to_add.append("date_fix TIMESTAMP")
if "fix_version" not in existing:  to_add.append("fix_version INT")

if to_add:
    spark.sql(f"ALTER TABLE {table} ADD COLUMNS ({', '.join(to_add)})")



### 1.2.1 Caso de corrección: altitudes anómalas

Se han identificado registros con valores de **altitud superiores a 50.000 pies**, considerados atípicos para operaciones estándar.  
Sin embargo, ciertos modelos específicos de aeronaves sí pueden alcanzar dichas altitudes, por lo que **estos casos se consideran válidos**.

Por tanto, la regla de corrección consiste en:
- **Marcar como erróneos** los registros con `altitude > 50000`,  
- **Exceptuando** aquellos cuyo `model` pertenezca al grupo de aeronaves `737-800`.

Esta validación permite mantener la integridad de los datos sin penalizar los comportamientos esperables en modelos concretos.


``/!\ EJERCICIO``

In [0]:
%sql
MERGE INTO sesion1_2.layer30_silver_quarantine.fact_engine_sensor_quarantine AS tgt
USING sesion1_2.layer30_silver.dim_aircraft AS src
ON tgt.aircraft_id = src.aircraft_id

WHEN MATCHED
  AND src.model = '737-800'
  AND tgt.altitude_ft > 50000
THEN UPDATE SET
  tgt.fixed       = TRUE,
  tgt.type_fix    = 'altitude_flag_over_50000_for_b737',
  tgt.date_fix    = current_timestamp(),
  tgt.fix_version = 1
;




### 1.2.2 Caso de corrección: validación de velocidad indicada (*IAS_KTS*)

La columna `ias_kts` representa la **velocidad indicada en nudos** (*Indicated Air Speed*).  
En nuestro pipeline, el rango operativo definido se encuentra entre **0 y 600 nudos**, considerándose cualquier valor fuera de ese intervalo como anómalo.

No obstante, se han identificado **excepciones operativas** en las que ciertas combinaciones de *fase de vuelo* y *modelo de aeronave* pueden justificar valores fuera de rango.  

Por tanto, la regla de corrección es la siguiente:
- **Exceptuar** aquellos casos en los que la columna `flight_phase` sea `"CLIMB"` o `"DESCENT"`

De esta manera, se preserva la coherencia del dataset sin penalizar comportamientos técnicamente válidos bajo condiciones específicas.


``/!\ EJERCICIO``

In [0]:
%sql
-- Corregir registros con ias_kts > 600 en fases de despegue o aterrizaje
MERGE INTO sesion1_2.layer30_silver_quarantine.fact_engine_sensor_quarantine AS tgt
USING (
  SELECT reading_id
  FROM sesion1_2.layer30_silver_quarantine.fact_engine_sensor_quarantine
  WHERE ias_kts > 600
    AND lower(phase_of_flight) IN ('CLIMB', 'DESCENT')
) AS src
ON tgt.reading_id = src.reading_id

WHEN MATCHED THEN
  UPDATE SET
    tgt.fixed        = TRUE,
    tgt.type_fix     = 'ias_overlimit_takeoff_landing',
    tgt.date_fix     = current_timestamp(),
    tgt.fix_version  = 1;




### 1.2.3 Join a Tabla Principal

In [0]:
%sql
-- 1) Upsert de filas corregidas (fixed=TRUE) desde la cuarentena a silver
MERGE INTO sesion1_2.layer30_silver.fact_engine_sensor AS tgt
USING (
  SELECT
    reading_id,
    aircraft_id,
    event_ts,
    event_date,
    engine_pos,
    phase_of_flight,
    altitude_ft,
    ias_kts,
    egt_c,
    n1_pct,
    fuel_flow_kg_h,
    vib_ips
  FROM sesion1_2.layer30_silver_quarantine.fact_engine_sensor_quarantine
  WHERE fixed = TRUE
) AS src
ON tgt.reading_id = src.reading_id

WHEN MATCHED THEN UPDATE SET
  tgt.aircraft_id     = src.aircraft_id,
  tgt.event_ts        = src.event_ts,
  tgt.event_date      = src.event_date,
  tgt.engine_pos      = src.engine_pos,
  tgt.phase_of_flight = src.phase_of_flight,
  tgt.altitude_ft     = src.altitude_ft,
  tgt.ias_kts         = src.ias_kts,
  tgt.egt_c           = src.egt_c,
  tgt.n1_pct          = src.n1_pct,
  tgt.fuel_flow_kg_h  = src.fuel_flow_kg_h,
  tgt.vib_ips         = src.vib_ips

WHEN NOT MATCHED THEN INSERT (
  reading_id, aircraft_id, event_ts, event_date, engine_pos, phase_of_flight,
  altitude_ft, ias_kts, egt_c, n1_pct, fuel_flow_kg_h, vib_ips
) VALUES (
  src.reading_id, src.aircraft_id, src.event_ts, src.event_date, src.engine_pos, src.phase_of_flight,
  src.altitude_ft, src.ias_kts, src.egt_c, src.n1_pct, src.fuel_flow_kg_h, src.vib_ips
);

-- 2) (Opcional) limpiar cuarentena de lo ya aplicado
DELETE FROM sesion1_2.layer30_silver_quarantine.fact_engine_sensor_quarantine
WHERE fixed = TRUE;




## 1.3 facts_maintenance

In [0]:
%sql
select *
from sesion1_2.layer30_silver_quarantine.fact_maintenance_event_quarantine

