## 1° PASO: Importamos módulos de apache spark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

## 2° PASO: Creamos las session de apache spark en una variable

In [None]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

## 3° PASO: Verificamos la versión de apache spark

In [3]:
spark

## 4° PASO: Crear la estructura del dataframe

## 5° Definimos ruta del archivo

In [48]:
ruta_lectura = "hdfs:/datalake/landing/personas/personas.parquet"

## 6° Creamos el dataframe de Persona

In [49]:
df_with_schema_parquet = spark.read.format("parquet").option("header","true").load(ruta_lectura)

## 7° Mostramos el dataframe cargado en memoria

In [50]:
df_with_schema_parquet.show()

+---+---------+--------------+--------------------+-------------+----+-------+----------+
| ID|   NOMBRE|      TELEFONO|              CORREO|FECHA_INGRESO|EDAD|SALARIO|ID_EMPRESA|
+---+---------+--------------+--------------------+-------------+----+-------+----------+
|  1|     Carl|1-745-633-9145|arcu.Sed.et@ante....|   2004-04-23|  32|20095.0|         5|
|  2|Priscilla|      155-2498|Donec.egestas.Ali...|   2019-02-17|  34| 9298.0|         2|
|  3|  Jocelyn|1-204-956-8594|amet.diam@loborti...|   2002-08-01|  27|10853.0|         3|
|  4|    Aidan|1-719-862-9385|euismod.et.commod...|   2018-11-06|  29| 3387.0|        10|
|  5|  Leandra|      839-8044|at@pretiumetrutru...|   2002-10-10|  41|22102.0|         1|
|  6|     Bert|      797-4453|a.felis.ullamcorp...|   2017-04-25|  70| 7800.0|         7|
|  7|     Mark|1-680-102-6792|Quisque.ac@placer...|   2006-04-21|  52| 8112.0|         5|
|  8|    Jonah|      214-2975|eu.ultrices.sit@v...|   2017-10-07|  23|17040.0|         5|
|  9|    H

## 8° Definimos la ruta en hdfs donde almacenaremos el archivo

In [51]:
df_nuevo = df_with_schema_parquet.withColumn('telefono', regexp_replace('telefono', '-', ''))

## 9° Guardamos el archivo en formato parquet

In [52]:
df_nuevo.show()

+---+---------+-----------+--------------------+-------------+----+-------+----------+
| ID|   NOMBRE|   telefono|              CORREO|FECHA_INGRESO|EDAD|SALARIO|ID_EMPRESA|
+---+---------+-----------+--------------------+-------------+----+-------+----------+
|  1|     Carl|17456339145|arcu.Sed.et@ante....|   2004-04-23|  32|20095.0|         5|
|  2|Priscilla|    1552498|Donec.egestas.Ali...|   2019-02-17|  34| 9298.0|         2|
|  3|  Jocelyn|12049568594|amet.diam@loborti...|   2002-08-01|  27|10853.0|         3|
|  4|    Aidan|17198629385|euismod.et.commod...|   2018-11-06|  29| 3387.0|        10|
|  5|  Leandra|    8398044|at@pretiumetrutru...|   2002-10-10|  41|22102.0|         1|
|  6|     Bert|    7974453|a.felis.ullamcorp...|   2017-04-25|  70| 7800.0|         7|
|  7|     Mark|16801026792|Quisque.ac@placer...|   2006-04-21|  52| 8112.0|         5|
|  8|    Jonah|    2142975|eu.ultrices.sit@v...|   2017-10-07|  23|17040.0|         5|
|  9|    Hanae|    9352277|          eu@Nun

In [53]:
ruta_destino = "hdfs:/datalake/curated/personas/personas.parquet"
df_nuevo.repartition(1).write.mode("overwrite").format("parquet").save(ruta_destino)

# Curated Empresa

In [54]:
ruta_lectura_empresa = "hdfs:/datalake/landing/empresas/empresa.parquet"

