# Proyecto Semana 6: Implementar un proceso ETL

## Modelo a Desarrollar

Para esta entrega se utilizara el modelo proporcionado por el grupo de profesores y tutores:

![Modelos](./Modelo_aeropuertos_definitivo.png)

Para saber mas informacion sobre el contenido de cada dimension, atributo y medida, puede consultar el diccionario de datos Actualizado [AQUI](./Diccionario%20IV.xlsx)

## Diseño del ETL:

Antes de empezar la implementaciond debemos trabajar en el diseño del proceso de ETL, este diseño se puede observar en la siguiente imagen:

![Diseno ETL](./EntregaGrupalS6_Diseno.png)

## Paso #1: Setup

En esta seccion se hara el setup de la tarea, incluyendo librerias y estableciendo todo los prerequisitos para la correcta ejecucion de lo esperado en la actividad.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql.types import StructType
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.types import FloatType, StringType, IntegerType, DateType
from pyspark.sql.functions import udf, col, length, isnan, when, count, regexp_replace, abs, percent_rank, row_number,concat,date_format,to_date, upper
from pyspark.sql import types as t
from pyspark.sql.window import Window
from pandas_profiling import ProfileReport
from datetime import datetime
from pyspark.sql.functions import max as spark_max
from pyspark.sql.functions import lit
from pyspark.sql.functions import array, struct, explode, floor, col, lit
from pyspark.sql import DataFrame
import pyspark.sql.functions as f
import matplotlib.pyplot as plt
import numpy as np
import os 
import pandas as pd
import math

# Configuración servidor base de datos transaccional
db_shiomar = 'Estudiante_103_202413'
pass_shiomar = 'MISO_aabb1122'
source_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/ProyectoTransaccional'
dest_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/Proyecto_G5_202413'

path_jar_driver = 'C:\Program Files (x86)\MySQL\Connector J 8.0\mysql-connector-java-8.0.28.jar'

conf=SparkConf() \
    .set('spark.driver.extraClassPath', path_jar_driver) \
    .set("spark.network.timeout", "600s") \
    .set("spark.executor.heartbeatInterval", "100s") \
    .set("spark.rpc.askTimeout", "600s") \
    .set("spark.rpc.lookupTimeout", "600s")

spark_context = SparkContext(conf=conf)
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession

def obtener_dataframe_de_bd(db_connection_string, sql, db_user, db_psswd):
    df_bd = spark.read.format('jdbc')\
        .option('url', db_connection_string) \
        .option('dbtable', sql) \
        .option('user', db_user) \
        .option('password', db_psswd) \
        .option('driver', 'com.mysql.cj.jdbc.Driver') \
        .load()
    return df_bd

def guardar_db(db_connection_string, df, tabla, db_user, db_psswd):
    df.select('*').write.format('jdbc') \
      .mode('append') \
      .option('url', db_connection_string) \
      .option('dbtable', tabla) \
      .option('user', db_user) \
      .option('password', db_psswd) \
      .option('driver', 'com.mysql.cj.jdbc.Driver') \
      .save()

  from .autonotebook import tqdm as notebook_tqdm


## Paso #2: Dimensión Aeropuerto

En este paso aplicaremos ETL a la Dimension **Aeropuerto**, su fuente de datos viene de la tabla `aeropuertos` de la base de datos de ProyectoTransaccional.

Al final del proceso de ETL se espera obtener una nueva tabla llamada `G5_Aeropuerto` en la base de datos de Proyecto_G5_202413 conteniendo las siguiente columnas:
* idAeropuerto_DWH
* Sigla
* IATA
* Nombre
* Elevacion
* NombreMunicipio
* NombreDepartamento

#### Extraccion

En esta seccion obtendremos los datos de las fuentes identificadas en nuestro diseño.

In [2]:
sql_aeropuerto = '''(SELECT DISTINCT sigla AS Sigla, iata AS IATA, nombre AS Nombre, elevacion AS Elevacion, municipio AS NombreMunicipio, departamento AS NombreDepartamento FROM ProyectoTransaccional.aeropuertos) AS Temp_aeropuerto'''
aeropuerto_df = obtener_dataframe_de_bd(source_db_connection_string, sql_aeropuerto, db_shiomar, pass_shiomar)
print('La cantidad de Registros de la tabla Aeropuertos obtenidos fueron: ' + str(aeropuerto_df.count()))
print('La tabla extraida queda de la siguiente forma:')
aeropuerto_df.show()

La cantidad de Registros de la tabla Aeropuertos obtenidos fueron: 598
La tabla extraida queda de la siguiente forma:
+-----+----+--------------------+---------+--------------------+------------------+
|Sigla|IATA|              Nombre|Elevacion|     NombreMunicipio|NombreDepartamento|
+-----+----+--------------------+---------+--------------------+------------------+
|  7FO|    |             la isla|      538|       Puerto Gaitán|              meta|
|  7FU|    |        la escondida|      564|       Puerto Gaitán|              meta|
|  7FW|    |           morichito|      720|        Hato Corozal|          casanare|
|  7FX|    |carolina del prin...|     6004|            Carolina|         antioquia|
|  7FY|    |               dubai|       82|              Ayapel|           cordoba|
|  7FZ|    |          el triunfo|      475|              Orocué|          casanare|
|  7GA|    |  baru - hidropuerto|        0| Cartagena de Indias|           bolivar|
|  7GB|    |         la carolina|      690

#### Transformacion

T1. Eliminar duplicados totales de la tabla:

Esta transformacion fue aplicada en la sentencia SQL de la etapa de Extraccion.

In [3]:
duplicates = aeropuerto_df.count() - aeropuerto_df.distinct().count()
print('La cantidad de duplicados totales es de ' + str(duplicates))

La cantidad de duplicados totales es de 0


T2. Adecuación del Formato de la Tabla según la Expectativa de Salida:

Esta transformacion fue aplicada en la sentencia SQL de la etapa de Extraccion.

In [4]:
print('La tabla actual tiene el siguiente formato: ')
aeropuerto_df.schema

La tabla actual tiene el siguiente formato: 


StructType(List(StructField(Sigla,StringType,true),StructField(IATA,StringType,true),StructField(Nombre,StringType,true),StructField(Elevacion,IntegerType,true),StructField(NombreMunicipio,StringType,true),StructField(NombreDepartamento,StringType,true)))

T3. Creación de la columna idAeropuero_DWH.

In [5]:
aeropuerto_df = aeropuerto_df.coalesce(1).withColumn('idAeropuerto_DWH', f.monotonically_increasing_id() + 1)
aeropuerto_df = aeropuerto_df.select('idAeropuerto_DWH','Sigla', 'IATA', 'Nombre', 'Elevacion', 'NombreMunicipio', 'NombreDepartamento' )
print('La tabla Transformada queda de la siguiente forma:')
aeropuerto_df.show()

La tabla Transformada queda de la siguiente forma:
+----------------+-----+----+--------------------+---------+--------------------+------------------+
|idAeropuerto_DWH|Sigla|IATA|              Nombre|Elevacion|     NombreMunicipio|NombreDepartamento|
+----------------+-----+----+--------------------+---------+--------------------+------------------+
|               1|  7FO|    |             la isla|      538|       Puerto Gaitán|              meta|
|               2|  7FU|    |        la escondida|      564|       Puerto Gaitán|              meta|
|               3|  7FW|    |           morichito|      720|        Hato Corozal|          casanare|
|               4|  7FX|    |carolina del prin...|     6004|            Carolina|         antioquia|
|               5|  7FY|    |               dubai|       82|              Ayapel|           cordoba|
|               6|  7FZ|    |          el triunfo|      475|              Orocué|          casanare|
|               7|  7GA|    |  baru - hi

T4. Rellenar valores vacíos de la columna IATA con valores por defecto.

Para esta transformacion se utilizara el IATA de `ZZZ` ya que este no existe en el listado de IATAs asignados.

In [6]:
aeropuerto_df = aeropuerto_df.withColumn("IATA", when(col("IATA") == "", "ZZZ").otherwise(col('IATA')))
print('La tabla Transformada queda de la siguiente forma:')
aeropuerto_df.show()

La tabla Transformada queda de la siguiente forma:
+----------------+-----+----+--------------------+---------+--------------------+------------------+
|idAeropuerto_DWH|Sigla|IATA|              Nombre|Elevacion|     NombreMunicipio|NombreDepartamento|
+----------------+-----+----+--------------------+---------+--------------------+------------------+
|               1|  7FO| ZZZ|             la isla|      538|       Puerto Gaitán|              meta|
|               2|  7FU| ZZZ|        la escondida|      564|       Puerto Gaitán|              meta|
|               3|  7FW| ZZZ|           morichito|      720|        Hato Corozal|          casanare|
|               4|  7FX| ZZZ|carolina del prin...|     6004|            Carolina|         antioquia|
|               5|  7FY| ZZZ|               dubai|       82|              Ayapel|           cordoba|
|               6|  7FZ| ZZZ|          el triunfo|      475|              Orocué|          casanare|
|               7|  7GA| ZZZ|  baru - hi

In [7]:
print("Verificamos los valores de IATA de la tabla resultante:")
aeropuerto_df.groupBy("IATA").count().show()

Verificamos los valores de IATA de la tabla resultante:
+----+-----+
|IATA|count|
+----+-----+
| ZZZ|  494|
| ECO|    1|
| ACD|    1|
| ACL|    1|
| ACR|    1|
| ADZ|    1|
| AFI|    1|
| APO|    1|
| ARO|    1|
| ARQ|    1|
| AUC|    1|
| BAQ|    1|
| BOG|    1|
| BUN|    1|
| CIM|    1|
| CLO|    1|
| COG|    1|
| CPL|    1|
| CTG|    1|
| CUC|    1|
+----+-----+
only showing top 20 rows



T5. Creación del Aeropuerto con idAeropuerto_DWH cero, para ser usado con Aeropuertos Internacionales sin datos.

Esta transformacion tiene la intencio de crear un campo para los posibles valores en done la tabla de `HechosVuelos` no tenga valores  y evitar a toda costa que tenga campos vacios, null o NA.

In [8]:
aero_0 = [('0','INT','ZZZ','Missing','0','Internacional','Internacional')]
columns = ['idAeropuerto_DWH','Sigla', 'IATA', 'Nombre', 'Elevacion', 'NombreMunicipio', 'NombreDepartamento']
aero_0 = spark.createDataFrame(data=aero_0,schema=columns)
aeropuerto_df = aeropuerto_df.union(aero_0)
aeropuerto_df = aeropuerto_df.withColumn('idAeropuerto_DWH',col('idAeropuerto_DWH').cast('int')).orderBy(col('idAeropuerto_DWH'))
print('La tabla Transformada queda de la siguiente forma:')
aeropuerto_df.show()

La tabla Transformada queda de la siguiente forma:
+----------------+-----+----+--------------------+---------+--------------------+------------------+
|idAeropuerto_DWH|Sigla|IATA|              Nombre|Elevacion|     NombreMunicipio|NombreDepartamento|
+----------------+-----+----+--------------------+---------+--------------------+------------------+
|               0|  INT| ZZZ|             Missing|        0|       Internacional|     Internacional|
|               1|  7FO| ZZZ|             la isla|      538|       Puerto Gaitán|              meta|
|               2|  7FU| ZZZ|        la escondida|      564|       Puerto Gaitán|              meta|
|               3|  7FW| ZZZ|           morichito|      720|        Hato Corozal|          casanare|
|               4|  7FX| ZZZ|carolina del prin...|     6004|            Carolina|         antioquia|
|               5|  7FY| ZZZ|               dubai|       82|              Ayapel|           cordoba|
|               6|  7FZ| ZZZ|          e

#### Carga
Se carga el dataframe procesado en una nueva tabla segun el diseño de ETL.

