# Tarea: Diseñar y construir un proceso ETL (T)

## 1. Módelo y diseño ETL
[link módelo y diseño ](https://github.com/MISW-4402-Analisis-y-Modelado-de-datos/Estudiante_42/blob/main/ETL/README.md)

## 2. Implementación

In [1]:
# Configuración servidor base de datos transaccional
# Recuerde usar Estudiante_i como usuario y la contraseña asigana en el excel de conexión a maquina virtual como contraseña
db_user = ''
db_psswd = ''
source_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/WWImportersTransactional'

dest_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/Estudiante_42'

# Driver de conexion
path_jar_driver = 'C:\Program Files (x86)\MySQL\Connector J 8.0\mysql-connector-java-8.0.28.jar'

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql.types import StructType
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.types import FloatType, StringType, IntegerType, DateType
from pyspark.sql.functions import udf, col, length, isnan, when, count, year, date_format, to_date, unix_timestamp, from_unixtime
import pyspark.sql.functions as f
import os 
from datetime import datetime
from pyspark.sql import types as t
from pandas_profiling import ProfileReport
#import matplotlib.pyplot as plt
import numpy as np

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
#Configuración de la sesión
conf=SparkConf() \
    .set('spark.driver.extraClassPath', path_jar_driver)

spark_context = SparkContext(conf=conf)
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession



### Conexión y carga de datos

Se define la función para conexión y cargue de dataframes desde la base de datos origen y luego la función para guardar un dataframe en una tabla de la base de datos destino.

In [4]:
def obtener_dataframe_de_bd(db_connection_string, sql, db_user, db_psswd):
    df_bd = spark.read.format('jdbc')\
        .option('url', db_connection_string) \
        .option('dbtable', sql) \
        .option('user', db_user) \
        .option('password', db_psswd) \
        .option('driver', 'com.mysql.cj.jdbc.Driver') \
        .load()
    return df_bd

def guardar_db(db_connection_string, df, tabla, db_user, db_psswd):
    df.select('*').write.format('jdbc') \
      .mode('append') \
      .option('url', db_connection_string) \
      .option('dbtable', tabla) \
      .option('user', db_user) \
      .option('password', db_psswd) \
      .option('driver', 'com.mysql.cj.jdbc.Driver') \
      .save()

### PROVEEDORES
Empezamos con  la dimensión <i>Proveedor</i>, Su fuente de datos es una combinación de las tablas transaccionales <i>Proveedores</i> y <i>CategoriasProveedores</i>.

#### Extracción
A continuación, nos conectamos a la base de datos y extraemos la información deseada por medio de SQL, cargandola en un DataFrame PySpark.

In [5]:
sql_proveedores = '''(SELECT ProveedorID AS ID_Proveedor_T, NombreProveedor AS Nombre, CategoriaProveedorID , PersonaContactoPrincipalID AS Contacto_principal, DiasPago AS Dias_pago_Temp, CodigoPostal AS Codigo_postal
FROM  WWImportersTransactional.Proveedores p) AS proveedor_Temp'''
proveedores = obtener_dataframe_de_bd(source_db_connection_string, sql_proveedores, db_user, db_psswd)
proveedores.show(5)

+--------------+--------------------+--------------------+------------------+--------------+-------------+
|ID_Proveedor_T|              Nombre|CategoriaProveedorID|Contacto_principal|Dias_pago_Temp|Codigo_postal|
+--------------+--------------------+--------------------+------------------+--------------+-------------+
|             4|      Fabrikam, Inc.|                   4|                27|            30|        40351|
|             5|Graphic Design In...|                   2|                29|            14|        64847|
|             7|       Litware, Inc.|                   5|                33|            30|        95245|
|             9|      Nod Publishers|                   2|                37|             7|        27906|
|            10|Northwind Electri...|                   3|                39|            30|         7860|
+--------------+--------------------+--------------------+------------------+--------------+-------------+
only showing top 5 rows



In [6]:
sql_categoria_proveedores = '''(SELECT CategoriaProveedorID, CategoriaProveedor AS Categoria FROM WWImportersTransactional.CategoriasProveedores) AS proveedores_categorias_Temp'''
categoria_proveedores = obtener_dataframe_de_bd(source_db_connection_string, sql_categoria_proveedores, db_user, db_psswd)
categoria_proveedores.show(5)

+--------------------+-------------------+
|CategoriaProveedorID|          Categoria|
+--------------------+-------------------+
|                   1|     otro mayorista|
|                   2|productos novedosos|
|                   3|           juguetes|
|                   4|               ropa|
|                   5|           embalaje|
+--------------------+-------------------+
only showing top 5 rows



#### Transformación
* Unir tablas Proveedores y CategoriasProveedores
* Modificar nombres estandar modelo
* Todo número negativo en el campo Dias_pago se multiplica por -1 
* agregar autonumérico para el DWH


In [7]:
# TRANSFORMACION
proveedores = proveedores.join(categoria_proveedores, how = 'inner', on = 'CategoriaProveedorID')
proveedores = proveedores.withColumn("Dias_pago", when(proveedores.Dias_pago_Temp < 0, proveedores.Dias_pago_Temp * -1).otherwise( proveedores.Dias_pago_Temp)).drop("Dias_pago_Temp")
proveedores = proveedores.withColumn('ID_Proveedor_DWH', f.monotonically_increasing_id() + 1)
proveedores.show()

+--------------------+--------------+--------------------+------------------+-------------+--------------------+---------+----------------+
|CategoriaProveedorID|ID_Proveedor_T|              Nombre|Contacto_principal|Codigo_postal|           Categoria|Dias_pago|ID_Proveedor_DWH|
+--------------------+--------------+--------------------+------------------+-------------+--------------------+---------+----------------+
|                   2|             5|Graphic Design In...|                29|        64847| productos novedosos|       14|               1|
|                   2|             9|      Nod Publishers|                37|        27906| productos novedosos|        7|               2|
|                   2|            12|   The Phone Company|                43|        56732| productos novedosos|       30|               3|
|                   2|             2|       Contoso, Ltd.|                23|        98253| productos novedosos|        7|               4|
|                   

#### Carga
Una vez realizado esto, se guardan los resultados en la base de datos destino

In [10]:
# CARGUE
guardar_db(dest_db_connection_string, proveedores,'Estudiante_42.Proveedor', db_user, db_psswd)

### TIPO TRANSACCION
Empezamos con  la dimensión <i>TipoTransaccion</i>. Su fuente de dato es de la tabla transaccional <i>TipoTransaccion</i>

#### Extracción

In [11]:
sql_tipo_transaccion = '''(SELECT TipoTransaccionID AS ID_Tipo_transaccion_T, TipoTransaccionNombre AS Tipo FROM WWImportersTransactional.TiposTransaccion) AS Tipo_transaccion_Temp'''
tipo_transaccion = obtener_dataframe_de_bd(source_db_connection_string, sql_tipo_transaccion, db_user, db_psswd)
tipo_transaccion.show(5)

+---------------------+--------------------+
|ID_Tipo_transaccion_T|                Tipo|
+---------------------+--------------------+
|                    2|Customer Credit Note|
|                    3|Customer Payment ...|
|                    4|     Customer Refund|
|                    5|    Supplier Invoice|
|                    6|Supplier Credit Note|
+---------------------+--------------------+
only showing top 5 rows



#### Transformación
* Modificar nombres estandar modelo
* Agregar autonumérico para el DWH

In [12]:
# TRANSFORMACION
tipo_transaccion = tipo_transaccion.withColumn('ID_Tipo_transaccion_DWH', f.monotonically_increasing_id() + 1)
tipo_transaccion.show(5)

+---------------------+--------------------+-----------------------+
|ID_Tipo_transaccion_T|                Tipo|ID_Tipo_transaccion_DWH|
+---------------------+--------------------+-----------------------+
|                    2|Customer Credit Note|                      1|
|                    3|Customer Payment ...|                      2|
|                    4|     Customer Refund|                      3|
|                    5|    Supplier Invoice|                      4|
|                    6|Supplier Credit Note|                      5|
+---------------------+--------------------+-----------------------+
only showing top 5 rows



#### Carga

In [13]:
# CARGUE
guardar_db(dest_db_connection_string, tipo_transaccion,'Estudiante_42.TipoTransaccion', db_user, db_psswd)

### HECHO MOVIMIENTO
Empezamos con  la tabla <i>Hecho_movimientos</i>. Su fuente de dato es de la tabla transaccional <i>movimientos</i>

#### Extracción

In [14]:
#EXTRACCION
sql_hecho_movimiento = '''(SELECT StockItemTransactionID AS ID_Transaccion_T, TransactionOccurredWhen AS Fecha_movimiento_Temp, StockItemID AS ID_producto, SupplierID AS ID_Proveedor, CustomerID AS ID_Cliente, Quantity AS Cantidad
FROM WWImportersTransactional.movimientos) AS Temp_Hecho_Movimiento'''
hecho_movimiento = obtener_dataframe_de_bd(source_db_connection_string, sql_hecho_movimiento, db_user, db_psswd)
hecho_movimiento.show(5)

+----------------+---------------------+-----------+------------+----------+--------+
|ID_Transaccion_T|Fecha_movimiento_Temp|ID_producto|ID_Proveedor|ID_Cliente|Cantidad|
+----------------+---------------------+-----------+------------+----------+--------+
|           94344|          Jan 20,2014|        108|            |     185.0|   -10.0|
|           96548|          Jan 28,2014|        162|            |     176.0|   -10.0|
|           96560|          Jan 28,2014|        216|            |     474.0|   -10.0|
|           96568|          Jan 28,2014|         22|            |     901.0|   -10.0|
|           96648|          Jan 28,2014|         25|            |     926.0|   -10.0|
+----------------+---------------------+-----------+------------+----------+--------+
only showing top 5 rows



#### Transformación
* Modificar nombres estandar modelo
* Eliminar duplicados
* Estandarizar fechas
* Agregar autonumérico para el DWH

In [15]:
hecho_movimiento = hecho_movimiento.dropDuplicates()
hecho_movimiento.show()

+----------------+---------------------+-----------+------------+----------+--------+
|ID_Transaccion_T|Fecha_movimiento_Temp|ID_producto|ID_Proveedor|ID_Cliente|Cantidad|
+----------------+---------------------+-----------+------------+----------+--------+
|          280212|          Nov 18,2015|         45|            |      33.0|   -10.0|
|          200365|          Feb 21,2015|         40|            |     577.0|   -10.0|
|          129490|          Jun 03,2014|         17|            |     892.0|   -10.0|
|          129677|          Jun 04,2014|         13|            |      92.0|   -10.0|
|          265767|          Sep 29,2015|         11|            |     572.0|   -10.0|
|          250434|          Aug 06,2015|         69|            |      11.0|   -10.0|
|          307982|          Feb 25,2016|        214|            |     529.0|   -10.0|
|          117255|          Apr 19,2014|         23|            |     429.0|   -10.0|
|          221409|          May 01,2015|        162|  

In [16]:
regex = "[0-2]\d{3}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1])"
hecho_movimiento = hecho_movimiento.withColumn("Fecha_movimiento", when(~hecho_movimiento["Fecha_movimiento_Temp"].rlike(regex),  to_date(from_unixtime(unix_timestamp(hecho_movimiento.Fecha_movimiento_Temp, "MMM dd,yyyy")))).otherwise( hecho_movimiento.Fecha_movimiento_Temp)).drop("Fecha_movimiento_Temp")
hecho_movimiento.show(5)

+----------------+-----------+------------+----------+--------+----------------+
|ID_Transaccion_T|ID_producto|ID_Proveedor|ID_Cliente|Cantidad|Fecha_movimiento|
+----------------+-----------+------------+----------+--------+----------------+
|          280212|         45|            |      33.0|   -10.0|      2015-11-18|
|          200365|         40|            |     577.0|   -10.0|      2015-02-21|
|          129490|         17|            |     892.0|   -10.0|      2014-06-03|
|          129677|         13|            |      92.0|   -10.0|      2014-06-04|
|          265767|         11|            |     572.0|   -10.0|      2015-09-29|
+----------------+-----------+------------+----------+--------+----------------+
only showing top 5 rows



In [17]:
hecho_movimiento = hecho_movimiento.withColumn('ID_Transaccion_T_DWH', f.monotonically_increasing_id() + 1)
hecho_movimiento.show(5)

+----------------+-----------+------------+----------+--------+----------------+--------------------+
|ID_Transaccion_T|ID_producto|ID_Proveedor|ID_Cliente|Cantidad|Fecha_movimiento|ID_Transaccion_T_DWH|
+----------------+-----------+------------+----------+--------+----------------+--------------------+
|          280212|         45|            |      33.0|   -10.0|      2015-11-18|                   1|
|          200365|         40|            |     577.0|   -10.0|      2015-02-21|                   2|
|          129490|         17|            |     892.0|   -10.0|      2014-06-03|                   3|
|          129677|         13|            |      92.0|   -10.0|      2014-06-04|                   4|
|          265767|         11|            |     572.0|   -10.0|      2015-09-29|                   5|
+----------------+-----------+------------+----------+--------+----------------+--------------------+
only showing top 5 rows



#### Carga

In [18]:
# CARGUE
guardar_db(dest_db_connection_string, hecho_movimiento,'Estudiante_42.Hecho_movimiento', db_user, db_psswd)

## VERIFICACIONES

* Los días de pago no pueden ser negativos no tiene sentido para nuestro negocio. Por favor corregir multiplicando los datos negativos por -1.

In [27]:
proveedores.filter(proveedores["Dias_pago"] < 0 ).count()

0

* Sobre “La cantidad máxima de productos movidos es 50 millones por viaje”, encontramos que efectivamente gracias a los avances ya podemos cargar más que la cantidad de 50 millones por viajes.

No se ha aclarado que es un viaje y como se representa

* La falta de datos antes del 2014 es un error de extracción de datos. Los nuevos datos incluyen este año.

In [24]:
hecho_movimiento.select(year('Fecha_movimiento').alias('year')).distinct().show()

+----+
|year|
+----+
|2015|
|2013|
|2014|
|2016|
+----+



* Nuestro análisis concluye que la información que se ha duplicado totalmente no es útil. Por favor no tenerlos en cuenta.

In [26]:
hecho_movimiento.count() ==  hecho_movimiento.distinct().count()

True

* El formato de fechas manejado es YYYY-MM-DD HH:MM:SS si tienen hora, minutos y segundos. De lo contrario el formato es YYYY-MM-DD”: En cuanto a formatos de fechas estamos de acuerdo con que los estandarizemos y el formato sea el especificado en la regla de negocio

In [28]:
regex = "[0-2]\d{3}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1])"
hecho_movimiento.filter(~hecho_movimiento["Fecha_movimiento"].rlike(regex)).count()

0

* Existen proveedores que tienen 2 filas una con un nombre y otra con el mismo nombre mas un “Inc” o “Ltd”. Unimos estos a un solo proveedor dado que se trató de un error de digitación.

In [30]:
proveedores.select("Nombre").show()

+--------------------+
|              Nombre|
+--------------------+
|Graphic Design In...|
|      Nod Publishers|
|   The Phone Company|
|       Contoso, Ltd.|
|  Lucerne Publishing|
| A Datum Corporation|
|Northwind Electri...|
|      Fabrikam, Inc.|
|       Litware, Inc.|
|Consolidated Mess...|
|      Woodgrove Bank|
|       Trey Research|
| Humongous Insurance|
+--------------------+



* El código postal igual para todos nuestros proveedores es un error que también fue corregido.

In [33]:
proveedores.select("Codigo_postal").show()

+-------------+
|Codigo_postal|
+-------------+
|        64847|
|        27906|
|        56732|
|        98253|
|        37659|
|        46077|
|         7860|
|        40351|
|        95245|
|        94101|
|        94101|
|        57543|
|        37770|
+-------------+



* Cantidades negativas significan salidas de productos del inventario

In [34]:
hecho_movimiento.select('Cantidad').describe().show()

+-------+------------------+
|summary|          Cantidad|
+-------+------------------+
|  count|            236667|
|   mean| 551.2711108857593|
| stddev|4080.6268075975277|
|    min|            -360.0|
|    max|           67368.0|
+-------+------------------+

