# Creación de ETLs con PySpark - Aeropuertos - HechoVuelo

## Proceso de ETL para una dimensión.

Las llaves ID_XXXX presentes en el modelo hacen referencia a las llaves de la bodega. Por otra parte, en el proceso de ETL se van a tener en cuenta las llaves transaccionales (**ProyectoTransaccional**). La nomenclatura para utilizar es:

1.   ID_XXXX_DWH, para las llaves de la bodega.
2.   ID_XXXX_T, para las llaves transaccionales.


Proceso de Conexion e inicialización

In [1]:
# Configuración servidor base de datos transaccional
# Recuerde usar Estudiante_i como usuario y la contraseña asigana en el excel de conexión a maquina virtual como contraseña
db_user = 'Estudiante_68_202314'
db_psswd = 'aabb1122'
source_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/ProyectoTransaccional'

dest_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/Proyecto_G3_202314'

# Driver de conexion
path_jar_driver = 'C:\Program Files (x86)\MySQL\Connector J 8.0\mysql-connector-java-8.0.28.jar'

In [2]:
import os 
from pyspark.sql import functions as f, SparkSession, types as t
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.functions import udf, col, length, isnan, when, count, regexp_replace
from datetime import datetime
from pyspark.sql.window import Window
import pandas as pd
from pyspark.sql.types import FloatType

In [3]:
#Configuración de la sesión
conf=SparkConf() \
    .set('spark.driver.extraClassPath', path_jar_driver)

if 'spark_context' in locals():
    spark_context.stop()

spark_context = SparkContext(conf=conf)
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession



### Conexión y carga de datos

Se define la función para conexión y cargue de dataframes desde la base de datos origen y luego la función para guardar un dataframe en una tabla de la base de datos destino.

In [4]:
def obterner_dataframe_desde_csv(_PATH, _sep):
    return spark.read.load(_PATH, format="csv", sep=_sep, inferSchema="true", header='true')

def obtener_dataframe_de_bd(db_connection_string, sql, db_user, db_psswd):
    df_bd = spark.read.format('jdbc')\
        .option('url', db_connection_string) \
        .option('dbtable', sql) \
        .option('user', db_user) \
        .option('password', db_psswd) \
        .option('driver', 'com.mysql.cj.jdbc.Driver') \
        .load()
    return df_bd

def guardar_db(db_connection_string, df, tabla, db_user, db_psswd):
    df.select('*').write.format('jdbc') \
      .mode('append') \
      .option('url', db_connection_string) \
      .option('dbtable', tabla) \
      .option('user', db_user) \
      .option('password', db_psswd) \
      .option('driver', 'com.mysql.cj.jdbc.Driver') \
      .save()

### BLOQUE - Aeropuertos
Empezamos con el bloque de la dimensión <i>Aeropuertos</i>, su fuente de datos viene de la tabla transaccional <i>Aeropuertos</i>.

#### Extracción
A continuación, nos conectamos a la base de datos y extraemos la información deseada por medio de SQL, cargandola en un DataFrame PySpark, es decir en memoria. De la tabla de aeropuertos, En este paso, solo nos interesan la informacion de los aeropuertos (sigla, iata, nombre, elevación, municipio, departamento), por lo cual se hace un filtro por medio del WHERE, buscando la informacion necesaria para los aeropuertos.

In [6]:
sql_aeropuertos = '''(SELECT DISTINCT sigla, iata, nombre, elevacion, municipio, departamento, numero_vuelos_origen FROM aeropuertos) AS Temp_aeropuertos'''
aeropuertos = obtener_dataframe_de_bd(source_db_connection_string, sql_aeropuertos, db_user, db_psswd)
aeropuertos.show(10)

