# Tutorial: creación de ETLs con PySpark

## 1. Introducción	
    ¿Qué aprenderá?
	En este tutorial aprenderá cómo puede utilizar PySpark para crear un proceso de ETL básico.

	¿Qué construirá?     
        Construirá un ETL que toma los datos desde la base de datos transacional de WideWorldImporters (WWImportersTransactional), los transforma  a una representación cercana al análisis y los  almacena en la base de datos relacional WWImportersDWH.
    
	¿Para qué?
	La construcción de ETLs que se ajusten a modelos multidimensionales es un paso necesario dentro de un proceso de analìtica 1.0 , pues permite tomar los datos crudos de una fuente, generalmente transaccional, para transformarlos en datos limpios que puedan utilizarse para la toma de decisiones.
    
    ¿Qué necesita?
    1. Python 3 con pip instalado
    2. Jupyter notebook
    3. Paquetes: Pyspark (3.0.1) y pandas (1.2.1)
    4. Controlador Connector J de MySQL (ya se encuentra instalado)
    5. Servidor SQL con base de datos relacional "WWImportersTransactional" y base de datos relacional que corresponde a la bodega de WWI "Estudiante_i"
	

## 2. Proceso de ETL para una dimensión.

En este proceso de ETL, se extraen los datos de las **órdenes de compra** de una base de datos transaccional y se almacenan en otra base de datos que corresponde a la bodega de datos, siguiendo una aproximación ROLAP. A continuación, se presenta el modelo multidimensional que es el modelo conceptual que representa el proceso de registro de órdenes de compra. Este modelo se utilizó para crear las tablas en la bodega de datos que representan el proceso de negocio y que serán cargadas como resultado del proceso ETL. 

Tenga en cuenta que las llaves ID_XXXX presentes en el modelo hacen referencia a las llaves de la bodega. Por otra parte, en el proceso de ETL se van a tener en cuenta las llaves transaccionales (**WWImportersTransactional**). La nomenclatura para utilizar es:

1.   ID_XXXX_DWH, para las llaves de la bodega.
2.   ID_XXXX_T, para las llaves transaccionales.


![Modelo ordenes](./WWI_modelo_ordenes.png)

El proceso de ETL debe ser diseñado antes de implementarse. A partir de las conclusiones del entendimiento de datos sabemos las fuentes que se van a  utilizar y la relación entre las fuentes. Adicionalmente, se cuenta con las respuestas de la organización a las preguntas, resultado del entendimiento de datos. De esa manera sabemos cómo se deben manipular los datos. 

Este proceso de ETL lo dividimos en seis bloques, uno para cada dimensión o <i>tabla de hechos</i> del modelo, con la única excepción de la dimensión de fecha que, por ser una dimensión especial que se genera de forma independiente, no se incluye aquí:

![ETL](./Disenio_ETL.PNG)

Recuerde que este es el diseño general. En el diseño completo se deben incluir las transformaciones realizadas a los datos a utilizarse en las dimensiones y tablas de hecho del modelo multidimensional, de acuerdo con lo que se muestra en la infografía de arquitectura de componentes (Componente proceso ETL) 

In [1]:
# Configuración servidor base de datos transaccional
# Recuerde usar Estudiante_i como usuario y la contraseña asigana en el excel de conexión a maquina virtual como contraseña
db_user = 'Estudiante_50_202314'
db_psswd = 'aabb1122'
source_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/WWImportersTransactional'

dest_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/Estudiante_50_202314'

# Driver de conexion
path_jar_driver = 'C:\Program Files (x86)\MySQL\Connector J 8.0\mysql-connector-java-8.0.28.jar'

In [2]:
import os 
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
from pyspark.sql import functions as f, SparkSession, types as t
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.functions import udf, col, length, isnan, when, count, regexp_replace, to_date, dayofmonth, month, year, weekofyear, date_format, trim
from datetime import datetime

In [3]:
#Configuración de la sesión
conf=SparkConf() \
    .set('spark.driver.extraClassPath', path_jar_driver)

spark_context = SparkContext(conf=conf)
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession



### Conexión y carga de datos

Se define la función para conexión y cargue de dataframes desde la base de datos origen y luego la función para guardar un dataframe en una tabla de la base de datos destino.

In [4]:
def obterner_dataframe_desde_csv(_PATH, _sep):
    return spark.read.load(_PATH, format="csv", sep=_sep, inferSchema="true", header='true')

def obtener_dataframe_de_bd(db_connection_string, sql, db_user, db_psswd):
    df_bd = spark.read.format('jdbc')\
        .option('url', db_connection_string) \
        .option('dbtable', sql) \
        .option('user', db_user) \
        .option('password', db_psswd) \
        .option('driver', 'com.mysql.cj.jdbc.Driver') \
        .load()
    return df_bd

def guardar_db(db_connection_string, df, tabla, db_user, db_psswd):
    df.select('*').write.format('jdbc') \
      .mode('append') \
      .option('url', db_connection_string) \
      .option('dbtable', tabla) \
      .option('user', db_user) \
      .option('password', db_psswd) \
      .option('driver', 'com.mysql.cj.jdbc.Driver') \
      .save()

### BLOQUE 1
Empezamos con el bloque 1: la dimensión <i>Empleado</i>, su fuente de datos viene de la tabla transaccional <i>Personas</i>. En la sentencia SQL filtramos usando WHERE para seleccionar solo las personas que sean vendedores y recuperamos únicamente los atributos que queremos, de acuerdo con  modelo dimensional. Recuerde que también puede usar el **.select()** de pyspark si no conoce los atributos de las tablas transaccionales. Sin embargo, es más eficiente aplicar el filtro en la consulta, ya que no trae a memoria más información de la necesaria.

#### Extracción
A continuación, nos conectamos a la base de datos y extraemos la información deseada por medio de SQL, cargandola en un DataFrame PySpark, es decir en memoria. Note que aquí se pueden renombrar los atributos con la estructura <i>nombreActual AS nuevoNombre</i>. De la tabla de personas, En este paso, solo nos interesan los empleados, por lo cual se hace un filtro por medio del WHERE, buscando las personas cuyo atributo EsVendedor sea igual a 1.

In [5]:
sql_empleados = '''(SELECT DISTINCT ID_persona AS ID_Empleado, NombreCompleto AS Nombre, EsVendedor FROM WWImportersTransactional.Personas WHERE EsVendedor=1) AS Temp_empleados'''
empleados = obtener_dataframe_de_bd(source_db_connection_string, sql_empleados, db_user, db_psswd)
empleados.show(10)

+-----------+------------------+----------+
|ID_Empleado|            Nombre|EsVendedor|
+-----------+------------------+----------+
|          2|    Kayla Woodcock|      true|
|          3|     Hudson Onslow|      true|
|          6|     Sophia Hinton|      true|
|          7|         Amy Trefl|      true|
|          8|    Anthony Grosse|      true|
|         13|Hudson Hollinworth|      true|
|         14|         Lily Code|      true|
|         15|         Taj Shand|      true|
|         16|     Archer Lamble|      true|
|         20|       Jack Potter|      true|
+-----------+------------------+----------+



#### Transformación
Recuerde que, puede hacer uso de selectExpr, filter, where entre otras de PySpark para modificar los datos cargados. Por ejemplo, el siguiente código utiliza <i>selectExpr</i> para renombrar la columna ID_Empleado por ID_Empleado_T, esta es la convención que vamos a utilizar: "_T" para indicar que el ID es el que estaba en la base de datos transaccional y "_DWH" para indicar que son ID's propios de la bodega. Usamos withColumn y monotonicallu_increasing_id para crear un ID acumulativo para cada registro en el dataframe

In [6]:
# TRANSFORMACION
empleados = empleados.selectExpr('ID_Empleado as ID_Empleado_T','Nombre')
empleados = empleados.coalesce(1).withColumn('ID_Empleado_DWH', f.monotonically_increasing_id() + 1)
empleados = empleados.select('ID_Empleado_DWH','ID_Empleado_T','Nombre')
empleados.show(5)

+---------------+-------------+--------------+
|ID_Empleado_DWH|ID_Empleado_T|        Nombre|
+---------------+-------------+--------------+
|              1|            2|Kayla Woodcock|
|              2|            3| Hudson Onslow|
|              3|            6| Sophia Hinton|
|              4|            7|     Amy Trefl|
|              5|            8|Anthony Grosse|
+---------------+-------------+--------------+
only showing top 5 rows