In [9]:
guardar_db(dest_db_connection_string, aeropuerto_df,'Proyecto_G5_202413.G5_Aeropuerto', db_shiomar, pass_shiomar)

## Paso #3: Dimensión Tipo Vuelo

En este paso aplicaremos ETL a la Dimension **TipoVuelo**, su fuente de datos viene de la tabla `Vuelos` de la base de datos de ProyectoTransaccional.

Al final del proceso de ETL se espera obtener una nueva tabla llamada `G5_TipoVuelo` en la base de datos de Proyecto_G5_202413 conteniendo las siguiente columnas:
* idTipoVuelo_DWH
* idTipovuelo_T
* nombreTipo
* tipoEquipo

#### Extraccion

En esta seccion obtendremos los datos de las fuentes identificadas en nuestro diseño.

T1. Se crea un dataframe que contiene unicamente las columnas requeridas tipo_vuelo, tipo_equipo

In [10]:
sql_vuelos = '''(SELECT  tipo_vuelo, tipo_equipo FROM ProyectoTransaccional.vuelos) AS Temp_vuelos'''
vuelos_df = obtener_dataframe_de_bd(source_db_connection_string, sql_vuelos, db_shiomar, pass_shiomar)
print('La cantidad de Registros de la tabla Vuelos obtenidos fueron: ' + str(vuelos_df.count()))
print('El dataFrame extraido queda de la siguiente forma:')
vuelos_df.show()

La cantidad de Registros de la tabla Vuelos obtenidos fueron: 563265
El dataFrame extraido queda de la siguiente forma:
+----------+-----------+
|tipo_vuelo|tipo_equipo|
+----------+-----------+
|         R|       JS32|
|         R|       JS32|
|         R|       JS32|
|         R|       JS32|
|         R|       DHC6|
|         R|       DHC6|
|         R|       A306|
|         R|       DHC6|
|         R|       A306|
|         R|       DHC6|
|         R|       A306|
|         R|       DHC6|
|         R|       A306|
|         R|       DHC6|
|         R|       A306|
|         R|       DHC6|
|         R|       A306|
|         R|       DHC6|
|         R|       A306|
|         R|       DHC6|
+----------+-----------+
only showing top 20 rows



#### Transformacion

T2. Eliminar duplicados totales de la tabla:

In [11]:
duplicates = vuelos_df.count() - vuelos_df.distinct().count()
vuelos_df_whitout_duplicates = vuelos_df.distinct()
print('La cantidad de duplicados totales es de ' + str(duplicates))
print('Dataframe sin duplicados')
vuelos_df_whitout_duplicates.show()
print('Cantidad Final: '+str(vuelos_df_whitout_duplicates.count()))

La cantidad de duplicados totales es de 562893
Dataframe sin duplicados
+----------+-----------+
|tipo_vuelo|tipo_equipo|
+----------+-----------+
|         T|       D28T|
|         C|       A320|
|         C|       A343|
|         T|       GLF2|
|         R|       DC95|
|         R|       B733|
|         T|       AC90|
|         A|       DH8D|
|         A|       E170|
|         T|       AN26|
|         A|        A30|
|         R|        747|
|         A|       B734|
|         R|       C295|
|         T|       C170|
|         R|       B731|
|         T|       C25C|
|         A|        AT4|
|         R|       A318|
|         R|       DC10|
+----------+-----------+
only showing top 20 rows

Cantidad Final: 372


T3. Se crea columna nombretipo

In [12]:
#creamos DF con el diccionario dado por el negocio
df = pd.DataFrame({'tipo_vuelo': ['R','T','C','A'], 'nombreTipo': ['regular', 'taxi', 'chárter', 'adicionales']})

#mostrar DatraFrame
diccionarioTipoVuelo=spark.createDataFrame(df)
diccionarioTipoVuelo.show()

+----------+-----------+
|tipo_vuelo| nombreTipo|
+----------+-----------+
|         R|    regular|
|         T|       taxi|
|         C|    chárter|
|         A|adicionales|
+----------+-----------+



In [13]:
# Creamos un join con el diccionario tipovuelos
vuelos_df_whitout_duplicates = vuelos_df_whitout_duplicates.join(diccionarioTipoVuelo,how = 'inner',  on='tipo_vuelo')
# mostramos el DF final
print(vuelos_df_whitout_duplicates.show())

+----------+-----------+----------+
|tipo_vuelo|tipo_equipo|nombreTipo|
+----------+-----------+----------+
|         R|       A320|   regular|
|         R|       B720|   regular|
|         R|       B753|   regular|
|         R|       DH8D|   regular|
|         R|       DH8A|   regular|
|         R|        Y12|   regular|
|         R|       AT75|   regular|
|         R|        F50|   regular|
|         R|        DC3|   regular|
|         R|        DC8|   regular|
|         R|       MD83|   regular|
|         R|       DHC3|   regular|
|         R|       DC93|   regular|
|         R|       B463|   regular|
|         R|        F10|   regular|
|         R|       B773|   regular|
|         R|       CDC6|   regular|
|         R|        767|   regular|
|         R|       DHC2|   regular|
|         R|       B788|   regular|
+----------+-----------+----------+
only showing top 20 rows

None


T4. Adecuación del Formato de la Tabla según la Expectativa de Salida:

In [14]:
#Agregar columna idTipoVuelo_DWH
vuelos_df_whitout_duplicates = vuelos_df_whitout_duplicates.coalesce(1).withColumn('idTipoVuelo_DWH', f.monotonically_increasing_id() + 1)
#Agregar columna idTipovuelo_T
vuelos_df_whitout_duplicates = vuelos_df_whitout_duplicates.coalesce(1).withColumn('idTipovuelo_T', concat(vuelos_df_whitout_duplicates.tipo_vuelo,vuelos_df_whitout_duplicates.tipo_equipo))
# mostramos el DF final
print(vuelos_df_whitout_duplicates.show())

+----------+-----------+----------+---------------+-------------+
|tipo_vuelo|tipo_equipo|nombreTipo|idTipoVuelo_DWH|idTipovuelo_T|
+----------+-----------+----------+---------------+-------------+
|         R|       A320|   regular|              1|        RA320|
|         R|       B720|   regular|              2|        RB720|
|         R|       B753|   regular|              3|        RB753|
|         R|       DH8D|   regular|              4|        RDH8D|
|         R|       DH8A|   regular|              5|        RDH8A|
|         R|        Y12|   regular|              6|         RY12|
|         R|       AT75|   regular|              7|        RAT75|
|         R|        F50|   regular|              8|         RF50|
|         R|        DC3|   regular|              9|         RDC3|
|         R|        DC8|   regular|             10|         RDC8|
|         R|       MD83|   regular|             11|        RMD83|
|         R|       DHC3|   regular|             12|        RDHC3|
|         

In [15]:
print('La tabla actual tiene el siguiente formato: ')
vuelos_df_whitout_duplicates.schema

La tabla actual tiene el siguiente formato: 


StructType(List(StructField(tipo_vuelo,StringType,true),StructField(tipo_equipo,StringType,true),StructField(nombreTipo,StringType,true),StructField(idTipoVuelo_DWH,LongType,false),StructField(idTipovuelo_T,StringType,true)))

T5. Se seleccionan columnas de salida de la transformacion con los nombres finales.

In [16]:
#dataframe final
df_tipo_vuelos = vuelos_df_whitout_duplicates.selectExpr('idTipoVuelo_DWH AS idTipoVuelo_DWH', 'idTipovuelo_T AS idTipovuelo_T', 'nombreTipo AS nombreTipo', 'tipo_equipo AS tipoEquipo')
print('La tabla final: ')
df_tipo_vuelos.show()

La tabla final: 
+---------------+-------------+----------+----------+
|idTipoVuelo_DWH|idTipovuelo_T|nombreTipo|tipoEquipo|
+---------------+-------------+----------+----------+
|              1|        RA320|   regular|      A320|
|              2|        RB720|   regular|      B720|
|              3|        RB753|   regular|      B753|
|              4|        RDH8D|   regular|      DH8D|
|              5|        RDH8A|   regular|      DH8A|
|              6|         RY12|   regular|       Y12|
|              7|        RAT75|   regular|      AT75|
|              8|         RF50|   regular|       F50|
|              9|         RDC3|   regular|       DC3|
|             10|         RDC8|   regular|       DC8|
|             11|        RMD83|   regular|      MD83|
|             12|        RDHC3|   regular|      DHC3|
|             13|        RDC93|   regular|      DC93|
|             14|        RB463|   regular|      B463|
|             15|         RF10|   regular|       F10|
|          

#### Carga
Se carga el dataframe procesado en una nueva tabla segun el diseño de ETL.

In [17]:
# CARGUE
guardar_db(dest_db_connection_string, df_tipo_vuelos,'Proyecto_G5_202413.G5_TipoVuelo', db_shiomar, pass_shiomar)

## Paso #4: Dimensión Trafico

En este paso aplicaremos ETL a la **Dimension Trafico** ,  su fuente de datos viene de la tabla Vuelos de la base de datos de ProyectoTransaccional.

Al final del proceso de ETL se espera obtener una nueva tabla llamada G5_TipoVuelo en la base de datos de Proyecto_G5_202413 conteniendo las siguiente columnas:

* IdTrafico_DWH 
* idTrafico_T
* NombreTrafico

#### Extraccion

En esta seccion obtendremos los datos de las fuentes identificadas en nuestro diseño.

T1. Se crea un dataframe que contiene unicamente la columna requerida trafico

In [18]:
sql_vuelos_trafico = '''(SELECT  trafico FROM ProyectoTransaccional.vuelos) AS Temp_vuelos_trafico'''
vuelos_trafico_df = obtener_dataframe_de_bd(source_db_connection_string, sql_vuelos_trafico, db_shiomar, pass_shiomar)
print('La cantidad de Registros de la tabla Vuelos obtenidos fueron: ' + str(vuelos_trafico_df.count()))
print('El dataFrame extraido queda de la siguiente forma:')
vuelos_trafico_df.show()

La cantidad de Registros de la tabla Vuelos obtenidos fueron: 563265
El dataFrame extraido queda de la siguiente forma:
+-------+
|trafico|
+-------+
|      N|
|      N|
|      N|
|      N|
|      N|
|      N|
|      I|
|      N|
|      I|
|      N|
|      I|
|      N|
|      I|
|      N|
|      I|
|      N|
|      I|
|      N|
|      I|
|      N|
+-------+
only showing top 20 rows



Transformacion
T2. Eliminar duplicados totales de la tabla:

In [19]:
duplicates_trafico = vuelos_trafico_df.count() - vuelos_trafico_df.distinct().count()
vuelos_trafico_df_whitout_duplicates = vuelos_trafico_df.distinct()
print('La cantidad de duplicados totales es de ' + str(duplicates_trafico))
print('Dataframe sin duplicados')
vuelos_trafico_df_whitout_duplicates.show()
print('Cantidad Final: '+str(vuelos_trafico_df_whitout_duplicates.count()))

La cantidad de duplicados totales es de 563262
Dataframe sin duplicados
+-------+
|trafico|
+-------+
|      E|
|      N|
|      I|
+-------+

Cantidad Final: 3


T3. Se crea columna nombreTrafico

In [20]:
#creamos DF con el diccionario dado por el negocio
df_dic_trafico = pd.DataFrame({'trafico': ['E','N','I'], 'nombreTrafico': ['postal o urgentes', 'Nacional', 'Internacional']})

#mostrar DatraFrame
diccionarioTrafico=spark.createDataFrame(df_dic_trafico)
diccionarioTrafico.show()

+-------+-----------------+
|trafico|    nombreTrafico|
+-------+-----------------+
|      E|postal o urgentes|
|      N|         Nacional|
|      I|    Internacional|
+-------+-----------------+



