# POBLANDO DATALAKE - INTRODUCCIÓN APACHE SPARK

## POBLANDO CAPA LANDING
**1° PASO** Importamos módulos de apache spark

In [84]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

**2° PASO** Creamos las session de apache spark en una variable

In [3]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

**3° PASO** Verificamos la versión de apache spark

In [4]:
spark

### CAPA LANDING - PERSONAS

**4° PASO** Crear un dataframe

* Crear la estructura del dataframe
* Declarar en una variable la ruta del archivo
* Leer el archivo de origen
* Mostrar la estructura del dataframe
* mostrar los datos del dataframe
* Cantidad de registros del dataframe
* Mostrar las estadísticas básicas de un campo determinado

In [62]:
# 4.1 Estructura del dataframe.
df_schema = StructType([
StructField("ID", StringType(),True),
StructField("NOMBRE", StringType(),True),
StructField("TELEFONO", StringType(),True),
StructField("CORREO", StringType(),True),
StructField("FECHA_INGRESO", StringType(),True),
StructField("EDAD", IntegerType(),True),
StructField("SALARIO", DoubleType(),True),
StructField("ID_EMPRESA", StringType(),True),
])



In [63]:
# 4.2 Definimos ruta del archivo

ruta_archivo_google_cloud = "gs://introduccion-apache-spark/datalake/workload/personas/persona.data"

ruta_archivo_databricks = "/FileStore/tables/persona.data"

ruta_archivo_hdfs = "hdfs:/introduccion-apache-spark/datalake/workload/personas/persona.data"


In [64]:
#4.3 Definimos ruta del archivo
df_personas = spark.read.format("CSV").option("header","true").option("delimiter","|").schema(df_schema).load(ruta_archivo_google_cloud)

In [65]:
#4.4 Mostramos la estructura del dataframe.
df_personas.printSchema()

root
 |-- ID: string (nullable = true)
 |-- NOMBRE: string (nullable = true)
 |-- TELEFONO: string (nullable = true)
 |-- CORREO: string (nullable = true)
 |-- FECHA_INGRESO: string (nullable = true)
 |-- EDAD: integer (nullable = true)
 |-- SALARIO: double (nullable = true)
 |-- ID_EMPRESA: string (nullable = true)



In [66]:
# 4.5 Mostraremos todos los datos del dataframe.
df_personas.show(5)

+---+---------+--------------+--------------------+-------------+----+-------+----------+
| ID|   NOMBRE|      TELEFONO|              CORREO|FECHA_INGRESO|EDAD|SALARIO|ID_EMPRESA|
+---+---------+--------------+--------------------+-------------+----+-------+----------+
|  1|     Carl|1-745-633-9145|arcu.Sed.et@ante....|   2004-04-23|  32|20095.0|         5|
|  2|Priscilla|      155-2498|Donec.egestas.Ali...|   2019-02-17|  34| 9298.0|         2|
|  3|  Jocelyn|1-204-956-8594|amet.diam@loborti...|   2002-08-01|  27|10853.0|         3|
|  4|    Aidan|1-719-862-9385|euismod.et.commod...|   2018-11-06|  29| 3387.0|        10|
|  5|  Leandra|      839-8044|at@pretiumetrutru...|   2002-10-10|  41|22102.0|         1|
+---+---------+--------------+--------------------+-------------+----+-------+----------+
only showing top 5 rows



In [67]:
# 4.6 Mostraremos todos los datos del dataframe.
num_rows = df_personas.count()

print("La cantidad de registro del dataframe es: ", num_rows)

La cantidad de registro del dataframe es:  100


In [68]:
# 4.7 Estadísticas de un campo determinado.
df_personas.describe('salario').show()

+-------+-----------------+
|summary|          salario|
+-------+-----------------+
|  count|              100|
|   mean|         11684.55|
| stddev|6841.493958437246|
|    min|           1256.0|
|    max|          24575.0|
+-------+-----------------+



**5° PASO** Guardar el dataframe en un ruta de la capa landing

In [35]:
ruta_destino_google_cloud = "gs://introduccion-apache-spark/datalake/landing/personas/"

ruta_destino_databricks = "/FileStore/tables/landing/personas/"

ruta_destino_hdfs = "hdfs:/introduccion-apache-spark/datalake/landing/personas/"

