# Flights Brasil

In [19]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window

In [109]:
class OverlapDetector:
    """Class to detect overlaps in a DataFrame using PySpark"""

    def __init__(self):
        self.spark = SparkSession.builder.appName("OverlapDetector").getOrCreate()

    def open_file(self, file: str):
        """
        Opens the file and returns a Spark DataFrame.
        
        Parameters:
        file (str): Path to the file to open.
        encoding (str): File encoding format.

        Returns:
        DataFrame: Spark DataFrame with the file's data.
        """
        data = self.spark.read.csv(file, header=True, inferSchema=True, encoding='utf-8')
        return data

    def filter_data_by_year(self, data, date_column: str, year_value: int):
        """
        Filters data by the given year.
        
        Parameters:
        data (DataFrame): Spark DataFrame.
        date_column (str): The name of the column containing date information.
        year_value (int): Year to filter the data by.

        Returns:
        DataFrame: Spark DataFrame filtered by the given year.
        """
        data = data.withColumn("year", F.year(F.col(date_column)))
        data_year = data.filter(F.col("year") == year_value)
        return data_year

    def detect_overlap(self, sample, start_column: str, end_column: str):
        """
        Detects overlaps in the Spark DataFrame using lag function.
        
        Parameters:
        sample (DataFrame): Spark DataFrame with the conditioned and filtered flight data.
        start_column (str): The name of the column containing the start time.
        end_column (str): The name of the column containing the end time.

        Returns:
        DataFrame: DataFrame with overlap details.
        """
        window_spec = Window.orderBy(start_column)
        
        # Lagging the start and end columns
        sample = sample.withColumn("prev_start", lag(start_column).over(window_spec))
        sample = sample.withColumn("prev_end", lag(end_column).over(window_spec))
        
        # Filtering where there is an overlap
        overlap_condition = (col(start_column) < col("prev_end")) & (col(end_column) > col("prev_start"))
        overlaps = sample.filter(overlap_condition)
        
        return overlaps

In [33]:
detector = OverlapDetector()

In [34]:
file="./vuelos_brasil.csv"

In [35]:
data = detector.open_file(file)

In [37]:
data = data.withColumn(f"`{date_column}`", F.to_timestamp(F.col(f"`{date_column}`")))
data = data.withColumn(f"`{start_column}`", F.to_timestamp(F.col(f"`{start_column}`")))
data = data.withColumn(f"`{end_column}`", F.to_timestamp(F.col(f"`{end_column}`")))

In [52]:
original_columns = data.columns
new_columns = [col.replace('.', '_') for col in original_columns]

# Renombrar las columnas
for original, new in zip(original_columns, new_columns):
    data = data.withColumnRenamed(original, new)

In [53]:
data.show()

+---------+--------------------+-----------------+-------------------+--------------------+-------------------+--------------------+------------+--------------------+----------------+--------------------+-------------+--------------+-----------------+--------------------+--------------+--------------+-----------+-----------+-----------+-----------+-------------------+-------------------+-------------------+
|     Voos|     Companhia_Aerea|Codigo_Tipo_Linha|   Partida_Prevista|        Partida_Real|   Chegada_Prevista|        Chegada_Real|Situacao_Voo|Codigo_Justificativa|Aeroporto_Origem|       Cidade_Origem|Estado_Origem|   Pais_Origem|Aeroporto_Destino|      Cidade_Destino|Estado_Destino|  Pais_Destino|   LongDest|    LatDest|   LongOrig|    LatOrig| `Partida_Prevista`|     `Partida_Real`|     `Chegada_Real`|
+---------+--------------------+-----------------+-------------------+--------------------+-------------------+--------------------+------------+--------------------+------------

## Data exploration

In [38]:
data.printSchema()

root
 |-- Voos: string (nullable = true)
 |-- Companhia.Aerea: string (nullable = true)
 |-- Codigo.Tipo.Linha: string (nullable = true)
 |-- Partida.Prevista: timestamp (nullable = true)
 |-- Partida.Real: string (nullable = true)
 |-- Chegada.Prevista: timestamp (nullable = true)
 |-- Chegada.Real: string (nullable = true)
 |-- Situacao.Voo: string (nullable = true)
 |-- Codigo.Justificativa: string (nullable = true)
 |-- Aeroporto.Origem: string (nullable = true)
 |-- Cidade.Origem: string (nullable = true)
 |-- Estado.Origem: string (nullable = true)
 |-- Pais.Origem: string (nullable = true)
 |-- Aeroporto.Destino: string (nullable = true)
 |-- Cidade.Destino: string (nullable = true)
 |-- Estado.Destino: string (nullable = true)
 |-- Pais.Destino: string (nullable = true)
 |-- LongDest: double (nullable = true)
 |-- LatDest: double (nullable = true)
 |-- LongOrig: double (nullable = true)
 |-- LatOrig: double (nullable = true)
 |-- `Partida.Prevista`: timestamp (nullable = true)