In [21]:
# Creamos un join con el diccionario tipovuelos
vuelos_trafico_df_whitout_duplicates = vuelos_trafico_df_whitout_duplicates.join(diccionarioTrafico,how = 'inner',  on='trafico')
# mostramos el DF final
print(vuelos_trafico_df_whitout_duplicates.show())

+-------+-----------------+
|trafico|    nombreTrafico|
+-------+-----------------+
|      E|postal o urgentes|
|      N|         Nacional|
|      I|    Internacional|
+-------+-----------------+

None


T4. Adecuación del Formato de la Tabla según la Expectativa de Salida:

In [22]:
#Agregar columna idTipoVuelo_DWH
vuelos_trafico_df_whitout_duplicates = vuelos_trafico_df_whitout_duplicates.coalesce(1).withColumn('IdTrafico_DWH', f.monotonically_increasing_id() + 1)
# mostramos el DF final
print(vuelos_trafico_df_whitout_duplicates.show())

+-------+-----------------+-------------+
|trafico|    nombreTrafico|IdTrafico_DWH|
+-------+-----------------+-------------+
|      E|postal o urgentes|            1|
|      N|         Nacional|            2|
|      I|    Internacional|            3|
+-------+-----------------+-------------+

None


In [23]:
print('La tabla actual tiene el siguiente formato: ')
vuelos_trafico_df_whitout_duplicates.schema

La tabla actual tiene el siguiente formato: 


StructType(List(StructField(trafico,StringType,true),StructField(nombreTrafico,StringType,true),StructField(IdTrafico_DWH,LongType,false)))

T5. Se seleccionan columnas de salida de la transformacion con los nombres finales.

In [24]:
#dataframe final
df_trafico = vuelos_trafico_df_whitout_duplicates.selectExpr('IdTrafico_DWH AS IdTrafico_DWH', 'trafico AS idTrafico_T', 'nombreTrafico AS nombreTrafico')
print('La tabla final: ')
df_trafico.show()

La tabla final: 
+-------------+-----------+-----------------+
|IdTrafico_DWH|idTrafico_T|    nombreTrafico|
+-------------+-----------+-----------------+
|            1|          E|postal o urgentes|
|            2|          N|         Nacional|
|            3|          I|    Internacional|
+-------------+-----------+-----------------+



### Carga
Se carga el dataframe procesado en una nueva tabla segun el diseño de ETL.

In [25]:
# CARGUE
guardar_db(dest_db_connection_string, df_trafico,'Proyecto_G5_202413.G5_Trafico',db_shiomar, pass_shiomar)

## Paso #5: Dimensión Geografía con Demografía

### Extraccion

In [26]:
sql_proyecciones = '''(SELECT * FROM ProyectoTransaccional.proyecciones) AS Temp_proyecciones'''
sql_pib = '''(SELECT * FROM ProyectoTransaccional.pib) AS Temp_pib'''
sql_departamentos = '''(SELECT * FROM ProyectoTransaccional.divipola) AS Temp_divipola'''

departamentos = obtener_dataframe_de_bd(source_db_connection_string, sql_departamentos, db_shiomar, pass_shiomar)
pib = obtener_dataframe_de_bd(source_db_connection_string, sql_pib, db_shiomar, pass_shiomar)
proyecciones = obtener_dataframe_de_bd(source_db_connection_string, sql_proyecciones, db_shiomar, pass_shiomar)

### Transformacion

### divipola

Se eliminan duplicados totales de divipola

In [27]:
print((departamentos.count(),departamentos.distinct().count()))

(15758, 7879)


In [28]:
departamentos = departamentos.drop_duplicates()

In [29]:
print((departamentos.count(),departamentos.distinct().count()))

(7879, 7879)


Correcion nombre departamento bogota

In [30]:
#Replace part of string with another stringc0
departamentos = departamentos.withColumn('NombreDepartamento', regexp_replace('NombreDepartamento', 'd0,', 'distrito'))

In [31]:
departamentos = departamentos.withColumn('NombreDepartamento', regexp_replace('NombreDepartamento', 'c0,', 'capital'))

Correccion nombre municipio bogota

In [32]:
departamentos = departamentos.withColumn('NombreMunicipio', regexp_replace('NombreMunicipio', 'D0,', 'DISTRITO'))
departamentos = departamentos.withColumn('NombreMunicipio', regexp_replace('NombreDepartamento', 'C0,', 'CAPITAL'))

In [33]:
departamentos.filter(col("NombreDepartamento").contains("bogota")).show(5)

+------------------+---------------+-------------------+--------------------+--------------------+-------------------+-----------------+--------------+-------------+--------------------+-------------------------------------+-----------------------+
|CodigoDepartamento|CodigoMunicipio|CodigoCentroPoblado|  NombreDepartamento|     NombreMunicipio|NombreCentroPoblado|TipoCentroPoblado|      Longitud|      Latitud|     Nombre Distrito|Municipio/AreasNoMunicipalizadas(ANM)|NombreAreaMetropolitana|
+------------------+---------------+-------------------+--------------------+--------------------+-------------------+-----------------+--------------+-------------+--------------------+-------------------------------------+-----------------------+
|                11|          11001|           11001010|bogota, distrito ...|bogota, distrito ...|           LA UNIÓN|   CENTRO POBLADO|-74.3635089446|3.98536782367|BOGOTÁ, DISTRITO ...|                            MUNICIPIO|                   null|
|   

Cambiar NombreDepartamento y NombreMunicipio a mayuscula

In [34]:
departamentos = departamentos.withColumn('NombreDepartamento', upper('NombreDepartamento'))
departamentos = departamentos.withColumn('NombreMunicipio', upper('NombreMunicipio'))

In [35]:
departamentos.show(5)

+------------------+---------------+-------------------+------------------+---------------+--------------------+------------------+--------------+-------------+---------------+-------------------------------------+-----------------------+
|CodigoDepartamento|CodigoMunicipio|CodigoCentroPoblado|NombreDepartamento|NombreMunicipio| NombreCentroPoblado| TipoCentroPoblado|      Longitud|      Latitud|Nombre Distrito|Municipio/AreasNoMunicipalizadas(ANM)|NombreAreaMetropolitana|
+------------------+---------------+-------------------+------------------+---------------+--------------------+------------------+--------------+-------------+---------------+-------------------------------------+-----------------------+
|                18|          18460|           18460000|           CAQUETA|        CAQUETA|               MILÁN|CABECERA MUNICIPAL| -75.506962028|1.29042720175|           null|                            MUNICIPIO|                   null|
|                52|          52835|        

In [36]:
departamentosNuevo = departamentos.withColumn('FechaInicial',lit('2005-01-01'))
departamentosNuevo = departamentosNuevo.withColumn('FechaFinal',lit('2199-12-31'))
departamentosNuevo = departamentosNuevo.withColumn('Version', lit(1))
departamentosNuevo = departamentosNuevo.withColumnRenamed("CodigoMunicipio","ID_Municipio_T")
departamentosNuevo = departamentosNuevo.withColumn('ID_Municipio_DWH', f.monotonically_increasing_id() + 1)
departamentosNuevo.show(5)

+------------------+--------------+-------------------+------------------+---------------+--------------------+------------------+--------------+-------------+---------------+-------------------------------------+-----------------------+------------+----------+-------+----------------+
|CodigoDepartamento|ID_Municipio_T|CodigoCentroPoblado|NombreDepartamento|NombreMunicipio| NombreCentroPoblado| TipoCentroPoblado|      Longitud|      Latitud|Nombre Distrito|Municipio/AreasNoMunicipalizadas(ANM)|NombreAreaMetropolitana|FechaInicial|FechaFinal|Version|ID_Municipio_DWH|
+------------------+--------------+-------------------+------------------+---------------+--------------------+------------------+--------------+-------------+---------------+-------------------------------------+-----------------------+------------+----------+-------+----------------+
|                18|         18460|           18460000|           CAQUETA|        CAQUETA|               MILÁN|CABECERA MUNICIPAL| -75.5069

In [37]:
departamentosNuevo = departamentosNuevo.fillna({'Nombre Distrito':'missing'})
departamentosNuevo = departamentosNuevo.fillna({'NombreAreaMetropolitana':'missing'})
departamentosNuevo = departamentosNuevo.withColumnRenamed("NombreAreaMetropolitana", "AreaMetropolitana")
departamentosNuevo.show(5)

+------------------+--------------+-------------------+------------------+---------------+--------------------+------------------+--------------+-------------+---------------+-------------------------------------+-----------------+------------+----------+-------+----------------+
|CodigoDepartamento|ID_Municipio_T|CodigoCentroPoblado|NombreDepartamento|NombreMunicipio| NombreCentroPoblado| TipoCentroPoblado|      Longitud|      Latitud|Nombre Distrito|Municipio/AreasNoMunicipalizadas(ANM)|AreaMetropolitana|FechaInicial|FechaFinal|Version|ID_Municipio_DWH|
+------------------+--------------+-------------------+------------------+---------------+--------------------+------------------+--------------+-------------+---------------+-------------------------------------+-----------------+------------+----------+-------+----------------+
|                18|         18460|           18460000|           CAQUETA|        CAQUETA|               MILÁN|CABECERA MUNICIPAL| -75.506962028|1.2904272017

In [38]:
departamentosOriginal = departamentos.fillna({'Nombre Distrito':'missing'})
departamentosOriginal = departamentosOriginal.fillna({'NombreAreaMetropolitana':'missing'})
departamentosOriginal = departamentosOriginal.withColumnRenamed("NombreAreaMetropolitana", "AreaMetropolitana")

### pib

Reparar nombre de los departamentos usando diccionario y codigo departamento generado de divipola

In [39]:
departamentosDic = {15:'BOYACA', 54:'NORTE DE SANTANDER', 94:'GUAINIA', 73:'TOLIMA', 50:'META', 66:'RISARALDA', 47:'MAGDALENA', 85:'CASANARE', 91:'AMAZONAS', 11:'BOGOTA, DISTRITO CAPITAL', 95:'GUAVIARE', 25:'CUNDINAMARCA', 88:'ARCHIPIELAGO DE SAN ANDRES, PROVIDENCIA Y SANTA CATALINA', 76:'VALLE DEL CAUCA', 99:'VICHADA', 68:'SANTANDER', 27:'CHOCO', 86:'PUTUMAYO', 70:'SUCRE', 5:'ANTIOQUIA', 19:'CAUCA', 81:'ARAUCA', 13:'BOLIVAR', 41:'HUILA', 17:'CALDAS', 63:'QUINDIO', 97:'VAUPES', 20:'CESAR', 52:'NARIÑO', 23:'CORDOBA', 44:'LA GUAJIRA', 8:'ATLANTICO', 18:'CAQUETA'}

In [40]:
pibFixed = pib.withColumn("DEPARTAMENTOS_FIXED", f.udf(lambda dep_code: departamentosDic.get(dep_code, "missing"), t.StringType())(f.col('CodigoDepartamento(DIVIPOLA)')))

In [41]:
pibFixed = pibFixed.withColumn("DEPARTAMENTOS", col("DEPARTAMENTOS_FIXED"))

In [42]:
pibFixed = pibFixed.drop("DEPARTAMENTOS_FIXED")

In [43]:
pibFixed.show(5)

+----------------------------+------------------+-------+-------+------------------+-----------------+-------+-------+-----------------+-------+-------+-------+-------+-------+-------+
|CodigoDepartamento(DIVIPOLA)|     DEPARTAMENTOS|   2006|   2007|              2008|             2010|   2011|   2012|             2013|   2014|   2015|   2016|   2017|   2009|   2005|
+----------------------------+------------------+-------+-------+------------------+-----------------+-------+-------+-----------------+-------+-------+-------+-------+-------+-------+
|                          47|         MAGDALENA|4202994|4618524| 5293554.101685749|6119382.927756879|6441145|6942728|7530341.883201833|7620221|8285062|9081224|9488025|5858657|3938108|
|                          54|NORTE DE SANTANDER|4562230|5093601| 5756904.272168976|6377160.081745004|6845992|7152319|7716659.361075875|8310220|8890010|9693223|9984804|6139384|3928796|
|                          18|           CAQUETA|3575615|4060678| 4519574.1