In [69]:
df_personas.write.mode("overwrite").format("parquet").save(ruta_destino_google_cloud)

### CAPA LANDING -  EMPRESAS 
**6° PASO** Realizar la ingesta de Empresas en la capa landing
* Crear estructura del dataframe.
* Definir la ruta del archivo.
* Crear la estructura del dataframe
* Declarar en una variable la ruta del archivo
* Leer el archivo de origen
* Mostrar la información del dataframe
* Guardar el dataframe en un ruta de la capa landing

In [70]:
# 6.1 Estructura del dataframe.
df_schema_empresas = StructType([
StructField("ID", StringType(),True),
StructField("EMPRESA_NAME", StringType(),True)
])

In [71]:
# 6.2 Definimos ruta del archivo

ruta_archivo_google_cloud = "gs://introduccion-apache-spark/datalake/workload/empresas/empresa.data"

ruta_archivo_databricks = "/FileStore/tables/empresa.data"

ruta_archivo_hdfs = "hdfs:/introduccion-apache-spark/datalake/workload/empresas/empresa.data"

In [72]:
# 6.3 Creamos el dataframe 
df_empresas = spark.read.format("CSV").option("header","true").option("delimiter","|").schema(df_schema_empresas).load(ruta_archivo_google_cloud)

In [73]:
# 6.4 Mostramos registros del dataframe
df_empresas.show(10)

+---+------------+
| ID|EMPRESA_NAME|
+---+------------+
|  1|     Walmart|
|  2|   Microsoft|
|  3|       Apple|
|  4|      Toyota|
|  5|      Amazon|
|  6|      Google|
|  7|     Samsung|
|  8|          HP|
|  9|         IBM|
| 10|        Sony|
+---+------------+



In [74]:
# 6.5 Definimos la ruta de almacenamiento

ruta_destino_google_cloud = "gs://introduccion-apache-spark/datalake/landing/empresas/"

ruta_destino_databricks = "/FileStore/tables/landing/empresas/"

ruta_destino_hdfs = "hdfs:/introduccion-apache-spark/datalake/landing/empresas/"


In [75]:
# 6.6 Guardamos el archivo en formato parquet
df_empresas.repartition(2).write.mode("overwrite").format("parquet").save(ruta_destino_google_cloud)

## POBLANDO CAPA CURATED

### PERSONAS
**7° PASO** Definimos ruta del archivo

In [90]:
ruta_landing_google_cloud = "gs://introduccion-apache-spark/datalake/landing/personas/"

ruta_landing_databricks = "/FileStore/tables/landing/personas/"

ruta_landing_hdfs = "hdfs:/introduccion-apache-spark/datalake/landing/personas/"

**8° PASO** Creamos el dataframe de Persona

In [91]:
df_personas_landing = spark.read.format("parquet").option("header","true").load(ruta_landing_google_cloud)

**9° PASO** Mostramos el dataframe cargado en memoria

In [92]:
df_personas_landing.show(10)

+---+---------+--------------+--------------------+-------------+----+-------+----------+
| ID|   NOMBRE|      TELEFONO|              CORREO|FECHA_INGRESO|EDAD|SALARIO|ID_EMPRESA|
+---+---------+--------------+--------------------+-------------+----+-------+----------+
|  1|     Carl|1-745-633-9145|arcu.Sed.et@ante....|   2004-04-23|  32|20095.0|         5|
|  2|Priscilla|      155-2498|Donec.egestas.Ali...|   2019-02-17|  34| 9298.0|         2|
|  3|  Jocelyn|1-204-956-8594|amet.diam@loborti...|   2002-08-01|  27|10853.0|         3|
|  4|    Aidan|1-719-862-9385|euismod.et.commod...|   2018-11-06|  29| 3387.0|        10|
|  5|  Leandra|      839-8044|at@pretiumetrutru...|   2002-10-10|  41|22102.0|         1|
|  6|     Bert|      797-4453|a.felis.ullamcorp...|   2017-04-25|  70| 7800.0|         7|
|  7|     Mark|1-680-102-6792|Quisque.ac@placer...|   2006-04-21|  52| 8112.0|         5|
|  8|    Jonah|      214-2975|eu.ultrices.sit@v...|   2017-10-07|  23|17040.0|         5|
|  9|    H