+-----+----+--------------------+---------+-------------+------------+--------------------+
|sigla|iata|              nombre|elevacion|    municipio|departamento|numero_vuelos_origen|
+-----+----+--------------------+---------+-------------+------------+--------------------+
|  7FO|    |             la isla|      538|Puerto Gaitán|        meta|              171525|
|  7FO|    |             la isla|      538|Puerto Gaitán|        meta|              571675|
|  7FO|    |             la isla|      538|Puerto Gaitán|        meta|              994420|
|  7FU|    |        la escondida|      564|Puerto Gaitán|        meta|              252325|
|  7FU|    |        la escondida|      564|Puerto Gaitán|        meta|              126667|
|  7FU|    |        la escondida|      564|Puerto Gaitán|        meta|              175953|
|  7FW|    |           morichito|      720| Hato Corozal|    casanare|              444936|
|  7FW|    |           morichito|      720| Hato Corozal|    casanare|          

#### Transformación

1. Limpieza de Datos: En ese paso la idea es poder limpiar los datos asociados a vacios, valores negativos, y completar fechas.

In [7]:
# TRANSFORMACION

aeropuertos = aeropuertos.withColumn('numero_vuelos_origen', f.when(aeropuertos["numero_vuelos_origen"] < 0, aeropuertos["numero_vuelos_origen"] * -1).otherwise(aeropuertos["numero_vuelos_origen"]))
print('valores sin negativos')
aeropuertos.show(10)


valores sin negativos
+-----+----+--------------------+---------+-------------+------------+--------------------+
|sigla|iata|              nombre|elevacion|    municipio|departamento|numero_vuelos_origen|
+-----+----+--------------------+---------+-------------+------------+--------------------+
|  7FO|    |             la isla|      538|Puerto Gaitán|        meta|              171525|
|  7FO|    |             la isla|      538|Puerto Gaitán|        meta|              571675|
|  7FO|    |             la isla|      538|Puerto Gaitán|        meta|              994420|
|  7FU|    |        la escondida|      564|Puerto Gaitán|        meta|              252325|
|  7FU|    |        la escondida|      564|Puerto Gaitán|        meta|              126667|
|  7FU|    |        la escondida|      564|Puerto Gaitán|        meta|              175953|
|  7FW|    |           morichito|      720| Hato Corozal|    casanare|              444936|
|  7FW|    |           morichito|      720| Hato Corozal| 

In [10]:
aeropuertos = aeropuertos.coalesce(1).withColumn('ID_Aeropuerto_DWH', f.monotonically_increasing_id() + 1)


In [11]:
aeropuertos = aeropuertos.withColumn('numero_vuelos_origen', f.when(aeropuertos["numero_vuelos_origen"] == '', '0').otherwise(aeropuertos["numero_vuelos_origen"]))
print('Numero_vuelos_origen no vacias')
aeropuertos.show(10)

Numero_vuelos_origen no vacias
+-----+----+--------------------+---------+-------------+------------+--------------------+-------+-----------------+
|sigla|iata|              nombre|elevacion|    municipio|departamento|numero_vuelos_origen|cambios|ID_Aeropuerto_DWH|
+-----+----+--------------------+---------+-------------+------------+--------------------+-------+-----------------+
|  7FO|    |             la isla|      538|Puerto Gaitán|        meta|              171525|      1|                1|
|  7FO|    |             la isla|      538|Puerto Gaitán|        meta|              571675|      1|                2|
|  7FO|    |             la isla|      538|Puerto Gaitán|        meta|              994420|      1|                3|
|  7FU|    |        la escondida|      564|Puerto Gaitán|        meta|              252325|      1|                4|
|  7FU|    |        la escondida|      564|Puerto Gaitán|        meta|              126667|      1|                5|
|  7FU|    |        la es

2. Se debe agregar la fila de cambios como version 1

In [12]:
aeropuertos = aeropuertos.withColumn('cambios', f.lit(1))
print('se agrega la columna de cambios con V1')
aeropuertos.show(10)