Se eliminan duplicados totales de pib

In [44]:
print((pibFixed.count(),pibFixed.distinct().count()))

(78, 28)


In [45]:
pibFixed = pibFixed.drop_duplicates()

In [46]:
print((pibFixed.count(),pibFixed.distinct().count()))

(28, 28)


Crear nueva columna año y setiar a cada fila de datos su correspondiente año

In [47]:
pibFixedInt = pibFixed.withColumn("2005", floor(col("2005"))) \
             .withColumn("2006", floor(col("2006"))) \
             .withColumn("2007", floor(col("2007"))) \
             .withColumn("2008", floor(col("2008"))) \
             .withColumn("2009", floor(col("2009"))) \
             .withColumn("2010", floor(col("2010"))) \
             .withColumn("2011", floor(col("2011"))) \
             .withColumn("2012", floor(col("2012"))) \
             .withColumn("2013", floor(col("2013"))) \
             .withColumn("2014", floor(col("2014"))) \
             .withColumn("2015", floor(col("2015"))) \
             .withColumn("2016", floor(col("2016"))) \
             .withColumn("2017", floor(col("2017")))

In [48]:
# Crear traspuesto
pibTraspuesto = pibFixedInt.withColumn(
    "year_value",
    array(
        struct(lit("2005").alias("year"), col("2005").alias("value")),
        struct(lit("2006").alias("year"), col("2006").alias("value")),
        struct(lit("2007").alias("year"), col("2007").alias("value")),
        struct(lit("2008").alias("year"), col("2008").alias("value")),
        struct(lit("2009").alias("year"), col("2009").alias("value")),
        struct(lit("2010").alias("year"), col("2010").alias("value")),
        struct(lit("2011").alias("year"), col("2011").alias("value")),
        struct(lit("2012").alias("year"), col("2012").alias("value")),
        struct(lit("2013").alias("year"), col("2013").alias("value")),
        struct(lit("2014").alias("year"), col("2014").alias("value")),
        struct(lit("2015").alias("year"), col("2015").alias("value")),
        struct(lit("2016").alias("year"), col("2016").alias("value")),
        struct(lit("2017").alias("year"), col("2017").alias("value"))
    )
)


In [49]:
# Explode al array
df_long = pibTraspuesto.withColumn("year_value", explode(col("year_value")))

In [50]:
# Seleccionar and renombrar columnas
df_pib_final = df_long.select(
    col("CodigoDepartamento(DIVIPOLA)").alias("DP"),
    col("DEPARTAMENTOS"),
    col("year_value.year").alias("AÑO"),
    col("year_value.value").alias("PIB")
)

In [51]:
pibCleaned = df_pib_final

In [52]:
pibCleaned.show(5)

+---+-------------+----+-------+
| DP|DEPARTAMENTOS| AÑO|    PIB|
+---+-------------+----+-------+
| 63|      QUINDIO|2005|5378939|
| 63|      QUINDIO|2006|6350376|
| 63|      QUINDIO|2007|6908084|
| 63|      QUINDIO|2008|7449000|
| 63|      QUINDIO|2009|7790106|
+---+-------------+----+-------+
only showing top 5 rows



### Proyecciones

Se eliminan duplicados totales de proyecciones

In [53]:
print((proyecciones.count(),proyecciones.distinct().count()))

(2574, 1287)


In [54]:
proyecciones = proyecciones.drop_duplicates()

In [55]:
print((proyecciones.count(),proyecciones.distinct().count()))

(1287, 1287)


Se generan dos ecuaciones para interpolar proyecciones atipicas:

Total hombres DP=76 Valor atipico año 2016
$$
\begin{align}
y &= 11116854.003*ln(x) -82472192.439
\end{align}
$$
Total mujeres DP=99  Valor atipico año 2009
$$
\begin{align}
y &= 1137.848x - 3346260
\end{align}
$$


Eliminar valores invalidos y atipicos hombres 2016 DP=76 y mujeres 2014 DP = 99

In [56]:
# TRANSFORMACION
hombresIncorrecto = proyecciones.filter((col('AÑO')==2016)).filter((col('DP') == 76)).filter(col('AREA GEOGRAFICA')=="Total")
hombresCorrecto = proyecciones.filter((col('AÑO')!=2016) | (col('DP') != 76) | (col('AREA GEOGRAFICA')!="Total"))

In [57]:
print(proyecciones.count())
print(hombresIncorrecto.count())
print(hombresCorrecto.count())

1287
1
1286


In [58]:
hombresIncorrecto = hombresIncorrecto.withColumn("Total Hombres Fixed", f.udf(lambda anio: math.ceil(11116854.0030*math.log(anio)-82472192.43), t.IntegerType())(f.col('AÑO')))

In [59]:
hombresIncorrecto = hombresIncorrecto.withColumn("Total Hombres", col("Total Hombres Fixed"))

In [60]:
hombresIncorrecto = hombresIncorrecto.drop("Total Hombres Fixed")

In [61]:
hombresIncorrecto = hombresIncorrecto.union(hombresCorrecto)


In [62]:
hombresIncorrecto.count()

1287

In [63]:
proyeccionFixed = hombresIncorrecto

In [64]:
# TRANSFORMACION
mujeresIncorrecto = proyeccionFixed.filter((col('AÑO')==2009)).filter((col('DP') == 99)).filter(col('AREA GEOGRAFICA')=="Total")
mujeresCorrecto = proyeccionFixed.filter((col('AÑO')!=2009) | (col('DP') != 99) | (col('AREA GEOGRAFICA')!="Total"))

In [65]:
print(proyecciones.count())
print(mujeresIncorrecto.count())
print(mujeresCorrecto.count())

1287
1
1286


In [66]:
mujeresIncorrecto = mujeresIncorrecto.withColumn("Total Mujeres Fixed", f.udf(lambda anio: math.ceil(1137.848*anio-2246260.863), t.IntegerType())(f.col('AÑO')))
mujeresIncorrecto = mujeresIncorrecto.withColumn("Total Mujeres", col("Total Mujeres Fixed"))
mujeresIncorrecto = mujeresIncorrecto.drop("Total Mujeres Fixed")
mujeresIncorrecto = mujeresIncorrecto.union(mujeresCorrecto)
mujeresIncorrecto.count()

1287

In [67]:
proyeccionFixed = mujeresIncorrecto

In [68]:
proyeccionFixed = proyeccionFixed.filter(col('AREA GEOGRAFICA')=="Total")

In [69]:
colsRemove = ()
colsList = list(colsRemove)
for n in range(100):
    colsList.append("Hombres_"+ str(n))
    colsList.append("Mujeres_"+ str(n))
    colsList.append("Total_"+ str(n))
colsList.append("Hombres_100 y más")
colsList.append("Mujeres_100 y más")
colsList.append("Total_100 y más")
colsRemove = tuple(colsList)

In [70]:
proyeccionCleaned = proyeccionFixed.drop(*colsRemove)

In [71]:
proyeccionCleaned = proyeccionCleaned.withColumnRenamed("Total Hombres", "TotalHombres")
proyeccionCleaned = proyeccionCleaned.withColumnRenamed("Total Mujeres", "TotalMujeres")

In [72]:
proyeccionCleaned.show(5)

+---+---------------+----+---------------+------------+------------+-------+
| DP|          DPNOM| AÑO|AREA GEOGRAFICA|TotalHombres|TotalMujeres|  Total|
+---+---------------+----+---------------+------------+------------+-------+
| 99|        vichada|2009|          Total|       47300|       39676|  86991|
| 76|valle del cauca|2016|          Total|     2114512|     2304500|4414569|
| 19|          cauca|2011|          Total|      679140|      687618|1366758|
| 41|          huila|2008|          Total|      488493|      489035| 977528|
| 44|     la guajira|2010|          Total|      352424|      370489| 722913|
+---+---------------+----+---------------+------------+------------+-------+
only showing top 5 rows



### Union tablas primera version

In [73]:
pib2005 = pibCleaned.filter((col('AÑO')==2005))
proyeccion2005 = proyeccionCleaned.filter((col('AÑO')==2005))

In [74]:
geografiaConDemografia = departamentosNuevo.alias('dep').join(proyeccion2005.alias('proy'),departamentosNuevo.CodigoDepartamento == proyeccion2005.DP,'left_outer')\
                    .join(pib2005.alias('pib'),departamentosNuevo.CodigoDepartamento == pib2005.DP,'left_outer') \
                    .select([col('dep.ID_Municipio_DWH'),col('dep.ID_Municipio_T'),col('dep.NombreMunicipio'),
                                         col('dep.NombreDepartamento'),col('dep.AreaMetropolitana'),col('dep.Longitud'),col('dep.Latitud'),
                                         col('pib.PIB'),col('proy.TotalHombres'),col('proy.TotalMujeres'),
                             col('dep.FechaInicial'), col('dep.FechaFinal'), col('dep.Version')]) \
                                .fillna({'PIB': 0, 'TotalHombres': 0, 'TotalMujeres': 0})

In [75]:
geografiaConDemografia.show(5)

+----------------+--------------+---------------+------------------+-----------------+--------------+-------------+-------+------------+------------+------------+----------+-------+
|ID_Municipio_DWH|ID_Municipio_T|NombreMunicipio|NombreDepartamento|AreaMetropolitana|      Longitud|      Latitud|    PIB|TotalHombres|TotalMujeres|FechaInicial|FechaFinal|Version|
+----------------+--------------+---------------+------------------+-----------------+--------------+-------------+-------+------------+------------+------------+----------+-------+
|               1|         18460|        CAQUETA|           CAQUETA|          missing| -75.506962028|1.29042720175|3306217|      194127|      185146|  2005-01-01|2199-12-31|      1|
|               2|         52835|         NARIÑO|            NARIÑO|          missing|-78.7116527073|1.70135268948|      0|      746790|      763664|  2005-01-01|2199-12-31|      1|
|               3|         52473|         NARIÑO|            NARIÑO|          missing|-78.

In [76]:
# CARGUE
guardar_db(dest_db_connection_string, geografiaConDemografia,'Proyecto_G5_202413.geografiaConDemografia_Test', db_shiomar, pass_shiomar)

### Manejo de historia GeografiaConDemografia

In [77]:
#  DataFrame Inicial
geografiaConDemografiaInitial = geografiaConDemografia

# maximum ID_Municipio_DWH y version from del DataFrame original
max_id_dwh = geografiaConDemografiaInitial.agg(spark_max("ID_Municipio_DWH")).collect()[0][0]
max_version = geografiaConDemografiaInitial.agg(spark_max("Version")).collect()[0][0]

# Actualizar fechaFinal de DF original a 2005-12-31
geografiaConDemografiaInitial = geografiaConDemografiaInitial.withColumn("FechaFinal", lit("2005-12-31"))

initialYear = 2006
endYear = 2017

#def return_end_year(year: int):
#    if year < endYear:
#        return lit(f"{year}-12-31")
#    else:
#        return lit("2199-12-31")
# Funcion para crear un nuevo DF with ID_Municipio_DWH y version actualizados
def create_new_df(df_new_base: DataFrame, max_id_dwh: int, max_version: int, year: int):
    df_new_updated = df_new_base.withColumn("ID_Municipio_DWH", col("ID_Municipio_T") + max_id_dwh) \
        .withColumn("FechaInicial", lit(f"{year}-01-01")) \
        .withColumn("FechaFinal", lit(f"{year}-12-31") if year < endYear else lit("2199-12-31")) \
        .withColumn("Version", lit(max_version + 1))
    return df_new_updated

