# Tutorial: creación de ETLs con PySpark

## 1. Introducción	
    ¿Qué aprenderá?
	En este tutorial aprenderá cómo puede utilizar PySpark para crear un proceso de ETL básico.

	¿Qué construirá?     
        Construirá un ETL que toma los datos desde la base de datos transacional de WideWorldImporters (WWImportersTransactional), los transforma  a una representación cercana al análisis y los  almacena en la base de datos relacional WWImportersDWH.
    
	¿Para qué?
	La construcción de ETLs que se ajusten a modelos multidimensionales es un paso necesario dentro de un proceso de analìtica 1.0 , pues permite tomar los datos crudos de una fuente, generalmente transaccional, para transformarlos en datos limpios que puedan utilizarse para la toma de decisiones.
    
    ¿Qué necesita?
    1. Python 3 con pip instalado
    2. Jupyter notebook
    3. Paquetes: Pyspark (3.0.1) y pandas (1.2.1)
    4. Controlador Connector J de MySQL (ya se encuentra instalado)
    5. Servidor SQL con base de datos relacional "WWImportersTransactional" y base de datos relacional que corresponde a la bodega de WWI "Estudiante_i"
	

## 2. Proceso de ETL para una dimensión.

En este proceso de ETL, se extraen los datos de las **órdenes de compra** de una base de datos transaccional y se almacenan en otra base de datos que corresponde a la bodega de datos, siguiendo una aproximación ROLAP. A continuación, se presenta el modelo multidimensional que es el modelo conceptual que representa el proceso de registro de órdenes de compra. Este modelo se utilizó para crear las tablas en la bodega de datos que representan el proceso de negocio y que serán cargadas como resultado del proceso ETL. 

Tenga en cuenta que las llaves ID_XXXX presentes en el modelo hacen referencia a las llaves de la bodega. Por otra parte, en el proceso de ETL se van a tener en cuenta las llaves transaccionales (**WWImportersTransactional**). La nomenclatura para utilizar es:

1.   ID_XXXX_DWH, para las llaves de la bodega.
2.   ID_XXXX_T, para las llaves transaccionales.


![Modelo ordenes](./WWI_modelo_ordenes.png)

El proceso de ETL debe ser diseñado antes de implementarse. A partir de las conclusiones del entendimiento de datos sabemos las fuentes que se van a  utilizar y la relación entre las fuentes. Adicionalmente, se cuenta con las respuestas de la organización a las preguntas, resultado del entendimiento de datos. De esa manera sabemos cómo se deben manipular los datos. 

Este proceso de ETL lo dividimos en seis bloques, uno para cada dimensión o <i>tabla de hechos</i> del modelo, con la única excepción de la dimensión de fecha que, por ser una dimensión especial que se genera de forma independiente, no se incluye aquí:

![ETL](./Disenio_ETL.PNG)

Recuerde que este es el diseño general. En el diseño completo se deben incluir las transformaciones realizadas a los datos a utilizarse en las dimensiones y tablas de hecho del modelo multidimensional, de acuerdo con lo que se muestra en la infografía de arquitectura de componentes (Componente proceso ETL) 

In [15]:
# Configuración servidor base de datos transaccional
# Recuerde usar Estudiante_i como usuario y la contraseña asigana en el excel de conexión a maquina virtual como contraseña
db_user = ''
db_psswd = ''
source_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/WWImportersTransactional'

dest_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/Estudiante_58_202413'

# Driver de conexion
path_jar_driver = 'C:\Program Files (x86)\MySQL\Connector J 8.0\mysql-connector-java-8.0.28.jar'

In [18]:
import os 
from pyspark.sql import functions as f, SparkSession, types as t
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.functions import udf, col, length, isnan, when, count, regexp_replace, upper, min as min_, unix_timestamp, to_timestamp , from_unixtime, date_format, weekofyear, to_date, year
from pyspark.sql.window import Window

from datetime import datetime