se agrega la columna de cambios con V1
+-----+----+--------------------+---------+-------------+------------+--------------------+-------+-----------------+
|sigla|iata|              nombre|elevacion|    municipio|departamento|numero_vuelos_origen|cambios|ID_Aeropuerto_DWH|
+-----+----+--------------------+---------+-------------+------------+--------------------+-------+-----------------+
|  7FO|    |             la isla|      538|Puerto Gaitán|        meta|              171525|      1|                1|
|  7FO|    |             la isla|      538|Puerto Gaitán|        meta|              571675|      1|                2|
|  7FO|    |             la isla|      538|Puerto Gaitán|        meta|              994420|      1|                3|
|  7FU|    |        la escondida|      564|Puerto Gaitán|        meta|              252325|      1|                4|
|  7FU|    |        la escondida|      564|Puerto Gaitán|        meta|              126667|      1|                5|
|  7FU|    |     

In [13]:
print((aeropuertos.count(),aeropuertos.distinct().count()))

(1194, 1194)


In [14]:
aeropuertos = aeropuertos.drop_duplicates()

In [15]:
print((aeropuertos.count(),aeropuertos.distinct().count()))

(1194, 1194)


#### Carga
Una vez realizado esto, se guardan los resultados en la base de datos destino

In [16]:
# CARGUE
guardar_db(dest_db_connection_string, aeropuertos,'Proyecto_G3_202314.Aeropuertos', db_user, db_psswd)
print(aeropuertos.count())

1194


Verifique los resultados usando MySQL Workbench

### BLOQUE HechoVuelos

La idea de este bloque es poder formar la informacion de <i>HechoVuelos</i> con los datos de las tablas de <i>Aeropuertos, Divipola, PIB, Proyecciones y Vuelos</i>. Para esto debemos sacar la informacion necesaria de cada tabla.

#### Extracción

In [17]:
# Configuración servidor base de datos Proyecto_G3_202314
# Recuerde usar Estudiante_i como usuario y la contraseña asigana en el excel de conexión a maquina virtual como contraseña
db_user = 'Estudiante_68_202314'
db_psswd = 'aabb1122'
source_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/Proyecto_G3_202314'

dest_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/Proyecto_G3_202314'

# Driver de conexion
path_jar_driver = 'C:\Program Files (x86)\MySQL\Connector J 8.0\mysql-connector-java-8.0.28.jar'

In [18]:
import os 
from pyspark.sql import functions as f, SparkSession, types as t
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.functions import udf, col, length, isnan, when, count, regexp_replace
from datetime import datetime
from pyspark.sql.window import Window
import pandas as pd
from pyspark.sql.types import FloatType

In [19]:
#Configuración de la sesión
conf=SparkConf() \
    .set('spark.driver.extraClassPath', path_jar_driver)

if 'spark_context' in locals():
    spark_context.stop()

spark_context = SparkContext(conf=conf)
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession

# Conexión y Carga de Datos

In [20]:
def obterner_dataframe_desde_csv(_PATH, _sep):
    return spark.read.load(_PATH, format="csv", sep=_sep, inferSchema="true", header='true')

def obtener_dataframe_de_bd(db_connection_string, sql, db_user, db_psswd):
    df_bd = spark.read.format('jdbc')\
        .option('url', db_connection_string) \
        .option('dbtable', sql) \
        .option('user', db_user) \
        .option('password', db_psswd) \
        .option('driver', 'com.mysql.cj.jdbc.Driver') \
        .load()
    return df_bd

def guardar_db(db_connection_string, df, tabla, db_user, db_psswd):
    df.select('*').write.format('jdbc') \
      .mode('append') \
      .option('url', db_connection_string) \
      .option('dbtable', tabla) \
      .option('user', db_user) \
      .option('password', db_psswd) \
      .option('driver', 'com.mysql.cj.jdbc.Driver') \
      .save()

BLOQUE - HechoVuelos
Empezamos con el bloque de la dimensión HechoVuelos, su fuente de datos viene de las tablas Aeropuertos, GeografiaConDemografia, TipoVuelo y Vuelos.