initialYear = 2006
endYear = 2017
for n in range(12):
    # Crear DF para el año
    pibN = pibCleaned.filter((col('AÑO')==initialYear+n))
    proyeccionN = proyeccionCleaned.filter((col('AÑO')==initialYear+n))

    geografiaConDemografiaN = departamentosNuevo.alias('dep').join(proyeccionN.alias('proy'),departamentosNuevo.CodigoDepartamento == proyeccionN.DP,'left_outer')\
                    .join(pibN.alias('pib'),departamentosNuevo.CodigoDepartamento == pibN.DP,'left_outer') \
                    .select([col('dep.ID_Municipio_T'),col('dep.NombreMunicipio'),
                                         col('dep.NombreDepartamento'),col('dep.AreaMetropolitana'),col('dep.Longitud'),col('dep.Latitud'),
                                         col('pib.PIB'),col('proy.TotalHombres'),col('proy.TotalMujeres')]) \
                                .fillna({'PIB': 0, 'TotalHombres': 0, 'TotalMujeres': 0})
    
    df_new = create_new_df(geografiaConDemografiaN, max_id_dwh, max_version, initialYear+n)
    geografiaConDemografiaInitial = geografiaConDemografiaInitial.unionByName(df_new)

    # actualizar max_id_dwh and max_version
    max_id_dwh += geografiaConDemografiaN.count()
    max_version += 1


# Show the result
geografiaConDemografiaInitial.show(truncate=False)

+----------------+--------------+---------------+------------------+-----------------+--------------+-------------+-------+------------+------------+------------+----------+-------+
|ID_Municipio_DWH|ID_Municipio_T|NombreMunicipio|NombreDepartamento|AreaMetropolitana|Longitud      |Latitud      |PIB    |TotalHombres|TotalMujeres|FechaInicial|FechaFinal|Version|
+----------------+--------------+---------------+------------------+-----------------+--------------+-------------+-------+------------+------------+------------+----------+-------+
|1               |18460         |CAQUETA        |CAQUETA           |missing          |-75.506962028 |1.29042720175|3306217|194127      |185146      |2005-01-01  |2005-12-31|1      |
|2               |52835         |NARIÑO         |NARIÑO            |missing          |-78.7116527073|1.70135268948|0      |746790      |763664      |2005-01-01  |2005-12-31|1      |
|3               |52473         |NARIÑO         |NARIÑO            |missing          |-78.

### Carga

In [78]:
# CARGUE
guardar_db(dest_db_connection_string, geografiaConDemografiaInitial,'Proyecto_G5_202413.G5_GeografiaConDemografia', db_shiomar, pass_shiomar)

## Paso #6: Dimensión Fecha

En este paso aplicaremos ETL a la **Dimension Fecha**, su fuente de datos viene de la tabla Vuelos y Aeropuertos de la base de datos de ProyectoTransaccional.

Al final del proceso de ETL se espera obtener una nueva tabla llamada G5_Fecha en la base de datos de Proyecto_G5_202413 conteniendo las siguiente columnas:

* idFecha
* descripcion
* Mes
* Anio
* Dia 

#### Extraccion
En esta seccion obtendremos los datos de las fuentes identificadas en nuestro diseño.

T1. Se crea un dataframe que contiene unicamente la columna fecha_construccion de la tabla aeropuertos y eliminar registros duplicados

In [79]:
sql_aeropuertos_fc = '''(SELECT DISTINCT fecha_construccion AS fecha FROM ProyectoTransaccional.aeropuertos) AS Temp_aeropuertos_fc'''
aeropuertos_fc_df = obtener_dataframe_de_bd(source_db_connection_string, sql_aeropuertos_fc, db_shiomar, pass_shiomar)
print('La cantidad de Registros de la tabla Vuelos obtenidos fueron: ' + str(aeropuertos_fc_df.count()))
print('El dataFrame extraido queda de la siguiente forma:')
aeropuertos_fc_df.show()

La cantidad de Registros de la tabla Vuelos obtenidos fueron: 213
El dataFrame extraido queda de la siguiente forma:
+----------+
|     fecha|
+----------+
|2015-06-05|
|2013-04-26|
|2009-05-07|
|2013-11-19|
|2013-03-20|
|2014-11-11|
|2012-10-25|
|2009-10-06|
|2012-08-16|
|2013-05-16|
|2014-02-28|
|2013-05-03|
|2013-12-05|
|2013-07-24|
|2010-08-19|
|2015-01-27|
|2013-09-24|
|2014-12-15|
|2014-01-21|
|2014-01-31|
+----------+
only showing top 20 rows



T2. Se crea un dataframe que contiene unicamente la columna fecha_vigencia de la tabla aeropuertos y eliminar registros duplicados

In [80]:
sql_aeropuertos_fv = '''(SELECT DISTINCT fecha_vigencia AS fecha FROM ProyectoTransaccional.aeropuertos) AS Temp_aeropuertos_fc'''
aeropuertos_fv_df = obtener_dataframe_de_bd(source_db_connection_string, sql_aeropuertos_fv, db_shiomar, pass_shiomar)
print('La cantidad de Registros de la tabla Vuelos obtenidos fueron: ' + str(aeropuertos_fv_df.count()))
print('El dataFrame extraido queda de la siguiente forma:')
aeropuertos_fv_df.show()

La cantidad de Registros de la tabla Vuelos obtenidos fueron: 85
El dataFrame extraido queda de la siguiente forma:
+----------+
|     fecha|
+----------+
|2018-06-11|
|2016-05-07|
|          |
|2015-11-07|
|2017-03-03|
|2016-05-10|
|2016-08-01|
|2018-02-06|
|2017-12-01|
|2018-03-05|
|2015-12-03|
|2017-01-09|
|2017-07-09|
|2017-11-05|
|2017-12-02|
|2015-10-10|
|2016-11-12|
|2016-05-09|
|2017-06-11|
|2015-10-09|
+----------+
only showing top 20 rows



T3. Se crea un dataframe que contiene unicamente la columna ano, mes de la tabla vuelos y eliminar registros duplicados

In [81]:
sql_vuelos_fecha = '''(SELECT  ano, mes FROM ProyectoTransaccional.vuelos) AS Temp_vuelos_trafico'''
vuelos_fecha_df = obtener_dataframe_de_bd(source_db_connection_string, sql_vuelos_fecha, db_shiomar, pass_shiomar)
print('La cantidad de Registros de la tabla Vuelos obtenidos fueron: ' + str(vuelos_fecha_df.count()))
print('El dataFrame extraido queda de la siguiente forma:')
vuelos_fecha_df.show()

La cantidad de Registros de la tabla Vuelos obtenidos fueron: 563265
El dataFrame extraido queda de la siguiente forma:
+----+---+
| ano|mes|
+----+---+
|2012|  9|
|2012| 11|
|2012| 12|
|2012| 11|
|2012|  1|
|2012|  2|
|2004|  1|
|2012|  3|
|2004|  2|
|2012|  4|
|2004|  3|
|2012|  5|
|2004|  4|
|2012|  6|
|2004|  5|
|2012|  7|
|2004|  6|
|2012|  8|
|2004|  7|
|2012| 10|
+----+---+
only showing top 20 rows



#### Transformacion

In [82]:
# Eliminar duplicados
duplicates_vuelos_fecha = vuelos_fecha_df.count() - vuelos_fecha_df.distinct().count()
vuelos_fecha_df_whitout_duplicates = vuelos_fecha_df.distinct()
print('La cantidad de duplicados totales es de ' + str(duplicates_vuelos_fecha))
print('Dataframe sin duplicados')
vuelos_fecha_df_whitout_duplicates.show()
print('Cantidad Final: '+str(vuelos_fecha_df_whitout_duplicates.count()))

La cantidad de duplicados totales es de 563097
Dataframe sin duplicados
+----+---+
| ano|mes|
+----+---+
|2012| 10|
|2005|  5|
|2007|  6|
|2010|  7|
|2010| 12|
|2015|  2|
|2017|  3|
|2008|  8|
|2017|  8|
|2004|  6|
|2014|  4|
|2009| 11|
|2005| 10|
|2017| 10|
|2015| 12|
|2016|  7|
|2004|  8|
|2016| 11|
|2012|  8|
|2013|  2|
+----+---+
only showing top 20 rows

Cantidad Final: 168


In [83]:
#creamos DF con el diccionario auxiliar
df_dic_fecha = pd.DataFrame({'mes': ['1','2','3','4','5','6','7','8','9','10','11','12'], 'NewMes': ['01', '02', '03','04','05','06','07','08','09','10','11','12']})
#mostrar DatraFrame
diccionarioFecha=spark.createDataFrame(df_dic_fecha)
diccionarioFecha.show()
# Se agrega columna dia con valor 01 indicado por el negocio 
vuelos_fecha_df_whitout_duplicates = vuelos_fecha_df_whitout_duplicates.coalesce(1).withColumn("dia",f.lit('01').alias('dia'))

# Creamos un join con el diccionario 
vuelos_fecha_df_whitout_duplicates = vuelos_fecha_df_whitout_duplicates.join(diccionarioFecha,how = 'inner',  on='mes')
# Mostramos el resultado del join con diccionario fecha
vuelos_fecha_df_whitout_duplicates.show()

+---+------+
|mes|NewMes|
+---+------+
|  1|    01|
|  2|    02|
|  3|    03|
|  4|    04|
|  5|    05|
|  6|    06|
|  7|    07|
|  8|    08|
|  9|    09|
| 10|    10|
| 11|    11|
| 12|    12|
+---+------+

+---+----+---+------+
|mes| ano|dia|NewMes|
+---+----+---+------+
|  1|2017| 01|    01|
|  1|2010| 01|    01|
|  1|2011| 01|    01|
|  1|2015| 01|    01|
|  1|2013| 01|    01|
|  1|2004| 01|    01|
|  1|2006| 01|    01|
|  1|2012| 01|    01|
|  1|2016| 01|    01|
|  1|2005| 01|    01|
|  1|2014| 01|    01|
|  1|2009| 01|    01|
|  1|2008| 01|    01|
|  1|2007| 01|    01|
|  3|2016| 01|    03|
|  3|2007| 01|    03|
|  3|2004| 01|    03|
|  3|2012| 01|    03|
|  3|2008| 01|    03|
|  3|2006| 01|    03|
+---+----+---+------+
only showing top 20 rows



In [84]:
# Se crea nueva columna fecha para el data frame de vuelos.
vuelos_fecha_df_whitout_duplicates = vuelos_fecha_df_whitout_duplicates.withColumn('fecha', date_format(to_date(concat(col("dia"),col("NewMes"),col("ano")), "ddMMyyyy"), "yyyy-MM-dd"))

print(vuelos_fecha_df_whitout_duplicates.show())

+---+----+---+------+----------+
|mes| ano|dia|NewMes|     fecha|
+---+----+---+------+----------+
|  1|2017| 01|    01|2017-01-01|
|  1|2010| 01|    01|2010-01-01|
|  1|2011| 01|    01|2011-01-01|
|  1|2015| 01|    01|2015-01-01|
|  1|2013| 01|    01|2013-01-01|
|  1|2004| 01|    01|2004-01-01|
|  1|2006| 01|    01|2006-01-01|
|  1|2012| 01|    01|2012-01-01|
|  1|2016| 01|    01|2016-01-01|
|  1|2005| 01|    01|2005-01-01|
|  1|2014| 01|    01|2014-01-01|
|  1|2009| 01|    01|2009-01-01|
|  1|2008| 01|    01|2008-01-01|
|  1|2007| 01|    01|2007-01-01|
|  3|2016| 01|    03|2016-03-01|
|  3|2007| 01|    03|2007-03-01|
|  3|2004| 01|    03|2004-03-01|
|  3|2012| 01|    03|2012-03-01|
|  3|2008| 01|    03|2008-03-01|
|  3|2006| 01|    03|2006-03-01|
+---+----+---+------+----------+
only showing top 20 rows