**10° PASO** Mostramos el schema del dataframe

In [93]:
df_personas_landing.printSchema()

root
 |-- ID: string (nullable = true)
 |-- NOMBRE: string (nullable = true)
 |-- TELEFONO: string (nullable = true)
 |-- CORREO: string (nullable = true)
 |-- FECHA_INGRESO: string (nullable = true)
 |-- EDAD: integer (nullable = true)
 |-- SALARIO: double (nullable = true)
 |-- ID_EMPRESA: string (nullable = true)



**11° PASO** Realizamos la limpieza del dataframe

In [94]:
df_personas_procesado = df_personas_landing.withColumn('telefono', regexp_replace('telefono', '-', ''))
df_personas_procesado.show(10)

+---+---------+-----------+--------------------+-------------+----+-------+----------+
| ID|   NOMBRE|   telefono|              CORREO|FECHA_INGRESO|EDAD|SALARIO|ID_EMPRESA|
+---+---------+-----------+--------------------+-------------+----+-------+----------+
|  1|     Carl|17456339145|arcu.Sed.et@ante....|   2004-04-23|  32|20095.0|         5|
|  2|Priscilla|    1552498|Donec.egestas.Ali...|   2019-02-17|  34| 9298.0|         2|
|  3|  Jocelyn|12049568594|amet.diam@loborti...|   2002-08-01|  27|10853.0|         3|
|  4|    Aidan|17198629385|euismod.et.commod...|   2018-11-06|  29| 3387.0|        10|
|  5|  Leandra|    8398044|at@pretiumetrutru...|   2002-10-10|  41|22102.0|         1|
|  6|     Bert|    7974453|a.felis.ullamcorp...|   2017-04-25|  70| 7800.0|         7|
|  7|     Mark|16801026792|Quisque.ac@placer...|   2006-04-21|  52| 8112.0|         5|
|  8|    Jonah|    2142975|eu.ultrices.sit@v...|   2017-10-07|  23|17040.0|         5|
|  9|    Hanae|    9352277|          eu@Nun

**12° PASO** Definir ruta de almacenamiento en la capa curated

In [95]:
ruta_curated_personas_google_cloud = "gs://introduccion-apache-spark/datalake/curated/personas/"

ruta_curated_personas_databricks = "/FileStore/tables/curated/personas/"

ruta_curated_personas_hdfs = "hdfs:/introduccion-apache-spark/datalake/curated/personas/"


**13° PASO** Definir ruta de almacenamiento en la capa curated

In [96]:
df_personas_procesado.repartition(1).write.mode("overwrite").format("parquet").save(ruta_curated_personas_google_cloud)

### EMPRESAS
**14° PASO** Realizar la ingesta de Empresas en Curated

* Declarar en una variable la ruta del archivo
* Leer el archivo de la capa landing
* Mostrar la estructura del dataframe
* mostrar los datos del dataframe
* Realizar una limpieza al dataframe
* Guardar el dataframe en un ruta de la capa curated

In [89]:
# 14.1 variable la ruta del archivo
ruta_landing_empresas_google_cloud = "gs://introduccion-apache-spark/datalake/landing/empresas/"

ruta_landing_empresas_databricks = "/FileStore/tables/landing/empresas/"

ruta_landing_empresas_hdfs = "hdfs:/introduccion-apache-spark/datalake/landing/empresas/"

# 14.2 Leer el archivo de la capa landing
df_empresas_landing = spark.read.format("parquet").option("header","true").load(ruta_landing_empresas_google_cloud)

# 14.3 Mostrar la estructura del dataframe
df_empresas_landing.printSchema()

#14.4 Mostrar los datos del dataframe
df_empresas_landing.show(5)

#14.5 Realizar limpieza a dataframe
df_empresas_procesado = df_empresas_landing.withColumn('EMPRESA_NAME',upper(col('EMPRESA_NAME')))

#14.6 Mostrar los datos procesado
df_empresas_procesado.show(5)

#14. Definir ruta de curated
ruta_curated_empresas_google_cloud = "gs://introduccion-apache-spark/datalake/curated/empresas/"

ruta_curated_empresas_databricks = "/FileStore/tables/curated/empresas/"

ruta_curated_empresas_hdfs = "hdfs:/introduccion-apache-spark/datalake/curated/empresas/"