In [20]:
#Configuración de la sesión
spark.stop()
conf=SparkConf() \
    .set('spark.driver.extraClassPath', path_jar_driver)

spark_context = SparkContext(conf=conf)
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession

### Conexión y carga de datos

Se define la función para conexión y cargue de dataframes desde la base de datos origen y luego la función para guardar un dataframe en una tabla de la base de datos destino.

In [21]:
def obterner_dataframe_desde_csv(_PATH, _sep):
    return spark.read.load(_PATH, format="csv", sep=_sep, inferSchema="true", header='true')

def obtener_dataframe_de_bd(db_connection_string, sql, db_user, db_psswd):
    df_bd = spark.read.format('jdbc')\
        .option('url', db_connection_string) \
        .option('dbtable', sql) \
        .option('user', db_user) \
        .option('password', db_psswd) \
        .option('driver', 'com.mysql.cj.jdbc.Driver') \
        .load()
    return df_bd

def guardar_db(db_connection_string, df, tabla, db_user, db_psswd):
    df.select('*').write.format('jdbc') \
      .mode('append') \
      .option('url', db_connection_string) \
      .option('dbtable', tabla) \
      .option('user', db_user) \
      .option('password', db_psswd) \
      .option('driver', 'com.mysql.cj.jdbc.Driver') \
      .save()

### BLOQUE 1
Empezamos con el bloque 1: la dimensión <i>TipoTransaccion</i>, su fuente de datos viene de la tabla transaccional <i>TiposTransaccion</i>. 

#### Extracción

In [39]:
sql_tt = '''(SELECT DISTINCT `TipoTransaccionID` AS ID_Tipo_transaccion, `TipoTransaccionNombre` AS Tipo  FROM WWImportersTransactional.TiposTransaccion) AS Temp_tt'''
tt = obtener_dataframe_de_bd(source_db_connection_string, sql_tt, db_user, db_psswd)
tt.show(10)

+-------------------+--------------------+
|ID_Tipo_transaccion|                Tipo|
+-------------------+--------------------+
|                  2|Customer Credit Note|
|                  3|Customer Payment ...|
|                  4|     Customer Refund|
|                  5|    Supplier Invoice|
|                  6|Supplier Credit Note|
|                  7|Supplier Payment ...|
|                  8|     Supplier Refund|
|                  9|      Stock Transfer|
|                 10|         Stock Issue|
|                 11|       Stock Receipt|
+-------------------+--------------------+
only showing top 10 rows



#### Transformación
Recuerde que, puede hacer uso de selectExpr, filter, where entre otras de PySpark para modificar los datos cargados. Por ejemplo, el siguiente código utiliza <i>selectExpr</i> para renombrar la columna ID_Empleado por ID_Empleado_T, esta es la convención que vamos a utilizar: "_T" para indicar que el ID es el que estaba en la base de datos transaccional y "_DWH" para indicar que son ID's propios de la bodega. Usamos withColumn y monotonicallu_increasing_id para crear un ID acumulativo para cada registro en el dataframe

In [40]:
# Renombro columnas y creo ID_Tipo_transaccion_DWH
tt = tt.selectExpr('ID_Tipo_transaccion as ID_Tipo_transaccion_T','Tipo')
tt = tt.coalesce(1).withColumn('ID_Tipo_transaccion_DWH', f.monotonically_increasing_id() + 1)
tt = tt.select('ID_Tipo_transaccion_DWH','ID_Tipo_transaccion_T','Tipo')
tt.show()

+-----------------------+---------------------+--------------------+
|ID_Tipo_transaccion_DWH|ID_Tipo_transaccion_T|                Tipo|
+-----------------------+---------------------+--------------------+
|                      1|                    2|Customer Credit Note|
|                      2|                    3|Customer Payment ...|
|                      3|                    4|     Customer Refund|
|                      4|                    5|    Supplier Invoice|
|                      5|                    6|Supplier Credit Note|
|                      6|                    7|Supplier Payment ...|
|                      7|                    8|     Supplier Refund|
|                      8|                    9|      Stock Transfer|
|                      9|                   10|         Stock Issue|
|                     10|                   11|       Stock Receipt|
|                     11|                   12|Stock Adjustment ...|
|                     12|         

