# Diseño de ETL

<img src="DiseñoProceso.png">

# Creación de ETLs con PySpark

## Configuración

### Importe de Librerias

In [1]:
from pyspark.sql import functions as f, types as t
from pyspark.sql.functions import lit
from pyspark import SparkContext, SparkConf, SQLContext
from datetime import datetime

### Conexión

In [2]:
db_user = 'Estudiante_4_202214'
db_psswd = 'NT8ZK03Q4H'
source_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/WWImportersTransactional'
dest_db_connection_string = 'jdbc:mysql://157.253.236.116:8080/Estudiante_4_202214'
path_jar_driver = '../mysql-connector-java-8.0.28.jar'

In [3]:
conf=SparkConf() \
    .set('spark.driver.extraClassPath', path_jar_driver)

spark_context = SparkContext(conf=conf)
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/09/08 13:49:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Carga de datos

In [4]:
def obterner_dataframe_desde_csv(path, _sep):
    return spark.read.load(path, format="csv", sep=_sep, inferSchema="true", header='true')

def obtener_dataframe_de_bd(db_connection_string, sql, db_user, db_psswd):
    df_bd = spark.read.format('jdbc')\
        .option('url', db_connection_string) \
        .option('dbtable', sql) \
        .option('user', db_user) \
        .option('password', db_psswd) \
        .option('driver', 'com.mysql.cj.jdbc.Driver') \
        .load()
    return df_bd

def guardar_db(db_connection_string, df, tabla, db_user, db_psswd):
    df.select('*').write.format('jdbc') \
      .mode('append') \
      .option('url', db_connection_string) \
      .option('dbtable', tabla) \
      .option('user', db_user) \
      .option('password', db_psswd) \
      .option('driver', 'com.mysql.cj.jdbc.Driver') \
      .save()

## Dimensión Proveedor

Empezamos con la dimensión *Proveedor*. Sus fuentes de datos son las tablas:
- transaccionales
- proveedores
- CategoriasProveedores
- Personas

### Extracción

In [5]:
sql_proveedores = '''(SELECT ProveedorID, NombreProveedor, CategoriaProveedorID, PersonaContactoPrincipalID as ID_persona, DiasPago, CodigoPostal FROM WWImportersTransactional.proveedores) AS Temp_proveedores'''
sql_categorias = '''(SELECT CategoriaProveedorID, CategoriaProveedor FROM WWImportersTransactional.CategoriasProveedores) AS Temp_categorias'''
sql_personas = '''(SELECT ID_persona, NombreCompleto FROM WWImportersTransactional.Personas) AS Temp_personas'''

In [6]:
proveedores = obtener_dataframe_de_bd(source_db_connection_string, sql_proveedores, db_user, db_psswd)
categorias = obtener_dataframe_de_bd(source_db_connection_string, sql_categorias, db_user, db_psswd)
personas = obtener_dataframe_de_bd(source_db_connection_string, sql_personas, db_user, db_psswd)

In [7]:
print(f"cols provedores {proveedores.columns}\ncols categorias {categorias.columns}\ncols personas {personas.columns}")

cols provedores ['ProveedorID', 'NombreProveedor', 'CategoriaProveedorID', 'ID_persona', 'DiasPago', 'CodigoPostal']
cols categorias ['CategoriaProveedorID', 'CategoriaProveedor']
cols personas ['ID_persona', 'NombreCompleto']



### Transformacion


In [8]:
proveedores = proveedores.join(categorias, how = 'inner', on = 'CategoriaProveedorID')
proveedores = proveedores.join(personas, how = 'inner', on = 'ID_persona')
proveedores = proveedores.selectExpr('ProveedorID as ID_Proveedor_T','NombreProveedor as Nombre', 'CategoriaProveedor as Categoria', 'NombreCompleto as Contacto_principal', 'DiasPago', 'CodigoPostal as Codigo_postal')
proveedores = proveedores.withColumn('ID_Proveedor_DWH', f.monotonically_increasing_id() + 1)
updateDiasPago = (f.when(f.col('DiasPago') < 0, f.col('DiasPago') * -1).otherwise(f.col('DiasPago')))
proveedores = proveedores.withColumn('DiasPago', updateDiasPago)
proveedores.show(5)


                                                                                

+--------------+--------------------+-------------------+------------------+--------+-------------+----------------+
|ID_Proveedor_T|              Nombre|          Categoria|Contacto_principal|DiasPago|Codigo_postal|ID_Proveedor_DWH|
+--------------+--------------------+-------------------+------------------+--------+-------------+----------------+
|             5|Graphic Design In...|productos novedosos|        Penny Buck|      14|        64847|               1|
|             9|      Nod Publishers|productos novedosos|      Marcos Costa|       7|        27906|               2|
|            12|   The Phone Company|productos novedosos|           Hai Dam|      30|        56732|               3|
|             2|       Contoso, Ltd.|productos novedosos|   Hanna Mihhailov|       7|        98253|               4|
|             8|  Lucerne Publishing|productos novedosos|       Prem Prabhu|      30|        37659|               5|
+--------------+--------------------+-------------------+-------