# 14.6 Guardar daframe en capa curated

df_empresas_procesado.repartition(1).write.mode("overwrite").format("parquet").save(ruta_curated_empresas_google_cloud)


root
 |-- ID: string (nullable = true)
 |-- EMPRESA_NAME: string (nullable = true)

+---+------------+
| ID|EMPRESA_NAME|
+---+------------+
|  5|      Amazon|
|  6|      Google|
|  4|      Toyota|
|  7|     Samsung|
|  3|       Apple|
+---+------------+
only showing top 5 rows

+---+------------+
| ID|EMPRESA_NAME|
+---+------------+
|  5|      AMAZON|
|  6|      GOOGLE|
|  4|      TOYOTA|
|  7|     SAMSUNG|
|  3|       APPLE|
+---+------------+
only showing top 5 rows



## POBLANDO CAPA FUNCTIONAL

**PASO 15°** Obteniendo información requeridad para el analisis de **Salario x Empresa**

**15.1** Definimos ruta tablas requeridas apuntando a la capa curated 

In [99]:
ruta_curated_empresas_google_cloud = "gs://introduccion-apache-spark/datalake/curated/empresas/"

ruta_curated_personas_google_cloud = "gs://introduccion-apache-spark/datalake/curated/personas/"

**15.2** Creamos el dataframe para cada tabla.

In [100]:
df_personas_curated = spark.read.format("parquet").option("header","true").load(ruta_curated_personas_google_cloud)

df_empresas_curated = spark.read.format("parquet").option("header","true").load(ruta_curated_empresas_google_cloud)

**15.3** Mostramos datos de los dataframes

In [101]:
df_personas_curated.show(5)
df_empresas_curated.show(5)

+---+---------+-----------+--------------------+-------------+----+-------+----------+
| ID|   NOMBRE|   telefono|              CORREO|FECHA_INGRESO|EDAD|SALARIO|ID_EMPRESA|
+---+---------+-----------+--------------------+-------------+----+-------+----------+
|  1|     Carl|17456339145|arcu.Sed.et@ante....|   2004-04-23|  32|20095.0|         5|
|  2|Priscilla|    1552498|Donec.egestas.Ali...|   2019-02-17|  34| 9298.0|         2|
|  3|  Jocelyn|12049568594|amet.diam@loborti...|   2002-08-01|  27|10853.0|         3|
|  4|    Aidan|17198629385|euismod.et.commod...|   2018-11-06|  29| 3387.0|        10|
|  5|  Leandra|    8398044|at@pretiumetrutru...|   2002-10-10|  41|22102.0|         1|
+---+---------+-----------+--------------------+-------------+----+-------+----------+
only showing top 5 rows

+---+------------+
| ID|EMPRESA_NAME|
+---+------------+
|  5|      AMAZON|
|  6|      GOOGLE|
|  4|      TOYOTA|
|  7|     SAMSUNG|
|  3|       APPLE|
+---+------------+
only showing top 5 ro

**15.4** Realizar la unión (join) de ambas tablas 

In [103]:
df_join = df_personas_curated.join(df_empresas_curated, df_personas_curated.ID_EMPRESA == df_empresas_curated.ID)
df_join.show()

+---+---------+-----------+--------------------+-------------+----+-------+----------+---+------------+
| ID|   NOMBRE|   telefono|              CORREO|FECHA_INGRESO|EDAD|SALARIO|ID_EMPRESA| ID|EMPRESA_NAME|
+---+---------+-----------+--------------------+-------------+----+-------+----------+---+------------+
|  1|     Carl|17456339145|arcu.Sed.et@ante....|   2004-04-23|  32|20095.0|         5|  5|      AMAZON|
|  2|Priscilla|    1552498|Donec.egestas.Ali...|   2019-02-17|  34| 9298.0|         2|  2|   MICROSOFT|
|  3|  Jocelyn|12049568594|amet.diam@loborti...|   2002-08-01|  27|10853.0|         3|  3|       APPLE|
|  4|    Aidan|17198629385|euismod.et.commod...|   2018-11-06|  29| 3387.0|        10| 10|        SONY|
|  5|  Leandra|    8398044|at@pretiumetrutru...|   2002-10-10|  41|22102.0|         1|  1|     WALMART|
|  6|     Bert|    7974453|a.felis.ullamcorp...|   2017-04-25|  70| 7800.0|         7|  7|     SAMSUNG|
|  7|     Mark|16801026792|Quisque.ac@placer...|   2006-04-21|  

