# Tarea ETL

## Conexion Pyspark

In [1]:
import os 
from pyspark.sql import functions as f, SparkSession, types as t
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.functions import udf, col, length, isnan, when, count, regexp_replace
from datetime import datetime

In [2]:
# Configuración servidor base de datos transaccional
db_user = 'Estudiante_1_202413'
db_psswd = 'aabb1122'
source_db_connection_string = 'jdbc:mysql://157.253.236.120:8080/WWImportersTransactional'
dest_db_connection_string = 'jdbc:mysql://157.253.236.120:8080/Estudiante_1_202413'
path_jar_driver = 'C:\Program Files (x86)\MySQL\Connector J 8.0\mysql-connector-java-8.0.28.jar'

In [3]:
#Configuración de la sesión
conf=SparkConf() \
    .set('spark.driver.extraClassPath', path_jar_driver)

spark_context = SparkContext(conf=conf)
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession




In [4]:
#spark.stop()

In [5]:
def obtener_dataframe_de_bd(db_connection_string, sql, db_user, db_psswd):
    df_bd = spark.read.format('jdbc')\
        .option('url', db_connection_string) \
        .option('dbtable', sql) \
        .option('user', db_user) \
        .option('password', db_psswd) \
        .option('driver', 'com.mysql.cj.jdbc.Driver') \
        .load()
    return df_bd

def guardar_db(db_connection_string, df, tabla, db_user, db_psswd):
    df.select('*').write.format('jdbc') \
      .mode('append') \
      .option('url', db_connection_string) \
      .option('dbtable', tabla) \
      .option('user', db_user) \
      .option('password', db_psswd) \
      .option('driver', 'com.mysql.cj.jdbc.Driver') \
      .save()


## Proceso ETL dimension Proveedor

### ETL dimension Proveedor

#### Extraccion

Se extraen los datos de la base de datos WWImportersTransactional. Especificamente se traen las tablas proveedoresCopia, CategoriasProveedores y Personas. Adicionalmente se cambia el nombre de la columna ProveedorID as ID_Proveedor_T y CategoriaProveedorID as CategoriaProveedorID_T.

In [6]:
sql_proveedoresCopia = '''(SELECT DISTINCT ProveedorID as ID_Proveedor_T, NombreProveedor as Nombre, PersonaContactoPrincipalID, DiasPago as Dias_pago, CodigoPostal as Codigo_postal, CategoriaProveedorID as CategoriaProveedorID_FK FROM WWImportersTransactional.proveedoresCopia) AS Temp_proveedores'''
Proveedor = obtener_dataframe_de_bd(source_db_connection_string, sql_proveedoresCopia, db_user, db_psswd)
sql_categorias = '''(SELECT DISTINCT CategoriaProveedorID as CategoriaProveedorID_T, CategoriaProveedor as Categoria FROM WWImportersTransactional.CategoriasProveedores) AS Temp_categorias'''
CategoriasProveedor = obtener_dataframe_de_bd(source_db_connection_string, sql_categorias, db_user, db_psswd)
sql_personas = '''(SELECT DISTINCT ID_persona, NombreCompleto as Contacto_principal FROM WWImportersTransactional.Personas) AS Temp_personas'''
Personas = obtener_dataframe_de_bd(source_db_connection_string, sql_personas, db_user, db_psswd)
Proveedor.show(5)
CategoriasProveedor.show(5)
Personas.show(5)

+--------------+--------------------+--------------------------+---------+-------------+-----------------------+
|ID_Proveedor_T|              Nombre|PersonaContactoPrincipalID|Dias_pago|Codigo_postal|CategoriaProveedorID_FK|
+--------------+--------------------+--------------------------+---------+-------------+-----------------------+
|             4|       Fabrikam Inc.|                        27|       30|        40351|                      4|
|             5|Graphic Design In...|                        29|       14|        64847|                      2|
|             7|        Litware Inc.|                        33|       30|        95245|                      5|
|             9|      Nod Publishers|                        37|        7|        27906|                      2|
|            10|Northwind Electri...|                        39|       30|         7860|                      3|
+--------------+--------------------+--------------------------+---------+-------------+--------

#### Transformacion

Se realiza la union de los dataframes Proveedor y  CategoriasProveedor. Adicionalmente se crea la columna ID_Proveedor_DWH y se adiciona indice. Por ultimo se filtran las columnas que se necesitan para el analisis.

