# A Simple Ingestion Engine for the Flights Dataset

This project consists of an implementation of an ingestion engine skeleton for JSON files received daily with information about flights that took place on the day indicated in the file name between two airports in the United States.
* The structure of each JSON can be consulted by opening any of the files with a text editor.
* The meaning of each column can be found in the config.json file located in the config folder of the repository.

This notebook only validates that the code of the ingestion engine with which the package was generated is correct.

In [None]:
!pip install loguru==0.7.1


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


## Wheel Installation in the Cluster

In [None]:
!pip install --force-reinstall ../motor_ingesta-0.1.0-py3-none-any.whl

Processing /Users/orr21/Documents/UCM/Master/Spark/spark-tarea-final/motor_ingesta-0.1.0-py3-none-any.whl
Installing collected packages: motor-ingesta
  Attempting uninstall: motor-ingesta
    Found existing installation: motor-ingesta 0.1.0
    Uninstalling motor-ingesta-0.1.0:
      Successfully uninstalled motor-ingesta-0.1.0
Successfully installed motor-ingesta-0.1.0

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


### Testing File Ingestion

**Exercise 1 (2 points)**. The **`2023-01-01.json`** file is ingested using the complete ingestion engine. A MotorIngesta class object is created and the `ingesta_fichero` method is used, leaving the result in the `flights_df` variable. The `flujo_diario` variable contains a FlujoDiario object initialized with the previous configuration path, and it is used here only to properly read the configuration and pass it to the `motor_ingesta` object as the config argument.

* This exercise required having previously completed the Python package code and generated the .whl file, and therefore, the exercise score is due to that work.

In [None]:
from motor_ingesta.motor_ingesta import MotorIngesta
import json

path_config_flujo_diario = "../config/config.json"       # path to the config.json file, which does not belong to the package
path_json_primer_dia = "../data/landing/2023-01-01.json"          # path to the JSON file for a specific day we want to ingest, in our case 2023-01-01.json

with open(path_config_flujo_diario) as f:
    flujo_diario = json.load(f)
motor_ingesta = MotorIngesta(flujo_diario)
flights_df = motor_ingesta.ingesta_fichero(path_json_primer_dia)

flights_df.show(5)

25/09/11 16:22:21 WARN Utils: Your hostname, MacBook-Pro-de-Oscar.local resolves to a loopback address: 127.0.0.1; using 192.168.1.112 instead (on interface en0)
25/09/11 16:22:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/11 16:22:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/09/11 16:22:24 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----------+-----------------+---------------+------+---------------+-----------+-------------+----+---------------+---------+--------+-------+--------+-------+---------+--------+-------+--------+
|FlightDate|Reporting_Airline|OriginAirportID|Origin| OriginCityName|OriginState|DestAirportID|Dest|   DestCityName|DestState|DepDelay|DepTime|ArrDelay|ArrTime|Cancelled|Diverted|AirTime|Distance|
+----------+-----------------+---------------+------+---------------+-----------+-------------+----+---------------+---------+--------+-------+--------+-------+---------+--------+-------+--------+
|2023-01-01|               9E|          12884|   LAN|    Lansing, MI|         MI|        11433| DTW|    Detroit, MI|       MI|      -6|   1136|     -22|   1215|    false|   false|     21|      74|
|2023-01-01|               DL|          13487|   MSP|Minneapolis, MN|         MN|        13495| MSY|New Orleans, LA|       LA|      -1|   1015|     -17|   1244|    false|   false|    133|    1039|
|2023-01-01|   

In [None]:
assert(flights_df.count() == 15856)
assert(len(flights_df.columns) == 18)
dtypes = dict(flights_df.dtypes)
assert(dtypes["Diverted"] == "boolean")
assert(dtypes["ArrTime"] == "int")
assert(flights_df.schema["Dest"].metadata == {"comment": "Destination Airport IATA code (3 letters)"})

### Testing the Function to Add Time in UTC Format

**Exercise 2 (2 points)** Testing the function to add UTC time with the `flights_df` DataFrame built previously. The result is stored in the `flights_with_utc` variable. Note that this is not strictly a unit test.

* This exercise required having previously completed the code for the `aniade_hora_utc` function in the Python package and generated the .whl file, and therefore, the exercise score is due to that work.

In [None]:
from motor_ingesta.agregaciones import aniade_hora_utc
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Actividad Spark") \
    .getOrCreate()

flights_with_utc = aniade_hora_utc(spark, flights_df)

flights_with_utc.show(5)

25/09/11 16:22:26 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+----------+-----------------+---------------+------+---------------+-----------+-------------+----+---------------+---------+--------+-------+--------+-------+---------+--------+-------+--------+-------------------+
|FlightDate|Reporting_Airline|OriginAirportID|Origin| OriginCityName|OriginState|DestAirportID|Dest|   DestCityName|DestState|DepDelay|DepTime|ArrDelay|ArrTime|Cancelled|Diverted|AirTime|Distance|         FlightTime|
+----------+-----------------+---------------+------+---------------+-----------+-------------+----+---------------+---------+--------+-------+--------+-------+---------+--------+-------+--------+-------------------+
|2023-01-01|               9E|          12884|   LAN|    Lansing, MI|         MI|        11433| DTW|    Detroit, MI|       MI|      -6|   1136|     -22|   1215|    false|   false|     21|      74|2023-01-01 16:36:00|
|2023-01-01|               DL|          13487|   MSP|Minneapolis, MN|         MN|        13495| MSY|New Orleans, LA|       LA|      

In [None]:
from pyspark.sql import functions as F
assert(flights_with_utc.where("FlightTime is null").count() == 266)
types = dict(flights_with_utc.dtypes)
assert(flights_with_utc.dtypes[18] == ("FlightTime", "timestamp"))

