### 1. Instalación e inicialización de Spark

In [None]:
# Actualización de los repositorios de UBUNTU
!sudo apt-get update

In [None]:
# Instalación de JAVA
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
# Descarga de SPARK
!wget -q https://downloads.apache.org/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz

In [None]:
# Des-zipeado del instalador
!tar xf spark-2.4.8-bin-hadoop2.7.tgz

In [None]:
# Instalación de Spark en Python
!pip install -q findspark

In [None]:
# Configuración de variables de entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.8-bin-hadoop2.7"

In [None]:
# Inicialización de Spark
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()


In [None]:
# Verificación de la sesión de Spark
spark

### 2. Creación de estructura de directorios para la capa UNIVERSAL

In [None]:
# Librería para manipulación del sistema de archivos
import os

In [None]:
# Importamos los objetos "StructType" y el "StructField"
# Estos objetos nos ayudaran a definir la metadata
from pyspark.sql.types import StructType, StructField

# Libreria para definir tipos de datos
from pyspark.sql.types import IntegerType, DoubleType, StringType

# Importamos la libreria de funciones clásicas
import pyspark.sql.functions as f

In [None]:
# Vamos a modelar la tabla
#
# - ENTIDAD TRANSACCIONES_BANCARIAS

In [None]:
# Creamos el directorio que va almacenar los registros correctos (cumplen reglas de calidad) provenientes de los datos de TRANSACCIONES_BANCARIAS
os.mkdir('/content/drive/MyDrive/DATALAKE/UNIVERSAL/TRANSACCIONES_BANCARIAS')

In [None]:
# Creamos el directorio que va almacenar los registros incorrectos (no cumplen reglas de calidad) provenientes de los datos de TRANSACCIONES_BANCARIAS
os.mkdir('/content/drive/MyDrive/DATALAKE/UNIVERSAL/TRANSACCIONES_BANCARIAS_REJ')

### 3. Lectura, selección, casteo, limpieza, rejectados [TRANSACCIONES_BANCARIAS]

In [None]:
# [LECTURA]

In [None]:
# Leemos la entidad "TRANSACCIONES_BANCARIAS" casteando a los tipos de datos correctos
dfTransaccionesBancariasLanding = spark.read.format("parquet").load("/content/drive/MyDrive/DATALAKE/LANDING/VISA/TRANSACCIONES_BANCARIAS/2018-04-19")

# Mostramos los datos
dfTransaccionesBancariasLanding.show(truncate=False)

