In [0]:
#Desde la librería "pyspark.sql.types" importamos los utilitarios "StructType" y el "StructField"
#"StrucType" nos permite modificar el esquema de metadatos de un dataframe
#"StructField" nos permite modificar a un campo del esquema de metadatos.
#Tambien es necesario importar los tipos de datos que utilizaremos
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

In [0]:
#Tambien, es posible hacer esto -> Importacion de todos los utilitarios de forma conjunta
from pyspark.sql.types import *

In [0]:
#Leemos el archivo de persona, con un esquema de metadatos predefinido
dfPersona = spark.read.format("csv").option("header", "true").option("delimiter", "|").schema(
    StructType(
        [
            StructField("ID", StringType(), True),
            StructField("NOMBRE", StringType(), True),
            StructField("TELEFONO", StringType(), True),
            StructField("CORREO", StringType(), True),
            StructField("FECHA_INGRESO", StringType(), True),
            StructField("EDAD", IntegerType(), True),
            StructField("SALARIO", DoubleType(), True),
            StructField("ID_EMPRESA", StringType(), True)
        ]
    )
).load("dbfs:/FileStore/Bigdata-factory/persona.data")

In [0]:
#Mostramos los datos
dfPersona.show(10)

+---+---------+--------------+--------------------+-------------+----+-------+----------+
| ID|   NOMBRE|      TELEFONO|              CORREO|FECHA_INGRESO|EDAD|SALARIO|ID_EMPRESA|
+---+---------+--------------+--------------------+-------------+----+-------+----------+
|  1|     Carl|1-745-633-9145|arcu.Sed.et@ante....|   2004-04-23|  32|20095.0|         5|
|  2|Priscilla|      155-2498|Donec.egestas.Ali...|   2019-02-17|  34| 9298.0|         2|
|  3|  Jocelyn|1-204-956-8594|amet.diam@loborti...|   2002-08-01|  27|10853.0|         3|
|  4|    Aidan|1-719-862-9385|euismod.et.commod...|   2018-11-06|  29| 3387.0|        10|
|  5|  Leandra|      839-8044|at@pretiumetrutru...|   2002-10-10|  41|22102.0|         1|
|  6|     Bert|      797-4453|a.felis.ullamcorp...|   2017-04-25|  70| 7800.0|         7|
|  7|     Mark|1-680-102-6792|Quisque.ac@placer...|   2006-04-21|  52| 8112.0|         5|
|  8|    Jonah|      214-2975|eu.ultrices.sit@v...|   2017-10-07|  23|17040.0|         5|
|  9|    H

In [0]:
#Creacion de la vista temporal para utilizar con Spark SQL
dfPersona.createOrReplaceTempView("dfPersona"),
#Imprimir listado de catalog
#print(spark.catalog.listTables())


Out[24]: (None,)

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import lit

# Crear un DataFrame para empresas que vamos asociar a empleados
dfEmpresa = spark.createDataFrame([
    ("1", "Finakad SAS", "Tecnología"),
    ("2", "A su contador SAS", "Finanzas"),
    ("3", "Medical True SAS", "Salud")
], ["ID_EMPRESA", "NOMBRE_EMPRESA", "SECTOR"])

# Crear tabla temporal de empresas
dfEmpresa.createOrReplaceTempView("df_empresa_temp")

In [0]:
# Consulta para ver todos los datos en dfEmpresa
dfEmpresa.show(5)

+----------+-----------------+----------+
|ID_EMPRESA|   NOMBRE_EMPRESA|    SECTOR|
+----------+-----------------+----------+
|         1|      Finakad SAS|Tecnología|
|         2|A su contador SAS|  Finanzas|
|         3| Medical True SAS|     Salud|
+----------+-----------------+----------+



In [0]:
# Consulto las ambas tablas por separado
print("Datos de persona_temp:")
spark.sql("SELECT * FROM dfPersona LIMIT 5").show()
#
print("Datos de empresa_temp:")
spark.sql("SELECT * FROM df_empresa_temp").show()

Datos de persona_temp:
+---+---------+--------------+--------------------+-------------+----+-------+----------+
| ID|   NOMBRE|      TELEFONO|              CORREO|FECHA_INGRESO|EDAD|SALARIO|ID_EMPRESA|
+---+---------+--------------+--------------------+-------------+----+-------+----------+
|  1|     Carl|1-745-633-9145|arcu.Sed.et@ante....|   2004-04-23|  32|20095.0|         5|
|  2|Priscilla|      155-2498|Donec.egestas.Ali...|   2019-02-17|  34| 9298.0|         2|
|  3|  Jocelyn|1-204-956-8594|amet.diam@loborti...|   2002-08-01|  27|10853.0|         3|
|  4|    Aidan|1-719-862-9385|euismod.et.commod...|   2018-11-06|  29| 3387.0|        10|
|  5|  Leandra|      839-8044|at@pretiumetrutru...|   2002-10-10|  41|22102.0|         1|
+---+---------+--------------+--------------------+-------------+----+-------+----------+