Extracción
A continuación, nos conectamos a la base de datos y extraemos la información deseada por medio de SQL, cargandola en un DataFrame PySpark, es decir en memoria. De la tabla de aeropuertos, En este paso, nos interesa toda la informacion de las diferentes tablas.

In [74]:
#EXTRACCION

sql_vuelos =  '''( SELECT * FROM Proyecto_G3_202314.Vuelos) AS Temp_vuelos '''
vuelos = obtener_dataframe_de_bd(source_db_connection_string, sql_vuelos, db_user, db_psswd)
#vuelos.show(10)

sql_aeropuertos = '''( SELECT * FROM Proyecto_G3_202314.Aeropuertos) AS Temp_aeropuertos '''
aeropuertos_b = obtener_dataframe_de_bd(source_db_connection_string, sql_aeropuertos, db_user, db_psswd)
#aeropuertos_b.show(10)

sql_geocondemo = '''( SELECT * FROM Proyecto_G3_202314.GeografiaConDemografia) AS Temp_geocondemo '''
geocondemo = obtener_dataframe_de_bd(source_db_connection_string, sql_geocondemo, db_user, db_psswd)
#geocondemo.show(10)

sql_tipovuelo = '''( SELECT * FROM Proyecto_G3_202314.TipoVuelo) AS Temp_tipovuelo '''
tipo_vuelo = obtener_dataframe_de_bd(source_db_connection_string, sql_tipovuelo, db_user, db_psswd).limit(1000)


#### Transformación

Hacer 2 Join de Vuelos a Aeropuertos por origen y destino.<br>
La siguiente es la primera transformación para hacer el join entre aeropuertos y vuelos por el origen.

In [75]:
resultado1 = vuelos.alias("vu")\
    .join(aeropuertos_b.alias("aer_o"), f.lower(col("vu.origen")) == f.lower(col("aer_o.sigla")), "inner")

resultado1 = resultado1.withColumnRenamed("municipio","municipio_o").withColumnRenamed("ID_Aeropuerto_DWH","idAeropuertoOrigen_DWH")
resultado1.show(5)

+----+---+------+-------+-----------+----------+-------+---------+------+------+--------------+---------+-----------+-----+----+-----------+---------+------------------+------------------+--------------------+-------+----------------------+
| ano|mes|origen|destino|tipo_equipo|tipo_vuelo|trafico|  empresa|vuelos|sillas|carga_ofrecida|pasajeros|carga_bordo|sigla|iata|     nombre|elevacion|       municipio_o|      departamento|numero_vuelos_origen|cambios|idAeropuertoOrigen_DWH|
+----+---+------+-------+-----------+----------+-------+---------+------+------+--------------+---------+-----------+-----+----+-----------+---------+------------------+------------------+--------------------+-------+----------------------+
|2012| 12|   cuc|    bog|       B732|         R|      N|AEROSUCRE|     1|     0|       15000.0|        0|     1214.0|  CUC| CUC|camilo daza|     1027|San José de Cúcuta|norte de santander|               55734|      1|                   718|
|2012| 11|   cuc|    bog|       B732

La siguiente es la segunda transformación para hacer el join entre aeropuertos y vuelos por el destino.

In [76]:
resultado2 = resultado1.alias("vu")\
    .join(aeropuertos_b.alias("aer_d"), f.lower(col("vu.destino")) == f.lower(col("aer_d.sigla")), "inner")

resultado2 = resultado2.withColumnRenamed("municipio","municipio_d").withColumnRenamed("ID_Aeropuerto_DWH","idAeropuertoDestino_DWH")

resultado2.show(5)

