In [1]:
!pip install geopandas

Collecting geopandas
  Downloading geopandas-0.13.2-py3-none-any.whl (1.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m0m
[?25hCollecting fiona>=1.8.19
  Downloading fiona-1.9.6-cp38-cp38-manylinux2014_x86_64.whl (15.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m15.7/15.7 MB[0m [31m45.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting pyproj>=3.0.1
  Downloading pyproj-3.5.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (7.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.8/7.8 MB[0m [31m19.3 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0mm
Collecting shapely>=1.7.1
  Downloading shapely-2.0.4-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m65.9 MB/s[0m eta [36m0:00:00[0m:00:01[0m
Collecting cligj>=0.5
  Download

In [13]:
!hdfs dfs -mkdir /raw

mkdir: `/raw': File exists


In [12]:
!hadoop fs -rm -r /raw/data_raw.parquet

Deleted /raw/data_raw.parquet


In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
import json

spark = SparkSession \
  .builder \
  .appName("streaming") \
  .master("local[*]") \
  .getOrCreate()

spark

In [15]:
schema = ArrayType(StructType([
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("date", StringType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("employee_id", IntegerType(), True),
    StructField("quantity_products", IntegerType(), True),
    StructField("order_id", StringType(), True)
]))

static_schema = StructType([
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("date", StringType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("employee_id", IntegerType(), True),
    StructField("quantity_products", IntegerType(), True),
    StructField("order_id", StringType(), True)
])

In [16]:
static_df = spark.createDataFrame([], static_schema)
n = 1
while n<=30:
    streaming_df = spark.readStream.format("socket").option("host", "localhost").option("port", "8000").load()
    json_df = streaming_df.select(from_json(col("value"), schema).alias("data"))
    json_df = json_df.selectExpr("explode(data) as dict").select("dict.*")
    writing_df = json_df.writeStream.format("memory").queryName("socketData").outputMode("update").start()
    static_stream_df = spark.sql("SELECT * FROM socketData")
    static_df = static_df.union(static_stream_df)
    writing_df.awaitTermination(2)
    writing_df.stop()
    n += 1
    if static_df.count() > 20:
        static_df.write.mode('append').parquet('/raw/data_raw.parquet')
        static_df = spark.createDataFrame([], static_schema)
    time.sleep(5)

                                                                                

In [17]:
df = spark.read.parquet("/raw/data_raw.parquet")
print("Tabla estatica:", df.count())
df = df.orderBy(desc("order_id"))
df.show(truncate=False)

Tabla estatica: 231
+------------------+------------------+-------------------+-----------+-----------+-----------------+------------------------------------+
|latitude          |longitude         |date               |customer_id|employee_id|quantity_products|order_id                            |
+------------------+------------------+-------------------+-----------+-----------+-----------------+------------------------------------+
|6.337055484171179 |-75.69237343917386|18/06/2024 21:26:29|8837       |1679       |265              |d8b9b417-b098-4344-b137-362894e4dcky|
|6.237450077477118 |-75.4922645802722 |18/06/2024 21:26:29|7267       |9726       |634              |d8b9b417-b098-4344-b137-362894e4dckx|
|6.211663524422019 |-75.66130353520231|18/06/2024 21:26:29|9569       |1114       |372              |d8b9b417-b098-4344-b137-362894e4dckw|
|6.276978616060629 |-75.54003012817529|18/06/2024 21:26:29|2545       |9726       |654              |d8b9b417-b098-4344-b137-362894e4dckv|
|6.2140

In [18]:
paths = [
    'file:///workspace/base.data/50001.parquet',
    'file:///workspace/base.data/customers.parquet',
    'file:///workspace/base.data/employees.parquet',
    'file:///workspace/base.data/medellin_neighborhoods.parquet'
]

hdfs_destination = "hdfs:///raw/"


Función para mover archivos a HDFS

In [19]:
def move_to_hdfs(file_paths, hdfs_dest):
    for path in file_paths:
        # Leer el archivo local
        df = spark.read.parquet(path)
        
        # Extraer el nombre del archivo para usarlo como nombre de archivo en HDFS
        file_name = path.split("/")[-1]
        
        # Escribir en HDFS
        df.write.parquet(hdfs_dest + file_name, mode="overwrite")
        
        print(f"Archivo {file_name} movido a {hdfs_dest}")

# Llamar a la función para mover archivos
move_to_hdfs(paths, hdfs_destination)


Archivo 50001.parquet movido a hdfs:///raw/
Archivo customers.parquet movido a hdfs:///raw/
Archivo employees.parquet movido a hdfs:///raw/
Archivo medellin_neighborhoods.parquet movido a hdfs:///raw/


Listar Archivos en HDFS desde Python

In [20]:
from subprocess import Popen, PIPE

# Comando para listar archivos en HDFS
command = ['hadoop', 'fs', '-ls', '/raw']

# Ejecutar el comando y capturar la salida
process = Popen(command, stdout=PIPE, stderr=PIPE)
stdout, stderr = process.communicate()

# Decodificar la salida y mostrarla
output = stdout.decode()
print(output)


Found 5 items
drwxr-xr-x   - root supergroup          0 2024-06-18 21:31 /raw/50001.parquet
drwxr-xr-x   - root supergroup          0 2024-06-18 21:31 /raw/customers.parquet
drwxr-xr-x   - root supergroup          0 2024-06-18 21:26 /raw/data_raw.parquet
drwxr-xr-x   - root supergroup          0 2024-06-18 21:31 /raw/employees.parquet
drwxr-xr-x   - root supergroup          0 2024-06-18 21:31 /raw/medellin_neighborhoods.parquet



In [21]:
# Definir la ruta base en HDFS
hdfs_base_path = 'hdfs://localhost:9000/raw'

# Leer cada archivo Parquet
df_50001 = spark.read.parquet(hdfs_base_path + '/50001.parquet')
df_customers = spark.read.parquet(hdfs_base_path + '/customers.parquet')
df_data_raw = spark.read.parquet(hdfs_base_path + '/data_raw.parquet')
df_employees = spark.read.parquet(hdfs_base_path + '/employees.parquet')
df_neighborhoods = spark.read.parquet(hdfs_base_path + '/medellin_neighborhoods.parquet')


In [22]:
df_data_raw.show(10)


+------------------+------------------+-------------------+-----------+-----------+-----------------+--------------------+
|          latitude|         longitude|               date|customer_id|employee_id|quantity_products|            order_id|
+------------------+------------------+-------------------+-----------+-----------+-----------------+--------------------+
| 6.199984569931321| -75.5726134212415|18/06/2024 21:23:26|       4419|       9726|              487|d8b9b417-b098-434...|
| 6.182487449108699|-75.56851534023963|18/06/2024 21:25:29|       4161|       9726|              251|d8b9b417-b098-434...|
| 6.208628146586892|-75.53739380286768|18/06/2024 21:25:52|       1669|       1679|              531|d8b9b417-b098-434...|
|   6.2076206902838|-75.53799454237576|18/06/2024 21:23:49|       4207|       1482|              266|d8b9b417-b098-434...|
| 6.304345061931938|-75.54549401566487|18/06/2024 21:22:55|       1999|       6337|              340|d8b9b417-b098-434...|
|  6.27925875563

In [23]:
df_50001.show()

+--------+----------+----------+----------+----------+--------------------+
|DPTOMPIO|DPTO_CCDGO|MPIO_CCDGO|MPIO_CNMBR|MPIO_CCNCT|            geometry|
+--------+----------+----------+----------+----------+--------------------+
|   05001|        05|       001|  MEDELLÍN|     05001|[01 03 00 00 00 0...|
+--------+----------+----------+----------+----------+--------------------+



In [24]:
df_neighborhoods.show(5)

+--------+------+----------+--------------+-----------------+---------------------------+--------------+----------------+------------------+--------------------+
|OBJECTID|CODIGO|    NOMBRE|IDENTIFICACION|LIMITEMUNICIPIOID|SUBTIPO_COMUNACORREGIMIENTO|LINK_DOCUMENTO|       SHAPEAREA|          SHAPELEN|            geometry|
+--------+------+----------+--------------+-----------------+---------------------------+--------------+----------------+------------------+--------------------+
|     321|    01|   POPULAR|      COMUNA 1|              001|                          1|          null|3098289.60257159| 9604.987826371042|[01 03 00 00 00 0...|
|     322|    02|SANTA CRUZ|      COMUNA 2|              001|                          1|          null|2195874.52580248| 8597.714448746181|[01 03 00 00 00 0...|
|     323|    03|  MANRIQUE|      COMUNA 3|              001|                          1|          null|5096746.29132065|  12078.2371083362|[01 03 00 00 00 0...|
|     324|    04|  ARANJUEZ|

In [25]:
df_employees.show(5)

+-----------+----------------+--------------+--------------------+--------------------+---------+
|employee_id|            name|         phone|               email|             address|comission|
+-----------+----------------+--------------+--------------------+--------------------+---------+
|       3830|Shaeleigh Turner|1-382-217-5724|pellentesque.ultr...| Ap #497-3659 Eu St.|     0.06|
|       8362|  Catherine King|1-721-878-1085|  sed@localstack.com|Ap #897-2636 Enim...|     0.07|
|       6696|    Patricia Cox|1-265-643-2312|imperdiet.erat.no...|Ap #775-1599 Sed Av.|     0.04|
|       1482|   Elijah Parker|1-960-392-6387|blandit.congue@lo...|P.O. Box 351, 382...|     0.13|
|       9435|    Ryan Nichols|1-746-416-6687|porttitor.tellus....|P.O. Box 829, 407...|     0.18|
+-----------+----------------+--------------+--------------------+--------------------+---------+
only showing top 5 rows



In [26]:
# Mostrar el esquema del DataFrame
df_data_raw.printSchema()

# Contar la cantidad de eventos
event_count = df_data_raw.count()
# Mostrar la cantidad de eventos
print(f"La cantidad de eventos es: {event_count}")

root
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- date: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- employee_id: integer (nullable = true)
 |-- quantity_products: integer (nullable = true)
 |-- order_id: string (nullable = true)

La cantidad de eventos es: 231


In [27]:
#Dejar las tablas en bronze con HIVE
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Spark SQL Hive Integration") \
    .config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
    .enableHiveSupport() \
    .getOrCreate()

In [28]:
# Aumentar el límite de la tasa de datos de IOPub
from notebook.services.config import ConfigManager
cm = ConfigManager().update('notebook', {
    'ServerApp': {
        'iopub_data_rate_limit': 10000000
    }
})


In [29]:
# Crear la base de datos bronze si no existe
spark.sql("CREATE DATABASE IF NOT EXISTS bronze")

DataFrame[]

In [30]:
# Guardar los datos como una tabla en la base de datos bronze
df_data_raw.write.mode('overwrite').saveAsTable('bronze.almacenamiento_bronze')

                                                                                

In [31]:
# Verificar que la tabla está guardada en almacenamiento_bronze
spark.sql("SELECT * FROM bronze.almacenamiento_bronze").show(truncate=False)

+------------------+------------------+-------------------+-----------+-----------+-----------------+------------------------------------+
|latitude          |longitude         |date               |customer_id|employee_id|quantity_products|order_id                            |
+------------------+------------------+-------------------+-----------+-----------+-----------------+------------------------------------+
|6.287253619436304 |-75.59859567560713|18/06/2024 21:23:57|9723       |3830       |386              |d8b9b417-b098-4344-b137-362894e4dcel|
|6.203530513155968 |-75.63432984287904|18/06/2024 21:22:55|9059       |1561       |593              |d8b9b417-b098-4344-b137-362894e4dccl|
|6.225420074041011 |-75.65419144934765|18/06/2024 21:24:20|3770       |3455       |636              |d8b9b417-b098-4344-b137-362894e4dcfl|
|6.238686550450639 |-75.56815898930817|18/06/2024 21:26:22|9595       |1737       |514              |d8b9b417-b098-4344-b137-362894e4dcki|
|6.292585887939182 |-75.634

In [32]:
# Crear la base de datos bronze si no existe
spark.sql("CREATE DATABASE IF NOT EXISTS bronze")
# Guardar los datos como una tabla en la base de datos municipio
df_50001.write.mode('overwrite').saveAsTable('bronze.municipio')
# Verificar que la tabla está guardada en municipio
#spark.sql("SELECT * FROM bronze.municipio").show(truncate=False, n=20)


In [35]:
# Crear la base de datos bronze si no existe
spark.sql("CREATE DATABASE IF NOT EXISTS bronze")
# Guardar los datos como una tabla en la base de datos neighborhoods
df_neighborhoods.write.mode('overwrite').saveAsTable('bronze.neighborhoods')
# Verificar que la tabla está guardada en barrios y mostrar solo las primeras 20 filas de algunas columnas
spark.sql("SELECT CODIGO, NOMBRE, IDENTIFICACION, LIMITEMUNICIPIOID,SUBTIPO_COMUNACORREGIMIENTO  FROM bronze.neighborhoods").show(truncate=False, n=5)


+------+----------+--------------+-----------------+---------------------------+
|CODIGO|NOMBRE    |IDENTIFICACION|LIMITEMUNICIPIOID|SUBTIPO_COMUNACORREGIMIENTO|
+------+----------+--------------+-----------------+---------------------------+
|01    |POPULAR   |COMUNA 1      |001              |1                          |
|02    |SANTA CRUZ|COMUNA 2      |001              |1                          |
|03    |MANRIQUE  |COMUNA 3      |001              |1                          |
|04    |ARANJUEZ  |COMUNA 4      |001              |1                          |
|05    |CASTILLA  |COMUNA 5      |001              |1                          |
+------+----------+--------------+-----------------+---------------------------+
only showing top 5 rows



In [36]:
# Crear la base de datos bronze si no existe
spark.sql("CREATE DATABASE IF NOT EXISTS bronze")
# Guardar los datos como una tabla en la base de datos clientes
df_customers.write.mode('overwrite').saveAsTable('bronze.customers')
# Verificar que la tabla está guardada en clientes
spark.sql("SELECT * FROM bronze.customers").show(truncate=False)

+-----------+--------------------+--------------+------------------------------+-------------------------------+
|customer_id|name                |phone         |email                         |address                        |
+-----------+--------------------+--------------+------------------------------+-------------------------------+
|4758       |Callie Reyes        |1-765-410-5785|magnis.dis@protonmail.ca      |151-4553 Interdum Road         |
|5379       |Elizabeth Washington|1-955-634-5542|vel@google.edu                |4063 Nunc St.                  |
|8111       |Hasad Wright        |1-324-830-5595|sed.auctor@aol.org            |Ap #625-8512 Non Rd.           |
|9258       |Kirk Watts          |1-578-784-1146|laoreet.ipsum@protonmail.org  |211-2213 Pede St.              |
|9142       |Cally Robbins       |1-887-472-0478|at.augue.id@google.com        |Ap #287-6324 A, Av.            |
|5041       |Benedict Underwood  |1-138-146-9856|ante.ipsum@yahoo.ca           |2927 Velit Rd.  

In [37]:
# Crear la base de datos bronze si no existe
spark.sql("CREATE DATABASE IF NOT EXISTS bronze")
# Guardar los datos como una tabla en la base de datos empleados
df_employees.write.mode('overwrite').saveAsTable('bronze.employees')
# Verificar que la tabla está guardada en empleados
spark.sql("SELECT * FROM bronze.employees").show(truncate=False)

+-----------+-----------------+--------------+--------------------------------------+-----------------------------+---------+
|employee_id|name             |phone         |email                                 |address                      |comission|
+-----------+-----------------+--------------+--------------------------------------+-----------------------------+---------+
|3830       |Shaeleigh Turner |1-382-217-5724|pellentesque.ultricies@localstack.com |Ap #497-3659 Eu St.          |0.06     |
|8362       |Catherine King   |1-721-878-1085|sed@localstack.com                    |Ap #897-2636 Enim Av.        |0.07     |
|6696       |Patricia Cox     |1-265-643-2312|imperdiet.erat.nonummy@localstack.com |Ap #775-1599 Sed Av.         |0.04     |
|1482       |Elijah Parker    |1-960-392-6387|blandit.congue@localstack.com         |P.O. Box 351, 3827 Dolor. Ave|0.13     |
|9435       |Ryan Nichols     |1-746-416-6687|porttitor.tellus.non@localstack.com   |P.O. Box 829, 4074 Et Rd.    |0.1

En Bronze se dejaron las tablas en Hive: nombre BD= bronze con tablas 1.almacenamiento_bronze 2.municipio 3.neighborhoods 4.customers y 5.employees

##### CAPA SILVER ###########

In [38]:
# Listar las tablas en la base de datos eventos_bronze
tables_bronze = spark.sql("SHOW TABLES IN bronze")
tables_bronze.show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
|  bronze|almacenamiento_br...|      false|
|  bronze|           customers|      false|
|  bronze|           employees|      false|
|  bronze|           municipio|      false|
|  bronze|       neighborhoods|      false|
|        |          socketdata|       true|
+--------+--------------------+-----------+



# Definir la ruta base en HDFS para la capa Silver

In [None]:
# Crear la base de datos silver si no existe
spark.sql("CREATE DATABASE IF NOT EXISTS silver")