None


In [85]:
#se crea data frame de vuelos final
data_frame_vuelos_final = vuelos_fecha_df_whitout_duplicates.selectExpr('fecha AS fecha')
print('El data_frame_vuelos_final : ')
data_frame_vuelos_final.show()

El data_frame_vuelos_final : 
+----------+
|     fecha|
+----------+
|2017-01-01|
|2010-01-01|
|2011-01-01|
|2015-01-01|
|2013-01-01|
|2004-01-01|
|2006-01-01|
|2012-01-01|
|2016-01-01|
|2005-01-01|
|2014-01-01|
|2009-01-01|
|2008-01-01|
|2007-01-01|
|2016-03-01|
|2007-03-01|
|2004-03-01|
|2012-03-01|
|2008-03-01|
|2006-03-01|
+----------+
only showing top 20 rows



#### T4. Se unen dafarames de aeropuertos y vuelos ,se eliminan duplicados

In [86]:
df_union_fechas1 = aeropuertos_fc_df.withColumnRenamed('fecha', 'descripcion') \
        .union(aeropuertos_fv_df.withColumnRenamed('fecha', 'descripcion'))
df_union_fechas1.show()
print("Total registros aeropuertos_fc_df::"+str(aeropuertos_fc_df.count()))
print("Total registros aeropuertos_fv_df::"+str(aeropuertos_fv_df.count()))

print("Total registros union1::"+str(df_union_fechas1.count()))

+-----------+
|descripcion|
+-----------+
| 2015-06-05|
| 2013-04-26|
| 2009-05-07|
| 2013-11-19|
| 2013-03-20|
| 2014-11-11|
| 2012-10-25|
| 2009-10-06|
| 2012-08-16|
| 2013-05-16|
| 2014-02-28|
| 2013-05-03|
| 2013-12-05|
| 2013-07-24|
| 2010-08-19|
| 2015-01-27|
| 2013-09-24|
| 2014-12-15|
| 2014-01-21|
| 2014-01-31|
+-----------+
only showing top 20 rows

Total registros aeropuertos_fc_df::213
Total registros aeropuertos_fv_df::85
Total registros union1::298


In [87]:
df_union_fechas2 = df_union_fechas1.withColumnRenamed('fecha', 'descripcion') \
        .union(data_frame_vuelos_final.withColumnRenamed('fecha', 'descripcion'))
df_union_fechas2.show()
print("Total registros df_union_fechas1::"+str(df_union_fechas1.count()))
print("Total registros aeropuertos_fv_df::"+str(data_frame_vuelos_final.count()))

print("Total registros union2::"+str(df_union_fechas2.count()))

+-----------+
|descripcion|
+-----------+
| 2015-06-05|
| 2013-04-26|
| 2009-05-07|
| 2013-11-19|
| 2013-03-20|
| 2014-11-11|
| 2012-10-25|
| 2009-10-06|
| 2012-08-16|
| 2013-05-16|
| 2014-02-28|
| 2013-05-03|
| 2013-12-05|
| 2013-07-24|
| 2010-08-19|
| 2015-01-27|
| 2013-09-24|
| 2014-12-15|
| 2014-01-21|
| 2014-01-31|
+-----------+
only showing top 20 rows

Total registros df_union_fechas1::298
Total registros aeropuertos_fv_df::168
Total registros union2::466


In [88]:
# Eliminar registros duplicados
df_union_fechas2 = df_union_fechas2.distinct()
print("Total registros final::"+str(df_union_fechas2.count()))

Total registros final::406


#### T5. crear columna descripcion.

In [89]:
# TRANSFORMACION
fecha_final = df_union_fechas2.selectExpr('descripcion AS descripcion')
fecha_final = fecha_final.coalesce(1).withColumn("idFecha", date_format(col("descripcion"), "yyyyMMdd"))


#### T6. crear columna mes, anio, dia

In [90]:
fecha_final = fecha_final.coalesce(1).withColumn("Dia", date_format(col("descripcion"), "dd"))
fecha_final = fecha_final.coalesce(1).withColumn("Mes", date_format(col("descripcion"), "MM"))
fecha_final = fecha_final.coalesce(1).withColumn("Anio", date_format(col("descripcion"), "yyyy"))
fecha_final = fecha_final.select('idFecha','descripcion','Dia','Mes','Anio')
fecha_final.show(5)
print("Total registros final::"+str(fecha_final.count()))

+--------+-----------+---+---+----+
| idFecha|descripcion|Dia|Mes|Anio|
+--------+-----------+---+---+----+
|20150227| 2015-02-27| 27| 02|2015|
|20070101| 2007-01-01| 01| 01|2007|
|20050201| 2005-02-01| 01| 02|2005|
|20091001| 2009-10-01| 01| 10|2009|
|20120817| 2012-08-17| 17| 08|2012|
+--------+-----------+---+---+----+
only showing top 5 rows

Total registros final::406


## Carga
Se carga el dataframe procesado en una nueva tabla segun el diseño de ETL.

In [91]:
# CARGUE
guardar_db(dest_db_connection_string, fecha_final,'Proyecto_G5_202413.G5_Fecha', db_shiomar, pass_shiomar)

## Paso #7: Mini Dimensión Aeropuerto

En este paso aplicaremos ETL a la Dimension **MiniDimensionAeropuerto**, su fuente de datos viene de la tabla `aeropuertos` de la base de datos de ProyectoTransaccional.

Al final del proceso de ETL se espera obtener una nueva tabla llamada `G5_MiniDimensionAeropuerto` en la base de datos de Proyecto_G5_202413 conteniendo las siguiente columnas:
* idMini_DWH
* rangoLongitudPista
* rangoAnchoPista
* rangoNumeroVuelosOrigen
* clase
* tipo

#### Extraccion

En esta seccion obtendremos los datos de las fuentes identificadas en nuestro diseño.

In [92]:
#sql_aeropuerto = '''(SELECT DISTINCT ae.sigla, ae.longitud_pista, ae.ancho_pista, ae.numero_vuelos_origen, ae.clase, ae.tipo, ae.anio FROM ProyectoTransaccional.aeropuertos ae ORDER BY ae.sigla, ae.Anio ASC LIMIT 1000) AS Temp_Aeropuertos'''
sql_aeropuerto = '''(SELECT DISTINCT ga.idAeropuerto_DWH, ae.sigla, ae.longitud_pista, ae.ancho_pista, ae.numero_vuelos_origen, ae.clase, ae.tipo, ae.Anio 
FROM ProyectoTransaccional.aeropuertos ae JOIN Proyecto_G5_202413.G5_Aeropuerto ga ON ae.sigla = ga.Sigla) as Temp_G5Aeropuertos'''
aeropuertos_df = obtener_dataframe_de_bd(source_db_connection_string, sql_aeropuerto, db_shiomar, pass_shiomar)
print('La cantidad de registros obtenidos de la union de las tablas G5_Aeropuertos y Aeropuertos fueron: ' + str(aeropuertos_df.count()))
print('La tabla extraida queda de la siguiente forma:')
aeropuertos_df.show(10)

La cantidad de registros obtenidos de la union de las tablas G5_Aeropuertos y Aeropuertos fueron: 1194
La tabla extraida queda de la siguiente forma:
+----------------+-----+--------------+-----------+--------------------+-----+----------+----+
|idAeropuerto_DWH|sigla|longitud_pista|ancho_pista|numero_vuelos_origen|clase|      tipo|Anio|
+----------------+-----+--------------+-----------+--------------------+-----+----------+----+
|               1|  7FO|          1170|         14|              171525|   1A|   Privado|2015|
|               1|  7FO|          1273|         15|              571675|   1A|   Privado|2016|
|               1|  7FO|          1597|         18|              994420|   1A|   Privado|2017|
|               2|  7FU|           667|         16|              252325|   1A|Fumigación|2015|
|               2|  7FU|           337|         13|              126667|   1A|Fumigación|2016|
|               2|  7FU|           178|         13|              175953|   1A|Fumigación|2

#### Transformacion

T1. Eliminar duplicados totales de la tabla:

Esta transformacion fue aplicada en la sentencia SQL de la etapa de Extraccion.

In [93]:
aeropuertos_dt = aeropuertos_df.count() - aeropuertos_df.distinct().count()
print('La cantidad de duplicados totales es de ' + str(aeropuertos_dt))

La cantidad de duplicados totales es de 0


T2. Tomar la información de la columna numero_vuelos_origen multiplicando por -1 cuando vengan valores negativos:

In [94]:
aeropuertos_df = aeropuertos_df.withColumn("numero_vuelos_origen", when(col("numero_vuelos_origen") < 0, col("numero_vuelos_origen") * -1).otherwise(col("numero_vuelos_origen")))
aeropuertos_df.show(10)

+----------------+-----+--------------+-----------+--------------------+-----+----------+----+
|idAeropuerto_DWH|sigla|longitud_pista|ancho_pista|numero_vuelos_origen|clase|      tipo|Anio|
+----------------+-----+--------------+-----------+--------------------+-----+----------+----+
|               1|  7FO|          1170|         14|              171525|   1A|   Privado|2015|
|               1|  7FO|          1273|         15|              571675|   1A|   Privado|2016|
|               1|  7FO|          1597|         18|              994420|   1A|   Privado|2017|
|               2|  7FU|           667|         16|              252325|   1A|Fumigación|2015|
|               2|  7FU|           337|         13|              126667|   1A|Fumigación|2016|
|               2|  7FU|           178|         13|              175953|   1A|Fumigación|2017|
|               3|  7FW|           514|          8|              444936|   1A|   Privado|2015|
|               3|  7FW|           302|          4

In [95]:
# Función para generar el rango cuartil con base a la información ingresada

def generar_rango_cuartil(df, partition_by, order_by, p_rank, column_name):
    windowSpec = Window.partitionBy(partition_by).orderBy(order_by)
    aeropuertos = df.withColumn(p_rank, percent_rank().over(windowSpec))
    aeropuertos = aeropuertos.withColumn(
        column_name,
        when(col(p_rank) < 0.25, "R1")
        .when((col(p_rank) >= 0.25) & (col(p_rank) < 0.50), "R2")
        .when((col(p_rank) >= 0.50) & (col(p_rank) < 0.75), "R3")
        .otherwise("R4")
    )
    return aeropuertos

T3. Crear la columna rangoNumeroVuelosOrigen categorizando la información en R1:[0-valor del 25%), R2:[Valor del 25%,Valor del 50%), R3: [Valor del 50%, Valor del 75%), R4: > valor del 75%:

In [96]:
aeropuertos_df = generar_rango_cuartil(aeropuertos_df, 'sigla', 'numero_vuelos_origen', 'pr_vuelos_p', 'rangoNumeroVuelosOrigen')
aeropuertos_df.show(10)

+----------------+-----+--------------+-----------+--------------------+-----+----------+----+-----------+-----------------------+
|idAeropuerto_DWH|sigla|longitud_pista|ancho_pista|numero_vuelos_origen|clase|      tipo|Anio|pr_vuelos_p|rangoNumeroVuelosOrigen|
+----------------+-----+--------------+-----------+--------------------+-----+----------+----+-----------+-----------------------+
|               1|  7FO|          1170|         14|              171525|   1A|   Privado|2015|        0.0|                     R1|
|               1|  7FO|          1273|         15|              571675|   1A|   Privado|2016|        0.5|                     R3|
|               1|  7FO|          1597|         18|              994420|   1A|   Privado|2017|        1.0|                     R4|
|               2|  7FU|           337|         13|              126667|   1A|Fumigación|2016|        0.0|                     R1|
|               2|  7FU|           178|         13|              175953|   1A|Fumig