+--------------+--------------------------+--------------------+
|EMPRESA       |PERSONA                   |TRANSACCION         |
+--------------+--------------------------+--------------------+
|{6, Google}   |{24, 24, Amaya, 1801.0}   |{2018-12-05, 1745.0}|
|{10, Sony}    |{32, 1, Carl, 20095.0}    |{2018-04-19, 238.0} |
|{2, Microsoft}|{34, 65, Nehru, 12423.0}  |{2018-04-19, 4097.0}|
|{5, Amazon}   |{23, 71, Doris, 11538.0}  |{2018-12-05, 1548.0}|
|{5, Amazon}   |{45, 83, Giselle, 2503.0} |{2018-04-19, 2233.0}|
|{4, Toyota}   |{42, 96, Amos, 15855.0}   |{2018-04-19, 2887.0}|
|{9, IBM}      |{70, 19, Laura, 17403.0}  |{2018-04-19, 286.0} |
|{4, Toyota}   |{67, 40, Ross, 14285.0}   |{2018-04-19, 974.0} |
|{8, HP}       |{57, 100, Cynthia, 8682.0}|{2018-04-19, 2698.0}|
|{2, Microsoft}|{22, 22, Kibo, 7449.0}    |{2018-12-05, 1398.0}|
|{7, Samsung}  |{23, 8, Jonah, 17040.0}   |{2018-12-05, 2538.0}|
|{4, Toyota}   |{42, 73, Fiona, 9960.0}   |{2018-11-28, 3878.0}|
|{5, Amazon}   |{34, 76, 

In [None]:
# Mostramos el esquema de los datos
dfTransaccionesBancariasLanding.printSchema()

root
 |-- EMPRESA: struct (nullable = true)
 |    |-- ID_EMPRESA: string (nullable = true)
 |    |-- NOMBRE_EMPRESA: string (nullable = true)
 |-- PERSONA: struct (nullable = true)
 |    |-- EDAD: long (nullable = true)
 |    |-- ID_PERSONA: string (nullable = true)
 |    |-- NOMBRE_PERSONA: string (nullable = true)
 |    |-- SALARIO: double (nullable = true)
 |-- TRANSACCION: struct (nullable = true)
 |    |-- FECHA: string (nullable = true)
 |    |-- MONTO: double (nullable = true)



In [None]:
# [SELECCIÓN]

In [None]:
# Se necesita la siguiente estructura de campos
# PERSONA.ID_PERSONA
# EMPRESA.ID_EMPRESA
# MONTO
# FECHA

In [None]:
dfTransaccionesBancarias1 = dfTransaccionesBancariasLanding.select(
    dfTransaccionesBancariasLanding["PERSONA.ID_PERSONA"].alias("ID_CLIENTE"),
    dfTransaccionesBancariasLanding["EMPRESA.ID_EMPRESA"].alias("ID_EMPRESA"),
    dfTransaccionesBancariasLanding["TRANSACCION.FECHA"].alias("FECHA_TRANSACCION"),
    dfTransaccionesBancariasLanding["TRANSACCION.MONTO"].alias("MONTO_TRANSACCION")    
)

dfTransaccionesBancarias1.show(truncate=False)

+----------+----------+-----------------+-----------------+
|ID_CLIENTE|ID_EMPRESA|FECHA_TRANSACCION|MONTO_TRANSACCION|
+----------+----------+-----------------+-----------------+
|24        |6         |2018-12-05       |1745.0           |
|1         |10        |2018-04-19       |238.0            |
|65        |2         |2018-04-19       |4097.0           |
|71        |5         |2018-12-05       |1548.0           |
|83        |5         |2018-04-19       |2233.0           |
|96        |4         |2018-04-19       |2887.0           |
|19        |9         |2018-04-19       |286.0            |
|40        |4         |2018-04-19       |974.0            |
|100       |8         |2018-04-19       |2698.0           |
|22        |2         |2018-12-05       |1398.0           |
|8         |7         |2018-12-05       |2538.0           |
|73        |4         |2018-11-28       |3878.0           |
|76        |5         |2018-11-28       |375.0            |
|63        |5         |2018-11-28       

In [None]:
# Mostramos el esquema de los datos
dfTransaccionesBancarias1.printSchema()

root
 |-- ID_CLIENTE: string (nullable = true)
 |-- ID_EMPRESA: string (nullable = true)
 |-- FECHA_TRANSACCION: string (nullable = true)
 |-- MONTO_TRANSACCION: double (nullable = true)



In [None]:
# [CASTEAR]

In [None]:
# Casteamos los datos de los campos seleccionados
dfTransaccionesBancarias2 = dfTransaccionesBancarias1.\
                            withColumn("ID_CLIENTE", dfTransaccionesBancarias1["ID_CLIENTE"].cast(StringType())).\
                            withColumn("ID_EMPRESA", dfTransaccionesBancarias1["ID_EMPRESA"].cast(StringType())).\
                            withColumn("FECHA_TRANSACCION", dfTransaccionesBancarias1["FECHA_TRANSACCION"].cast(StringType())).\
                            withColumn("MONTO_TRANSACCION", dfTransaccionesBancarias1["MONTO_TRANSACCION"].cast(DoubleType()))

# Mostramos los datos
dfTransaccionesBancarias2.show(truncate=False)

+----------+----------+-----------------+-----------------+
|ID_CLIENTE|ID_EMPRESA|FECHA_TRANSACCION|MONTO_TRANSACCION|
+----------+----------+-----------------+-----------------+
|24        |6         |2018-12-05       |1745.0           |
|1         |10        |2018-04-19       |238.0            |
|65        |2         |2018-04-19       |4097.0           |
|71        |5         |2018-12-05       |1548.0           |
|83        |5         |2018-04-19       |2233.0           |
|96        |4         |2018-04-19       |2887.0           |
|19        |9         |2018-04-19       |286.0            |
|40        |4         |2018-04-19       |974.0            |
|100       |8         |2018-04-19       |2698.0           |
|22        |2         |2018-12-05       |1398.0           |
|8         |7         |2018-12-05       |2538.0           |
|73        |4         |2018-11-28       |3878.0           |
|76        |5         |2018-11-28       |375.0            |
|63        |5         |2018-11-28       

In [None]:
# Mostramos el esquema de los datos
dfTransaccionesBancarias2.printSchema()

root
 |-- ID_CLIENTE: string (nullable = true)
 |-- ID_EMPRESA: string (nullable = true)
 |-- FECHA_TRANSACCION: string (nullable = true)
 |-- MONTO_TRANSACCION: double (nullable = true)



In [None]:
# [REGLAS DE CALIDAD (LIMPIEZA)]

In [None]:
# Ahora viene la aplicación de las reglas de calidad, esto por supuesto ya es muy relativo va a depender de qué es lo que entienda como 
# reglas de calidad una empresa. 
# Datos de TRANSACCIONES_BANCARIAS que cumplen las reglas de calidad
dfTransaccionesBancariasUniversal = dfTransaccionesBancarias2.filter(
    (dfTransaccionesBancarias2["ID_CLIENTE"].isNotNull()) &
    (dfTransaccionesBancarias2["ID_EMPRESA"].isNotNull()) &
    (dfTransaccionesBancarias2["FECHA_TRANSACCION"].isNotNull()) &
    (dfTransaccionesBancarias2["MONTO_TRANSACCION"].isNotNull()) &  
    (dfTransaccionesBancarias2["MONTO_TRANSACCION"] >= 0)
)

# Mostramos los datos
dfTransaccionesBancariasUniversal.show(truncate=False)

+----------+----------+-----------------+-----------------+
|ID_CLIENTE|ID_EMPRESA|FECHA_TRANSACCION|MONTO_TRANSACCION|
+----------+----------+-----------------+-----------------+
|24        |6         |2018-12-05       |1745.0           |
|1         |10        |2018-04-19       |238.0            |
|65        |2         |2018-04-19       |4097.0           |
|71        |5         |2018-12-05       |1548.0           |
|83        |5         |2018-04-19       |2233.0           |
|96        |4         |2018-04-19       |2887.0           |
|19        |9         |2018-04-19       |286.0            |
|40        |4         |2018-04-19       |974.0            |
|100       |8         |2018-04-19       |2698.0           |
|22        |2         |2018-12-05       |1398.0           |
|8         |7         |2018-12-05       |2538.0           |
|73        |4         |2018-11-28       |3878.0           |
|76        |5         |2018-11-28       |375.0            |
|63        |5         |2018-11-28       

In [None]:
# Datos de CLIENTE que NO cumplen las reglas de calidad
# Estos datos luego debiesen ser casteados al tipo STRING y posterior a eso escribirlos en el directorio.
dfTransaccionesBancariasRejectados = dfTransaccionesBancarias2.filter(
    ~(
        (dfTransaccionesBancarias2["ID_CLIENTE"].isNotNull()) &
        (dfTransaccionesBancarias2["ID_EMPRESA"].isNotNull()) &
        (dfTransaccionesBancarias2["FECHA_TRANSACCION"].isNotNull()) &
        (dfTransaccionesBancarias2["MONTO_TRANSACCION"].isNotNull()) &  
        (dfTransaccionesBancarias2["MONTO_TRANSACCION"] >= 0)   
     )                   
)

# Mostramos los datos
dfTransaccionesBancariasRejectados.show(truncate=False)

+----------+----------+-----------------+-----------------+
|ID_CLIENTE|ID_EMPRESA|FECHA_TRANSACCION|MONTO_TRANSACCION|
+----------+----------+-----------------+-----------------+
+----------+----------+-----------------+-----------------+



### 4. Escribimos los datos modelados

In [None]:
# Realizamos la escritura de los datos de "TRANSACCIONES_BANCARIAS" modelados y lo guardamos en el directorio TRANSACCIONES_BANCARIAS en la capa UNIVERSAL,
# más especificamente, en el subdirectorio de partición para la fecha '2018-04-19' 
dfTransaccionesBancariasUniversal.write.format("parquet").mode("overwrite").save("/content/drive/MyDrive/DATALAKE/UNIVERSAL/TRANSACCIONES_BANCARIAS/2018-04-19")

In [None]:
# Realizamos la escritura de los datos de "TRANSACCIONES_BANCARIAS_REJ" modelados y lo guardamos en el directorio TRANSACCIONES_BANCARIAS_REJ en la capa UNIVERSAL,
# más especificamente, en el subdirectorio de partición para la fecha '2018-04-19' 
dfTransaccionesBancariasRejectados.write.format("parquet").mode("overwrite").save("/content/drive/MyDrive/DATALAKE/UNIVERSAL/TRANSACCIONES_BANCARIAS_REJ/2018-04-19")