+----+---+------+-------+-----------+----------+-------+---------+------+------+--------------+---------+-----------+-----+----+-----------+---------+------------------+------------------+--------------------+-------+----------------------+-----+----+---------+---------+------------+------------+--------------------+-------+-----------------------+
| ano|mes|origen|destino|tipo_equipo|tipo_vuelo|trafico|  empresa|vuelos|sillas|carga_ofrecida|pasajeros|carga_bordo|sigla|iata|     nombre|elevacion|       municipio_o|      departamento|numero_vuelos_origen|cambios|idAeropuertoOrigen_DWH|sigla|iata|   nombre|elevacion| municipio_d|departamento|numero_vuelos_origen|cambios|idAeropuertoDestino_DWH|
+----+---+------+-------+-----------+----------+-------+---------+------+------+--------------+---------+-----------+-----+----+-----------+---------+------------------+------------------+--------------------+-------+----------------------+-----+----+---------+---------+------------+------------+-

Hacer 2 Join de Aeropuertos por municipio y departamento por origen y destino <br>
El siguiente es el Join por el municipio de origen.


In [77]:
resultado3 = resultado2.alias("vu")\
    .join(geocondemo.alias("gcd_o"),  f.lower(col("vu.municipio_o")) ==  f.lower(col("gcd_o.nombreMunicipio")), "inner") 

resultado3 = resultado3.withColumnRenamed("idMunicipio_DWH","idMunicipioOrigen_DWH")
resultado3.show(5)

+----+---+------+-------+-----------+----------+-------+-------+------+------+--------------+---------+-----------+-----+----+--------------------+---------+------------+------------+--------------------+-------+----------------------+-----+----+-------------+---------+-----------+------------+--------------------+-------+-----------------------+---------------------+-------------+---------------+------------------+-----------------+--------------+-------------+------------------+------------+------------+------------+----------+--------+
| ano|mes|origen|destino|tipo_equipo|tipo_vuelo|trafico|empresa|vuelos|sillas|carga_ofrecida|pasajeros|carga_bordo|sigla|iata|              nombre|elevacion| municipio_o|departamento|numero_vuelos_origen|cambios|idAeropuertoOrigen_DWH|sigla|iata|       nombre|elevacion|municipio_d|departamento|numero_vuelos_origen|cambios|idAeropuertoDestino_DWH|idMunicipioOrigen_DWH|idMunicipio_T|nombreMunicipio|nombreDepartamento|areaMetropolitana|      longitud|   

El siguiente es el Join por el municipio de destino.

In [78]:
resultado4 = resultado3.alias("vu")\
    .join(geocondemo.alias("gcd_d"),  f.lower(col("vu.municipio_d")) ==  f.lower(col("gcd_d.nombreMunicipio")), "inner") 
resultado4 = resultado4.withColumnRenamed("idMunicipio_DWH","idMunicipioDestino_DWH")
resultado4.show(5)

+----+---+------+-------+-----------+----------+-------+-------+------+------+--------------+---------+-----------+-----+----+-------------+---------+-----------+------------+--------------------+-------+----------------------+-----+----+--------------------+---------+------------+------------+--------------------+-------+-----------------------+---------------------+-------------+---------------+------------------+-----------------+--------------+-------------+-------------------+------------+------------+------------+----------+--------+----------------------+-------------+---------------+------------------+-----------------+--------------+-------------+------------------+------------+------------+------------+----------+--------+
| ano|mes|origen|destino|tipo_equipo|tipo_vuelo|trafico|empresa|vuelos|sillas|carga_ofrecida|pasajeros|carga_bordo|sigla|iata|       nombre|elevacion|municipio_o|departamento|numero_vuelos_origen|cambios|idAeropuertoOrigen_DWH|sigla|iata|              nombr

In [79]:
resultado5 = resultado4.alias("vu")\
    .join(tipo_vuelo.alias("tv"), 
          (f.substring(col("tv.nombreTipo"), 1,1) == col("vu.tipo_vuelo")) & 
          (col("tv.tipoEquipo") == col("vu.tipo_equipo")), "inner")

resultado5.show(5)

