# Tutorial: creación de ETLs con PySpark

## 1. Introducción	
    ¿Qué aprenderá?
	En este tutorial aprenderá cómo puede utilizar PySpark para crear un proceso de ETL básico.

	¿Qué construirá?     
        Construirá un ETL que toma los datos desde la base de datos transacional de WideWorldImporters (WWImportersTransactional), los transforma  a una representación cercana al análisis y los  almacena en la base de datos relacional WWImportersDWH.
    
	¿Para qué?
	La construcción de ETLs que se ajusten a modelos multidimensionales es un paso necesario dentro de un proceso de analìtica 1.0 , pues permite tomar los datos crudos de una fuente, generalmente transaccional, para transformarlos en datos limpios que puedan utilizarse para la toma de decisiones.
    
    ¿Qué necesita?
    1. Python 3 con pip instalado
    2. Jupyter notebook
    3. Paquetes: Pyspark (3.0.1) y pandas (1.2.1)
    4. Controlador Connector J de MySQL (ya se encuentra instalado)
    5. Servidor SQL con base de datos relacional "WWImportersTransactional" y base de datos relacional que corresponde a la bodega de WWI "Estudiante_i"
	

## 2. Proceso de ETL para una dimensión.

In [1]:
# Configuración servidor base de datos transaccional
# Recuerde usar Estudiante_i como usuario y la contraseña asigana en el excel de conexión a maquina virtual como contraseña
db_user = 'Estudiante_8_202413'
db_psswd = 'MISO_aabb1122'
source_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/ProyectoTransaccional'

dest_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/Proyecto_G7_202413'

# Driver de conexion
path_jar_driver = 'C:\Program Files (x86)\MySQL\Connector J 8.0\mysql-connector-java-8.0.28.jar'

In [2]:
import os 
from pyspark.sql import functions as f, SparkSession, types as t
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.functions import udf, col, length, isnan, when, count, regexp_replace,date_format,substring,dayofweek, expr, concat, lit
from pyspark.sql.window import Window
from datetime import datetime, date

In [3]:
#Configuración de la sesión
conf=SparkConf() \
    .set('spark.driver.extraClassPath', path_jar_driver)

spark_context = SparkContext(conf=conf)
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession



### Conexión y carga de datos

Se define la función para conexión y cargue de dataframes desde la base de datos origen y luego la función para guardar un dataframe en una tabla de la base de datos destino.

In [4]:
def obterner_dataframe_desde_csv(_PATH, _sep):
    return spark.read.load(_PATH, format="csv", sep=_sep, inferSchema="true", header='true')

def obtener_dataframe_de_bd(db_connection_string, sql, db_user, db_psswd):
    df_bd = spark.read.format('jdbc')\
        .option('url', db_connection_string) \
        .option('dbtable', sql) \
        .option('user', db_user) \
        .option('password', db_psswd) \
        .option('driver', 'com.mysql.cj.jdbc.Driver') \
        .load()
    return df_bd

def guardar_db(db_connection_string, df, tabla, db_user, db_psswd):
    df.select('*').write.format('jdbc') \
      .mode('append') \
      .option('url', db_connection_string) \
      .option('dbtable', tabla) \
      .option('user', db_user) \
      .option('password', db_psswd) \
      .option('driver', 'com.mysql.cj.jdbc.Driver') \
      .save()

### BLOQUE 1
Empezamos con el bloque 1: la dimensión <i>Aeropuerto</i>, su fuente de datos viene de la tabla transaccional <i>aeropuertos</i>.

#### Extracción


In [5]:
#EXTRACCION
sql_aeropuertos = '''(SELECT * FROM ProyectoTransaccional.aeropuertos) AS Temp_aeropuertos'''
aeropuertos = obtener_dataframe_de_bd(source_db_connection_string, sql_aeropuertos, db_user, db_psswd)
aeropuertos.show(5)

print(aeropuertos.columns)