In [7]:
Proveedor = Proveedor.join(CategoriasProveedor, Proveedor["CategoriaProveedorID_FK"] == CategoriasProveedor["CategoriaProveedorID_T"], "inner")
Proveedor = Proveedor.join(Personas, Proveedor["PersonaContactoPrincipalID"] == Personas["ID_persona"], "inner")
Proveedor = Proveedor.coalesce(1).withColumn('ID_Proveedor_DWH', f.monotonically_increasing_id() + 1)
Proveedor = Proveedor.select('ID_Proveedor_DWH', 'ID_Proveedor_T', 'Nombre', 'Categoria', 'Contacto_principal', 'Dias_pago', 'Codigo_postal')
# Mostrar los primeros 5 registros del resultado
Proveedor.show(5)


+----------------+--------------+-------------------+--------------------+------------------+---------+-------------+
|ID_Proveedor_DWH|ID_Proveedor_T|             Nombre|           Categoria|Contacto_principal|Dias_pago|Codigo_postal|
+----------------+--------------+-------------------+--------------------+------------------+---------+-------------+
|               1|             6|Humongous Insurance|servicios de seguros|Madelaine  Cartier|      -14|        37770|
|               2|            20|  Fabrikam Inc. Ltd|                ropa|       Bill Lawson|       30|        40351|
|               3|             4|      Fabrikam Inc.|                ropa|       Bill Lawson|       30|        40351|
|               4|            26|  Trey Research Inc|servicios de mark...|      Donald Jones|        7|        57543|
|               5|            11|      Trey Research|servicios de mark...|      Donald Jones|       -7|        57543|
+----------------+--------------+-------------------+---

Sigiendo la regla de negocio donde indican que no deben existir dias de pago negativos, se realiza la transformacion de los valores de la columna Dias_Pago de tal forma que no existan valores negativos. Esto se realiza multiplicando por -1 los valores negativos.

In [8]:
Proveedor = Proveedor.withColumn("Dias_pago",when(col("Dias_pago") < 0, col("Dias_pago") * -1).otherwise(col("Dias_pago")))

Siguiendo la regla de negocio que indica que hay proveedores duplicados (mismo nombre, pero les adicionaron Inc o Ltd), se procede a identificar proveedores duplicados que discrepen unicamente por las palabras Inc o Ltd para seguidamente eliminar uno de los registros y tener en la tabla de Proveedores del DWH registros unicos.

In [9]:
print("Cantidad de registros contando probveedores duplicados: " + str(Proveedor.count()))
Proveedor.select("ID_Proveedor_T", "Nombre").distinct().orderBy("Nombre").show()

Proveedor = Proveedor.withColumn("Nombre", regexp_replace(col("Nombre"), r"\s*Inc\.?\s*|\s*Ltd\.?\s*", "")).dropDuplicates(["Nombre"])
print("Cantidad de registros sin  probveedores duplicados: " + str(Proveedor.count()))
valores_unicos = Proveedor.select("ID_Proveedor_T", "Nombre").distinct().orderBy("Nombre").show()


Cantidad de registros contando probveedores duplicados: 19
+--------------+--------------------+
|ID_Proveedor_T|              Nombre|
+--------------+--------------------+
|             1| A Datum Corporation|
|            16|A Datum Corporati...|
|             3|Consolidated Mess...|
|             2|        Contoso Ltd.|
|             4|       Fabrikam Inc.|
|            20|   Fabrikam Inc. Ltd|
|             5|Graphic Design In...|
|             6| Humongous Insurance|
|             7|        Litware Inc.|
|            23|    Litware Inc. Ltd|
|             8|  Lucerne Publishing|
|             9|      Nod Publishers|
|            10|Northwind Electri...|
|            25|Northwind Electri...|
|            12|   The Phone Company|
|            11|       Trey Research|
|            26|   Trey Research Inc|
|            13|      Woodgrove Bank|
|            29|  Woodgrove Bank Ltd|
+--------------+--------------------+

Cantidad de registros sin  probveedores duplicados: 13
+----------