**15.5**  Realizar la unión (join) de ambas tablas utilizando **Spark SQL**

In [104]:
df_personas_curated.createOrReplaceTempView("tb_personas")
df_empresas_curated.createOrReplaceTempView("tb_empresas")

df_sql = spark.sql("SELECT * FROM tb_personas p inner join tb_empresas e on e.ID = p.ID_EMPRESA")
df_sql.show(5)

+---+---------+-----------+--------------------+-------------+----+-------+----------+---+------------+
| ID|   NOMBRE|   telefono|              CORREO|FECHA_INGRESO|EDAD|SALARIO|ID_EMPRESA| ID|EMPRESA_NAME|
+---+---------+-----------+--------------------+-------------+----+-------+----------+---+------------+
|  1|     Carl|17456339145|arcu.Sed.et@ante....|   2004-04-23|  32|20095.0|         5|  5|      AMAZON|
|  2|Priscilla|    1552498|Donec.egestas.Ali...|   2019-02-17|  34| 9298.0|         2|  2|   MICROSOFT|
|  3|  Jocelyn|12049568594|amet.diam@loborti...|   2002-08-01|  27|10853.0|         3|  3|       APPLE|
|  4|    Aidan|17198629385|euismod.et.commod...|   2018-11-06|  29| 3387.0|        10| 10|        SONY|
|  5|  Leandra|    8398044|at@pretiumetrutru...|   2002-10-10|  41|22102.0|         1|  1|     WALMART|
+---+---------+-----------+--------------------+-------------+----+-------+----------+---+------------+
only showing top 5 rows



**15.6** Seleccionar campos requeridos para el análisis 

In [105]:
df_select = df_join.select(col('EDAD'),col('SALARIO'),col('EMPRESA_NAME'))
df_select.show()

+----+-------+------------+
|EDAD|SALARIO|EMPRESA_NAME|
+----+-------+------------+
|  32|20095.0|      AMAZON|
|  34| 9298.0|   MICROSOFT|
|  27|10853.0|       APPLE|
|  29| 3387.0|        SONY|
|  41|22102.0|     WALMART|
|  70| 7800.0|     SAMSUNG|
|  52| 8112.0|      AMAZON|
|  23|17040.0|      AMAZON|
|  69| 6834.0|       APPLE|
|  19| 7996.0|     SAMSUNG|
|  48| 4913.0|          HP|
|  24|19943.0|          HP|
|  34| 9501.0|      AMAZON|
|  59|16289.0|   MICROSOFT|
|  27| 1539.0|      AMAZON|
|  26| 3377.0|   MICROSOFT|
|  60| 6851.0|      GOOGLE|
|  34| 4759.0|     SAMSUNG|
|  70|17403.0|      TOYOTA|
|  24|18752.0|         IBM|
+----+-------+------------+
only showing top 20 rows



**15.7** Definimos la ruta de functional y persistimos los registros

In [106]:
ruta_functional = "gs://introduccion-apache-spark/datalake/functional/salario_empresa/"

df_select.repartition(1).write.mode("overwrite").format("parquet").save(ruta_functional)

**15.8** Consultamos la información almacenada en Funcional

In [107]:
df_salario_empresa = spark.read.format("parquet").option("header","true").load(ruta_functional)
df_salario_empresa.show(10)

+----+-------+------------+
|EDAD|SALARIO|EMPRESA_NAME|
+----+-------+------------+
|  32|20095.0|      AMAZON|
|  34| 9298.0|   MICROSOFT|
|  27|10853.0|       APPLE|
|  29| 3387.0|        SONY|
|  41|22102.0|     WALMART|
|  70| 7800.0|     SAMSUNG|
|  52| 8112.0|      AMAZON|
|  23|17040.0|      AMAZON|
|  69| 6834.0|       APPLE|
|  19| 7996.0|     SAMSUNG|
+----+-------+------------+
only showing top 10 rows




# Felicitaciones por completar el curso de Introducción Apache Spark
#### **Elaborado por** Juan Salinas
#### Linkedin: https://www.linkedin.com/in/juan-salinas/