+-----+----+------------+-------------+------------+---------+-------+--------+--------------------+--------------+--------------+-----------+----+---------+----------+------------------+--------------+-----+----------+--------------------+----------------+-------------+----+
|sigla|iata|      nombre|    municipio|departamento|categoria|latitud|longitud|         propietario|    explotador|longitud_pista|ancho_pista|pbmo|elevacion|resolucion|fecha_construccion|fecha_vigencia|clase|      tipo|numero_vuelos_origen|gcd_departamento|gcd_municipio|Anio|
+-----+----+------------+-------------+------------+---------+-------+--------+--------------------+--------------+--------------+-----------+----+---------+----------+------------------+--------------+-----+----------+--------------------+----------------+-------------+----+
|  7FO|    |     la isla|Puerto Gaitán|        meta|Aeródromo| 4.4211|-71.6271|LA ISLA Y EL ROSA...| LA CEIBA S.A.|          1170|         14|3000|      538|   1325000| 

#### Transformación

In [6]:
# TRANSFORMACION
aeropuertosTemp = aeropuertos.coalesce(1).withColumn('idAeropuerto_DWH', f.monotonically_increasing_id() + 1)
aeropuertosTemp = aeropuertosTemp.selectExpr('idAeropuerto_DWH', 'sigla', 'iata', 'nombre', 'municipio as nombreMunicipio', 'departamento as nombreDepartamento', 'categoria', 'latitud', 'longitud', 'propietario', 'explotador', 'longitud_pista', 'ancho_pista', 'pbmo', 'elevacion', 'resolucion', 'fecha_construccion', 'fecha_vigencia', 'clase', 'tipo', 'numero_vuelos_origen', 'gcd_departamento', 'gcd_municipio', 'Anio')
aeropuertosTemp.show(5)

+----------------+-----+----+------------+---------------+------------------+---------+-------+--------+--------------------+--------------+--------------+-----------+----+---------+----------+------------------+--------------+-----+----------+--------------------+----------------+-------------+----+
|idAeropuerto_DWH|sigla|iata|      nombre|nombreMunicipio|nombreDepartamento|categoria|latitud|longitud|         propietario|    explotador|longitud_pista|ancho_pista|pbmo|elevacion|resolucion|fecha_construccion|fecha_vigencia|clase|      tipo|numero_vuelos_origen|gcd_departamento|gcd_municipio|Anio|
+----------------+-----+----+------------+---------------+------------------+---------+-------+--------+--------------------+--------------+--------------+-----------+----+---------+----------+------------------+--------------+-----+----------+--------------------+----------------+-------------+----+
|               1|  7FO|    |     la isla|  Puerto Gaitán|              meta|Aeródromo| 4.4211

In [7]:
aeropuertosDim = aeropuertosTemp.select('idAeropuerto_DWH','sigla','iata','nombre','elevacion','nombreMunicipio','nombreDepartamento')
aeropuertosDim.show(5)

+----------------+-----+----+------------+---------+---------------+------------------+
|idAeropuerto_DWH|sigla|iata|      nombre|elevacion|nombreMunicipio|nombreDepartamento|
+----------------+-----+----+------------+---------+---------------+------------------+
|               1|  7FO|    |     la isla|      538|  Puerto Gaitán|              meta|
|               2|  7FO|    |     la isla|      538|  Puerto Gaitán|              meta|
|               3|  7FO|    |     la isla|      538|  Puerto Gaitán|              meta|
|               4|  7FU|    |la escondida|      564|  Puerto Gaitán|              meta|
|               5|  7FU|    |la escondida|      564|  Puerto Gaitán|              meta|
+----------------+-----+----+------------+---------+---------------+------------------+
only showing top 5 rows



#### Carga
Una vez realizado esto, se guardan los resultados en la base de datos destino