In [55]:
df_with_schema_parquet2 = spark.read.format("parquet").option("header","true").load(ruta_lectura_empresa)

In [56]:
df_with_schema_parquet2.show()

+---+------------+
| ID|EMPRESA_NAME|
+---+------------+
|  1|     Walmart|
|  2|   Microsoft|
|  3|       Apple|
|  4|      Toyota|
|  5|      Amazon|
|  6|      Google|
|  7|     Samsung|
|  8|          HP|
|  9|         IBM|
| 10|        Sony|
+---+------------+



In [59]:
df_nuevo_2 = df_with_schema_parquet2.withColumn('EMPRESA_NAME',upper(col('EMPRESA_NAME')))

In [60]:
df_nuevo_2.show()

+---+------------+
| ID|EMPRESA_NAME|
+---+------------+
|  1|     WALMART|
|  2|   MICROSOFT|
|  3|       APPLE|
|  4|      TOYOTA|
|  5|      AMAZON|
|  6|      GOOGLE|
|  7|     SAMSUNG|
|  8|          HP|
|  9|         IBM|
| 10|        SONY|
+---+------------+



In [61]:
ruta_destino_2 = "hdfs:/datalake/curated/empresas/empresa.parquet"
df_nuevo_2.repartition(1).write.mode("overwrite").format("parquet").save(ruta_destino_2)

# FUNCTIONAL

### Dataframe Personas

In [62]:
ruta_personas = "hdfs:/datalake/curated/personas/personas.parquet"
df_personas = spark.read.format("parquet").option("header","true").load(ruta_personas)
df_personas.show(5)

+---+---------+-----------+--------------------+-------------+----+-------+----------+
| ID|   NOMBRE|   telefono|              CORREO|FECHA_INGRESO|EDAD|SALARIO|ID_EMPRESA|
+---+---------+-----------+--------------------+-------------+----+-------+----------+
|  1|     Carl|17456339145|arcu.Sed.et@ante....|   2004-04-23|  32|20095.0|         5|
|  2|Priscilla|    1552498|Donec.egestas.Ali...|   2019-02-17|  34| 9298.0|         2|
|  3|  Jocelyn|12049568594|amet.diam@loborti...|   2002-08-01|  27|10853.0|         3|
|  4|    Aidan|17198629385|euismod.et.commod...|   2018-11-06|  29| 3387.0|        10|
|  5|  Leandra|    8398044|at@pretiumetrutru...|   2002-10-10|  41|22102.0|         1|
+---+---------+-----------+--------------------+-------------+----+-------+----------+
only showing top 5 rows



In [63]:
ruta_empresas = "hdfs:/datalake/curated/empresas/empresa.parquet"
df_empresas = spark.read.format("parquet").option("header","true").load(ruta_empresas)
df_empresas.show(5)

+---+------------+
| ID|EMPRESA_NAME|
+---+------------+
|  1|     WALMART|
|  2|   MICROSOFT|
|  3|       APPLE|
|  4|      TOYOTA|
|  5|      AMAZON|
+---+------------+
only showing top 5 rows



In [74]:
df_personas.createOrReplaceTempView("tb_personas")
df_empresas.createOrReplaceTempView("tb_empresas")

df_sql = spark.sql("SELECT * FROM tb_personas p inner join tb_empresas e on e.ID = p.ID_EMPRESA")
df_sql.show(5)