In [45]:
type(data)

pyspark.sql.dataframe.DataFrame

In [39]:
data.show(5)

+---------+--------------------+-----------------+-------------------+--------------------+-------------------+--------------------+------------+--------------------+----------------+--------------------+-------------+-----------+-----------------+--------------+--------------+--------------+-----------+-----------+-----------+-----------+-------------------+-------------------+-------------------+
|     Voos|     Companhia.Aerea|Codigo.Tipo.Linha|   Partida.Prevista|        Partida.Real|   Chegada.Prevista|        Chegada.Real|Situacao.Voo|Codigo.Justificativa|Aeroporto.Origem|       Cidade.Origem|Estado.Origem|Pais.Origem|Aeroporto.Destino|Cidade.Destino|Estado.Destino|  Pais.Destino|   LongDest|    LatDest|   LongOrig|    LatOrig| `Partida.Prevista`|     `Partida.Real`|     `Chegada.Real`|
+---------+--------------------+-----------------+-------------------+--------------------+-------------------+--------------------+------------+--------------------+----------------+-------------

In [41]:
data.head()

Row(Voos='AAL - 203', Companhia.Aerea='AMERICAN AIRLINES INC', Codigo.Tipo.Linha='Internacional', Partida.Prevista=datetime.datetime(2016, 1, 30, 8, 58), Partida.Real='2016-01-30T08:58:00Z', Chegada.Prevista=datetime.datetime(2016, 1, 30, 10, 35), Chegada.Real='2016-01-30T10:35:00Z', Situacao.Voo='Realizado', Codigo.Justificativa='NA', Aeroporto.Origem='Afonso Pena', Cidade.Origem='Sao Jose Dos Pinhais', Estado.Origem='PR', Pais.Origem='Brasil', Aeroporto.Destino='Salgado Filho', Cidade.Destino='Porto Alegre', Estado.Destino='RS', Pais.Destino='Brasil', LongDest=-51.1753811, LatDest=-29.9934732, LongOrig=-49.1724811, LatOrig=-25.5327132, `Partida.Prevista`=datetime.datetime(2016, 1, 30, 8, 58), `Partida.Real`=datetime.datetime(2016, 1, 30, 8, 58), `Chegada.Real`=datetime.datetime(2016, 1, 30, 10, 35))

In [54]:
data.describe().show()

+-------+----------+--------------------+-----------------+--------------------+--------------------+------------+--------------------+--------------------+--------------------+-------------+-------------+--------------------+--------------------+--------------+-------------+------------------+-------------------+-------------------+-------------------+
|summary|      Voos|     Companhia_Aerea|Codigo_Tipo_Linha|        Partida_Real|        Chegada_Real|Situacao_Voo|Codigo_Justificativa|    Aeroporto_Origem|       Cidade_Origem|Estado_Origem|  Pais_Origem|   Aeroporto_Destino|      Cidade_Destino|Estado_Destino| Pais_Destino|          LongDest|            LatDest|           LongOrig|            LatOrig|
+-------+----------+--------------------+-----------------+--------------------+--------------------+------------+--------------------+--------------------+--------------------+-------------+-------------+--------------------+--------------------+--------------+-------------+------------

In [56]:
conteo_situacao_voo = data.groupBy("Situacao_Voo").count()
conteo_situacao_voo.orderBy(F.desc("count")).show()

+------------+-------+
|Situacao_Voo|  count|
+------------+-------+
|   Realizado|2253323|
|   Cancelado| 289196|
+------------+-------+



In [57]:
conteo_Companhia_Aerea = data.groupBy("Companhia_Aerea").count()
conteo_Companhia_Aerea.orderBy(F.desc("count")).show()