En total se eliminaron 6 registros. Se puede evidenciar de las tablas anteriores que los registros que se eliminaron fueron los ID_proveedor_T: 1 (Proveedor A Datum Corporation), 4 (Proveedor Fabrikam ), 7 (Proveedor Litware), 10 (Proveedor Northwind Electric Cars), 11 (Proveedor Trey research) y 13 (Woodgrove Bank). Estas informacion de IDs transaccionales es importante por que en la tabla de movimientos deberan identificarse los movimientos asociados a estos proveedores y remplazar el ID por los IDs que quedaron como IDs transaccionales finales para estos proveedores (16, 20, 23, 25, 26 y 29 respectivamente). Debe validarse con el negocio la opcion de asegurar que las tablas transaccionales se arreglen para evitar tener proveedores duplicados y asi afectar los registros de la tabla de movimientos.

#### Carga

Se procede a cargar la tabla de Proveedores al DWH. El nombre de la tabla es Proveedor.

In [10]:
guardar_db(dest_db_connection_string, Proveedor,'Estudiante_1_202413.Proveedor', db_user, db_psswd)
Proveedor.show(5)

+----------------+--------------+--------------------+--------------------+------------------+---------+-------------+
|ID_Proveedor_DWH|ID_Proveedor_T|              Nombre|           Categoria|Contacto_principal|Dias_pago|Codigo_postal|
+----------------+--------------+--------------------+--------------------+------------------+---------+-------------+
|              16|            16| A Datum Corporation| productos novedosos|        Reio Kabin|       14|        46077|
|              14|             3|Consolidated Mess...|servicios de mens...|      Kerstin Parn|       30|        94101|
|               9|             2|             Contoso| productos novedosos|   Hanna Mihhailov|        7|        98253|
|               2|            20|            Fabrikam|                ropa|       Bill Lawson|       30|        40351|
|              15|             5|Graphic Design In...| productos novedosos|        Penny Buck|       14|        64847|
+----------------+--------------+---------------

Se valida en WorkBench MySQL que la tabla quedara cargada con los campos requeridos: ID_Proveedorm_DWH, ID_Proveedor_T, Nombre, Categoria, Contacto_principal, Dias_pago, Codigo_postal. 

![image.png](attachment:4149d54e-a4d4-4d91-a4fd-77b203097881.png)

### ETL dimension tipoTransaccion

#### Extraccion

Se extraen los datos de la base de datos WWImportersTransactional. Especificamente se trae la tabla TiposTransaccion

In [11]:
sql_TiposTransaccion = '''(SELECT DISTINCT TipoTransaccionID, TipoTransaccionNombre FROM WWImportersTransactional.TiposTransaccion) AS Temp_TipoTransaccion'''
TipoTransaccion = obtener_dataframe_de_bd(source_db_connection_string, sql_TiposTransaccion, db_user, db_psswd)
TipoTransaccion.show(5)

+-----------------+---------------------+
|TipoTransaccionID|TipoTransaccionNombre|
+-----------------+---------------------+
|                2| Customer Credit Note|
|                3| Customer Payment ...|
|                4|      Customer Refund|
|                5|     Supplier Invoice|
|                6| Supplier Credit Note|
+-----------------+---------------------+
only showing top 5 rows



#### Transformacion

Se procede a realizar un renombramiento de la columna TipoTransaccionID por ID_Tipo_transaccion_T y TipoTransaccionNombre por Tipo. Adicionalmente se crea la columna ID_Tipo_transaccion_DWH y se adiciona indice. Por ultimo se filtran las columnas que se necesitan para el analisis. 

In [12]:
TipoTransaccion = TipoTransaccion.withColumnRenamed("TipoTransaccionID", "ID_Tipo_transaccion_T").withColumnRenamed("TipoTransaccionNombre", "Tipo")
TipoTransaccion = TipoTransaccion.coalesce(1).withColumn('ID_Tipo_transaccion_DWH', f.monotonically_increasing_id() + 1)
TipoTransaccion = TipoTransaccion.select("ID_Tipo_transaccion_DWH", "ID_Tipo_transaccion_T", "Tipo")
TipoTransaccion.show(5)

+-----------------------+---------------------+--------------------+
|ID_Tipo_transaccion_DWH|ID_Tipo_transaccion_T|                Tipo|
+-----------------------+---------------------+--------------------+
|                      1|                    2|Customer Credit Note|
|                      2|                    3|Customer Payment ...|
|                      3|                    4|     Customer Refund|
|                      4|                    5|    Supplier Invoice|
|                      5|                    6|Supplier Credit Note|
+-----------------------+---------------------+--------------------+
only showing top 5 rows