### Carga

In [9]:
guardar_db(dest_db_connection_string, proveedores,'Estudiante_4_202214.Proveedor', db_user, db_psswd)

                                                                                

## Dimensión TipoTransaccion

Empezamos con la dimensión *TipoTransaccion*. Sus fuentes de datos son las tabl:
- TiposTransaccion

### Extracción

In [10]:
sql_tipo_transaccion = '''(SELECT TipoTransaccionID as ID_Tipo_transaccion_T, TipoTransaccionNombre as Tipo FROM WWImportersTransactional.TiposTransaccion) AS Temp_tipo_transaccion'''

In [11]:
tipo_transaccion = obtener_dataframe_de_bd(source_db_connection_string, sql_tipo_transaccion, db_user, db_psswd)

In [12]:
print(f"cols tipo_transaccion {tipo_transaccion.columns}")

cols tipo_transaccion ['ID_Tipo_transaccion_T', 'Tipo']


### Transformación

In [13]:
tipo_transaccion = tipo_transaccion.withColumn('ID_Tipo_transaccion_DWH', f.monotonically_increasing_id() + 1)
tipo_transaccion.show(5)

[Stage 16:>                                                         (0 + 1) / 1]

+---------------------+--------------------+-----------------------+
|ID_Tipo_transaccion_T|                Tipo|ID_Tipo_transaccion_DWH|
+---------------------+--------------------+-----------------------+
|                    2|Customer Credit Note|                      1|
|                    3|Customer Payment ...|                      2|
|                    4|     Customer Refund|                      3|
|                    5|    Supplier Invoice|                      4|
|                    6|Supplier Credit Note|                      5|
+---------------------+--------------------+-----------------------+
only showing top 5 rows



                                                                                

### Carga

In [14]:
guardar_db(dest_db_connection_string, tipo_transaccion,'Estudiante_4_202214.TipoTransaccion', db_user, db_psswd)

                                                                                

## Hecho Movimiento

Hecho movimiento. Sus fuentes de datos son la tabla:
- Movimientos

### Extracción

In [15]:
sql_movimientos = '''(SELECT * FROM WWImportersTransactional.movimientos) AS Temp_movimientos'''

In [16]:
movimientos = obtener_dataframe_de_bd(source_db_connection_string, sql_movimientos, db_user, db_psswd)

Se eliminan duplicados totales de movimientos

In [17]:
print(f"movimientos {movimientos.count()}\nmovimientos distintos {movimientos.distinct().count()}")

[Stage 23:>                                                         (0 + 6) / 6]

movimientos 267300
movimientos distintos 236667


                                                                                

In [18]:
movimientos = movimientos.drop_duplicates()

In [19]:
print(f"movimientos {movimientos.count()}\nmovimientos distintos {movimientos.distinct().count()}")

[Stage 33:>                                                         (0 + 1) / 1]

movimientos 236667
movimientos distintos 236667


                                                                                

In [20]:
movimientos1 = movimientos

### Transformacion

En el siguiente código para el manejo de fechas, pasamos del formato fecha al formato establecido en la regla de negocio

In [21]:
regex = "([0-2]\d{3}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1]))"
cumplenFormato = movimientos.filter(movimientos["FechaTransaccion"].rlike(regex))
noCumplenFormato = movimientos.filter(~movimientos["FechaTransaccion"].rlike(regex))
print(noCumplenFormato.count(), cumplenFormato.count())
print(noCumplenFormato.show(5))
noCumplenFormato = noCumplenFormato.withColumn('FechaTransaccion', f.udf(lambda d: datetime.strptime(d, '%b %d,%Y').strftime('%Y-%m-%d'), t.StringType())(f.col('FechaTransaccion')))
movimientos = noCumplenFormato.union(cumplenFormato)
noCumplenFormato.count(), movimientos.count()
movimientos1 = movimientos

                                                                                

64254 172413


                                                                                

+---------------------+----------+-----------------+---------+---------+-----------+---------------+----------------+--------+
|TransaccionProductoID|ProductoID|TipoTransaccionID|ClienteID|InvoiceID|ProveedorID|OrdenDeCompraID|FechaTransaccion|Cantidad|
+---------------------+----------+-----------------+---------+---------+-----------+---------------+----------------+--------+
|               191841|       112|               10|    153.0|  40177.0|           |               |     Jan 21,2015|   -10.0|
|               243245|        58|               10|    514.0|  50949.0|           |               |     Jul 14,2015|   -10.0|
|               197126|        62|               10|     72.0|  41283.0|           |               |     Feb 10,2015|   -10.0|
|               325175|       173|               10|    599.0|  68194.0|           |               |     Apr 26,2016|   -10.0|
|               335577|       102|               10|    993.0|  70368.0|           |               |     May 30

                                                                                