first_row = flights_with_utc.where("OriginAirportID = 12884").select(F.min("FlightTime").cast("string").alias("FlightTime")).first()
assert(first_row.FlightTime == "2023-01-01 10:59:00")

## Testing the Function to Add Columns with Next Flight Time, Airline, and Elapsed Time Interval

**Exercise 3 (2.5 points)** The function to add intervals by airport is invoked, starting from the `flights_with_utc` variable from the previous section, storing the result returned by the function in the cached `df_with_next_flight` variable.

* This exercise required having previously completed the code for the `aniade_intervalos_por_aeropuerto` function in the Python package and generated the .whl file, and therefore, the exercise score is due to that work.

In [None]:
from motor_ingesta.agregaciones import aniade_intervalos_por_aeropuerto

df_with_next_flight = aniade_intervalos_por_aeropuerto(flights_with_utc)

df_with_next_flight.show(10, False)

+----------+-----------------+---------------+------+------------------------------+-----------+-------------+----+---------------------+---------+--------+-------+--------+-------+---------+--------+-------+--------+-------------------+-------------------+------------+---------+
|FlightDate|Reporting_Airline|OriginAirportID|Origin|OriginCityName                |OriginState|DestAirportID|Dest|DestCityName         |DestState|DepDelay|DepTime|ArrDelay|ArrTime|Cancelled|Diverted|AirTime|Distance|FlightTime         |FlightTime_next    |Airline_next|diff_next|
+----------+-----------------+---------------+------+------------------------------+-----------+-------------+----+---------------------+---------+--------+-------+--------+-------+---------+--------+-------+--------+-------------------+-------------------+------------+---------+
|2023-01-01|9E               |10135          |ABE   |Allentown/Bethlehem/Easton, PA|PA         |10397        |ATL |Atlanta, GA          |GA       |1       |6

In [None]:
assert(df_with_next_flight.dtypes[19] == ("FlightTime_next", "timestamp"))
assert(df_with_next_flight.dtypes[20] == ("Airline_next", "string"))
assert(df_with_next_flight.dtypes[21] == ("diff_next", "bigint"))

first_row = df_with_next_flight.where("OriginAirportID = 12884")\
                               .select(F.col("FlightTime").cast("string"), 
                                       F.col("FlightTime_next").cast("string"), 
                                       F.col("Airline_next"),
                                       F.col("diff_next")).sort("FlightTime").first()

assert(first_row.FlightTime_next == "2023-01-01 16:36:00")
assert(first_row.Airline_next == "9E")
assert(first_row.diff_next == 20220)

## Fixing Null Values in the 3 `_next` Columns for the Last Flight of Each Airport Each Day

**Exercise 4 (2.5 points)**

As the daily flow logic is currently implemented, the last flight of each day will not have the FlightTime_next column populated because data from the next day is not yet available. This behavior is fixed to resolve the null values by modifying the code of the `procesa_diario` method so that, before writing the current day's data, the three `_next` columns in the data from the day before the one we are ingesting have been corrected. A simple (though not necessarily optimal) way to achieve this was:
* Reading from the table the partition that was written the previous day, if such table and partition exist.
* Adding to the DataFrame returned by `aniade_hora_utc` the 3 columns it lacks to have the same structure as the table, which are `FlightTime_next`, `Airline_next`, and `diff_next` (they can be in that order if the `aniade_intervalos_por_aeropuerto` function was implemented to add them in that order), but without giving them a value (with None value, converting each column to the appropriate data type so that it later fits with the existing table).
* Unioning the DataFrame from the previous day with the one just calculated.
* Invoking `aniade_intervalos_por_aeropuerto` passing as argument the DataFrame resulting from the union.

Apart from a unit test (which is left as optional but without points), functionality is verified by invoking `procesa_diario` from the daily flow, with files from two consecutive days, and then checking what has been written to the table after ingesting the second file. It is tested with days 1 and 2 of January 2023.

* This exercise required having previously completed the Python package code and generated the .whl file, and therefore, the exercise score is due to that work.

In [None]:
from motor_ingesta.flujo_diario import FlujoDiario

ruta_config = "../config/config.json"
path_json_primer_dia = "../data/landing/2023-01-01.json"

flujo = FlujoDiario(ruta_config)
flujo.procesa_diario(path_json_primer_dia)

[32m2025-09-11 16:22:30.056[0m | [1mINFO    [0m | [36mmotor_ingesta.flujo_diario[0m:[36mprocesa_diario[0m:[36m46[0m - [1mNo se han podido leer datos del día 2022-12-31: [TABLE_OR_VIEW_NOT_FOUND] The table or view `default`.`flights` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.;
'UnresolvedRelation [default, flights], [], false
[0m
                                                                                

In [None]:
path_json_segundo_dia = "../data/landing/2023-01-02.json"  # path to the 2023-01-02.json file

flujo.procesa_diario(path_json_segundo_dia)

[32m2025-09-11 16:22:31.780[0m | [1mINFO    [0m | [36mmotor_ingesta.flujo_diario[0m:[36mprocesa_diario[0m:[36m44[0m - [1mLeída partición del día 2023-01-01 con éxito[0m


In [None]:
vuelos = spark.read.table("default.flights").sort("Origin", "FlightTime")
assert(vuelos.count() == 33931)
row = vuelos.where("FlightDate = '2023-01-01' and  Origin = 'ABE' and DepTime = 1734").first()
assert(row.diff_next == 44220)

25/09/11 16:22:38 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


## Optional Exercise

**Optional Exercise (1 point)** The four unit tests found in the `test_ingesta.py` file are completed. No additional code is written in this notebook.

- A final grade of up to 9.0 points can be achieved without solving this exercise.