T4. Crear la columna rangoLongitudPista categorizando la información en R1:[0-valor del 25%), R2:[Valor del 25%,Valor del 50%), R3: [Valor del 50%, Valor del 75%), R4: > valor del 75%:

In [97]:
aeropuertos_df = generar_rango_cuartil(aeropuertos_df, 'sigla', 'longitud_pista', 'pr_longitud_p', 'rangoLongitudPista')
aeropuertos_df.show(10)

+----------------+-----+--------------+-----------+--------------------+-----+----------+----+-----------+-----------------------+-------------+------------------+
|idAeropuerto_DWH|sigla|longitud_pista|ancho_pista|numero_vuelos_origen|clase|      tipo|Anio|pr_vuelos_p|rangoNumeroVuelosOrigen|pr_longitud_p|rangoLongitudPista|
+----------------+-----+--------------+-----------+--------------------+-----+----------+----+-----------+-----------------------+-------------+------------------+
|               1|  7FO|          1170|         14|              171525|   1A|   Privado|2015|        0.0|                     R1|          0.0|                R1|
|               1|  7FO|          1273|         15|              571675|   1A|   Privado|2016|        0.5|                     R3|          0.5|                R3|
|               1|  7FO|          1597|         18|              994420|   1A|   Privado|2017|        1.0|                     R4|          1.0|                R4|
|               

T5. Crear la columna rangoAnchoPista categorizando la información en R1:[0-valor del 25%), R2:[Valor del 25%,Valor del 50%), R3: [Valor del 50%, Valor del 75%), R4: > valor del 75%:

In [98]:
aeropuertos_df = generar_rango_cuartil(aeropuertos_df, 'sigla', 'ancho_pista', 'pr_ancho_p', 'rangoAnchoPista')
aeropuertos_df.show(10)

+----------------+-----+--------------+-----------+--------------------+-----+----------+----+-----------+-----------------------+-------------+------------------+----------+---------------+
|idAeropuerto_DWH|sigla|longitud_pista|ancho_pista|numero_vuelos_origen|clase|      tipo|Anio|pr_vuelos_p|rangoNumeroVuelosOrigen|pr_longitud_p|rangoLongitudPista|pr_ancho_p|rangoAnchoPista|
+----------------+-----+--------------+-----------+--------------------+-----+----------+----+-----------+-----------------------+-------------+------------------+----------+---------------+
|               1|  7FO|          1170|         14|              171525|   1A|   Privado|2015|        0.0|                     R1|          0.0|                R1|       0.0|             R1|
|               1|  7FO|          1273|         15|              571675|   1A|   Privado|2016|        0.5|                     R3|          0.5|                R3|       0.5|             R3|
|               1|  7FO|          1597|      

T6. Rellenar valores vacíos de la columna Tipo con valores por defecto 'Sin tipo':

In [99]:
aeropuertos_df = aeropuertos_df.withColumn('clase',f.when(f.col('clase').isNull() | (f.col('clase') == ''), 'Sin clase').otherwise(f.col('clase')))

T7. Rellenar valores vacíos de la columna Clase con valores por defecto 'Sin clase':

In [100]:
aeropuertos_df = aeropuertos_df.withColumn('tipo',f.when(f.col('tipo').isNull() | (f.col('tipo') == ''), 'Sin tipo').otherwise(f.col('tipo')))

T8. Creación de la columna idMini_DWH:

In [101]:
mini_dimension_aeropuerto_df = aeropuertos_df.select('rangoLongitudPista', 'rangoAnchoPista', 'rangoNumeroVuelosOrigen', 'clase', 'tipo')
mini_dimension_aeropuerto_df = mini_dimension_aeropuerto_df.distinct()
mini_dimension_aeropuerto_df = mini_dimension_aeropuerto_df.coalesce(1).withColumn('idMini_DWH', f.monotonically_increasing_id() + 1)
mini_dimension_aeropuerto_df = mini_dimension_aeropuerto_df.select('idMini_DWH','rangoLongitudPista', 'rangoAnchoPista', 'rangoNumeroVuelosOrigen', 'clase', 'tipo')
print('La tabla Transformada queda de la siguiente forma: ')
print('Numero de regiustros en minidimension: ' + str(mini_dimension_aeropuerto_df.count()))
mini_dimension_aeropuerto_df.show()

La tabla Transformada queda de la siguiente forma: 
Numero de regiustros en minidimension: 194
+----------+------------------+---------------+-----------------------+-----+----------+
|idMini_DWH|rangoLongitudPista|rangoAnchoPista|rangoNumeroVuelosOrigen|clase|      tipo|
+----------+------------------+---------------+-----------------------+-----+----------+
|         1|                R3|             R3|                     R4|   3C| Aerocivil|
|         2|                R1|             R1|                     R1|   0B| Aerocivil|
|         3|                R3|             R3|                     R4|   1A|Fumigación|
|         4|                R1|             R1|                     R1|   2C|   Público|
|         5|                R1|             R1|                     R4|   2A|   Privado|
|         6|                R4|             R4|                     R3|   4C| Aerocivil|
|         7|                R4|             R4|                     R3|   1A|Fumigación|
|         8|   

#### Carga
Se carga el dataframe procesado en una nueva tabla segun el diseño de ETL.

In [102]:
guardar_db(dest_db_connection_string, mini_dimension_aeropuerto_df,'Proyecto_G5_202413.G5_MiniDimensionAeropuerto', db_shiomar, pass_shiomar)

## Paso #8: Hecho Historia Cambio

En este paso aplicaremos ETL a la Dimension **HechoHistoriaCambios**, su fuente de datos viene de las tablas `G5_Aeropuerto`, `G5_Aeropuerto`, `G5_miniDimensionAeropuerto` y  `G5_Fecha` de la base de datos de Proyecto_G5_202413.

Al final del proceso de ETL se espera obtener una nueva tabla llamada `G5_HechoHistoriaCambios` en la base de datos de Proyecto_G5_202413 conteniendo las siguiente columnas:
* cambio
* fechaInicio
* fechaFin
* idMini_DWH
* idAeropuerto_DWH

#### Extraccion

En esta seccion obtendremos los datos de las fuentes identificadas en nuestro diseño.

In [103]:
hecho_hist_cambios_df = mini_dimension_aeropuerto_df.join(aeropuertos_df, on=["rangoLongitudPista", "rangoAnchoPista", "rangoNumeroVuelosOrigen", "clase", "tipo"],
    how="inner")
print('La cantidad de Registros de la tabla Aeropuertos obtenidos fueron: ' + str(hecho_hist_cambios_df.count()))
print('La tabla extraida queda de la siguiente forma:')
hecho_hist_cambios_df = hecho_hist_cambios_df.select('idAeropuerto_DWH','idMini_DWH', 'anio')
hecho_hist_cambios_df.show(10)

La cantidad de Registros de la tabla Aeropuertos obtenidos fueron: 1194
La tabla extraida queda de la siguiente forma:
+----------------+----------+----+
|idAeropuerto_DWH|idMini_DWH|anio|
+----------------+----------+----+
|               1|        82|2015|
|               1|       122|2016|
|               1|        63|2017|
|               2|        23|2017|
|               2|       153|2016|
|               2|        81|2015|
|               3|       133|2017|
|               3|       115|2016|
|               3|        63|2015|
|               4|       144|2017|
+----------------+----------+----+
only showing top 10 rows



#### Transformacion

T1. Creamos la columna `fechaInicio` a partir de la columna `anio` concatenando `0101`:

In [104]:
hecho_hist_cambios_df = hecho_hist_cambios_df.withColumn("fechaInicio", f.concat(f.col('anio'), f.lit('0101')))
hecho_hist_cambios_df.show(10)

+----------------+----------+----+-----------+
|idAeropuerto_DWH|idMini_DWH|anio|fechaInicio|
+----------------+----------+----+-----------+
|             288|         1|2016|   20160101|
|             274|         1|2016|   20160101|
|             225|         2|2015|   20150101|
|             220|         3|2016|   20160101|
|             173|         3|2016|   20160101|
|             172|         3|2016|   20160101|
|             169|         3|2016|   20160101|
|             158|         3|2016|   20160101|
|             157|         3|2016|   20160101|
|             156|         3|2016|   20160101|
+----------------+----------+----+-----------+
only showing top 10 rows



T2. Creamos la columna `fechaFin` a partir de la columna `anio` concatenando `0101`:

In [105]:
hecho_hist_cambios_df = hecho_hist_cambios_df.withColumn("fechaFin", f.concat(f.col('anio'), f.lit('0101')))
hecho_hist_cambios_df.show(10)

+----------------+----------+----+-----------+--------+
|idAeropuerto_DWH|idMini_DWH|anio|fechaInicio|fechaFin|
+----------------+----------+----+-----------+--------+
|             288|         1|2016|   20160101|20160101|
|             274|         1|2016|   20160101|20160101|
|             225|         2|2015|   20150101|20150101|
|             220|         3|2016|   20160101|20160101|
|             173|         3|2016|   20160101|20160101|
|             172|         3|2016|   20160101|20160101|
|             169|         3|2016|   20160101|20160101|
|             158|         3|2016|   20160101|20160101|
|             157|         3|2016|   20160101|20160101|
|             156|         3|2016|   20160101|20160101|
+----------------+----------+----+-----------+--------+
only showing top 10 rows



T3. Creamos la columna `cambio` con el valor constante `1`:

In [106]:
hecho_hist_cambios_df = hecho_hist_cambios_df.withColumn("cambio", f.lit('1'))
hecho_hist_cambios_df = hecho_hist_cambios_df.select('cambio', 'idAeropuerto_DWH', 'idMini_DWH','fechaInicio','fechaFin')
hecho_hist_cambios_df.show(10)

+------+----------------+----------+-----------+--------+
|cambio|idAeropuerto_DWH|idMini_DWH|fechaInicio|fechaFin|
+------+----------------+----------+-----------+--------+
|     1|             288|         1|   20160101|20160101|
|     1|             274|         1|   20160101|20160101|
|     1|             225|         2|   20150101|20150101|
|     1|             220|         3|   20160101|20160101|
|     1|             173|         3|   20160101|20160101|
|     1|             172|         3|   20160101|20160101|
|     1|             169|         3|   20160101|20160101|
|     1|             158|         3|   20160101|20160101|
|     1|             157|         3|   20160101|20160101|
|     1|             156|         3|   20160101|20160101|
+------+----------------+----------+-----------+--------+
only showing top 10 rows



#### Carga
Se carga el dataframe procesado en una nueva tabla segun el diseño de ETL.

In [107]:
guardar_db(dest_db_connection_string, hecho_hist_cambios_df,'Proyecto_G5_202413.G5_HechoHistoriaCambios', db_shiomar, pass_shiomar)

## Paso #9: Hecho Vuelo

En este paso aplicaremos ETL para la creacion del **HehcoVuelo**, su fuente de datos viene de las tablas `vuelos` de la base de datos de WWImportersProyectoTransaccionalTransactional y de las Tablas `G5_Aeropuerto`, `G5_miniDimensionAeropuerto`, `G5_Fecha`, `G5_GeografiaconDemografia`, `G5_Trafico` y `G5_TipoVuelo` de la base de datos de Proyecto_G5_202413.

Al final del proceso de ETL se espera obtener una nueva tabla llamada `G5_HechoVuelo` en la base de datos de Estudiante_103_202413 conteniendo las siguiente columnas:
* idFecha
* idAeropuertoOrigen_DWH
* idAeropuertoDestino_DWH
* idMiniMomentoDelHechoOrigen_DWH
* idMunicipioOrigen_DWH
* idMunicipioDestino_DWH
* idTrafico_DWH
* idTipoVuelo_DWH
* Vuelos
* Sillas
* CargaOfrecida
* Pasajeros
* CargaAbordo