#### Carga

Se procede a cargar la tabla de Proveedores al DWH. El nombre de la tabla es TipoTransaccion.

In [13]:
guardar_db(dest_db_connection_string, TipoTransaccion,'Estudiante_1_202413.TipoTransaccion', db_user, db_psswd)
TipoTransaccion.show(5)

+-----------------------+---------------------+--------------------+
|ID_Tipo_transaccion_DWH|ID_Tipo_transaccion_T|                Tipo|
+-----------------------+---------------------+--------------------+
|                      1|                    2|Customer Credit Note|
|                      2|                    3|Customer Payment ...|
|                      3|                    4|     Customer Refund|
|                      4|                    5|    Supplier Invoice|
|                      5|                    6|Supplier Credit Note|
+-----------------------+---------------------+--------------------+
only showing top 5 rows



Se valida en WorkBench MySQL que la tabla quedara cargada con los campos requeridos: ID_Tipo_transaccion_DWH, ID_Tipo_transaccion_T, Tipo.

![image.png](attachment:04e5bb9b-69df-42f1-bd78-7551fa260e84.png)

### ETL dimension Fecha 

#### Extraccion

Se extraen los datos de la base de datos WWImportersTransactional. Especificamente se trae de la tabla movimientosCopia la columna FechaTransaccion. De esta forma podemos cubrir todas las fechas de transacciones que ocurrieran de acuerdo a los registro de la compañia. Adicionañlmente se cambia el nombre de la columna FechaTransaccion por Fecha.

In [14]:
sql_fecha = '''(SELECT DISTINCT FechaTransaccion as Fecha FROM WWImportersTransactional.movimientosCopia) AS Temp_Fecha'''
Fecha = obtener_dataframe_de_bd(source_db_connection_string, sql_fecha, db_user, db_psswd)
Fecha.show(5)

+-----------+
|      Fecha|
+-----------+
|Apr 25,2014|
|Dec 10,2015|
|Dec 04,2015|
|Dec 23,2015|
|Jul 27,2015|
+-----------+
only showing top 5 rows



#### Transformacion

Se procede inicialmente a identificar las fechas que cumplen con el estandat definido por la compañia, el cual es YYYYMMDD. En esta validacion se usael patron definido en regex para identificar las fechas que cumplan con este formato.

In [15]:
regex = "([0-2]\d{3}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1]))"
cumplenFormato = Fecha.filter(Fecha["Fecha"].rlike(regex))
noCumplenFormato = Fecha.filter(~Fecha["Fecha"].rlike(regex))
print(noCumplenFormato.count(), cumplenFormato.count())
print(noCumplenFormato.show(5))

758 1397
+-----------+
|      Fecha|
+-----------+
|Apr 25,2014|
|Dec 10,2015|
|Dec 04,2015|
|Dec 23,2015|
|Jul 27,2015|
+-----------+
only showing top 5 rows

None


Procedemos a transformar las fechas que no cumplen con el formato YYYYMMDD. Para esto creamos una nueva columna Fecha_convertida la cual va a tomar las fechas que no cumplen con el formato y crear un nuevo registro con un objeto tipo date que tendra el formato YYYY-MM-DD. EL tipo de dato de la columna Fecha es string y el tipo de dato de la columna Fecha_convertida es date.

In [16]:
#Transformacion fechas que no cumplen el formato
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DateType
from datetime import datetime

# Función para convertir el formato "Mon DD, YYYY" a un objeto datetime.date
def convertir_fecha(fecha):
    return datetime.strptime(fecha, "%b %d,%Y").date()

# Registrar la función UDF con tipo de retorno DateType
convertir_fecha_udf = udf(convertir_fecha, DateType())
df_converted_1 = noCumplenFormato.withColumn("Fecha_convertida", convertir_fecha_udf(col("Fecha")))
df_converted_1.show(4)
df_converted_1.dtypes

+-----------+----------------+
|      Fecha|Fecha_convertida|
+-----------+----------------+
|Apr 25,2014|      2014-04-25|
|Dec 10,2015|      2015-12-10|
|Dec 04,2015|      2015-12-04|
|Dec 23,2015|      2015-12-23|
+-----------+----------------+
only showing top 4 rows



[('Fecha', 'string'), ('Fecha_convertida', 'date')]