+----+---+------+-------+-----------+----------+-------+-------+------+------+--------------+---------+-----------+-----+----+---------+---------+---------------+------------+--------------------+-------+----------------------+-----+----+---------+---------+-----------+------------+--------------------+-------+-----------------------+---------------------+-------------+---------------+------------------+-----------------+-------------+-------------+-----------+------------+------------+------------+----------+--------+----------------------+-------------+---------------+------------------+-----------------+--------------+-------------+-----------+------------+------------+------------+----------+--------+---------------+-------------+----------+----------+
| ano|mes|origen|destino|tipo_equipo|tipo_vuelo|trafico|empresa|vuelos|sillas|carga_ofrecida|pasajeros|carga_bordo|sigla|iata|   nombre|elevacion|    municipio_o|departamento|numero_vuelos_origen|cambios|idAeropuertoOrigen_DWH|sigla|

Transformación para normalizar la fecha, concatenando al año y el mes el día 01

In [80]:
resultado5 = resultado5.withColumn("idFecha", f.concat(f.format_string("%04d", col("ano")), 
                                              f.format_string("%02d", col("mes")), 
                                              f.lit("01")))

Transformación para Cambiar el nombre de columna trafico a ID_Trafico_T

In [81]:
resultado5 = resultado5.withColumn("miniMomentoDelHechoOrigen_DWH", f.lit(0))
resultado5 = resultado5.withColumnRenamed("trafico","id_Trafico_T")


Selección de campos para cargar en la Base de Datos

In [82]:
resultado5 = resultado5.select("idAeropuertoOrigen_DWH","idAeropuertoDestino_DWH","idMunicipioOrigen_DWH","idMunicipioDestino_DWH","miniMomentoDelHechoOrigen_DWH","idFecha","id_Trafico_T", "idTipoVuelo_DWH", "vuelos","sillas","carga_ofrecida","carga_bordo","pasajeros")  

resultado5.show(5)

+----------------------+-----------------------+---------------------+----------------------+-----------------------------+--------+------------+---------------+------+------+--------------+-----------+---------+
|idAeropuertoOrigen_DWH|idAeropuertoDestino_DWH|idMunicipioOrigen_DWH|idMunicipioDestino_DWH|miniMomentoDelHechoOrigen_DWH| idFecha|id_Trafico_T|idTipoVuelo_DWH|vuelos|sillas|carga_ofrecida|carga_bordo|pasajeros|
+----------------------+-----------------------+---------------------+----------------------+-----------------------------+--------+------------+---------------+------+------+--------------+-----------+---------+
|                   780|                    928|           8589946432|           25769845641|                            0|20040501|           N|         276303|     1|     0|           0.0|        0.0|        5|
|                   780|                    928|           8589946432|           25769845641|                            0|20040501|           N|   

Agregar id incremental idVuelo_DWH

In [83]:
resultado6 = resultado5.coalesce(1).withColumn("idVuelo_DWH", f.monotonically_increasing_id() + 1)
resultado6.show(5)

+----------------------+-----------------------+---------------------+----------------------+-----------------------------+--------+------------+---------------+------+------+--------------+-----------+---------+-----------+
|idAeropuertoOrigen_DWH|idAeropuertoDestino_DWH|idMunicipioOrigen_DWH|idMunicipioDestino_DWH|miniMomentoDelHechoOrigen_DWH| idFecha|id_Trafico_T|idTipoVuelo_DWH|vuelos|sillas|carga_ofrecida|carga_bordo|pasajeros|idVuelo_DWH|
+----------------------+-----------------------+---------------------+----------------------+-----------------------------+--------+------------+---------------+------+------+--------------+-----------+---------+-----------+
|                   780|                    928|           8589946432|           25769845641|                            0|20040501|           N|         276303|     1|     0|           0.0|        0.0|        5|          1|
|                   780|                    928|           8589946432|           25769845641|       

In [85]:
resultado6 = resultado6.limit(1000)
print(resultado6.count())

1000


# Carga de Datos

In [86]:
HechoVuelo = resultado6
guardar_db(dest_db_connection_string, HechoVuelo,'Proyecto_G3_202314.HechoVuelo', db_user, db_psswd)
print(HechoVuelo.count())

1000