#### Carga
Una vez realizado esto, se guardan los resultados en la base de datos destino

In [7]:
# CARGUE
guardar_db(dest_db_connection_string, empleados,'Empleado', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 2
Empezamos el bloque 2: dimensión ciudad. Su fuente de datos es una combinación de las tablas transaccionales <i>paises, provinciasEstados y ciudades</i>

#### Extracción

In [8]:
#EXTRACCION
sql_paises = '''(SELECT DISTINCT ID_Pais, Nombre, Continente, Region, Subregion FROM WWImportersTransactional.Paises) AS Temp_paises'''
sql_provincias_estados = '''(SELECT DISTINCT ID_EstadosProvincias AS ID_EstadoProvincia, NombreEstadoProvincia, TerritorioVentas, ID_Pais FROM WWImportersTransactional.EstadosProvincias) AS Temp_estados_provincias'''
sql_ciudades = '''(SELECT DISTINCT ID_ciudad as ID_ciudad_T, NombreCiudad, ID_EstadoProvincia, Poblacion FROM WWImportersTransactional.Ciudades) AS Temp_ciudades'''

paises = obtener_dataframe_de_bd(source_db_connection_string, sql_paises, db_user, db_psswd)
provincias_estados = obtener_dataframe_de_bd(source_db_connection_string, sql_provincias_estados, db_user, db_psswd)
ciudades = obtener_dataframe_de_bd(source_db_connection_string, sql_ciudades, db_user, db_psswd)

print(ciudades.columns, paises.columns, provincias_estados.columns)

['ID_ciudad_T', 'NombreCiudad', 'ID_EstadoProvincia', 'Poblacion'] ['ID_Pais', 'Nombre', 'Continente', 'Region', 'Subregion'] ['ID_EstadoProvincia', 'NombreEstadoProvincia', 'TerritorioVentas', 'ID_Pais']


#### Transformación

In [9]:
# TRANSFORMACION
ciudades = ciudades.join(provincias_estados, how = 'left', on = 'ID_EstadoProvincia')
ciudades = ciudades.join(paises, how = 'left', on = 'ID_Pais')
ciudades = ciudades.coalesce(1).withColumn('ID_Ciudad_DWH', f.monotonically_increasing_id() + 1)
ciudades = ciudades.select('ID_Ciudad_DWH','ID_ciudad_T','NombreCiudad','Continente','Nombre','Poblacion',
                          'Region','TerritorioVentas','NombreEstadoProvincia','Subregion') \
                    .withColumnRenamed('Nombre','Pais')
ciudades.show(5)

+-------------+-----------+------------+-------------+-------------+---------+--------+----------------+---------------------+----------------+
|ID_Ciudad_DWH|ID_ciudad_T|NombreCiudad|   Continente|         Pais|Poblacion|  Region|TerritorioVentas|NombreEstadoProvincia|       Subregion|
+-------------+-----------+------------+-------------+-------------+---------+--------+----------------+---------------------+----------------+
|            1|         49|     Absecon|North America|United States|     8411|Americas|         Mideast|           New Jersey|Northern America|
|            2|        150|    Adelphia|North America|United States|     null|Americas|         Mideast|           New Jersey|Northern America|
|            3|        336|      Albion|North America|United States|     null|Americas|         Mideast|           New Jersey|Northern America|
|            4|        458|   Allamuchy|North America|United States|       78|Americas|         Mideast|           New Jersey|Northern A

#### Carga

In [10]:
# CARGUE
guardar_db(dest_db_connection_string, ciudades,'Ciudad', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 3
Bloque 3: dimensión paquete. Su fuente de datos es la tabla transaccional <i>Paquetes</i>

#### Extracción

In [11]:
#EXTRACCION
sql_paquetes = '''(SELECT DISTINCT ID_TipoPaquete AS ID_TipoPaquete_T, TipoPaquete AS Nombre FROM WWImportersTransactional.Paquetes) AS Temp_Paquetes'''
paquetes = obtener_dataframe_de_bd(source_db_connection_string, sql_paquetes, db_user, db_psswd)

#### Transformación

In [12]:
# TRANSFORMACION
paquetes = paquetes.coalesce(1).withColumn('ID_TipoPaquete_DWH', f.monotonically_increasing_id() + 1)
paquetes = paquetes.select('ID_TipoPaquete_DWH','ID_TipoPaquete_T','Nombre')
paquetes.show(5)

+------------------+----------------+------+
|ID_TipoPaquete_DWH|ID_TipoPaquete_T|Nombre|
+------------------+----------------+------+
|                 1|               1|   Bag|
|                 2|               2| Block|
|                 3|               3|Bottle|
|                 4|               4|   Box|
|                 5|               5|   Can|
+------------------+----------------+------+
only showing top 5 rows



#### Carga

In [13]:
# CARGUE
guardar_db(dest_db_connection_string, paquetes,'TipoPaquete', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 4
Bloque 4: dimensión producto, su fuente de datos es la combinación entre las tablas transaccionales Productos y Colores

#### Extracción

In [14]:
sql_productos = '''(SELECT DISTINCT ID_Producto as ID_Producto_T, ID_Color, NombreProducto, Marca, Necesita_refrigeracion, Dias_tiempo_entrega, Impuesto, PrecioUnitario, PrecioRecomendado FROM WWImportersTransactional.Producto) AS Temp_productos'''
sql_colores = '''(SELECT DISTINCT ID_Color, Color FROM WWImportersTransactional.Colores) AS Temp_colores'''

productos = obtener_dataframe_de_bd(source_db_connection_string, sql_productos, db_user, db_psswd)
colores = obtener_dataframe_de_bd(source_db_connection_string, sql_colores, db_user, db_psswd)

#### Transformación

In [15]:
# TRANSFORMACION
productos = productos.join(colores, how = 'left', on = 'ID_Color').fillna({'Color': 'Missing'})
productos = productos.coalesce(1).withColumn('ID_Producto_DWH', f.monotonically_increasing_id() + 1)
productos = productos.select('ID_Producto_DWH','ID_Producto_T','NombreProducto','Marca','Color','Necesita_refrigeracion','Dias_tiempo_entrega','PrecioRecomendado','Impuesto','PrecioUnitario')
productos.show(5)

+---------------+-------------+--------------------+---------+-----+----------------------+-------------------+-----------------+--------+--------------+
|ID_Producto_DWH|ID_Producto_T|      NombreProducto|    Marca|Color|Necesita_refrigeracion|Dias_tiempo_entrega|PrecioRecomendado|Impuesto|PrecioUnitario|
+---------------+-------------+--------------------+---------+-----+----------------------+-------------------+-----------------+--------+--------------+
|              1|           59|RC toy sedan car ...|Northwind|  Red|                     0|                 14|               37|      15|            25|
|              2|           64|RC vintage Americ...|Northwind|  Red|                     0|                 14|               45|      15|            30|
|              3|           68|Ride on toy sedan...|Northwind|  Red|                     0|                 14|              344|      15|           230|
|              4|           73|Ride on vintage A...|Northwind|  Red|        

#### Carga

In [16]:
# CARGUE
guardar_db(dest_db_connection_string, productos,'Producto', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 5
Bloque 5: dimensión cliente. Su fuente de datos es la combinación entre las tablas transaccionales Categorias de cliente, Grupos de compra y Clientes

#### Extracción

In [17]:
sql_categoriasCliente = '''(SELECT DISTINCT ID_Categoria, NombreCategoria FROM WWImportersTransactional.CategoriasCliente) AS Temp_categoriasclientes'''
sql_gruposCompra = '''(SELECT DISTINCT ID_GrupoCompra, NombreGrupoCompra FROM WWImportersTransactional.GruposCompra) AS Temp_gruposcompra'''
sql_clientes = '''(SELECT DISTINCT ID_Cliente as ID_Cliente_T, Nombre, ClienteFactura, ID_Categoria, ID_GrupoCompra, ID_CiudadEntrega, LimiteCredito, FechaAperturaCuenta, DiasPago FROM WWImportersTransactional.Clientes) AS Temp_clientes'''

categoriasCliente = obtener_dataframe_de_bd(source_db_connection_string, sql_categoriasCliente, db_user, db_psswd)
gruposCompra = obtener_dataframe_de_bd(source_db_connection_string, sql_gruposCompra, db_user, db_psswd)
clientes = obtener_dataframe_de_bd(source_db_connection_string, sql_clientes, db_user, db_psswd)

#### Transformación

In [18]:
# TRANSFORMACION 
# EL SUPUESTO QUE SE TIENE ES QUE TODOS LOS CLIENTES TIENEN TODOS SUS DATOS DE CATEGORIA Y GRUPO Y NO SE ESTÁN PERDIENDO CLIENTES AL REALIZAR ESTE JOIN
clientes = clientes.join(gruposCompra, how = 'left', on = 'ID_GrupoCompra')
clientes = clientes.alias('cl').join(categoriasCliente.alias('ct'), how = 'left', on = 'ID_Categoria') \
.select([col('cl.ID_Cliente_T'),col('cl.Nombre'),col('ct.NombreCategoria'),col('cl.NombreGrupoCompra') \
        ,col('cl.ClienteFactura'),col('cl.ID_CiudadEntrega'),col('cl.LimiteCredito'),col('cl.FechaAperturaCuenta'),col('cl.DiasPago')])
clientes = clientes.coalesce(1).withColumn('ID_Cliente_DWH', f.monotonically_increasing_id() + 1)
clientes = clientes.select('ID_Cliente_DWH','ID_Cliente_T','Nombre','NombreCategoria','NombreGrupoCompra','ClienteFactura',
                          'ID_CiudadEntrega','LimiteCredito','FechaAperturaCuenta','DiasPago')

clientes = clientes.fillna({'NombreCategoria':'Missing','NombreGrupoCompra':'Missing'})
clientes.show(5)

+--------------+------------+--------------------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|ID_Cliente_DWH|ID_Cliente_T|              Nombre|NombreCategoria|NombreGrupoCompra|ClienteFactura|ID_CiudadEntrega|LimiteCredito|FechaAperturaCuenta|DiasPago|
+--------------+------------+--------------------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|             1|         801|         Eric Torres|      Corporate|          Missing|           801|           31321|         3000|2013-01-01 00:00:00|       7|
|             2|         802|        Cosmina Vlad|      Corporate|          Missing|           802|            5192|         2940|2013-01-01 00:00:00|       7|
|             3|         803|          Bala Dixit|   Novelty Shop|          Missing|           803|           33799|         2000|2013-01-01 00:00:00|       7|
|             4|         804|Aleksandrs 

In [19]:
# Crea el registro para el id = 0
clientes_0 = [('0','','Missing','Missing','Missing','0','0','','','')]
columns = ['ID_Cliente_DWH','ID_Cliente_T','Nombre','NombreCategoria','NombreGrupoCompra','ClienteFactura',
            'ID_CiudadEntrega','LimiteCredito','FechaAperturaCuenta','DiasPago']
cliente_0 = spark.createDataFrame(data=clientes_0,schema=columns)
cliente_0.show()

+--------------+------------+-------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|ID_Cliente_DWH|ID_Cliente_T| Nombre|NombreCategoria|NombreGrupoCompra|ClienteFactura|ID_CiudadEntrega|LimiteCredito|FechaAperturaCuenta|DiasPago|
+--------------+------------+-------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|             0|            |Missing|        Missing|          Missing|             0|               0|             |                   |        |
+--------------+------------+-------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+



In [20]:
clientes = clientes.union(cliente_0)
clientes.show(5)

+--------------+------------+--------------------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|ID_Cliente_DWH|ID_Cliente_T|              Nombre|NombreCategoria|NombreGrupoCompra|ClienteFactura|ID_CiudadEntrega|LimiteCredito|FechaAperturaCuenta|DiasPago|
+--------------+------------+--------------------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|             1|         801|         Eric Torres|      Corporate|          Missing|           801|           31321|         3000|2013-01-01 00:00:00|       7|
|             2|         802|        Cosmina Vlad|      Corporate|          Missing|           802|            5192|         2940|2013-01-01 00:00:00|       7|
|             3|         803|          Bala Dixit|   Novelty Shop|          Missing|           803|           33799|         2000|2013-01-01 00:00:00|       7|
|             4|         804|Aleksandrs 

In [21]:
# Se ordena por el identificador DWH
clientes = clientes.withColumn('ID_Cliente_DWH',col('ID_Cliente_DWH').cast('int')).orderBy(col('ID_Cliente_DWH'))
clientes.show(5)

+--------------+------------+--------------------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|ID_Cliente_DWH|ID_Cliente_T|              Nombre|NombreCategoria|NombreGrupoCompra|ClienteFactura|ID_CiudadEntrega|LimiteCredito|FechaAperturaCuenta|DiasPago|
+--------------+------------+--------------------+---------------+-----------------+--------------+----------------+-------------+-------------------+--------+
|             0|            |             Missing|        Missing|          Missing|             0|               0|             |                   |        |
|             1|         801|         Eric Torres|      Corporate|          Missing|           801|           31321|         3000|2013-01-01 00:00:00|       7|
|             2|         802|        Cosmina Vlad|      Corporate|          Missing|           802|            5192|         2940|2013-01-01 00:00:00|       7|
|             3|         803|          B

#### Carga

In [22]:
# CARGUE
guardar_db(dest_db_connection_string,clientes,'Cliente', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

### BLOQUE 6
Bloque 6: Hecho orden. Su fuente de datos es la combinación entre las tablas transaccionales Ordenes y detalles de orden

#### Extracción

In [23]:
sql_ordenes = '''(SELECT DISTINCT * FROM WWImportersTransactional.Ordenes) AS Temp_ordenes'''
sql_detallesOrdenes = '''(SELECT DISTINCT * FROM WWImportersTransactional.DetallesOrdenes) AS Temp_detallesordenes'''
ordenes = obtener_dataframe_de_bd(source_db_connection_string, sql_ordenes, db_user, db_psswd)
detallesOrdenes = obtener_dataframe_de_bd(source_db_connection_string, sql_detallesOrdenes, db_user, db_psswd)

#### Transformación
Estas son las respuestas de Wide World Importers a los conclusiones obtenidas en el entendimiento de los datos:
- La regla de negocio "La tasa de impuesto es de 10% o 15%" es correcta, pero habian errores en la tabla original, que fueron corregidos. 
- Para la segunda regla de negocio: "Son 73.595 órdenes detalladas en 231.412 lineas de detalle de órdenes realizadas desde 2013", si faltaban datos, los cuales fueron completados, y nos dicen que en cuanto a consistencia ellos revisaron las tablas e hicieron correcciones, pero que los duplicados completos de ordenes los eliminemos
- "El formato de fechas manejado es YYYY-MM-DD HH:MM:SS si tienen hora, minutos y segundos. De lo contrario el formato es YYYY-MM-DD": En cuanto a formatos de fechas estan de acuerdo con que los estandarizemos y el formato sea el especificado en la regla
- Para las descripciones de productos que eran "a", se actualizaron a los valores reales. 
- Se pueden eliminar las columnas Comenarios, Instrucciones_de_entrega y comentarios_internos porque estan vacias. 
- A pesar de estar en un proceso de mejorar la calidad de los datos y mantener los nulos nos ayudaría a reflejar esa calidad, de la mano con el grupo de analitica de WWI se decide imputar por la media el valor extremo de la variable Cantidad
- Para las ordenes las columnas Seleccionado_por_ID_de_persona, ID_de_pedido_pendiente, Seleccion_completada_cuando, y para las columnas Seleccion_completada_cuando de la tabla detalles de ordenes, se decide mantener los valores vacíos, sin embargo para la variable Precio_unitario el negocio reviso y complemento los valores faltantes

Las tablas usadas en el tutorial de entendimiento de datos estaran disponibles para su revision con los siguientes nombres: OrdenesCopia y DetallesOrdenesCopia. 

Para este tutorial vamos a trabajar con unas tablas que dadas las conclusiones del tutorial de entendimiento, WWImporters revisó los datos originales, creo tablas y las llamo "Ordenes" y "DetallesOrdenes"

Se hace una verificación de los valores de la tasa de impuesto

In [24]:
detallesOrdenes.select("Tasa_de_impuesto").distinct().show()

+----------------+
|Tasa_de_impuesto|
+----------------+
|              10|
|              15|
+----------------+



Se hace una verificación del rango de fechas disponible en los datos

In [25]:
ordenes.agg({"Fecha_de_pedido": "min"}).show()

+--------------------+
|min(Fecha_de_pedido)|
+--------------------+
|          2013-01-01|
+--------------------+



Se elimina columnas Comenarios, Instrucciones_de_entrega y comentarios_internos

In [26]:
ordenes = ordenes.drop(*["Comentarios", "Instrucciones_de_entrega","comentarios_internos"])

Se eliminan duplicados totales de ordenes

In [27]:
print((ordenes.count(),ordenes.distinct().count()))

(93629, 93629)


In [28]:
ordenes = ordenes.drop_duplicates()

In [29]:
print((ordenes.count(),ordenes.distinct().count()))

(93629, 93629)


Se hace verificación de consistencia

In [30]:
#consistencia: revisar genially: definicion de consistencia
ids_ordenes = set([x.ID_de_pedido for x in ordenes.select('ID_de_pedido').collect()])
ids_detalles = set([x.ID_de_pedido for x in detallesOrdenes.select('ID_de_pedido').collect()])

len(ids_ordenes-ids_detalles), len(ids_detalles-ids_ordenes)

(0, 0)

En el siguiente código para el manejo de fechas, pasamos del formato MM dd,YYYY al formato establecido en la regla de negocio<br>

In [31]:
# TRANSFORMACION
regex = "([0-2]\d{3}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1]))"
cumplenFormato = ordenes.filter(ordenes["Fecha_de_pedido"].rlike(regex))
noCumplenFormato = ordenes.filter(~ordenes["Fecha_de_pedido"].rlike(regex))
print(noCumplenFormato.count(), cumplenFormato.count())
print(noCumplenFormato.show(5))
noCumplenFormato = noCumplenFormato.withColumn('Fecha_de_pedido', f.udf(lambda d: datetime.strptime(d, '%b %d,%Y').strftime('%Y-%m-%d'), t.StringType())(f.col('Fecha_de_pedido')))
ordenes = noCumplenFormato.union(cumplenFormato)
noCumplenFormato.count(), ordenes.count()

20034 73595
+------------+-------------+--------------+------------------------------+-------------------------+----------------------+---------------+-------------------------+--------------------------------------+-------------------------------------------+---------------------------+
|ID_de_pedido|ID_de_cliente|ID_de_vendedor|Seleccionado_por_ID_de_persona|ID_de_persona_de_contacto|ID_de_pedido_pendiente|Fecha_de_pedido|Fecha_de_entrega_esperada|Numero_de_pedido_de_compra_del_cliente|Pedido_pendiente_de_suministro_insuficiente|Seleccion_completada_cuando|
+------------+-------------+--------------+------------------------------+-------------------------+----------------------+---------------+-------------------------+--------------------------------------+-------------------------------------------+---------------------------+
|       20972|          132|             6|                             8|                     1263|                  null|    Jan 28,2014|               201

(20034, 93629)

Descripciones


In [32]:
detallesOrdenes.where(length(col("Descripcion")) <= 10).show()

+----------------+------------+-----------+-----------+---------------+--------+---------------+----------------+---------------------+---------------------------+
|Detalle_orden_ID|ID_de_pedido|ID_Producto|Descripcion|ID_Tipo_Paquete|Cantidad|Precio_unitario|Tasa_de_impuesto|Cantidad_seleccionada|Seleccion_completada_cuando|
+----------------+------------+-----------+-----------+---------------+--------+---------------+----------------+---------------------+---------------------------+
+----------------+------------+-----------+-----------+---------------+--------+---------------+----------------+---------------------+---------------------------+



Imputar valor maximo de cantidad

In [33]:
detallesOrdenes.select('Cantidad').sort(col("Cantidad").desc()).collect()[1]

Row(Cantidad=360)

In [34]:
detallesOrdenes = detallesOrdenes.replace( 10000000, 360, 'Cantidad')

In [35]:
detallesOrdenes.select('Cantidad').sort(col("Cantidad").desc()).collect()[0]

Row(Cantidad=360)

In [36]:
detallesOrdenes.show(5)

+----------------+------------+-----------+--------------------+---------------+--------+---------------+----------------+---------------------+---------------------------+
|Detalle_orden_ID|ID_de_pedido|ID_Producto|         Descripcion|ID_Tipo_Paquete|Cantidad|Precio_unitario|Tasa_de_impuesto|Cantidad_seleccionada|Seleccion_completada_cuando|
+----------------+------------+-----------+--------------------+---------------+--------+---------------+----------------+---------------------+---------------------------+
|               1|          45|        164|32 mm Double side...|              7|      50|            112|              15|                   50|        2013-01-02 11:00:00|
|               2|           1|         67|Ride on toy sedan...|              7|      10|            230|              15|                   10|        2013-01-01 11:00:00|
|               3|           2|         50|Developer joke mu...|              7|       9|             13|              15|             

In [37]:
ordenes.show(5)

+------------+-------------+--------------+------------------------------+-------------------------+----------------------+---------------+-------------------------+--------------------------------------+-------------------------------------------+---------------------------+
|ID_de_pedido|ID_de_cliente|ID_de_vendedor|Seleccionado_por_ID_de_persona|ID_de_persona_de_contacto|ID_de_pedido_pendiente|Fecha_de_pedido|Fecha_de_entrega_esperada|Numero_de_pedido_de_compra_del_cliente|Pedido_pendiente_de_suministro_insuficiente|Seleccion_completada_cuando|
+------------+-------------+--------------+------------------------------+-------------------------+----------------------+---------------+-------------------------+--------------------------------------+-------------------------------------------+---------------------------+
|       20972|          132|             6|                             8|                     1263|                  null|     2014-01-28|               2014-01-29|    

Se unen los dos dataframes en un nuevo dataframe, se verifica que no haya duplicados y si los hay se eliminan. Se crea un nuevo dataframe que va a tener toda la información del hecho orden transformada y lista para continuar el proceso de cargue.

In [38]:
ordenes_tmp =ordenes
ordenes_tmp = ordenes_tmp.join(detallesOrdenes, how = 'inner', on = 'ID_de_pedido')
ordenes_tmp = ordenes_tmp.withColumn('Valor_total',col('Precio_unitario')*col('Cantidad'))
ordenes_tmp = ordenes_tmp.withColumn('Impuestos',col('Valor_total')*col('Tasa_de_impuesto'))
ordenes_tmp = ordenes_tmp.selectExpr('ID_de_pedido as ID_de_pedido_T','ID_Producto','Fecha_de_pedido','ID_de_cliente','ID_de_vendedor','ID_Tipo_Paquete','Cantidad','Valor_total', 'Impuestos')

print((ordenes_tmp.count(),ordenes_tmp.distinct().count()))

ordenes_tmp = ordenes_tmp.drop_duplicates()
ordenes_tmp.show(5)

(294314, 231412)
+--------------+-----------+---------------+-------------+--------------+---------------+--------+-----------+---------+
|ID_de_pedido_T|ID_Producto|Fecha_de_pedido|ID_de_cliente|ID_de_vendedor|ID_Tipo_Paquete|Cantidad|Valor_total|Impuestos|
+--------------+-----------+---------------+-------------+--------------+---------------+--------+-----------+---------+
|           148|        203|     2013-01-02|          812|            13|              7|      40|       1280|    19200|
|           463|         64|     2013-01-09|          555|             3|              7|       1|         30|      450|
|           463|         10|     2013-01-09|          555|             3|              7|       8|        256|     3840|
|           463|         16|     2013-01-09|          555|             3|              7|      10|        130|     1950|
|           463|         57|     2013-01-09|          555|             3|              7|       3|         39|      585|
+--------------

In [39]:
guardar_db(dest_db_connection_string, ordenes_tmp,'Hecho_Orden_Tmp', db_user, db_psswd)

Cree la tabla de Fecha según el material compartido y adicione el left join al crear la tabla de ordenes para que quede completa

In [40]:
# El idPedido representa la dimensión degenerada Pedido
# Si hay campos nulos en ordenes_tmp al hacer join por el left outer join no se perderan y se utiliza como comodín un id=0 que debe existir en todas las dimensiones.
# Ese comodín representa el registro sin Dato.
# Debe adicionarle a todas las tablas el registro con identificador 0, como se muestra para la tabla de clientes
# Recuerde que falta incluir el join con la tabla de Fecha para que la tabla quede completa.

ordenes = ordenes_tmp.alias('o').join(clientes.alias('cl'), ordenes_tmp.ID_de_cliente == clientes.ID_Cliente_T,'left')\
                    .join(ciudades.alias('ciu'), clientes.ID_CiudadEntrega == ciudades.ID_ciudad_T,'left') \
                    .join(empleados.alias('e'), ordenes_tmp.ID_de_vendedor == empleados.ID_Empleado_T,'left') \
                    .join(paquetes.alias('p'), ordenes_tmp.ID_Tipo_Paquete == paquetes.ID_TipoPaquete_T,'left') \
                    .join(productos.alias('pr'), (ordenes_tmp.ID_Producto == productos.ID_Producto_T) ,'left') \
                    .select([col('o.ID_de_pedido_T'),col('cl.ID_Cliente_DWH'),col('ciu.ID_Ciudad_DWH'),
                             col('e.ID_Empleado_DWH'),col('pr.ID_Producto_DWH'),col('p.ID_TipoPaquete_DWH'),
                             col('o.Cantidad'),col('o.Valor_total'),col('o.Impuestos')]) \
                    .fillna({'ID_Cliente_DWH': 0, 'ID_Ciudad_DWH': 0, 'ID_Empleado_DWH': 0, 'ID_Producto_DWH': 0,
                             'ID_TipoPaquete_DWH': 0})
ordenes.show(5)

+--------------+--------------+-------------+---------------+---------------+------------------+--------+-----------+---------+
|ID_de_pedido_T|ID_Cliente_DWH|ID_Ciudad_DWH|ID_Empleado_DWH|ID_Producto_DWH|ID_TipoPaquete_DWH|Cantidad|Valor_total|Impuestos|
+--------------+--------------+-------------+---------------+---------------+------------------+--------+-----------+---------+
|           148|            12|        35925|              6|             89|                 7|      40|       1280|    19200|
|           463|           617|        29242|              2|              2|                 7|       1|         30|      450|
|           463|           617|        29242|              2|             22|                 7|       8|        256|     3840|
|           463|           617|        29242|              2|            165|                 7|      10|        130|     1950|
|           463|           617|        29242|              2|            135|                 7|       3

In [41]:
ordenes = ordenes.select('ID_de_pedido_T','ID_Ciudad_DWH','ID_Cliente_DWH','ID_Empleado_DWH','ID_Producto_DWH',
                         'ID_TipoPaquete_DWH','Cantidad','Impuestos','Valor_total') \
                    .withColumnRenamed('Valor_total','Total')
ordenes.show(5)

+--------------+-------------+--------------+---------------+---------------+------------------+--------+---------+-----+
|ID_de_pedido_T|ID_Ciudad_DWH|ID_Cliente_DWH|ID_Empleado_DWH|ID_Producto_DWH|ID_TipoPaquete_DWH|Cantidad|Impuestos|Total|
+--------------+-------------+--------------+---------------+---------------+------------------+--------+---------+-----+
|           148|        35925|            12|              6|             89|                 7|      40|    19200| 1280|
|           463|        29242|           617|              2|              2|                 7|       1|      450|   30|
|           463|        29242|           617|              2|             22|                 7|       8|     3840|  256|
|           463|        29242|           617|              2|            165|                 7|      10|     1950|  130|
|           463|        29242|           617|              2|            135|                 7|       3|      585|   39|
+--------------+--------

#### Carga

In [70]:
guardar_db(dest_db_connection_string, ordenes,'Hecho_Orden', db_user, db_psswd)

Verifique los resultados usando MySQL Workbench

# Resultado de consultas
Corresponde a las consultas realizadas sobre las tablas, para mostrar el estado final de las tablas pobladas como resultado del proceso de ETL.

# 3. Tarea ETL
Espacio para desarrollar la tarea planteada

## Dimensión Proveedor
Su fuente de datos es la combinación entre las tablas transaccionales 'proveedores', 'CategoriasProveedores' y 'Personas'.

### Extracción

In [43]:
#EXTRACCION
sql_proveedores = '''(SELECT DISTINCT ProveedorID AS ID_Proveedor_T, NombreProveedor AS Nombre, CategoriaProveedorID, PersonaContactoPrincipalID, DiasPago AS Dias_pago, CodigoPostal AS Codigo_postal FROM WWImportersTransactional.proveedores) AS Temp_Proveedores'''
sql_categoriasProveedores = '''(SELECT DISTINCT * FROM WWImportersTransactional.CategoriasProveedores) AS Temp_CategoriasProveedores'''
sql_personas = '''(SELECT DISTINCT ID_persona, NombreCompleto FROM WWImportersTransactional.Personas) AS Temp_Personas'''

proveedores = obtener_dataframe_de_bd(source_db_connection_string, sql_proveedores, db_user, db_psswd)
categoriasProveedores = obtener_dataframe_de_bd(source_db_connection_string, sql_categoriasProveedores, db_user, db_psswd)
personas = obtener_dataframe_de_bd(source_db_connection_string, sql_personas, db_user, db_psswd)

### Transformación
Insights relevantes
- Los días de pago no pueden ser negativos no tiene sentido para nuestro negocio. Por favor corregir multiplicando los datos negativos por -1.
- Nuestro análisis concluye que la información que se ha duplicado totalmente no es útil. Por favor no tenerlos en cuenta. 
- Existen proveedores que tienen 2 filas una con un nombre y otra con el mismo nombre mas un "Inc" o "Ltd". Unimos estos a un solo proveedor dado que se trató de un error de digitación.
- El código postal igual para todos nuestros proveedores es un error que también fue corregido.
- El negocio indica que las tablas de categoriasProveedores y TiposTransaccion fueron analizadas previamente, por su grupo de consultores.

In [44]:
proveedores.show(5)

+--------------+--------------------+--------------------+--------------------------+---------+-------------+
|ID_Proveedor_T|              Nombre|CategoriaProveedorID|PersonaContactoPrincipalID|Dias_pago|Codigo_postal|
+--------------+--------------------+--------------------+--------------------------+---------+-------------+
|             4|      Fabrikam, Inc.|                   4|                        27|       30|        40351|
|             5|Graphic Design In...|                   2|                        29|       14|        64847|
|             7|       Litware, Inc.|                   5|                        33|       30|        95245|
|             9|      Nod Publishers|                   2|                        37|        7|        27906|
|            10|Northwind Electri...|                   3|                        39|       30|         7860|
+--------------+--------------------+--------------------+--------------------------+---------+-------------+
only showi

Empezamos eliminando las entradas duplicadas por el mismo nombre con la adición de 'Inc' o 'Ltd'.

In [45]:
# Retiramos sufijos ', Inc.' o ', Ltd.' de la columna 'Nombre'
proveedores = proveedores.withColumn(
    "Nombre",
    when(proveedores["Nombre"].endswith(", Inc."), regexp_replace(proveedores["Nombre"], ", Inc.$", ""))
    .when(proveedores["Nombre"].endswith(", Ltd."), regexp_replace(proveedores["Nombre"], ", Ltd.$", ""))
    .otherwise(proveedores["Nombre"])
)

# Eliminamos duplicados por la columna nombre manteniendo la primera entrada
proveedores = proveedores.dropDuplicates(subset=['Nombre'])

#Revisamos el cambio
proveedores.show(5)

+--------------+-------------------+--------------------+--------------------------+---------+-------------+
|ID_Proveedor_T|             Nombre|CategoriaProveedorID|PersonaContactoPrincipalID|Dias_pago|Codigo_postal|
+--------------+-------------------+--------------------+--------------------------+---------+-------------+
|            12|  The Phone Company|                   2|                        43|       30|        56732|
|            11|      Trey Research|                   8|                        41|       -7|        57543|
|             6|Humongous Insurance|                   9|                        31|      -14|        37770|
|             7|            Litware|                   5|                        33|       30|        95245|
|             2|            Contoso|                   2|                        23|       -7|        98253|
+--------------+-------------------+--------------------+--------------------------+---------+-------------+
only showing top 5 

Procedemos ahora a mantener el valor absoluto de la columna 'Dias_pago' por regla de negocio

In [46]:
proveedores = proveedores.withColumn("Dias_pago", when(col("Dias_pago") < 0, col("Dias_pago") * -1).otherwise(col("Dias_pago")))

#Revisamos el cambio
proveedores.show(5)

+--------------+-------------------+--------------------+--------------------------+---------+-------------+
|ID_Proveedor_T|             Nombre|CategoriaProveedorID|PersonaContactoPrincipalID|Dias_pago|Codigo_postal|
+--------------+-------------------+--------------------+--------------------------+---------+-------------+
|            12|  The Phone Company|                   2|                        43|       30|        56732|
|            11|      Trey Research|                   8|                        41|        7|        57543|
|             6|Humongous Insurance|                   9|                        31|       14|        37770|
|             7|            Litware|                   5|                        33|       30|        95245|
|             2|            Contoso|                   2|                        23|        7|        98253|
+--------------+-------------------+--------------------+--------------------------+---------+-------------+
only showing top 5 

Cambiamos ahora la columna 'CategoriaProvedorId' por 'Categoria' con un join con el df categoriasProveedores 

In [47]:
# Unir ambos DataFrames basado en 'CategoriaProveedorId'
proveedores = proveedores.join(categoriasProveedores, on="CategoriaProveedorID", how="left")

# Renombrar la columna a 'Categoria'
proveedores = proveedores.withColumnRenamed("CategoriaProveedor", "Categoria")

# Eliminar columna 'CategoriaProveedorID' 
proveedores = proveedores.drop("CategoriaProveedorID")

# Revisamos el cambio
proveedores.show(5)

+--------------+-------------------+--------------------------+---------+-------------+--------------------+
|ID_Proveedor_T|             Nombre|PersonaContactoPrincipalID|Dias_pago|Codigo_postal|           Categoria|
+--------------+-------------------+--------------------------+---------+-------------+--------------------+
|             7|            Litware|                        33|       30|        95245|            embalaje|
|             6|Humongous Insurance|                        31|       14|        37770|servicios de seguros|
|            11|      Trey Research|                        41|        7|        57543|servicios de mark...|
|            13|     Woodgrove Bank|                        45|        7|        94101|servicios financi...|
|            12|  The Phone Company|                        43|       30|        56732| productos novedosos|
+--------------+-------------------+--------------------------+---------+-------------+--------------------+
only showing top 5 

Cambiamos ahora la columna 'PersonaContactoPrincipalId' por 'Contacto_principal' con un join con el df personas 

In [48]:
# Unir ambos DataFrames basado en el ID de la persona (contacto principal)
proveedores = proveedores.join(personas, proveedores["PersonaContactoPrincipalId"] == personas["ID_persona"], "left")

# Renombrar la columna a 'Contacto_principal'
proveedores = proveedores.withColumnRenamed("NombreCompleto", "Contacto_principal")

# Eliminar columnas 'PersonaContactoPrincipalID' e 'ID_persona' 
proveedores = proveedores.drop("PersonaContactoPrincipalID", "ID_persona")

# Revisamos el cambio
proveedores.show(5)

+--------------+-------------------+---------+-------------+--------------------+------------------+
|ID_Proveedor_T|             Nombre|Dias_pago|Codigo_postal|           Categoria|Contacto_principal|
+--------------+-------------------+---------+-------------+--------------------+------------------+
|             2|            Contoso|        7|        98253| productos novedosos|   Hanna Mihhailov|
|             6|Humongous Insurance|       14|        37770|servicios de seguros|Madelaine  Cartier|
|             7|            Litware|       30|        95245|            embalaje|     Elias Myllari|
|            11|      Trey Research|        7|        57543|servicios de mark...|      Donald Jones|
|            12|  The Phone Company|       30|        56732| productos novedosos|           Hai Dam|
+--------------+-------------------+---------+-------------+--------------------+------------------+
only showing top 5 rows



Ahora solo queda generar la llave de la bodega 'ID_Proveedor_DWH', ordenar las columnas y ordenar las filas por la llave de la bodega. 

In [49]:
proveedores = proveedores.coalesce(1).withColumn('ID_Proveedor_DWH', f.monotonically_increasing_id() + 1)
proveedores = proveedores.select('ID_Proveedor_DWH','ID_Proveedor_T','Nombre','Categoria','Contacto_principal','Dias_pago','Codigo_postal')
proveedores.show(5)

+----------------+--------------+--------------------+--------------------+------------------+---------+-------------+
|ID_Proveedor_DWH|ID_Proveedor_T|              Nombre|           Categoria|Contacto_principal|Dias_pago|Codigo_postal|
+----------------+--------------+--------------------+--------------------+------------------+---------+-------------+
|               1|             1| A Datum Corporation| productos novedosos|        Reio Kabin|       14|        46077|
|               2|             2|             Contoso| productos novedosos|   Hanna Mihhailov|        7|        98253|
|               3|             3|Consolidated Mess...|servicios de mens...|      Kerstin Parn|       30|        94101|
|               4|             4|            Fabrikam|                ropa|       Bill Lawson|       30|        40351|
|               5|             5|Graphic Design In...| productos novedosos|        Penny Buck|       14|        64847|
+----------------+--------------+---------------

#### Carga

In [50]:
guardar_db(dest_db_connection_string, proveedores, 'Proveedor', db_user, db_psswd)

## Dimensión Fecha
Su fuente de datos es la tabla transaccional 'Movimientos'.

### Extracción

In [51]:
#EXTRACCION
sql_movimientos = '''(SELECT DISTINCT FechaTransaccion FROM WWImportersTransactional.movimientos) AS Temp_Movimientos'''

fechas = obtener_dataframe_de_bd(source_db_connection_string, sql_movimientos, db_user, db_psswd)

### Transformación

Insights Relevantes
- "El formato de fechas manejado es YYYY-MM-DD HH:MM:SS si tienen hora, minutos y segundos. De lo contrario el formato es YYYY-MM-DD": En cuanto a formatos de fechas estamos de acuerdo con que los estandarizemos y el formato sea el especificado en la regla de negocio.

In [52]:
fechas.show(5)

+----------------+
|FechaTransaccion|
+----------------+
|     Jan 20,2014|
|     Jan 28,2014|
|     Feb 01,2014|
|     Mar 25,2014|
|     May 01,2014|
+----------------+
only showing top 5 rows



In [53]:
fechas.printSchema()

root
 |-- FechaTransaccion: string (nullable = true)



Procedemos a crear una nueva columna de tipo Date con nombre 'Fecha' y convertimos el string de FechaTransaccion en date con to_date de PySpark

In [54]:
# Creamos nueva columna a partir del parsing de la FechaTransaccion
fechas = fechas.withColumn("Fecha", to_date(col("FechaTransaccion"), "MMM dd,yyyy"))

#Eliminamos la columna original
fechas = fechas.drop("FechaTransaccion")

# Comprobamos el cambio
fechas.show(5)

+----------+
|     Fecha|
+----------+
|2014-01-20|
|2014-01-28|
|2014-02-01|
|2014-03-25|
|2014-05-01|
+----------+
only showing top 5 rows



In [55]:
fechas.printSchema()

root
 |-- Fecha: date (nullable = true)



Ahora obtendremos y almacenaremos el Dia, Mes, Anio y Numero de semana ISO

In [56]:
#Calculamos dia, mes y anio
fechas = fechas.withColumn("Dia", dayofmonth(col("Fecha")))
fechas = fechas.withColumn("Mes", month(col("Fecha")))
fechas = fechas.withColumn("Anio", year(col("Fecha")))

# Calculamos el numero de semana ISO
fechas = fechas.withColumn("Numero_semana_ISO", weekofyear(col("Fecha")))

# Comprobamos el cambio
fechas.show(5)

+----------+---+---+----+-----------------+
|     Fecha|Dia|Mes|Anio|Numero_semana_ISO|
+----------+---+---+----+-----------------+
|2014-01-20| 20|  1|2014|                4|
|2014-01-28| 28|  1|2014|                5|
|2014-02-01|  1|  2|2014|                5|
|2014-03-25| 25|  3|2014|               13|
|2014-05-01|  1|  5|2014|               18|
+----------+---+---+----+-----------------+
only showing top 5 rows



Ahora solo queda generar la columna ID_Fecha que corresponde a un numero del formato YYYYMMDD y ordenar las columnas.

In [57]:
fechas = fechas.withColumn("ID_Fecha", date_format(col("Fecha"), "yyyyMMdd").cast("int"))
fechas = fechas.select('ID_Fecha','Fecha','Dia','Mes','Anio','Numero_semana_ISO')
fechas = fechas.dropna()
fechas = fechas.orderBy(col("ID_Fecha"))
fechas.show(5)

+--------+----------+---+---+----+-----------------+
|ID_Fecha|     Fecha|Dia|Mes|Anio|Numero_semana_ISO|
+--------+----------+---+---+----+-----------------+
|20131231|2013-12-31| 31| 12|2013|                1|
|20140101|2014-01-01|  1|  1|2014|                1|
|20140102|2014-01-02|  2|  1|2014|                1|
|20140103|2014-01-03|  3|  1|2014|                1|
|20140104|2014-01-04|  4|  1|2014|                1|
+--------+----------+---+---+----+-----------------+
only showing top 5 rows



### Carga

In [58]:
guardar_db(dest_db_connection_string, fechas, 'Fecha', db_user, db_psswd)

## Dimensión TipoTransacción
Su fuente de datos es la tabla transaccional 'TiposTransaccion'.

### Extracción

In [59]:
#EXTRACCION
sql_tipoTransaccion = '''(SELECT DISTINCT TipoTransaccionID AS ID_Tipo_transaccion_T, TipoTransaccionNombre AS Tipo FROM WWImportersTransactional.TiposTransaccion) AS Temp_TiposTransaccion'''

tipoTransaccion = obtener_dataframe_de_bd(source_db_connection_string, sql_tipoTransaccion, db_user, db_psswd)

### Transformación
Insights relavantes
- El negocio indica que las tablas de categoriasProveedores y TiposTransaccion fueron analizadas previamente, por su grupo de consultores.

In [60]:
tipoTransaccion.show()

+---------------------+--------------------+
|ID_Tipo_transaccion_T|                Tipo|
+---------------------+--------------------+
|                    2|Customer Credit Note|
|                    3|Customer Payment ...|
|                    4|     Customer Refund|
|                    5|    Supplier Invoice|
|                    6|Supplier Credit Note|
|                    7|Supplier Payment ...|
|                    8|     Supplier Refund|
|                    9|      Stock Transfer|
|                   10|         Stock Issue|
|                   11|       Stock Receipt|
|                   12|Stock Adjustment ...|
|                   13|     Customer Contra|
+---------------------+--------------------+



Como esta tabla fue revisada previamente, solo quedaria aniadir la llave de la bodega 'ID_Tipo_transaccion_DWH'

In [61]:
tipoTransaccion = tipoTransaccion.orderBy(col("ID_Tipo_transaccion_T"))
tipoTransaccion = tipoTransaccion.coalesce(1).withColumn('ID_Tipo_transaccion_DWH', f.monotonically_increasing_id() + 1)
tipoTransaccion = tipoTransaccion.select('ID_Tipo_transaccion_DWH','ID_Tipo_transaccion_T','Tipo')
tipoTransaccion.show(5)

+-----------------------+---------------------+--------------------+
|ID_Tipo_transaccion_DWH|ID_Tipo_transaccion_T|                Tipo|
+-----------------------+---------------------+--------------------+
|                      1|                    2|Customer Credit Note|
|                      2|                    3|Customer Payment ...|
|                      3|                    4|     Customer Refund|
|                      4|                    5|    Supplier Invoice|
|                      5|                    6|Supplier Credit Note|
+-----------------------+---------------------+--------------------+
only showing top 5 rows



### Carga

In [62]:
guardar_db(dest_db_connection_string, tipoTransaccion, 'TipoTransaccion', db_user, db_psswd)

## Hecho Movimiento
Su fuente de datos es la tabla transaccional 'movimientos'

### Extracción

In [63]:
# EXTRACCION
sql_movimientos = '''(SELECT DISTINCT FechaTransaccion, ProductoID AS ID_Producto_T, ProveedorID AS ID_Proveedor_T, ClienteID AS ID_Cliente_T, TipoTransaccionID AS ID_Tipo_transaccion_T, Cantidad FROM WWImportersTransactional.movimientos) AS Temp_Movimientos'''

movimientos = obtener_dataframe_de_bd(source_db_connection_string, sql_movimientos, db_user, db_psswd)

### Transformación
Insights relevantes:
- Cantidades negativas significan salidas de productos del inventario

In [64]:
movimientos.show(10)

+----------------+-------------+--------------+------------+---------------------+--------+
|FechaTransaccion|ID_Producto_T|ID_Proveedor_T|ID_Cliente_T|ID_Tipo_transaccion_T|Cantidad|
+----------------+-------------+--------------+------------+---------------------+--------+
|     Jan 20,2014|          108|              |       185.0|                   10|   -10.0|
|     Jan 28,2014|          162|              |       176.0|                   10|   -10.0|
|     Jan 28,2014|          216|              |       474.0|                   10|   -10.0|
|     Jan 28,2014|           22|              |       901.0|                   10|   -10.0|
|     Jan 28,2014|           25|              |       926.0|                   10|   -10.0|
|     Feb 01,2014|           14|              |       444.0|                   10|   -10.0|
|     Feb 01,2014|           75|              |       168.0|                   10|   -10.0|
|     Mar 25,2014|           20|              |       802.0|                   1

Primero convertimos la fecha a Date para unirla con la tabla fechas y dejar unicamente el ID_Fecha

In [65]:
# Creamos nueva columna a partir del parsing de la FechaTransaccion
movimientos = movimientos.withColumn("Fecha", to_date(col("FechaTransaccion"), "MMM dd,yyyy"))

#Eliminamos la columna original
movimientos = movimientos.drop("FechaTransaccion")

# Unir ambos DataFrames basado en 'Fecha'
movimientos = movimientos.join(fechas, on="Fecha", how="left")

# Drop a columnas que no necesitamos
movimientos = movimientos.drop("Fecha", "Dia", "Mes", "Anio", "Numero_semana_ISO")

#Verificamos la transformacion
movimientos.show(10)

+-------------+--------------+------------+---------------------+--------+--------+
|ID_Producto_T|ID_Proveedor_T|ID_Cliente_T|ID_Tipo_transaccion_T|Cantidad|ID_Fecha|
+-------------+--------------+------------+---------------------+--------+--------+
|          162|              |       176.0|                   10|   -10.0|20140128|
|          216|              |       474.0|                   10|   -10.0|20140128|
|           22|              |       901.0|                   10|   -10.0|20140128|
|           25|              |       926.0|                   10|   -10.0|20140128|
|          108|              |       185.0|                   10|   -10.0|20140120|
|           20|              |       802.0|                   10|   -10.0|20140325|
|           65|              |       975.0|                   10|   -10.0|20140325|
|          130|              |       487.0|                   10|   -10.0|20140325|
|          171|              |       129.0|                   10|   -10.0|20

Procedemos ahora a convertir ID_Producto_T en ID_Producto_DWH esto con un join con la tabla Producto.

In [66]:
# Unir ambos DataFrames basado en 'ID_Producto_T'
movimientos = movimientos.join(productos, on="ID_Producto_T", how="left")

# Drop a columnas que no necesitamos
movimientos = movimientos.drop("ID_Producto_T", "NombreProducto", "Marca", "Color", "Necesita_refrigeracion", "Dias_tiempo_entrega", "PrecioRecomendado", "Impuesto", "PrecioUnitario")

#Verificamos la transformacion
movimientos.show(10)

+--------------+------------+---------------------+--------+--------+---------------+
|ID_Proveedor_T|ID_Cliente_T|ID_Tipo_transaccion_T|Cantidad|ID_Fecha|ID_Producto_DWH|
+--------------+------------+---------------------+--------+--------+---------------+
|              |       176.0|                   10|   -10.0|20140128|             60|
|              |       474.0|                   10|   -10.0|20140128|            102|
|              |       901.0|                   10|   -10.0|20140128|            168|
|              |       926.0|                   10|   -10.0|20140128|            119|
|              |       185.0|                   10|   -10.0|20140120|            202|
|              |       802.0|                   10|   -10.0|20140325|            167|
|              |       975.0|                   10|   -10.0|20140325|            137|
|              |       487.0|                   10|   -10.0|20140325|            160|
|              |       129.0|                   10|   

Continuamos transformando ID_Proveedor_T en ID_Proveedor_DWH, pero antes verifiquemos la cuenta de vacios.

In [67]:
total_rows = movimientos.count()
empty_entries = movimientos.filter(
    col("ID_Proveedor_T").isNull() | 
    col("ID_Proveedor_T").eqNullSafe("") | 
    isnan(col("ID_Proveedor_T")) | 
    (trim(col("ID_Proveedor_T")) == "")
).count()
percentage_empty = (empty_entries / total_rows) * 100

print(f"Porcentaje de entradas vacias en 'ID_Proveedor_T': {percentage_empty:.2f}%")

Porcentaje de entradas vacias en 'ID_Proveedor_T': 96.47%


In [68]:
non_empty_entries = movimientos.filter(col("ID_Proveedor_T").isNotNull() & (col("ID_Proveedor_T") != ""))
non_empty_entries.show(5)

+--------------+------------+---------------------+--------+--------+---------------+
|ID_Proveedor_T|ID_Cliente_T|ID_Tipo_transaccion_T|Cantidad|ID_Fecha|ID_Producto_DWH|
+--------------+------------+---------------------+--------+--------+---------------+
|           4.0|         0.0|                   11|  4788.0|20140101|            190|
|           4.0|         0.0|                   11|  4884.0|20140107|            190|
|           4.0|         0.0|                   11|  5436.0|20140120|            190|
|           4.0|         0.0|                   11|   108.0|20140120|            190|
|           4.0|         0.0|                   11|  5616.0|20140121|            190|
|           4.0|         0.0|                   11|  5520.0|20140123|            190|
|           4.0|         0.0|                   11|  5592.0|20140124|            190|
|           4.0|         0.0|                   11|  5652.0|20140128|            190|
|           4.0|         0.0|                   11|  5

A pesar de que el porcentaje de vacios es muy alto, procedemos a convertir de ID_Proveedor_T a ID_Proveedor_DWH, no sin antes transformar los ID_Proveedor_T vacios a None

In [72]:
# Transformar vacios a None
movimientos = movimientos.withColumn("ID_Proveedor_T", when(col("ID_Proveedor_T") != "", col("ID_Proveedor_T")).otherwise(None))

# Unir ambos DataFrames basado en 'ID_Proveedor_T'
movimientos = movimientos.join(proveedores, on="ID_Proveedor_T", how="left")

# Drop a columnas que no necesitamos
movimientos = movimientos.drop("Nombre", "Categoria", "Contacto_principal", "Dias_pago", "Codigo_postal", "ID_Proveedor_T")

#Verificamos la transformacion
movimientos.show(10)

+------------+---------------------+--------+--------+---------------+----------------+
|ID_Cliente_T|ID_Tipo_transaccion_T|Cantidad|ID_Fecha|ID_Producto_DWH|ID_Proveedor_DWH|
+------------+---------------------+--------+--------+---------------+----------------+
|       176.0|                   10|   -10.0|20140128|             60|            null|
|       474.0|                   10|   -10.0|20140128|            102|            null|
|       901.0|                   10|   -10.0|20140128|            168|            null|
|       926.0|                   10|   -10.0|20140128|            119|            null|
|       185.0|                   10|   -10.0|20140120|            202|            null|
|       802.0|                   10|   -10.0|20140325|            167|            null|
|       975.0|                   10|   -10.0|20140325|            137|            null|
|       487.0|                   10|   -10.0|20140325|            160|            null|
|       129.0|                  

Continuamos transformando ID_Cliente_T en ID_Cliente_DWH

In [74]:
# Unir ambos DataFrames basado en 'ID_Cliente_T'
movimientos = movimientos.join(clientes, on="ID_Cliente_T", how="left")

# Drop a columnas que no necesitamos
movimientos = movimientos.drop("ID_Cliente_T", "Nombre", "NombreCategoria", "NombreGrupoCompra", "ClienteFactura", "ID_CiudadEntrega", "LimiteCredito", "FechaAperturaCuenta", "DiasPago")

#Verificamos la transformacion
movimientos.show(10)

+---------------------+--------+--------+---------------+----------------+--------------+
|ID_Tipo_transaccion_T|Cantidad|ID_Fecha|ID_Producto_DWH|ID_Proveedor_DWH|ID_Cliente_DWH|
+---------------------+--------+--------+---------------+----------------+--------------+
|                   10|   -10.0|20140128|             60|            null|           437|
|                   10|   -10.0|20140128|            102|            null|           536|
|                   10|   -10.0|20140128|            168|            null|           101|
|                   10|   -10.0|20140128|            119|            null|           126|
|                   10|   -10.0|20140120|            202|            null|           446|
|                   10|   -10.0|20140325|            167|            null|             2|
|                   10|   -10.0|20140325|            137|            null|           175|
|                   10|   -10.0|20140325|            160|            null|           549|
|         

Finalmente transformamos ID_Tipo_transaccion_T en ID_Tipo_transaccion_DWH

In [76]:
# Unir ambos DataFrames basado en 'ID_Tipo_transaccion_T'
movimientos = movimientos.join(tipoTransaccion, on="ID_Tipo_transaccion_T", how="left")

# Drop a columnas que no necesitamos
movimientos = movimientos.drop("ID_Tipo_transaccion_T", "Tipo")

#Verificamos la transformacion
movimientos.show(10)

+--------+--------+---------------+----------------+--------------+-----------------------+
|Cantidad|ID_Fecha|ID_Producto_DWH|ID_Proveedor_DWH|ID_Cliente_DWH|ID_Tipo_transaccion_DWH|
+--------+--------+---------------+----------------+--------------+-----------------------+
|   -10.0|20140128|             60|            null|           437|                      9|
|   -10.0|20140128|            102|            null|           536|                      9|
|   -10.0|20140128|            168|            null|           101|                      9|
|   -10.0|20140128|            119|            null|           126|                      9|
|   -10.0|20140120|            202|            null|           446|                      9|
|   -10.0|20140325|            167|            null|             2|                      9|
|   -10.0|20140325|            137|            null|           175|                      9|
|   -10.0|20140325|            160|            null|           549|             

Finalmente, solo falta reorganizar las columnas

In [79]:
movimientos = movimientos.select('ID_Fecha','ID_Producto_DWH','ID_Proveedor_DWH','ID_Cliente_DWH','ID_Tipo_transaccion_DWH', 'Cantidad')
movimientos.show(5)

+--------+---------------+----------------+--------------+-----------------------+--------+
|ID_Fecha|ID_Producto_DWH|ID_Proveedor_DWH|ID_Cliente_DWH|ID_Tipo_transaccion_DWH|Cantidad|
+--------+---------------+----------------+--------------+-----------------------+--------+
|20140128|             60|            null|           437|                      9|   -10.0|
|20140120|            202|            null|           446|                      9|   -10.0|
|20140201|             26|            null|           506|                      9|   -10.0|
|20140128|            102|            null|           536|                      9|   -10.0|
|20140128|            168|            null|           101|                      9|   -10.0|
+--------+---------------+----------------+--------------+-----------------------+--------+
only showing top 5 rows



### Carga

In [80]:
guardar_db(dest_db_connection_string, movimientos, 'Hecho_Movimiento', db_user, db_psswd)

Con esto hemos terminado el proceso ETL para las dimensiones Proveedor, Fecha y TipoTransaccion, ademas de la tabla de hechos: Hecho_Movimiento

## 4. Cierre
Completado este tutorial, ya sabe cómo realizar ETL básicos en PySpark.


## 5. Información adicional

Si quiere conocer más sobre PySpark la guía más detallada es la documentación oficial, la cual puede encontrar acá: https://spark.apache.org/docs/latest/api/python/index.html <br>
Para ir directamente a la documentación de PySpark SQL, donde está la información sobre los DataFrames, haga clic en este enlace: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html <br>

El Capítulo 2 del libro <i>Learn PySpark : Build Python-based Machine Learning and Deep Learning Models, New York: Apress. 2019</i> de Pramod Singh contiene muchos ejemplos útiles, puede encontrarlo en la biblioteca virtual de la universidad.

## 6. Preguntas frecuentes

- Si al intentar escribir un <i>dataframe</i> obtiene un error en el formato: 
    ```
    path file:<PATH>/dw/<PATH> already exists.;
    ```
    Borre la carpeta indicada en el error y vuelva a intentar.

- Si al ejecutar su código obtiene el error: 
    ```
    ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=tutorial ETL PySpark, master=local) created by __init__ at <ipython-input-4-64455da959dd>:92 

    ```
    reinicie el kernel del notebook y vuelva a intentar.