Adicionalmente para las fechas que si cumplen el patron definido por la organizacion, se procede a crear una nueva columna para crear fechas con tipo de objeto date.

In [17]:
#Transformacion fechas que si cumplen el formato
from pyspark.sql.functions import col, substring, to_date, concat_ws, expr, dayofmonth, month, year, weekofyear
df_converted_2 = cumplenFormato.withColumn("Fecha_convertida", to_date(substring(col("Fecha"), 1, 10)))
df_converted_2.show(4)
df_converted_2.dtypes

+--------------------+----------------+
|               Fecha|Fecha_convertida|
+--------------------+----------------+
|2014-01-20 12:00:...|      2014-01-20|
|2014-01-28 12:00:...|      2014-01-28|
|2014-02-01 12:00:...|      2014-02-01|
|2014-03-25 12:00:...|      2014-03-25|
+--------------------+----------------+
only showing top 4 rows



[('Fecha', 'string'), ('Fecha_convertida', 'date')]

Ahora unimos las dos tablas, la de las fechas que no cumplian el formato y la de las fechas que si lo cumplian. Adicionalmente se adiciona una columna llamada ID_Fecha que tenga la fecha en formato YYYYMMDD. Esta nueva columna es numerica (int) y se usara para la tabla de hechos.

In [18]:
#Union de tablas
Fecha = df_converted_1.union(df_converted_2)
#Transformacion para crear columna ID_Fecha
Fecha = Fecha.withColumn("ID_Fecha", expr("int(concat_ws('', year(Fecha_convertida), lpad(month(Fecha_convertida), 2, '0'), lpad(dayofmonth(Fecha_convertida), 2, '0')))"))
Fecha.show(4)

+-----------+----------------+--------+
|      Fecha|Fecha_convertida|ID_Fecha|
+-----------+----------------+--------+
|Apr 25,2014|      2014-04-25|20140425|
|Dec 10,2015|      2015-12-10|20151210|
|Dec 04,2015|      2015-12-04|20151204|
|Dec 23,2015|      2015-12-23|20151223|
+-----------+----------------+--------+
only showing top 4 rows



Finalmente se procede a adicionar las columnas Fecha, Dia, Mes, Anio y Numersemana_ISO. Se deja la tabla Fecha con las columnas que se necesitan cargar al DWH.

In [19]:
Fecha = Fecha.withColumn("Dia", dayofmonth(col("Fecha_convertida"))) \
    .withColumn("Mes", month(col("Fecha_convertida"))) \
    .withColumn("Anioa", year(col("Fecha_convertida"))) \
    .withColumn("Numero_semana_ISO", weekofyear(col("Fecha_convertida")))

Fecha = Fecha.select("ID_fecha", "Fecha", "Dia", "Mes", "Anioa", "Numero_semana_ISO")
Fecha.show(4)

+--------+-----------+---+---+-----+-----------------+
|ID_fecha|      Fecha|Dia|Mes|Anioa|Numero_semana_ISO|
+--------+-----------+---+---+-----+-----------------+
|20140425|Apr 25,2014| 25|  4| 2014|               17|
|20151210|Dec 10,2015| 10| 12| 2015|               50|
|20151204|Dec 04,2015|  4| 12| 2015|               49|
|20151223|Dec 23,2015| 23| 12| 2015|               52|
+--------+-----------+---+---+-----+-----------------+
only showing top 4 rows



#### Carga

Se procede a cargar la tabla de Proveedores al DWH. El nombre de la tabla es Fecha.

In [20]:
guardar_db(dest_db_connection_string, Fecha,'Estudiante_1_202413.Fecha', db_user, db_psswd)

Se valida en WorkBench MySQL que la tabla quedara cargada con los campos requeridos: ID_fecha, Fecha, Dia, Mes, Anioa, Numero_semana_ISO. 

![image.png](attachment:3e9337cb-e74c-4f78-8936-19b20bf1c306.png)

### ETL Hecho Movimientos

#### Extraccion

Se extraen los datos de la base de datos WWImportersTransactional. Especificamente se trae la tabla movimientosCopia que tiene el registro de todos los movimientos de la empresa. Tambien se traen las tablas de Producto y Cliente, las cuales segun el modelo multidimencional se requieren. Estas tablas se les adiciona el ID del DWH ya que las tablas originales que estaban el el DWH son tablas de otro ejercicio y no correspondian a las tablas originales de la base de datos transaccional.