+--------------------+------+
|     Companhia_Aerea| count|
+--------------------+------+
|                 GOL|759191|
|                AZUL|742495|
|                 TAM|554099|
|      AVIANCA BRASIL|183913|
|           PASSAREDO| 70436|
|AMERICAN AIRLINES...| 22975|
|COPA -COMPANIA PA...| 21034|
|    TAP AIR PORTUGAL| 18878|
|AEROLINEAS ARGENT...| 16522|
|   MAP LINHAS AEREAS| 15675|
|     UNITED AIRLINES|  9269|
|           LAN CHILE|  9141|
|      DELTA AIRLINES|  9054|
|AUSTRAL LINEAS A�...|  8941|
|                SETE|  7720|
|          AIR FRANCE|  6819|
|TRASAMERICA  AIRL...|  5660|
|            EMIRATES|  5646|
|             AVIANCA|  5553|
|           LUFTHANSA|  4974|
+--------------------+------+
only showing top 20 rows



In [58]:
conteo_Pais_Origem = data.groupBy("Pais_Origem").count()
conteo_Pais_Origem.orderBy(F.desc("count")).show()

+--------------------+-------+
|         Pais_Origem|  count|
+--------------------+-------+
|              Brasil|2382839|
|           Argentina|  35864|
|      Estados Unidos|  31700|
|               Chile|  11943|
|              Panama|  10516|
|            Portugal|   9643|
|             Uruguai|   6514|
|                Peru|   6342|
|             Espanha|   5405|
|              Franca|   4350|
|            Colombia|   4121|
|            Alemanha|   3919|
|            Paraguai|   3231|
|              Italia|   2972|
|         Reino Unido|   2885|
|Emirados Arabes U...|   2715|
|              Mexico|   2172|
|             Bolivia|   2037|
|Republica Dominicana|   1893|
|             Holanda|   1725|
+--------------------+-------+
only showing top 20 rows



In [59]:
conteo_Pais_Destino = data.groupBy("Pais_Destino").count()
conteo_Pais_Destino.orderBy(F.desc("count")).show()

+--------------------+-------+
|        Pais_Destino|  count|
+--------------------+-------+
|              Brasil|2382826|
|           Argentina|  35893|
|      Estados Unidos|  31482|
|               Chile|  12069|
|              Panama|  10518|
|            Portugal|   9655|
|             Uruguai|   6530|
|                Peru|   6390|
|             Espanha|   5402|
|              Franca|   4353|
|            Colombia|   4139|
|            Alemanha|   3916|
|            Paraguai|   3228|
|              Italia|   2970|
|         Reino Unido|   2890|
|Emirados Arabes U...|   2715|
|              Mexico|   2199|
|             Bolivia|   2039|
|Republica Dominicana|   1891|
|             Holanda|   1726|
+--------------------+-------+
only showing top 20 rows



## Implemento la clase

In [110]:
year_value = 2015
date_column = "Partida_Prevista"
start_column = "Partida_Real"
end_column = "Chegada_Real"

In [111]:
data_year = detector.filter_data_by_year(data, date_column, year_value)

In [112]:
sample = data_year.select("Voos", date_column, start_column, end_column, "Situacao_Voo")

In [114]:
overlaps = detector.detect_overlap(sample, start_column, end_column)

In [115]:
overlaps.show()

+----------+-------------------+--------------------+--------------------+------------+--------------------+--------------------+
|      Voos|   Partida_Prevista|        Partida_Real|        Chegada_Real|Situacao_Voo|          prev_start|            prev_end|
+----------+-------------------+--------------------+--------------------+------------+--------------------+--------------------+
|AZU - 4100|2015-01-01 02:27:00|2014-12-31T02:15:00Z|2014-12-31T03:00:00Z|   Realizado|2014-12-31T01:03:00Z|2014-12-31T07:17:00Z|
|GLO - 1844|2015-01-01 00:00:00|2015-01-01T00:00:00Z|2015-01-01T02:50:00Z|   Realizado|2014-12-31T23:59:00Z|2015-01-01T00:48:00Z|
| AAL - 214|2015-01-01 00:01:00|2015-01-01T00:01:00Z|2015-01-01T07:45:00Z|   Realizado|2015-01-01T00:00:00Z|2015-01-01T02:50:00Z|
|TAM - 8053|2015-01-01 00:05:00|2015-01-01T00:05:00Z|2015-01-01T08:22:00Z|   Realizado|2015-01-01T00:01:00Z|2015-01-01T07:45:00Z|
|TAM - 3557|2015-01-01 00:05:00|2015-01-01T00:05:00Z|2015-01-01T01:45:00Z|   Realizado|201

In [117]:
overlap_count = overlaps.count()

print(f"Total number of overlaps: {overlap_count}")

Total number of overlaps: 941593