In [42]:
# Crea el registro para el id = 0
tt_0 = [('0','0','Missing')]
columns = ['ID_Tipo_transaccion_DWH','ID_Tipo_transaccion_T','Tipo']
tt_0 = spark.createDataFrame(data=tt_0,schema=columns)
tt = tt.union(tt_0)

tt.show()

+-----------------------+---------------------+--------------------+
|ID_Tipo_transaccion_DWH|ID_Tipo_transaccion_T|                Tipo|
+-----------------------+---------------------+--------------------+
|                      1|                    2|Customer Credit Note|
|                      2|                    3|Customer Payment ...|
|                      3|                    4|     Customer Refund|
|                      4|                    5|    Supplier Invoice|
|                      5|                    6|Supplier Credit Note|
|                      6|                    7|Supplier Payment ...|
|                      7|                    8|     Supplier Refund|
|                      8|                    9|      Stock Transfer|
|                      9|                   10|         Stock Issue|
|                     10|                   11|       Stock Receipt|
|                     11|                   12|Stock Adjustment ...|
|                     12|         

#### Carga
Una vez realizado esto, se guardan los resultados en la base de datos destino

**OJO** Recuerde antes de guardar los datos que la tabla no exista o este vacía, para que no se guarden los mismos datos varias veces y no ocupar más espacio.