+---+---------+-----------+--------------------+-------------+----+-------+----------+---+------------+
| ID|   NOMBRE|   telefono|              CORREO|FECHA_INGRESO|EDAD|SALARIO|ID_EMPRESA| ID|EMPRESA_NAME|
+---+---------+-----------+--------------------+-------------+----+-------+----------+---+------------+
|  1|     Carl|17456339145|arcu.Sed.et@ante....|   2004-04-23|  32|20095.0|         5|  5|      AMAZON|
|  2|Priscilla|    1552498|Donec.egestas.Ali...|   2019-02-17|  34| 9298.0|         2|  2|   MICROSOFT|
|  3|  Jocelyn|12049568594|amet.diam@loborti...|   2002-08-01|  27|10853.0|         3|  3|       APPLE|
|  4|    Aidan|17198629385|euismod.et.commod...|   2018-11-06|  29| 3387.0|        10| 10|        SONY|
|  5|  Leandra|    8398044|at@pretiumetrutru...|   2002-10-10|  41|22102.0|         1|  1|     WALMART|
+---+---------+-----------+--------------------+-------------+----+-------+----------+---+------------+
only showing top 5 rows



In [67]:
df_join = df_personas.join(df_empresas, df_personas.ID_EMPRESA == df_empresas.ID)
df_join.show()

+---+---------+-----------+--------------------+-------------+----+-------+----------+---+------------+
| ID|   NOMBRE|   telefono|              CORREO|FECHA_INGRESO|EDAD|SALARIO|ID_EMPRESA| ID|EMPRESA_NAME|
+---+---------+-----------+--------------------+-------------+----+-------+----------+---+------------+
|  1|     Carl|17456339145|arcu.Sed.et@ante....|   2004-04-23|  32|20095.0|         5|  5|      AMAZON|
|  2|Priscilla|    1552498|Donec.egestas.Ali...|   2019-02-17|  34| 9298.0|         2|  2|   MICROSOFT|
|  3|  Jocelyn|12049568594|amet.diam@loborti...|   2002-08-01|  27|10853.0|         3|  3|       APPLE|
|  4|    Aidan|17198629385|euismod.et.commod...|   2018-11-06|  29| 3387.0|        10| 10|        SONY|
|  5|  Leandra|    8398044|at@pretiumetrutru...|   2002-10-10|  41|22102.0|         1|  1|     WALMART|
|  6|     Bert|    7974453|a.felis.ullamcorp...|   2017-04-25|  70| 7800.0|         7|  7|     SAMSUNG|
|  7|     Mark|16801026792|Quisque.ac@placer...|   2006-04-21|  

In [76]:
df_select = df_join.select(col('NOMBRE'),col('EDAD'),col('SALARIO'),col('EMPRESA_NAME'))
df_select.show()

+---------+----+-------+------------+
|   NOMBRE|EDAD|SALARIO|EMPRESA_NAME|
+---------+----+-------+------------+
|     Carl|  32|20095.0|      AMAZON|
|Priscilla|  34| 9298.0|   MICROSOFT|
|  Jocelyn|  27|10853.0|       APPLE|
|    Aidan|  29| 3387.0|        SONY|
|  Leandra|  41|22102.0|     WALMART|
|     Bert|  70| 7800.0|     SAMSUNG|
|     Mark|  52| 8112.0|      AMAZON|
|    Jonah|  23|17040.0|      AMAZON|
|    Hanae|  69| 6834.0|       APPLE|
|   Cadman|  19| 7996.0|     SAMSUNG|
|  Melyssa|  48| 4913.0|          HP|
|   Tanner|  24|19943.0|          HP|
|   Trevor|  34| 9501.0|      AMAZON|
|    Allen|  59|16289.0|   MICROSOFT|
|    Wanda|  27| 1539.0|      AMAZON|
|    Alden|  26| 3377.0|   MICROSOFT|
|     Omar|  60| 6851.0|      GOOGLE|
|     Owen|  34| 4759.0|     SAMSUNG|
|    Laura|  70|17403.0|      TOYOTA|
|    Emery|  24|18752.0|         IBM|
+---------+----+-------+------------+
only showing top 20 rows



In [79]:
ruta_functional= "hdfs:/datalake/functional/sueldo_empleados/sueldos_empleados.parquet"
df_select.repartition(1).write.mode("overwrite").format("parquet").save(ruta_functional)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:39949)
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
  File "/opt/conda/anaconda/lib/python2.7/socket.py", line 228, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:39949)