In [8]:
# CARGUE
guardar_db(dest_db_connection_string, aeropuertosDim,'Estudiante_8_202413.Aeropuerto', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 2
Empezamos el bloque 2: <i>MiniDimensiónAeropuerto</i>. Su fuente de datos viene de la tabla transaccional <i>aeropuertos</i>.

#### Extracción

In [9]:
#EXTRACCION
aeropuertosTemp = aeropuertosTemp.withColumn('numero_vuelos_origen', f.abs('numero_vuelos_origen'))
aeropuertosTemp.show(5)

+----------------+-----+----+------------+---------------+------------------+---------+-------+--------+--------------------+--------------+--------------+-----------+----+---------+----------+------------------+--------------+-----+----------+--------------------+----------------+-------------+----+
|idAeropuerto_DWH|sigla|iata|      nombre|nombreMunicipio|nombreDepartamento|categoria|latitud|longitud|         propietario|    explotador|longitud_pista|ancho_pista|pbmo|elevacion|resolucion|fecha_construccion|fecha_vigencia|clase|      tipo|numero_vuelos_origen|gcd_departamento|gcd_municipio|Anio|
+----------------+-----+----+------------+---------------+------------------+---------+-------+--------+--------------------+--------------+--------------+-----------+----+---------+----------+------------------+--------------+-----+----------+--------------------+----------------+-------------+----+
|               1|  7FO|    |     la isla|  Puerto Gaitán|              meta|Aeródromo| 4.4211

#### Transformación

In [10]:
# TRANSFORMACION
cuartiles_longitud = aeropuertosTemp.approxQuantile('longitud_pista', [0.25, 0.5, 0.75], 0)
r1_longitud_max = cuartiles_longitud[0]
r2_longitud_max = cuartiles_longitud[1]
r3_longitud_max = cuartiles_longitud[2]

cuartiles_ancho = aeropuertosTemp.approxQuantile('ancho_pista', [0.25, 0.5, 0.75], 0)
r1_ancho_max = cuartiles_ancho[0]
r2_ancho_max = cuartiles_ancho[1]
r3_ancho_max = cuartiles_ancho[2]

cuartiles_vuelos_origen = aeropuertosTemp.approxQuantile('numero_vuelos_origen', [0.25, 0.5, 0.75], 0)
r1_vuelos_max = cuartiles_vuelos_origen[0]
r2_vuelos_max = cuartiles_vuelos_origen[1]
r3_vuelos_max = cuartiles_vuelos_origen[2]

aeropuertosTemp = aeropuertosTemp.withColumn(
    "rangoLongitudPista",
    expr(
        f"CASE WHEN longitud_pista < {r1_longitud_max} THEN 'R1' "  \
        f"WHEN longitud_pista < {r2_longitud_max} THEN 'R2' "  \
        f"WHEN longitud_pista < {r3_longitud_max} THEN 'R3' "  \
        "ELSE 'R4' END"
    )
)

aeropuertosTemp = aeropuertosTemp.withColumn(
    "rangoAnchoPista",
    expr(
        f"CASE WHEN ancho_pista < {r1_ancho_max} THEN 'R1' "  \
        f"WHEN ancho_pista < {r2_ancho_max} THEN 'R2' "  \
        f"WHEN ancho_pista < {r3_ancho_max} THEN 'R3' "  \
        "ELSE 'R4' END"
    )
)

aeropuertosTemp = aeropuertosTemp.withColumn(
    "rangoNumeroVuelosOrigen",
    expr(
        f"CASE WHEN numero_vuelos_origen < {r1_vuelos_max} THEN 'R1' "  \
        f"WHEN numero_vuelos_origen < {r2_vuelos_max} THEN 'R2' "  \
        f"WHEN numero_vuelos_origen < {r3_vuelos_max} THEN 'R3' "  \
        "ELSE 'R4' END"
    )
)

df_mini_aeropuerto = aeropuertosTemp.select('rangoLongitudPista','rangoAnchoPista','rangoNumeroVuelosOrigen','clase','tipo')
df_mini_aeropuerto = df_mini_aeropuerto.distinct()
df_mini_aeropuerto = df_mini_aeropuerto.coalesce(1).withColumn('idMini_DWH', f.monotonically_increasing_id() + 1)
df_mini_aeropuerto = df_mini_aeropuerto.select('idMini_DWH','rangoLongitudPista','rangoAnchoPista','rangoNumeroVuelosOrigen','clase','tipo')

df_mini_aeropuerto.show(10)

print(df_mini_aeropuerto.count())

+----------+------------------+---------------+-----------------------+-----+----------+
|idMini_DWH|rangoLongitudPista|rangoAnchoPista|rangoNumeroVuelosOrigen|clase|      tipo|
+----------+------------------+---------------+-----------------------+-----+----------+
|         1|                R4|             R3|                     R3|   1A|   Privado|
|         2|                R4|             R3|                     R4|   1A|   Privado|
|         3|                R2|             R3|                     R3|   1A|Fumigación|
|         4|                R1|             R2|                     R2|   1A|Fumigación|
|         5|                R1|             R2|                     R3|   1A|Fumigación|
|         6|                R2|             R2|                     R4|   1A|   Privado|
|         7|                R1|             R1|                     R3|   1A|   Privado|
|         8|                R1|             R2|                     R2|   1A|   Público|
|         9|         

In [11]:
print(aeropuertos.columns)
print(aeropuertos.count())

print(aeropuertosTemp.columns)
print(aeropuertosTemp.count())

print(df_mini_aeropuerto.columns)
print(df_mini_aeropuerto.count())

['sigla', 'iata', 'nombre', 'municipio', 'departamento', 'categoria', 'latitud', 'longitud', 'propietario', 'explotador', 'longitud_pista', 'ancho_pista', 'pbmo', 'elevacion', 'resolucion', 'fecha_construccion', 'fecha_vigencia', 'clase', 'tipo', 'numero_vuelos_origen', 'gcd_departamento', 'gcd_municipio', 'Anio']
1414
['idAeropuerto_DWH', 'sigla', 'iata', 'nombre', 'nombreMunicipio', 'nombreDepartamento', 'categoria', 'latitud', 'longitud', 'propietario', 'explotador', 'longitud_pista', 'ancho_pista', 'pbmo', 'elevacion', 'resolucion', 'fecha_construccion', 'fecha_vigencia', 'clase', 'tipo', 'numero_vuelos_origen', 'gcd_departamento', 'gcd_municipio', 'Anio', 'rangoLongitudPista', 'rangoAnchoPista', 'rangoNumeroVuelosOrigen']
1414
['idMini_DWH', 'rangoLongitudPista', 'rangoAnchoPista', 'rangoNumeroVuelosOrigen', 'clase', 'tipo']
341


#### Carga

**OJO** Recuerde antes de guardar los datos que la tabla no exista o este vacía, para que no se guarden los mismos datos varias veces y no ocupar más espacio.

In [12]:
# CARGUE
guardar_db(dest_db_connection_string, df_mini_aeropuerto,'Estudiante_8_202413.MiniDimensionAeropuerto', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 3
Empezamos con el bloque 3: la dimensión <i>Fecha</i>, su fuente de datos viene de la tabla transaccional <i>vuelos</i>.

#### Extracción

In [13]:
#EXTRACCION
sql_fecha = '''(SELECT DISTINCT mes, ano AS anio FROM ProyectoTransaccional.vuelos) AS Temp_fecha'''
fecha = obtener_dataframe_de_bd(source_db_connection_string, sql_fecha, db_user, db_psswd)
fecha.show(5)

print(fecha.columns)

+---+----+
|mes|anio|
+---+----+
|  9|2012|
| 11|2012|
| 12|2012|
|  1|2012|
|  2|2012|
+---+----+
only showing top 5 rows

['mes', 'anio']


#### Transformación

In [14]:
# TRANSFORMACION
fecha = fecha.withColumn('dia', f.lit(1))
fecha = fecha.select('anio', 'mes', 'dia')
fecha.show(5)

+----+---+---+
|anio|mes|dia|
+----+---+---+
|2012|  9|  1|
|2012| 11|  1|
|2012| 12|  1|
|2012|  1|  1|
|2012|  2|  1|
+----+---+---+
only showing top 5 rows



In [15]:
fecha_str = f.concat_ws('-',
                        f.col('anio'),
                        f.lpad(f.col('mes'), 2, '0'),
                        f.lpad(f.col('dia'), 2, '0'))
fecha = fecha.withColumn('descripcion', f.to_date(fecha_str, 'yyyy-MM-dd'))
fecha.show(5)

+----+---+---+-----------+
|anio|mes|dia|descripcion|
+----+---+---+-----------+
|2012|  9|  1| 2012-09-01|
|2012| 11|  1| 2012-11-01|
|2012| 12|  1| 2012-12-01|
|2012|  1|  1| 2012-01-01|
|2012|  2|  1| 2012-02-01|
+----+---+---+-----------+
only showing top 5 rows



In [16]:
fecha = fecha.withColumn('idFecha', date_format(fecha['descripcion'], 'yyyyMMdd'))
fecha.show(5)

+----+---+---+-----------+--------+
|anio|mes|dia|descripcion| idFecha|
+----+---+---+-----------+--------+
|2012|  9|  1| 2012-09-01|20120901|
|2012| 11|  1| 2012-11-01|20121101|
|2012| 12|  1| 2012-12-01|20121201|
|2012|  1|  1| 2012-01-01|20120101|
|2012|  2|  1| 2012-02-01|20120201|
+----+---+---+-----------+--------+
only showing top 5 rows



In [17]:
fecha = fecha.select('idFecha', 'descripcion', 'anio', 'mes', 'dia')
fecha.show(5)

+--------+-----------+----+---+---+
| idFecha|descripcion|anio|mes|dia|
+--------+-----------+----+---+---+
|20120901| 2012-09-01|2012|  9|  1|
|20121101| 2012-11-01|2012| 11|  1|
|20121201| 2012-12-01|2012| 12|  1|
|20120101| 2012-01-01|2012|  1|  1|
|20120201| 2012-02-01|2012|  2|  1|
+--------+-----------+----+---+---+
only showing top 5 rows



#### Carga

In [18]:
# CARGUE
guardar_db(dest_db_connection_string, fecha,'Estudiante_8_202413.FechaS6', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 4
Empezamos con el bloque 4: la dimensión <i>HechoHistoriaCambios</i>, su fuente de datos viene de las tablas <i>Aeropuerto</i>, <i>MiniDimensiónAeropuerto</i> y <i>Fecha</i>.

#### Extracción

In [19]:
#EXTRACCION
cols = ['rangoLongitudPista', 'rangoAnchoPista', 'rangoNumeroVuelosOrigen', 'clase', 'tipo']
df_hecho_temp = aeropuertosTemp.join(df_mini_aeropuerto, cols)
df_hecho_temp.show(5)

print(df_hecho_temp.columns)

+------------------+---------------+-----------------------+-----+----+----------------+-----+----+--------------------+---------------+------------------+---------+---------+-----------+-----------+----------+--------------+-----------+----+---------+----------+------------------+--------------+--------------------+----------------+-------------+----+----------+
|rangoLongitudPista|rangoAnchoPista|rangoNumeroVuelosOrigen|clase|tipo|idAeropuerto_DWH|sigla|iata|              nombre|nombreMunicipio|nombreDepartamento|categoria|  latitud|   longitud|propietario|explotador|longitud_pista|ancho_pista|pbmo|elevacion|resolucion|fecha_construccion|fecha_vigencia|numero_vuelos_origen|gcd_departamento|gcd_municipio|Anio|idMini_DWH|
+------------------+---------------+-----------------------+-----+----+----------------+-----+----+--------------------+---------------+------------------+---------+---------+-----------+-----------+----------+--------------+-----------+----+---------+----------+-----

#### Transformación

In [20]:
# TRANSFORMACION
df_hecho_temp = df_hecho_temp.withColumn('cambio', f.lit(1))
df_hecho_temp.show(5)

+------------------+---------------+-----------------------+-----+----+----------------+-----+----+--------------------+---------------+------------------+---------+---------+-----------+-----------+----------+--------------+-----------+----+---------+----------+------------------+--------------+--------------------+----------------+-------------+----+----------+------+
|rangoLongitudPista|rangoAnchoPista|rangoNumeroVuelosOrigen|clase|tipo|idAeropuerto_DWH|sigla|iata|              nombre|nombreMunicipio|nombreDepartamento|categoria|  latitud|   longitud|propietario|explotador|longitud_pista|ancho_pista|pbmo|elevacion|resolucion|fecha_construccion|fecha_vigencia|numero_vuelos_origen|gcd_departamento|gcd_municipio|Anio|idMini_DWH|cambio|
+------------------+---------------+-----------------------+-----+----+----------------+-----+----+--------------------+---------------+------------------+---------+---------+-----------+-----------+----------+--------------+-----------+----+---------+--

In [21]:
hechoHistoriaCambiosTemp = df_hecho_temp.select('idAeropuerto_DWH', 'idMini_DWH', 'sigla', 'anio', 'cambio')

sigla_counts = df_hecho_temp.groupBy('sigla').count()
siglas_repetidas = sigla_counts.filter(f.col('count') > 1).select('sigla')

repetidos = hechoHistoriaCambiosTemp.join(siglas_repetidas, on='sigla', how='inner')

window_spec = Window.partitionBy('sigla').orderBy('anio')

repetidos = repetidos.withColumn('siglaRepetida', f.max('anio').over(window_spec))
repetidos = repetidos.withColumn('fechaInicio', f.expr("to_date(concat(anio, '-01-01'))"))
repetidos = repetidos.withColumn('fechaFin',
                                 f.when(f.lead('sigla', 1).over(window_spec) == f.col('sigla'), 
                                        f.expr("last_day(to_date(concat(anio, '-12-31')))"))
                                  .otherwise(f.lit('2199-12-31')))

repetidos.show(6)

+-----+----------------+----------+----+------+-------------+-----------+----------+
|sigla|idAeropuerto_DWH|idMini_DWH|anio|cambio|siglaRepetida|fechaInicio|  fechaFin|
+-----+----------------+----------+----+------+-------------+-----------+----------+
|  7FO|               1|         1|2015|     1|         2015| 2015-01-01|2015-12-31|
|  7FO|               2|         2|2016|     1|         2016| 2016-01-01|2016-12-31|
|  7FO|               3|         2|2017|     1|         2017| 2017-01-01|2199-12-31|
|  7FU|               4|         3|2015|     1|         2015| 2015-01-01|2015-12-31|
|  7FU|               5|         4|2016|     1|         2016| 2016-01-01|2016-12-31|
|  7FU|               6|         5|2017|     1|         2017| 2017-01-01|2199-12-31|
+-----+----------------+----------+----+------+-------------+-----------+----------+
only showing top 6 rows



In [22]:
repetidos = repetidos.withColumn('cambio',
                                     f.when(f.year('fechaFin') == 2199, 0)
                                      .otherwise(f.col('cambio')))

repetidos.show(6)

+-----+----------------+----------+----+------+-------------+-----------+----------+
|sigla|idAeropuerto_DWH|idMini_DWH|anio|cambio|siglaRepetida|fechaInicio|  fechaFin|
+-----+----------------+----------+----+------+-------------+-----------+----------+
|  7FO|               1|         1|2015|     1|         2015| 2015-01-01|2015-12-31|
|  7FO|               2|         2|2016|     1|         2016| 2016-01-01|2016-12-31|
|  7FO|               3|         2|2017|     0|         2017| 2017-01-01|2199-12-31|
|  7FU|               4|         3|2015|     1|         2015| 2015-01-01|2015-12-31|
|  7FU|               5|         4|2016|     1|         2016| 2016-01-01|2016-12-31|
|  7FU|               6|         5|2017|     0|         2017| 2017-01-01|2199-12-31|
+-----+----------------+----------+----+------+-------------+-----------+----------+
only showing top 6 rows



In [23]:
hechoHistoriaCambios = repetidos.select('idAeropuerto_DWH', 'idMini_DWH', 'fechaInicio', 'fechaFin', 'cambio')
hechoHistoriaCambios.show(6)

+----------------+----------+-----------+----------+------+
|idAeropuerto_DWH|idMini_DWH|fechaInicio|  fechaFin|cambio|
+----------------+----------+-----------+----------+------+
|               1|         1| 2015-01-01|2015-12-31|     1|
|               2|         2| 2016-01-01|2016-12-31|     1|
|               3|         2| 2017-01-01|2199-12-31|     0|
|               4|         3| 2015-01-01|2015-12-31|     1|
|               5|         4| 2016-01-01|2016-12-31|     1|
|               6|         5| 2017-01-01|2199-12-31|     0|
+----------------+----------+-----------+----------+------+
only showing top 6 rows



#### Carga

In [24]:
# CARGUE
guardar_db(dest_db_connection_string, hechoHistoriaCambios,'Estudiante_8_202413.HechoHistoriaCambios', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 5
Bloque 5: <i>GeografiaConDemografia</i>

#### Extracción

#### Transformación

In [14]:
# TRANSFORMACION 


+--------------+------------+-----------------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|ID_Cliente_DWH|ID_Cliente_T|           Nombre|NombreCategoria|NombreGrupoCompra|ClienteFactura|ID_CiudadEntrega|LimiteCredito|FechaAperturaCuenta|DiasPago|
+--------------+------------+-----------------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|             1|         803|       Bala Dixit|   Novelty Shop|          Missing|           803|           33799|         2000|2013-01-01 00:00:00|       7|
|             2|         805|     Ratan Poddar|   Novelty Shop|          Missing|           805|           10194|         3300|2013-01-01 00:00:00|       7|
|             3|         806|           Shi Tu|   Novelty Shop|          Missing|           806|           31685|         3000|2013-01-01 00:00:00|       7|
|             4|         811|    Surendra Sahu|   Novelty 

#### Carga

**OJO** Recuerde antes de guardar los datos que la tabla no exista o este vacía, para que no se guarden los mismos datos varias veces y no ocupar más espacio.

In [27]:
# CARGUE
guardar_db(dest_db_connection_string,clientes,'Estudiante_31_202413.ETL_Cliente', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 6
Bloque 6: <i>TipoVuelo</i>

#### Extracción

#### Transformación


Se hace una verificación de los valores de la tasa de impuesto

Se hace verificación de consistencia

En el siguiente código para el manejo de fechas, pasamos del formato MM dd,YYYY al formato establecido en la regla de negocio<br>

In [22]:
# TRANSFORMACION


20034 73595
+------------+-------------+--------------+------------------------------+-------------------------+----------------------+---------------+-------------------------+--------------------------------------+-------------------------------------------+-----------+------------------------+--------------------+---------------------------+
|ID_de_pedido|ID_de_cliente|ID_de_vendedor|Seleccionado_por_ID_de_persona|ID_de_persona_de_contacto|ID_de_pedido_pendiente|Fecha_de_pedido|Fecha_de_entrega_esperada|Numero_de_pedido_de_compra_del_cliente|Pedido_pendiente_de_suministro_insuficiente|Comentarios|Instrucciones_de_entrega|Comentarios_internos|Seleccion_completada_cuando|
+------------+-------------+--------------+------------------------------+-------------------------+----------------------+---------------+-------------------------+--------------------------------------+-------------------------------------------+-----------+------------------------+--------------------+------------

TypeError: col should be Column

Descripciones


#### Carga

**OJO** Recuerde antes de guardar los datos que la tabla no exista o este vacía, para que no se guarden los mismos datos varias veces y no ocupar más espacio.

In [47]:
guardar_db(dest_db_connection_string, ordenes,'Estudiante_31_202413.Hecho_Orden', db_user, db_psswd)

### BLOQUE 7
Bloque 7: <i>Trafico</i>

#### Extracción

#### Transformación

#### Carga

### BLOQUE 8
Bloque 8: <i>HechoVuelo</i>