In [43]:
# CARGUE
guardar_db(dest_db_connection_string, tt,'Estudiante_58_202413.TipoTransaccion', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 3
Empezamos el bloque 3: dimensión Producto. Su fuente de datos es la tabla <i>Producto</i>. De esta tabla solo se general los Ids y un nombre para la tabla de hechos

In [8]:
#### Extracción

In [48]:
sql_prod = '''(SELECT DISTINCT `ID_producto`, NombreProducto  FROM WWImportersTransactional.Producto ) AS Temp_prod'''
prod = obtener_dataframe_de_bd(source_db_connection_string, sql_prod, db_user, db_psswd)
prod.show(10)

+-----------+--------------------+
|ID_producto|      NombreProducto|
+-----------+--------------------+
|          1|USB missile launc...|
|          2|USB rocket launch...|
|          3|Office cube peris...|
|          4|USB food flash dr...|
|          5|USB food flash dr...|
|          6|USB food flash dr...|
|          7|USB food flash dr...|
|          8|USB food flash dr...|
|          9|USB food flash dr...|
|         10|USB food flash dr...|
+-----------+--------------------+
only showing top 10 rows



#### Transformación

In [49]:
# Renombro columnas y creo ID_Producto_DWH
prod = prod.selectExpr('ID_producto as ID_Producto_T', 'NombreProducto as Nombre')
prod = prod.coalesce(1).withColumn('ID_Producto_DWH', f.monotonically_increasing_id() + 1)
prod = prod.select('ID_Producto_DWH','ID_Producto_T', 'Nombre').distinct()

# Crea el registro para el id = 0
prod_0 = [('0','0','Missing')]
columns = ['ID_Producto_DWH','ID_Producto_T','Nombre']
prod_0 = spark.createDataFrame(data=prod_0,schema=columns)
prod = prod.union(prod_0)

prod.show()

+---------------+-------------+--------------------+
|ID_Producto_DWH|ID_Producto_T|              Nombre|
+---------------+-------------+--------------------+
|              1|            1|USB missile launc...|
|              2|            2|USB rocket launch...|
|              3|            3|Office cube peris...|
|              4|            4|USB food flash dr...|
|              5|            5|USB food flash dr...|
|              6|            6|USB food flash dr...|
|              7|            7|USB food flash dr...|
|              8|            8|USB food flash dr...|
|              9|            9|USB food flash dr...|
|             10|           10|USB food flash dr...|
|             11|           11|USB food flash dr...|
|             12|           12|USB food flash dr...|
|             13|           13|USB food flash dr...|
|             14|           14|USB food flash dr...|
|             15|           15|USB food flash dr...|
|             16|           16|DBA joke mug - 

#### Carga
Una vez realizado esto, se guardan los resultados en la base de datos destino

**OJO** Recuerde antes de guardar los datos que la tabla no exista o este vacía, para que no se guarden los mismos datos varias veces y no ocupar más espacio.

In [51]:
# CARGUE
guardar_db(dest_db_connection_string, prod,'Estudiante_58_202413.Producto', db_user, db_psswd)

### BLOQUE 4
Empezamos el bloque 4: dimensión Client. Su fuente de datos es la tabla <i>Clientes</i>. De esta tabla solo se general los Ids y un nombre para la tabla de hechos

#### Extracción

In [67]:
sql_cli = '''(SELECT DISTINCT `ID_Cliente`, `Nombre`  FROM WWImportersTransactional.Clientes) AS Temp_cli'''
cli = obtener_dataframe_de_bd(source_db_connection_string, sql_cli, db_user, db_psswd)
cli.show(10)

+----------+--------------------+
|ID_Cliente|              Nombre|
+----------+--------------------+
|         1|Tailspin Toys (He...|
|         2|Tailspin Toys (Sy...|
|         3|Tailspin Toys (Pe...|
|         4|Tailspin Toys (Me...|
|         5|Tailspin Toys (Ga...|
|         6|Tailspin Toys (Je...|
|         7|Tailspin Toys (Fr...|
|         8|Tailspin Toys (Bo...|
|         9|Tailspin Toys (Ne...|
|        10|Tailspin Toys (Wi...|
+----------+--------------------+
only showing top 10 rows



#### Transformación

In [68]:
# Renombro columnas y creo ID_Producto_DWH
cli = cli.selectExpr('ID_Cliente as ID_Cliente_T', 'Nombre')
cli = cli.coalesce(1).withColumn('ID_Cliente_DWH', f.monotonically_increasing_id() + 1)
cli = cli.select('ID_Cliente_DWH','ID_Cliente_T', 'Nombre').distinct()

# Crea el registro para el id = 0
cli_0 = [('0','0','Missing')]
columns = ['ID_Cliente_DWH','ID_Cliente_T', 'Nombre']
cli_0 = spark.createDataFrame(data=cli_0,schema=columns)
cli = cli.union(cli_0)

cli.show()

+--------------+------------+--------------------+
|ID_Cliente_DWH|ID_Cliente_T|              Nombre|
+--------------+------------+--------------------+
|             1|           1|Tailspin Toys (He...|
|             2|           2|Tailspin Toys (Sy...|
|             3|           3|Tailspin Toys (Pe...|
|             4|           4|Tailspin Toys (Me...|
|             5|           5|Tailspin Toys (Ga...|
|             6|           6|Tailspin Toys (Je...|
|             7|           7|Tailspin Toys (Fr...|
|             8|           8|Tailspin Toys (Bo...|
|             9|           9|Tailspin Toys (Ne...|
|            10|          10|Tailspin Toys (Wi...|
|            11|          11|Tailspin Toys (De...|
|            12|          12|Tailspin Toys (Bi...|
|            13|          13|Tailspin Toys (St...|
|            14|          14|Tailspin Toys (Lo...|
|            15|          15|Tailspin Toys (Ba...|
|            16|          16|Tailspin Toys (Co...|
|            17|          17|Ta

In [None]:
#### Carga
Una vez realizado esto, se guardan los resultados en la base de datos destino

**OJO** Recuerde antes de guardar los datos que la tabla no exista o este vacía, para que no se guarden los mismos datos varias veces y no ocupar más espacio.

In [70]:
# CARGUE
guardar_db(dest_db_connection_string, cli,'Estudiante_58_202413.Cliente', db_user, db_psswd)

### BLOQUE 5
Empezamos el bloque 5: dimensión Proveedor. Su fuente de datos es la tabla <i>proveedoresCopia</i>

#### Extracción

In [74]:
#EXTRACCION
sql_prov = '''(SELECT DISTINCT ProveedorID AS ID_Proveedor, NombreProveedor, CategoriaProveedorID AS Categoria, PersonaContactoPrincipalID AS Contacto_principal, DiasPago AS Dias_pago, CodigoPostal Codigo_postal FROM WWImportersTransactional.proveedoresCopia ORDER BY NombreProveedor) AS Temp_prov'''

prov = obtener_dataframe_de_bd(source_db_connection_string, sql_prov, db_user, db_psswd)

prov.show()

+------------+--------------------+---------+------------------+---------+-------------+
|ID_Proveedor|     NombreProveedor|Categoria|Contacto_principal|Dias_pago|Codigo_postal|
+------------+--------------------+---------+------------------+---------+-------------+
|           1| A Datum Corporation|        2|                21|      -14|        46077|
|          16|A Datum Corporati...|        2|                21|       14|        46077|
|           3|Consolidated Mess...|        6|                25|      -30|        94101|
|           2|        Contoso Ltd.|        2|                23|       -7|        98253|
|           4|       Fabrikam Inc.|        4|                27|       30|        40351|
|          20|   Fabrikam Inc. Ltd|        4|                27|       30|        40351|
|           5|Graphic Design In...|        2|                29|       14|        64847|
|           6| Humongous Insurance|        9|                31|      -14|        37770|
|           7|       

#### Transformación

In [75]:
# Normalizo Dias_pago
prov = prov.withColumn("Dias_pago", when(col("Dias_pago") < 0, col("Dias_pago") * -1).otherwise(col("Dias_pago")))

# Normalizo NombreProveedor y elimino duplicados
prov = prov.withColumn("NombreN", regexp_replace(col("NombreProveedor"), r"\s*(Inc. Ltd|Inc|Ltd)\.*\s*$", ""))
window_spec = Window.partitionBy("NombreN").orderBy("ID_Proveedor")
prov = prov.withColumn("Min_ID", min_("ID_Proveedor").over(window_spec))
prov = prov.where(col("ID_Proveedor") == col("Min_ID")).drop("Min_ID")

# Renombro columnas y creo ID_Proveedor_DWH
prov = prov.selectExpr('ID_Proveedor as ID_Proveedor_T', 'NombreProveedor as Nombre','Categoria','Contacto_principal','Dias_pago','Codigo_postal')
prov = prov.coalesce(1).withColumn('ID_Proveedor_DWH', f.monotonically_increasing_id() + 1)
prov = prov.select('ID_Proveedor_DWH','ID_Proveedor_T','Nombre','Categoria','Contacto_principal','Dias_pago','Codigo_postal').orderBy(col('Nombre'))

# Crea el registro para el id = 0
prov_0 = [('0','0','Missing', '','','0','')]
columns = ['ID_Proveedor_DWH','ID_Proveedor_T','Nombre', 'Categoria','Contacto_principal','Dias_pago','Codigo_postal']
prov_0 = spark.createDataFrame(data=prov_0,schema=columns)
prov = prov.union(prov_0)

prov.show()

+----------------+--------------+--------------------+---------+------------------+---------+-------------+
|ID_Proveedor_DWH|ID_Proveedor_T|              Nombre|Categoria|Contacto_principal|Dias_pago|Codigo_postal|
+----------------+--------------+--------------------+---------+------------------+---------+-------------+
|               9|             1| A Datum Corporation|        2|                21|       14|        46077|
|              12|             3|Consolidated Mess...|        6|                25|       30|        94101|
|               5|             2|        Contoso Ltd.|        2|                23|        7|        98253|
|               8|             4|       Fabrikam Inc.|        4|                27|       30|        40351|
|              11|             5|Graphic Design In...|        2|                29|       14|        64847|
|               3|             6| Humongous Insurance|        9|                31|       14|        37770|
|               4|          

#### Carga

**OJO** Recuerde antes de guardar los datos que la tabla no exista o este vacía, para que no se guarden los mismos datos varias veces y no ocupar más espacio.

In [76]:
# CARGUE
guardar_db(dest_db_connection_string, prov,'Estudiante_58_202413.Proveedor', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 5
Bloque 5: dimensión Fecha. Su fuente de datos es la tabla transaccional <i>movimientos_v2</i> campo <i>FechaTransaccion</i>

#### Extracción

In [77]:
#EXTRACCION
sql_fechas = '''(SELECT DISTINCT FechaTransaccion FROM WWImportersTransactional.movimientos_v2) AS Temp_Fechas'''
fechas = obtener_dataframe_de_bd(source_db_connection_string, sql_fechas, db_user, db_psswd)
fechas.show(20)

+----------------+
|FechaTransaccion|
+----------------+
|     Jan 20,2014|
|     Jan 28,2014|
|     Feb 01,2014|
|     Mar 25,2014|
|     May 01,2014|
|     May 02,2014|
|     May 10,2014|
|     May 26,2014|
|     Jun 02,2014|
|     Jul 08,2014|
|     Jul 17,2014|
|     Jul 24,2014|
|     Aug 19,2014|
|     Sep 16,2014|
|     Sep 17,2014|
|     Sep 23,2014|
|     Oct 04,2014|
|     Oct 21,2014|
|     Oct 30,2014|
|     Nov 07,2014|
+----------------+
only showing top 20 rows



#### Transformación

In [79]:
# Encuentro las que no cumplen el formato pautado
regex = "([0-2]\d{3}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1]))"
cumplenFormato = fechas.filter(fechas["FechaTransaccion"].rlike(regex))
noCumplenFormato = fechas.filter(~fechas["FechaTransaccion"].rlike(regex))

# Las corrijo
noCumplenFormato = noCumplenFormato.withColumn('FechaTransaccion', f.udf(lambda d: datetime.strptime(d, '%b %d,%Y').strftime('%Y-%m-%d 12:00:00.0000000'), t.StringType())(f.col('FechaTransaccion')))
fechas_ok = noCumplenFormato.union(cumplenFormato)

# Desgloso la fecha como exige la dimensión
fechas_ok = fechas_ok.withColumn(
    "FechaTransaccion", 
    to_timestamp("FechaTransaccion", "yyyy-MM-dd HH:mm:ss.SSSSSSS")
)
fechas_ok = fechas_ok.withColumn("Fecha", to_date("FechaTransaccion")) \
       .withColumn("Dia", date_format("FechaTransaccion", "d"))  \
       .withColumn("Mes", date_format("FechaTransaccion", "M")) \
       .withColumn("Anio", date_format("FechaTransaccion", "yyyy")) \
       .withColumn("Numero_semana_ISO", weekofyear("FechaTransaccion"))

fechas_ok = fechas_ok.withColumn("ID_Fecha", unix_timestamp("Fecha"))
fechas_ok = fechas_ok.select('ID_Fecha', 'Fecha', 'Dia', 'Mes', 'Anio', 'Numero_semana_ISO').distinct().orderBy('ID_Fecha')

fechas_ok.show()

+----------+----------+---+---+----+-----------------+
|  ID_Fecha|     Fecha|Dia|Mes|Anio|Numero_semana_ISO|
+----------+----------+---+---+----+-----------------+
|1357016400|2013-01-01|  1|  1|2013|                1|
|1357102800|2013-01-02|  2|  1|2013|                1|
|1357189200|2013-01-03|  3|  1|2013|                1|
|1357275600|2013-01-04|  4|  1|2013|                1|
|1357362000|2013-01-05|  5|  1|2013|                1|
|1357534800|2013-01-07|  7|  1|2013|                2|
|1357621200|2013-01-08|  8|  1|2013|                2|
|1357707600|2013-01-09|  9|  1|2013|                2|
|1357794000|2013-01-10| 10|  1|2013|                2|
|1357880400|2013-01-11| 11|  1|2013|                2|
|1357966800|2013-01-12| 12|  1|2013|                2|
|1358139600|2013-01-14| 14|  1|2013|                3|
|1358226000|2013-01-15| 15|  1|2013|                3|
|1358312400|2013-01-16| 16|  1|2013|                3|
|1358398800|2013-01-17| 17|  1|2013|                3|
|135848520

#### Carga

**OJO** Recuerde antes de guardar los datos que la tabla no exista o este vacía, para que no se guarden los mismos datos varias veces y no ocupar más espacio.

In [80]:
# CARGUE
guardar_db(dest_db_connection_string, fechas_ok,'Estudiante_58_202413.Fecha', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 6
Bloque 6: Hecho orden. Su fuente de datos es la combinación entre las tablas transaccionales Ordenes y detalles de orden

#### Extracción

In [90]:
prod = obtener_dataframe_de_bd(source_db_connection_string, '''(SELECT ID_Producto_DWH, ID_Producto_T FROM Estudiante_58_202413.Producto) AS Temp_prod''', db_user, db_psswd)
prov = obtener_dataframe_de_bd(source_db_connection_string, '''(SELECT ID_Proveedor_DWH, ID_Proveedor_T FROM Estudiante_58_202413.Proveedor) AS Temp_prov''', db_user, db_psswd)
cli = obtener_dataframe_de_bd(source_db_connection_string, '''(SELECT ID_Cliente_DWH, ID_Cliente_T FROM Estudiante_58_202413.Cliente) AS Temp_cli''', db_user, db_psswd)
tt = obtener_dataframe_de_bd(source_db_connection_string, '''(SELECT ID_Tipo_transaccion_DWH, ID_Tipo_transaccion_T FROM Estudiante_58_202413.TipoTransaccion) AS Temp_tt''', db_user, db_psswd)
mov = obtener_dataframe_de_bd(source_db_connection_string, '''(SELECT DISTINCT FechaTransaccion, ProductoID as ID_Producto_T, ProveedorID as ID_Proveedor_T, ClienteID as ID_Cliente_T, TipoTransaccionID as ID_Tipo_transaccion_T, Cantidad FROM WWImportersTransactional.movimientos_v2) AS Temp_mov''', db_user, db_psswd)


mov.show(5)

+----------------+-------------+--------------+------------+---------------------+--------+
|FechaTransaccion|ID_Producto_T|ID_Proveedor_T|ID_Cliente_T|ID_Tipo_transaccion_T|Cantidad|
+----------------+-------------+--------------+------------+---------------------+--------+
|     Jan 20,2014|          108|          null|       185.0|                   10|   -10.0|
|     Jan 28,2014|          162|           4.0|         0.0|                   11|    10.0|
|     Jan 28,2014|          216|          null|       474.0|                   10|   -10.0|
|     Jan 28,2014|           22|           7.0|         0.0|                   11|    10.0|
|     Jan 28,2014|           25|           7.0|         0.0|                   11|    10.0|
+----------------+-------------+--------------+------------+---------------------+--------+
only showing top 5 rows



#### Transformación

Se hace una verificación de los valores de la tasa de impuesto

In [97]:
# Realizo el join con las dimensiones
mov_t = mov.join(prod, how = 'left', on = 'ID_Producto_T')
mov_t = mov_t.join(prov, how = 'left', on = 'ID_Proveedor_T')
mov_t = mov_t.join(cli, how = 'left', on = 'ID_Cliente_T')
mov_t = mov_t.join(tt, how = 'left', on = 'ID_Tipo_transaccion_T')

# Reemplazo las referencias vacías por 0
mov_t = mov_t.fillna({'ID_Producto_DWH': 0, 'ID_Proveedor_DWH': 0, 'ID_Cliente_DWH': 0, 'ID_Tipo_transaccion_DWH': 0})

# Corrijo y referencio las fechas
regex = "([0-2]\d{3}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1]))"
cumplenFormato = mov_t.filter(mov_t["FechaTransaccion"].rlike(regex))
noCumplenFormato = mov_t.filter(~mov_t["FechaTransaccion"].rlike(regex))
noCumplenFormato = noCumplenFormato.withColumn('FechaTransaccion', f.udf(lambda d: datetime.strptime(d, '%b %d,%Y').strftime('%Y-%m-%d 12:00:00.0000000'), t.StringType())(f.col('FechaTransaccion')))
mov_t = noCumplenFormato.union(cumplenFormato)
mov_t = mov_t.withColumn("FechaTransaccion", to_timestamp("FechaTransaccion", "yyyy-MM-dd HH:mm:ss.SSSSSSS")) \
    .withColumn("Fecha", to_date("FechaTransaccion")) \
    .withColumn("ID_Fecha", unix_timestamp("Fecha"))

mov_t = mov_t.selectExpr('ID_Fecha','ID_Producto_DWH','ID_Proveedor_DWH','ID_Cliente_DWH','ID_Tipo_transaccion_DWH','Cantidad').distinct()

mov_t.show(5)


+----------+---------------+----------------+--------------+-----------------------+--------+
|  ID_Fecha|ID_Producto_DWH|ID_Proveedor_DWH|ID_Cliente_DWH|ID_Tipo_transaccion_DWH|Cantidad|
+----------+---------------+----------------+--------------+-----------------------+--------+
|1407992400|             26|               0|            73|                      9|   -10.0|
|1444194000|             29|               0|            66|                      9|   -10.0|
|1460523600|             65|               0|           237|                      9|   -10.0|
|1408942800|             65|               0|           440|                      9|    -6.0|
|1389330000|             22|               0|           155|                      9|   -10.0|
+----------+---------------+----------------+--------------+-----------------------+--------+
only showing top 5 rows



Se hace una verificación del rango de fechas disponible en los datos

#### Carga

**OJO** Recuerde antes de guardar los datos que la tabla no exista o este vacía, para que no se guarden los mismos datos varias veces y no ocupar más espacio.

In [98]:
guardar_db(dest_db_connection_string, mov_t,'Estudiante_58_202413.Hecho_Movimiento', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

# Resultado de consultas
Corresponde a las consultas realizadas sobre las tablas, para mostrar el estado final de las tablas pobladas como resultado del proceso de ETL.

# 3. Tarea ETL
Espacio para desarrollar la tarea planteada

## 4. Cierre
Completado este tutorial, ya sabe cómo realizar ETL básicos en PySpark.


## 5. Información adicional

Si quiere conocer más sobre PySpark la guía más detallada es la documentación oficial, la cual puede encontrar acá: https://spark.apache.org/docs/latest/api/python/index.html <br>
Para ir directamente a la documentación de PySpark SQL, donde está la información sobre los DataFrames, haga clic en este enlace: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html <br>

El Capítulo 2 del libro <i>Learn PySpark : Build Python-based Machine Learning and Deep Learning Models, New York: Apress. 2019</i> de Pramod Singh contiene muchos ejemplos útiles, puede encontrarlo en la biblioteca virtual de la universidad.

## 6. Preguntas frecuentes

- Si al intentar escribir un <i>dataframe</i> obtiene un error en el formato: 
    ```
    path file:<PATH>/dw/<PATH> already exists.;
    ```
    Borre la carpeta indicada en el error y vuelva a intentar.

- Si al ejecutar su código obtiene el error: 
    ```
    ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=tutorial ETL PySpark, master=local) created by __init__ at <ipython-input-4-64455da959dd>:92 

    ```
    reinicie el kernel del notebook y vuelva a intentar.