#### Extraccion

En esta seccion obtendremos los datos de las fuentes identificadas en nuestro diseño.

In [119]:
sql_vuelos = '''(SELECT DISTINCT ano, mes, origen, destino,tipo_vuelo,trafico,tipo_equipo, vuelos AS Vuelos, sillas AS Sillas, carga_ofrecida AS CargaOfrecida, pasajeros AS Pasajeros, carga_bordo AS CargaAbordo FROM ProyectoTransaccional.vuelos) AS Temp_vuelos'''
vuelos_temp = obtener_dataframe_de_bd(source_db_connection_string, sql_vuelos, db_shiomar, pass_shiomar)
print('La cantidad de Registros de Transaccion de Movimientos obtenidas fueron: ' + str(vuelos_temp.count()))
print('La tabla extraida queda de la siguiente forma:')
vuelos_temp.show()
print('-------------------------')
sql_aero = '''(SELECT DISTINCT sigla, gcd_municipio FROM ProyectoTransaccional.aeropuertos) AS Temp_aeropuerto'''
aero_df = obtener_dataframe_de_bd(source_db_connection_string, sql_aero, db_shiomar, pass_shiomar)
print('La cantidad de Registros de la tabla Aeropuertos obtenidos fueron: ' + str(aero_df.count()))
print('La tabla extraida queda de la siguiente forma:')
aero_df.show()

La cantidad de Registros de Transaccion de Movimientos obtenidas fueron: 562430
La tabla extraida queda de la siguiente forma:
+----+---+------+-------+----------+-------+-----------+------+------+-------------+---------+-----------+
| ano|mes|origen|destino|tipo_vuelo|trafico|tipo_equipo|Vuelos|Sillas|CargaOfrecida|Pasajeros|CargaAbordo|
+----+---+------+-------+----------+-------+-----------+------+------+-------------+---------+-----------+
|2012|  9|   uib|    axm|         R|      N|       JS32|     2|    35|        412.0|        2|        0.0|
|2012| 11|   uib|    axm|         R|      N|       JS32|     2|    35|        416.0|        2|        0.0|
|2012| 12|   uib|    axm|         R|      N|       JS32|     4|    69|        830.0|        6|        0.0|
|2012| 11|   uib|    bga|         R|      N|       JS32|     1|    17|        206.0|        1|        0.0|
|2012|  1|   uib|    bsc|         R|      N|       DHC6|    14|   223|       2786.0|      126|      143.0|
|2012|  2|   uib|

#### Transformacion

T1. Eliminar Duplicados Totales la tabla de Vuelos

Esta transformacion fue aplicada en la sentencia SQL de la etapa de Extraccion.

In [120]:
duplicates = vuelos_temp.count() - vuelos_temp.distinct().count()
print('La cantidad de duplicados totales es de ' + str(duplicates))

La cantidad de duplicados totales es de 0


T2. Preparación para Conexión de Tablas de dimensiones.

In [None]:
# Preparacion de tabla
df_dic_fecha = pd.DataFrame({'mes': ['1','2','3','4','5','6','7','8','9','10','11','12'], 'NewMes': ['01', '02', '03','04','05','06','07','08','09','10','11','12']})
diccionarioFecha=spark.createDataFrame(df_dic_fecha)
vuelos_temp = vuelos_temp.join(diccionarioFecha,how = 'inner',  on='mes')
vuelos_temp = vuelos_temp.drop('mes')
vuelos_temp = vuelos_temp.withColumnRenamed('NewMes', 'mes')
vuelos_temp = vuelos_temp.withColumn("Dia", lit('01'))

df = pd.DataFrame({'tipo_vuelo': ['R','T','C','A'], 'nombreTipo': ['regular', 'taxi', 'chárter', 'adicionales']})
diccionarioTipoVuelo=spark.createDataFrame(df)
vuelos_temp = vuelos_temp.join(diccionarioTipoVuelo,how = 'inner',  on='tipo_vuelo')
vuelos_temp = vuelos_temp.drop('tipo_vuelo')

vuelos_temp = vuelos_temp.withColumn('origen', upper(vuelos_temp['origen']))
vuelos_temp = vuelos_temp.withColumn('destino', upper(vuelos_temp['destino']))

print('La tabla Transformada queda de la siguiente forma:')
vuelos_temp.show()

La tabla Transformada queda de la siguiente forma:


T3. Conexión tablas dimensiones con Tabla Hechos.

In [None]:
vuelos_temp = vuelos_temp.alias('v').join(df_trafico.alias('tr'), (vuelos_temp.trafico == df_trafico.idTrafico_T),'left') \
                    .join(df_tipo_vuelos.alias('tv'), ((vuelos_temp.nombreTipo == df_tipo_vuelos.nombreTipo) & (vuelos_temp.tipo_equipo == df_tipo_vuelos.tipoEquipo)),'left') \
                    .join(aeropuerto_df.alias('a'), (vuelos_temp.origen == aeropuerto_df.Sigla),'left') \
                    .join(aero_df.alias('aero'), (vuelos_temp.origen == upper(aero_df.sigla)),'left')   \
                    .join(fecha_final.alias('fe'), ((vuelos_temp.mes == fecha_final.Mes) & (vuelos_temp.ano == fecha_final.Anio) & (vuelos_temp.Dia == fecha_final.Dia)) ,'left') \
                    .select([col('fe.idFecha'),col('a.idAeropuerto_DWH'),col('v.destino'), col('aero.gcd_municipio'),col('tv.idTipoVuelo_DWH'),col('tr.IdTrafico_DWH'),
                             col('v.Vuelos'),col('v.Sillas'),col('v.CargaOfrecida'),col('v.Pasajeros'), col('v.CargaAbordo')]) \
                    .fillna({'gcd_municipio': 0, 'idAeropuerto_DWH': 0, })

vuelos_temp = vuelos_temp.withColumnRenamed('idAeropuerto_DWH', 'idAeropuertoOrigen_DWH')
vuelos_temp = vuelos_temp.withColumnRenamed('gcd_municipio', 'gcd_municipio_origen')
                    
print('La tabla Transformada queda de la siguiente forma:')
vuelos_temp.show()

In [None]:
geo = geografiaConDemografiaInitial.dropDuplicates(['ID_Municipio_T'])
vuelos_temp = vuelos_temp.alias('v').join(aeropuerto_df.alias('a'), (vuelos_temp.destino == aeropuerto_df.Sigla),'left') \
                    .join(aero_df.alias('aero'), (vuelos_temp.destino == upper(aero_df.sigla)),'left')   \
                    .join(geo.alias('g'), (vuelos_temp.gcd_municipio_origen == geo.ID_Municipio_T),'left') \
                    .select([col('v.idFecha'),col('v.idAeropuertoOrigen_DWH'),col('a.idAeropuerto_DWH'), col('g.ID_Municipio_DWH'), col('aero.gcd_municipio'),col('v.idTipoVuelo_DWH'),col('v.IdTrafico_DWH'),
                             col('v.Vuelos'),col('v.Sillas'),col('v.CargaOfrecida'),col('v.Pasajeros'), col('v.CargaAbordo')]) \
                    .fillna({'gcd_municipio': 0, 'idAeropuerto_DWH': 0,'ID_Municipio_DWH':0 })

vuelos_temp = vuelos_temp.withColumnRenamed('idAeropuerto_DWH', 'idAeropuertoDestino_DWH')
vuelos_temp = vuelos_temp.withColumnRenamed('gcd_municipio', 'gcd_municipio_destino')
vuelos_temp = vuelos_temp.withColumnRenamed('ID_Municipio_DWH', 'idMunicipioOrigen_DWH')
                    
print('La tabla Transformada queda de la siguiente forma:')
vuelos_temp.show()

In [None]:
vuelos_temp = vuelos_temp.alias('v').join(geo.alias('g'), (vuelos_temp.gcd_municipio_destino == geo.ID_Municipio_T),'left') \
                    .select([col('v.idFecha'),col('v.idAeropuertoOrigen_DWH'),col('v.idAeropuertoDestino_DWH'), col('v.idMunicipioOrigen_DWH'), col('g.ID_Municipio_DWH'),col('v.idTipoVuelo_DWH'),col('v.IdTrafico_DWH'),
                             col('v.Vuelos'),col('v.Sillas'),col('v.CargaOfrecida'),col('v.Pasajeros'), col('v.CargaAbordo')]) \
                    .fillna({'ID_Municipio_DWH': 0})
vuelos_temp = vuelos_temp.withColumnRenamed('ID_Municipio_DWH', 'idMunicipioDestino_DWH')
                    
print('La tabla Transformada queda de la siguiente forma:')
vuelos_temp.show()

In [None]:
p = Window.partitionBy('idAeropuerto_DWH')

mini_anio = aeropuertos_df.withColumn('maxAnio', spark_max('Anio').over(p))\
    .where(col('Anio') == col('maxAnio'))\
    .drop('maxAnio')
#mini_anio.show()
mini_anio.count() #598

In [None]:
 vuelos_2 = vuelos_temp.alias('v').join(mini_anio.alias('mini'), (vuelos_temp.idAeropuertoOrigen_DWH == mini_anio.idAeropuerto_DWH), 'left')\
                                .select([col('v.idFecha'),col('v.idAeropuertoOrigen_DWH'),col('v.idAeropuertoDestino_DWH'), col('v.idMunicipioOrigen_DWH'), col('v.idMunicipioDestino_DWH'),col('v.idTipoVuelo_DWH'),col('v.IdTrafico_DWH'),
                             col('v.Vuelos'),col('v.Sillas'),col('v.CargaOfrecida'),col('v.Pasajeros'), col('v.CargaAbordo'), col('mini.rangoNumeroVuelosOrigen'), col('mini.rangoLongitudPista'), col('mini.rangoAnchoPista'), col('mini.tipo'), col('mini.clase')]) 

print('La tabla Transformada queda de la siguiente forma:')
vuelos_2.show()

In [None]:
 vuelos_2 = vuelos_2.alias('v').join(mini_dimension_aeropuerto_df.alias('mini'),  ( (vuelos_2.rangoNumeroVuelosOrigen == mini_dimension_aeropuerto_df.rangoNumeroVuelosOrigen) & (vuelos_2.rangoLongitudPista == mini_dimension_aeropuerto_df.rangoLongitudPista) & (vuelos_2.rangoAnchoPista == mini_dimension_aeropuerto_df.rangoAnchoPista) & (vuelos_2.tipo == mini_dimension_aeropuerto_df.tipo) & (vuelos_2.clase == mini_dimension_aeropuerto_df.clase) ), 'left')\
                                .select([col('v.idFecha'),col('v.idAeropuertoOrigen_DWH'),col('v.idAeropuertoDestino_DWH'), col('v.idMunicipioOrigen_DWH'), col('v.idMunicipioDestino_DWH'),col('v.idTipoVuelo_DWH'),col('v.IdTrafico_DWH'),
                             col('v.Vuelos'),col('v.Sillas'),col('v.CargaOfrecida'),col('v.Pasajeros'), col('v.CargaAbordo'), col('mini.idMini_DWH')]) \
                            .fillna({'idMini_DWH': 0})
    
vuelos_2 = vuelos_2.withColumnRenamed('idMini_DWH', 'idMiniMomentoDelHechoOrigen_DWH')

print('La tabla Transformada queda de la siguiente forma:')
vuelos_2.show()
vuelos_2.count()

#### Carga
Se carga el dataframe procesado en una nueva tabla segun el diseño de ETL.

In [None]:
guardar_db(dest_db_connection_string, vuelos_2,'Proyecto_G5_202413.G5_HechoVuelo', db_shiomar, pass_shiomar)

## Paso #10: Clean Up
En esta seccion haremos limpieza de la sesion de Spark, ya que hemos concluido con lo esperado en la actividad.

In [118]:
#spark.stop()