In [21]:
sql_movimientos = '''(SELECT ProductoID, TipoTransaccionID, ClienteID, ProveedorID, FechaTransaccion, Cantidad FROM WWImportersTransactional.movimientosCopia) AS Temp_movimientos'''
Hecho_Movimientos = obtener_dataframe_de_bd(source_db_connection_string, sql_movimientos, db_user, db_psswd)
#Hecho_Movimientos = Hecho_Movimientos.withColumn("ProveedorID", col("ProveedorID").cast("int"))
sql_producto = '''(SELECT ID_Producto as ID_Producto_T  FROM WWImportersTransactional.Producto) AS Temp_producto'''
Producto = obtener_dataframe_de_bd(source_db_connection_string, sql_producto, db_user, db_psswd)
Producto = Producto.coalesce(1).withColumn('ID_Producto_DWH', f.monotonically_increasing_id() + 1)
sql_cliente = '''(SELECT ID_Cliente as ID_Cliente_T  FROM WWImportersTransactional.Clientes) AS Temp_clientes'''
Cliente = obtener_dataframe_de_bd(source_db_connection_string, sql_cliente, db_user, db_psswd)
Cliente = Cliente.coalesce(1).withColumn('ID_Cliente_DWH', f.monotonically_increasing_id() + 1)
Hecho_Movimientos.show(2)
Producto.show(2)
Cliente.show(2)

+----------+-----------------+---------+-----------+----------------+--------+
|ProductoID|TipoTransaccionID|ClienteID|ProveedorID|FechaTransaccion|Cantidad|
+----------+-----------------+---------+-----------+----------------+--------+
|       217|               10|    476.0|           |     Apr 25,2014|   -40.0|
|       135|               10|     33.0|           |     Dec 10,2015|    -7.0|
+----------+-----------------+---------+-----------+----------------+--------+
only showing top 2 rows

+-------------+---------------+
|ID_Producto_T|ID_Producto_DWH|
+-------------+---------------+
|            1|              1|
|            2|              2|
+-------------+---------------+
only showing top 2 rows

+------------+--------------+
|ID_Cliente_T|ID_Cliente_DWH|
+------------+--------------+
|           1|             1|
|           2|             2|
+------------+--------------+
only showing top 2 rows



Validamos la cantidad de registros de la tabla Hecho_Movimientos para contrastar la informacion que se cargue al subir los datos al DWH con el fin de asegurar que no se pierdan registros.

In [22]:
Hecho_Movimientos.count()

204292

#### Transformacion

Se realiza al cambio de tipo de objeto para la columna ProveedorID de la tabla Hecho_Movimientos. Esto ya que actualmente es un string y ára comparar contra la tabla de Proveedores debe ser de tipo numerico.

In [23]:
Hecho_Movimientos = Hecho_Movimientos.withColumn("ProveedorID", col("ProveedorID").cast("int"))

Se realiza join entre las tablas Hecho_Movimientos y Fecha comparando las columnas FechaTransaccion y Fecha respectivamente. Despues se realiza join con la tabla Producto comparando contra ProductoID  de la tabla de hechos y ID_Producto_T de la tabla de Producto.

In [24]:
Hecho_Movimientos = Hecho_Movimientos.join(Fecha, Hecho_Movimientos["FechaTransaccion"] == Fecha["Fecha"], "inner")
Hecho_Movimientos = Hecho_Movimientos.join(Producto, Hecho_Movimientos["ProductoID"] == Producto["ID_Producto_T"], "inner")
Hecho_Movimientos.count()

204292

Para poder adicionar la columna de ID_Proveedor_DWH es necesario ajustar los IDs transaccionales de los proveedores en la tabla de Hecho_Movimeintos. Esto se debe a que en la etapa de ETL de la tabla proveedor se eliminaron proveedores duplicados de acuerdo a la regla de negocio. Primero identificamos con que proveedores se han realizado transacciones para luego poder identificar el ID que debe remplazarse.

In [25]:
Hecho_Movimientos.select("ProveedorID").distinct().show()

+-----------+
|ProveedorID|
+-----------+
|       null|
|          1|
|          4|
|          7|
+-----------+