Datos de empresa_temp:
+----------+-----------------+----------+
|ID_EMPRESA|   NOMBRE_EMPRESA|    SECTOR|
+----------+-----------------+----------+
|         1|     

In [0]:
union_query = """
SELECT ID, NOMBRE, ID_EMPRESA FROM dfPersona
UNION
SELECT ID_EMPRESA, NOMBRE_EMPRESA, ID_EMPRESA FROM empresa_temp
"""
union_result = spark.sql(union_query)

print("Resultado de UNION:")
union_result.show()

Resultado de UNION:
+---+---------+----------+
| ID|   NOMBRE|ID_EMPRESA|
+---+---------+----------+
|  9|    Hanae|         3|
| 13|   Trevor|         5|
| 17|     Omar|         6|
|  8|    Jonah|         5|
|  2|Priscilla|         2|
| 20|    Emery|         9|
|  7|     Mark|         5|
|  5|  Leandra|         1|
| 15|    Wanda|         5|
|  1|     Carl|         5|
| 12|   Tanner|         8|
|  4|    Aidan|        10|
| 10|   Cadman|         7|
| 19|    Laura|         4|
| 16|    Alden|         2|
| 14|    Allen|         2|
| 21|  Carissa|        10|
|  3|  Jocelyn|         3|
| 18|     Owen|         7|
| 11|  Melyssa|         8|
+---+---------+----------+
only showing top 20 rows



In [0]:
# Ejemplo de INNER JOIN
inner_join_query = """
SELECT p.ID, p.NOMBRE, p.SALARIO, e.NOMBRE_EMPRESA, e.SECTOR
FROM dfPersona p
INNER JOIN df_empresa_temp e ON p.ID_EMPRESA = e.ID_EMPRESA
"""
inner_join_result = spark.sql(inner_join_query)
#
print("Resultado de INNER JOIN:")
inner_join_result.show()

Resultado de INNER JOIN:
+---+-------+-------+-----------------+----------+
| ID| NOMBRE|SALARIO|   NOMBRE_EMPRESA|    SECTOR|
+---+-------+-------+-----------------+----------+
| 99|    Ray| 5570.0|      Finakad SAS|Tecnología|
| 93| Althea| 8818.0|      Finakad SAS|Tecnología|
| 87|  Karly| 3715.0|      Finakad SAS|Tecnología|
| 70|   Suki|12029.0|      Finakad SAS|Tecnología|
| 54|   Lars|20573.0|      Finakad SAS|Tecnología|
| 32| Gisela| 6497.0|      Finakad SAS|Tecnología|
|  5|Leandra|22102.0|      Finakad SAS|Tecnología|
| 96|   Amos|15855.0|A su contador SAS|  Finanzas|
| 83|Giselle| 2503.0|A su contador SAS|  Finanzas|
| 81|    Joy| 1256.0|A su contador SAS|  Finanzas|
| 78| Lenore| 1483.0|A su contador SAS|  Finanzas|
| 67|  Buffy|15116.0|A su contador SAS|  Finanzas|
| 66| Adrian|22953.0|A su contador SAS|  Finanzas|
| 60|Bernard|10825.0|A su contador SAS|  Finanzas|
| 43|  Yetta|21452.0|A su contador SAS|  Finanzas|
| 33|    Jin|22038.0|A su contador SAS|  Finanzas|
| 29| 

In [0]:
spark.sql("""
SELECT p.NOMBRE as Empleado, e.NOMBRE_EMPRESA as Empresa_Cliente, e.SECTOR
FROM dfPersona p
JOIN df_empresa_temp e ON p.ID_EMPRESA = e.ID_EMPRESA
ORDER BY e.NOMBRE_EMPRESA
""").show()

+---------+-----------------+----------+
| Empleado|  Empresa_Cliente|    SECTOR|
+---------+-----------------+----------+
|    Yetta|A su contador SAS|  Finanzas|
|      Jin|A su contador SAS|  Finanzas|
|    Allen|A su contador SAS|  Finanzas|
|   Adrian|A su contador SAS|  Finanzas|
|  Bernard|A su contador SAS|  Finanzas|
|     Jana|A su contador SAS|  Finanzas|
|    Alden|A su contador SAS|  Finanzas|
|     Kibo|A su contador SAS|  Finanzas|
|      Joy|A su contador SAS|  Finanzas|
|    Buffy|A su contador SAS|  Finanzas|
|   Lenore|A su contador SAS|  Finanzas|
|     Amos|A su contador SAS|  Finanzas|
|  Giselle|A su contador SAS|  Finanzas|
|Priscilla|A su contador SAS|  Finanzas|
|     Lars|      Finakad SAS|Tecnología|
|   Gisela|      Finakad SAS|Tecnología|
|      Ray|      Finakad SAS|Tecnología|
|  Leandra|      Finakad SAS|Tecnología|
|   Althea|      Finakad SAS|Tecnología|
|    Karly|      Finakad SAS|Tecnología|
+---------+-----------------+----------+
only showing top