Se sacan los campos que se necesitan de movimientos, se verifica que no haya duplicados y si los hay se eliminan.
El ID_Fecha no será tomado en cuenta, como se mencionó por slack. Se quedará con el campo fecha en la tabla de hecho

In [22]:
movimientos = movimientos1
movimientos = movimientos.selectExpr('ProductoID as ID_Producto_T','TipoTransaccionID as ID_Tipo_transaccion_T','ClienteID as ID_Cliente_T','ProveedorID as ID_Proveedor_T','Cantidad','FechaTransaccion as Fecha')

print((movimientos.count(),movimientos.distinct().count()))
movimientos = movimientos.drop_duplicates()
print((movimientos.count(),movimientos.distinct().count()))
movimientos1 = movimientos

                                                                                

(236667, 236656)




(236656, 236656)


                                                                                

In [23]:
movimientos = movimientos1

sql_proveedor = '''(SELECT ID_Proveedor_T, ID_Proveedor_DWH FROM Estudiante_38_202214.Proveedor) AS Temp_proveedor'''
proveedor = obtener_dataframe_de_bd(dest_db_connection_string, sql_proveedor, db_user, db_psswd)

sql_producto = '''(SELECT ID_Producto_T, ID_Producto_DWH FROM Estudiante_38_202214.Producto) AS Temp_producto'''
producto = obtener_dataframe_de_bd(dest_db_connection_string, sql_producto, db_user, db_psswd)

sql_cliente = '''(SELECT ID_Cliente_T, ID_Cliente_DWH FROM Estudiante_38_202214.Cliente) AS Temp_cliente'''
cliente = obtener_dataframe_de_bd(dest_db_connection_string, sql_cliente, db_user, db_psswd)

sql_tipoTransaccion = '''(SELECT ID_Tipo_transaccion_T, ID_Tipo_transaccion_DWH FROM Estudiante_38_202214.TipoTransaccion) AS Temp_tipoTransaccion'''
tipoTransaccion = obtener_dataframe_de_bd(dest_db_connection_string, sql_tipoTransaccion, db_user, db_psswd)

movimientos.show(5)


movimientos_cliente = movimientos.join(cliente, how = 'inner', on = 'ID_Cliente_T')
movimientos_cliente = movimientos_cliente.withColumn('ID_Proveedor_DWH', lit(""))
movimientos_cliente = movimientos_cliente.selectExpr('ID_Cliente_T','ID_Producto_T','ID_Tipo_transaccion_T','ID_Proveedor_T','Cantidad','Fecha','ID_Proveedor_T','ID_Proveedor_DWH','ID_Cliente_DWH')
movimientos_cliente.show(5)


movimientos_proveedor = movimientos.join(proveedor, how = 'inner', on = 'ID_Proveedor_T')
movimientos_proveedor = movimientos_proveedor.withColumn('ID_Cliente_DWH', lit(""))
movimientos_proveedor = movimientos_proveedor.selectExpr('ID_Cliente_T','ID_Producto_T','ID_Tipo_transaccion_T','ID_Proveedor_T','Cantidad','Fecha','ID_Proveedor_T','ID_Proveedor_DWH','ID_Cliente_DWH')
movimientos_proveedor.show(5)

movimientos = movimientos_cliente.union(movimientos_proveedor)
movimientos = movimientos.join(producto, how = 'inner', on = 'ID_Producto_T')
movimientos = movimientos.join(tipoTransaccion, how = 'inner', on = 'ID_Tipo_transaccion_T')
movimientos = movimientos.selectExpr('ID_Producto_DWH','ID_Proveedor_DWH','ID_Cliente_DWH','ID_Tipo_transaccion_DWH','Cantidad','Fecha')
movimientos.show(5)


Py4JJavaError: An error occurred while calling o151.load.
: java.sql.SQLSyntaxErrorException: SELECT command denied to user 'Estudiante_4_202214'@'190.248.168.196' for table 'Proveedor'
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:1009)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.getQueryOutputSchema(JDBCRDD.scala:67)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:57)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:239)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:36)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:350)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:274)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:245)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:245)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)


### Carga

In [None]:
# CARGUE
inferior = 0
superior = 999
j=0
total = movimientos.count()/1000
print(total)
collected = movimientos.collect()
while j<total:
    if j%50==0:
        print(j)
    j += 1
    aux = spark.createDataFrame(collected[inferior:superior],movimientos.columns)
    guardar_db(dest_db_connection_string, aux,'Estudiante_38_202214.Hecho_Movimiento', db_user, db_psswd)
    inferior+=1000
    superior+=1000