Los IDs transaccionales de los proveedores que tiene la tabla de Hecho_Movimiento son el 1, 4 y 7. Se revisa la correspondencia de los IDs Transaccionales que se identificaron en el proceso de ETL de la tabla de Proveedores y se mapea que IDs transaccionales se eliminaron, y cuales son los IDs actuales que representan el mismo proveedor. De esta validacion se identifica:

ID Transaccional eliminado 1 --> ID transaccional definitivo 16 

ID Transaccional eliminado 4 --> ID transaccional definitivo 20

ID Transaccional eliminado 7 --> ID transaccional definitivo 23

Una vez se tiene esta correspondencia se hace el mapeo sobre la tabla de hechos para reflejar los IDs transaccionales definitivos de esos proveedores.

In [26]:
# Crear la condición para la transformación
reemplazos = {1: 16, 4: 20, 7: 23}
condicion = [(col("ProveedorID") == k) for k in reemplazos.keys()]
transformacion = [reemplazos[k] for k in reemplazos.keys()]

# Aplicar la transformación usando when() y otherwise()
Hecho_Movimientos = Hecho_Movimientos.withColumn("ProveedorID", 
    when(condicion[0], transformacion[0])
    .when(condicion[1], transformacion[1])
    .when(condicion[2], transformacion[2])
    .otherwise(col("ProveedorID")))



In [27]:
Hecho_Movimientos.select("ProveedorID").distinct().show()

+-----------+
|ProveedorID|
+-----------+
|       null|
|         16|
|         20|
|         23|
+-----------+



Se hace join entre las tablas Hecho_Movimientos y  Proveedor comparando las columnas ProveedorID y ID_Proveedor_T. Despues se hace join con la tabla de Cliente y finalmente con la tabla TipoTransaccion.

In [28]:
Hecho_Movimientos = Hecho_Movimientos.join(Proveedor, Hecho_Movimientos["ProveedorID"] == Proveedor["ID_Proveedor_T"], "left")
Hecho_Movimientos = Hecho_Movimientos.join(Cliente, Hecho_Movimientos["ClienteID"] == Cliente["ID_Cliente_T"], "left")
Hecho_Movimientos = Hecho_Movimientos.join(TipoTransaccion, Hecho_Movimientos["TipoTransaccionID"] == TipoTransaccion["ID_Tipo_transaccion_T"], "left")

In [29]:
Hecho_Movimientos.count()

204292

Finalmente se selecciona las columnas que van a quedar definitivas en la tabla Hechos_Movimientos

In [30]:
Hecho_Movimientos = Hecho_Movimientos.select("ID_Fecha", "ID_Producto_DWH", "ID_Proveedor_DWH", "ID_Cliente_DWH", "ID_Tipo_transaccion_DWH", "Cantidad")

In [31]:
Hecho_Movimientos.show(5)

+--------+---------------+----------------+--------------+-----------------------+--------+
|ID_Fecha|ID_Producto_DWH|ID_Proveedor_DWH|ID_Cliente_DWH|ID_Tipo_transaccion_DWH|Cantidad|
+--------+---------------+----------------+--------------+-----------------------+--------+
|20140227|             40|            null|           285|                      9|    -9.0|
|20140227|             74|            null|           451|                      9|    -4.0|
|20140227|            146|            null|           526|                      9|   -24.0|
|20140227|            129|            null|           276|                      9|    -6.0|
|20140227|            176|            null|            37|                      9|    -7.0|
+--------+---------------+----------------+--------------+-----------------------+--------+
only showing top 5 rows



#### Carga

Se procede a cargar la tabla de Proveedores al DWH. El nombre de la tabla es Hecho_Movimiento.

In [32]:
guardar_db(dest_db_connection_string, Hecho_Movimientos,'Estudiante_1_202413.Hecho_Movimiento', db_user, db_psswd)

Se valida en WorkBench MySQL que la tabla quedara cargada con los campos requeridos: ID_Fecha, ID_Producto_DWH, ID_Proveedor_DWH, ID_Cliente_DWH, ID_Tipo_transaccion_DWH, Cantidad.

![image.png](attachment:d09a37b2-bdb4-438d-b624-6e917a99da01.png)

Adicionalmente se valida que la cantidad de registros de mivimientos se igual al que se cargo desde la base de datos transaccional, es decir: 204292.

![image.png](attachment:b265064a-0b34-4425-9f37-32eae3783812.png)