In [None]:
!pip install pandas geopandas matplotlib faker pyspark pyarrow

In [None]:
#### Capa bronze data cruda
####
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, DoubleType, IntegerType, TimestampType

# Crear sesión de Spark
spark = SparkSession \
    .builder \
    .appName("SparkStreamingFromSocket") \
    .master("local[*]") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "4") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

# Definir el esquema para los datos JSON que se recibirán
schema = StructType() \
    .add("latitude", DoubleType()) \
    .add("longitude", DoubleType()) \
    .add("date", TimestampType()) \
    .add("customer_id", StringType()) \
    .add("employee_id", StringType()) \
    .add("quantity_products", IntegerType()) \
    .add("order_id", StringType()) \
    .add("commune_code", StringType()) \
    .add("commune_name", StringType()) \
    .add("customer_name", StringType()) \
    .add("employee_name", StringType()) \
    .add("employee_commission", DoubleType())

# Leer datos desde el socket
streaming_df = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 12345) \
    .load()

# Parsear los datos JSON utilizando el esquema definido
parsed_df = streaming_df \
    .select(from_json(col("value").cast("string"), schema).alias("parsed_value")) \
    .select("parsed_value.*")

# Función para guardar los datos recibidos en bronze
def process_data(df, epoch_id):
    try:
        hdfs_path = "/user/root/bronze"
        df.write \
          .format("parquet") \
          .mode("append") \
          .save(hdfs_path)
        df.show(truncate=False)
    except Exception as e:
        print(f"Error al procesar los datos: {e}")
        
# Escribir los resultados en la consola
query = parsed_df \
    .writeStream \
    .foreachBatch(process_data) \
    .outputMode("append") \
    .start()

# Mantener el stream en ejecución
query.awaitTermination()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

+--------+---------+----+-----------+-----------+-----------------+--------+------------+------------+-------------+-------------+-------------------+
|latitude|longitude|date|customer_id|employee_id|quantity_products|order_id|commune_code|commune_name|customer_name|employee_name|employee_commission|
+--------+---------+----+-----------+-----------+-----------------+--------+------------+------------+-------------+-------------+-------------------+
+--------+---------+----+-----------+-----------+-----------------+--------+------------+------------+-------------+-------------+-------------------+



                                                                                

+------------------+------------------+-------------------+-----------+-----------+-----------------+----------+------------+------------------------------------------+------------------+-----------------+-------------------+
|latitude          |longitude         |date               |customer_id|employee_id|quantity_products|order_id  |commune_code|commune_name                              |customer_name     |employee_name    |employee_commission|
+------------------+------------------+-------------------+-----------+-----------+-----------------+----------+------------+------------------------------------------+------------------+-----------------+-------------------+
|6.250208574487932 |-75.5425665812484 |2023-07-13 08:41:51|7062       |1482       |99               |8946514217|08          |VILLA HERMOSA                             |Roth Coffey       |Elijah Parker    |0.13               |
|6.228071965734311 |-75.5768274632126 |2024-04-29 09:38:36|8194       |1561       |25           

In [8]:
#### Capa silver procesamiento
####
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, expr, round, lit, year, month, dayofmonth, trim, hour, minute, second, dayofweek
import os 

# Obtener la sesión de Spark existente si está activa
spark = SparkSession.builder.getOrCreate()

# Función para leer archivos Parquet desde HDFS
def leer_archivos_parquet(path: str) -> DataFrame:
    try:
        # Verificar la existencia del archivo antes de leerlo
        if os.system(f"hdfs dfs -test -e {path}") == 0:
            return spark.read.parquet(path)
        else:
            print(f"El archivo Parquet {path} no existe.")
            return None
    except Exception as e:
        print(f"Error al leer el archivo Parquet {path}: {e}")
        return None
    
# Ruta de bronze
bronze_path = "hdfs:///user/root/bronze"  

# Leer archivos Parquet desde el directorio en HDFS
df_bronze = leer_archivos_parquet(bronze_path)

# Función para agregar una columna con valor constante al precio y dividir la fecha
def transformar_df(df: DataFrame) -> DataFrame:
        df_transformado = df \
            .withColumn("price", lit(3500)) \
            .withColumn("sales", col("quantity_products") * col("price")) \
            .withColumn("commission_value", round(col("sales") * col("employee_commission"), 0)) \
            .withColumn("customer_name", trim(col("customer_name"))) \
            .withColumn("employee_name", trim(col("employee_name"))) \
            .withColumn("commune_name", trim(col("commune_name"))) \
            .withColumn("year", year(col("date"))) \
            .withColumn("month", month(col("date"))) \
            .withColumn("day", dayofmonth(col("date"))) \
            .withColumn("day_week", dayofweek(col("date"))) \
            .withColumn("hour", hour(col("date"))) \
            .withColumn("minute", minute(col("date"))) \
            .withColumn("second", second(col("date")))
        return df_transformado

# Función para guardar DataFrame en un archivo Parquet en la capa Silver, siempre sobreescribe esto es compactar
def guardar_archivo_parquet(df: DataFrame, path: str) -> None:
    df.coalesce(1).write \
        .mode("overwrite") \
        .parquet(path)
        
# Función principal para unir archivos de Bronze a Silver
def unir_archivos_bronze_a_silver(bronze_path: str, silver_path: str) -> None:
    # Leer archivos Parquet desde la capa Bronze
    df_bronze = leer_archivos_parquet(bronze_path)

    # Transformar el DataFrame de Bronze
    df_transformado = transformar_df(df_bronze)

    # Guardar el DataFrame transformado en la capa Silver
    guardar_archivo_parquet(df_transformado, silver_path)

    # Leer archivos Parquet desde la capa Silver después de transformar
    df_silver_transformado = leer_archivos_parquet(silver_path)

    if df_silver_transformado is not None:
        # Contar la cantidad de registros en la capa Silver después de transformar
        records_processed = df_silver_transformado.count()
        print(f"Cantidad de registros procesados en silver después de transformar: {records_processed}")
    
    # Mostrar el DataFrame transformado (opcional)
    df_transformado.printSchema()
    df_transformado.show()

# Rutas en hdfs (distrbuido)
bronze_path = "hdfs:///user/root/bronze"  
silver_path = "hdfs:///user/root/silver/unificado.parquet"  

# Ejecutar el proceso de unión
unir_archivos_bronze_a_silver(bronze_path, silver_path)


                                                                                

Cantidad de registros procesados en silver después de transformar: 500
root
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- employee_id: string (nullable = true)
 |-- quantity_products: integer (nullable = true)
 |-- order_id: string (nullable = true)
 |-- commune_code: string (nullable = true)
 |-- commune_name: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- employee_name: string (nullable = true)
 |-- employee_commission: double (nullable = true)
 |-- price: integer (nullable = false)
 |-- sales: integer (nullable = true)
 |-- commission_value: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- day_week: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- second: integer (nullable = true)

+-----

In [9]:
#### Capa gold
####
from pyspark.sql import SparkSession 
from pyspark.sql.functions import sum, max, min, avg, count, col

# Obtener la sesión de Spark existente si está activa
spark = SparkSession.builder.getOrCreate()

# Ruta del archivo Parquet en la capa Silver
silver_path = "hdfs:///user/root/silver/unificado.parquet"

# Leer los datos desde el archivo Parquet en la capa Silver
df = spark.read.parquet(silver_path)

# Crear la base de datos si no existe
spark.sql("CREATE DATABASE IF NOT EXISTS UNALwater")

# Establecer la base de datos en uso
spark.sql("USE UNALwater")

# Definir la ruta en HDFS donde se guardará la tabla en formato Parquet en la capa Gold
gold_path = "hdfs:///user/root/gold/UNALWater"

# Insertar el dataframe en una tabla externa UNALWater particionada por el campo 'date'
df.write.mode("append") \
  .partitionBy("date") \
  .format("parquet") \
  .option("path", gold_path) \
  .saveAsTable("UNALWater")

# Mostrar la estructura de la tabla
print("Estructura de la tabla UNALWater:")
spark.sql("DESCRIBE UNALWater").show()

# Construir y ejecutar consultas SQL para responder preguntas de negocio
query_ventas_por_comuna = """
    SELECT 
        CASE 
            WHEN commune_name LIKE '%CORREGIMIENTO DE SAN SEBAS%' THEN 'SAN SEBASTIAN DE PALMITAS'
            WHEN commune_name LIKE '%CORREGIMIENTO DE SAN CRIS%' THEN 'SAN CRISTOBAL'
            WHEN commune_name = 'CORREGIMIENTO DE ALTAVISTA' THEN 'ALTAVISTA'
            WHEN commune_name = 'CORREGIMIENTO DE SANTA ELENA' THEN 'SANTA ELENA'
            WHEN commune_name = 'CORREGIMIENTO DE SAN ANTONIO DE PRADO' THEN 'SAN ANTONIO DE PRADO'
        ELSE commune_name END AS Comuna_Corregimiento,
        SUM(quantity_products) AS Cantidad_Productos,
        SUM(sales) AS Total_Ventas
    FROM UNALWater
    GROUP BY Comuna_Corregimiento
    ORDER BY Total_Ventas DESC;
"""

query_ventas_por_vendedor = """
    SELECT 
        employee_name AS Vendedor,
        SUM(quantity_products) AS Cantidad_Productos,
        SUM(sales) AS Total_Ventas,
        SUM(commission_value) AS Valor_Comision
    FROM UNALWater
    GROUP BY employee_name
    ORDER BY Valor_Comision DESC
"""

query_top10_por_clientes = """
    SELECT 
        customer_name AS Cliente,
        SUM(quantity_products) AS Cantidad_Productos,
        SUM(sales) AS Total_Ventas
    FROM UNALWater
    GROUP BY customer_name
    ORDER BY Total_Ventas DESC
    LIMIT 10
"""

query_ventas_por_dia = """
    SELECT 
        CASE 
            WHEN day_week = 1 THEN 'Domingo' 
            WHEN day_week = 2 THEN 'Lunes' 
            WHEN day_week = 3 THEN 'Martes'
            WHEN day_week = 4 THEN 'Miércoles'
            WHEN day_week = 5 THEN 'Jueves'
            WHEN day_week = 6 THEN 'Viernes'
            WHEN day_week = 7 THEN 'Sábado'
        END AS Dia_Semana,
        SUM(quantity_products) AS Cantidad_Productos,
        SUM(sales) AS Total_Ventas
    FROM UNALWater
    GROUP BY day_week
    ORDER BY Total_Ventas DESC
"""

# Ejecutar las consultas SQL y mostrar los resultados
ventas_por_comuna = spark.sql(query_ventas_por_comuna)
print("Comportamiento de la cantidad de ventas por comuna:")
ventas_por_comuna.show()

ventas_por_vendedor = spark.sql(query_ventas_por_vendedor)
print("Comportamiento de la cantidad de ventas por vendedor:")
ventas_por_vendedor.show()

top10_por_clientes = spark.sql(query_top10_por_clientes)
print("Los 10 clientes que más nos han comprado botellas de agua:")
top10_por_clientes.show()

ventas_por_dia = spark.sql(query_ventas_por_dia)
print("Comportamiento de ventas de botellas de agua por día de la semana:")
ventas_por_dia.show()



                                                                                

Estructura de la tabla UNALWater:
+-------------------+---------+-------+
|           col_name|data_type|comment|
+-------------------+---------+-------+
|           latitude|   double|   null|
|          longitude|   double|   null|
|        customer_id|   string|   null|
|        employee_id|   string|   null|
|  quantity_products|      int|   null|
|           order_id|   string|   null|
|       commune_code|   string|   null|
|       commune_name|   string|   null|
|      customer_name|   string|   null|
|      employee_name|   string|   null|
|employee_commission|   double|   null|
|              price|      int|   null|
|              sales|      int|   null|
|   commission_value|   double|   null|
|               year|      int|   null|
|              month|      int|   null|
|                day|      int|   null|
|           day_week|      int|   null|
|               hour|      int|   null|
|             minute|      int|   null|
+-------------------+---------+-------+
only s

                                                                                

+--------------------+------------------+------------+
|Comuna_Corregimiento|Cantidad_Productos|Total_Ventas|
+--------------------+------------------+------------+
|         SANTA ELENA|              6232|    21812000|
|SAN SEBASTIAN DE ...|              4945|    17307500|
|SAN ANTONIO DE PRADO|              4330|    15155000|
|       SAN CRISTOBAL|              3888|    13608000|
|           ALTAVISTA|              1905|     6667500|
|          EL POBLADO|              1204|     4214000|
|             ROBLEDO|               926|     3241000|
|       VILLA HERMOSA|               869|     3041500|
|    LAURELES ESTADIO|               804|     2814000|
|        BUENOS AIRES|               721|     2523500|
|               BELÉN|               660|     2310000|
|            GUAYABAL|               531|     1858500|
|       LA CANDELARIA|               487|     1704500|
|            CASTILLA|               475|     1662500|
|            ARANJUEZ|               372|     1302000|
|         

                                                                                

+-----------------+------------------+------------+--------------+
|         Vendedor|Cantidad_Productos|Total_Ventas|Valor_Comision|
+-----------------+------------------+------------+--------------+
|Christen Hamilton|              2010|     7035000|     1195950.0|
|     Melanie Ball|              1618|     5663000|     1019340.0|
|     Sydnee Kirby|              1526|     5341000|      854560.0|
|   Howard Guthrie|              1273|     4455500|      712880.0|
|  Althea Mckenzie|              1540|     5390000|      700700.0|
|       Ori Tucker|              1786|     6251000|      625100.0|
|  Phyllis Hubbard|              1773|     6205500|      620550.0|
|    Elijah Parker|              1355|     4742500|      616525.0|
|    Bevis Sanford|              1111|     3888500|      544390.0|
|     Ryan Nichols|               838|     2933000|      527940.0|
|    Davis Jenkins|              1095|     3832500|      498225.0|
|   Catherine King|              1877|     6569500|      45986

                                                                                

+-----------------+------------------+------------+
|          Cliente|Cantidad_Productos|Total_Ventas|
+-----------------+------------------+------------+
|        Knox Best|               350|     1225000|
|    Amery Shepard|               270|      945000|
|      Roth Coffey|               264|      924000|
|    Heather Oneil|               263|      920500|
|     Elvis Bolton|               260|      910000|
|   Zenia Cardenas|               255|      892500|
|       Xyla Kelly|               245|      857500|
|   Galena Fleming|               233|      815500|
|  Colorado Torres|               232|      812000|
|Lillian Mcfarland|               232|      812000|
+-----------------+------------------+------------+

Comportamiento de ventas de botellas de agua por día de la semana:




+----------+------------------+------------+
|Dia_Semana|Cantidad_Productos|Total_Ventas|
+----------+------------------+------------+
|    Jueves|              5012|    17542000|
|    Martes|              4784|    16744000|
|   Domingo|              4494|    15729000|
|    Sábado|              4342|    15197000|
|     Lunes|              3916|    13706000|
| Miércoles|              3663|    12820500|
|   Viernes|              3443|    12050500|
+----------+------------------+------------+



                                                                                

In [None]:
#!hdfs dfs -ls /user/root/silver/unificado.parquet
#!hdfs dfs -mkdir -p /user/root/gold
#!hdfs dfs -ls /user/root/bronze
#!hdfs dfs -ls /user/root
#!hdfs dfs -copyToLocal /user/root/gold /Analitica/BigData/Final/gold
#!hdfs dfs -copyFromLocal medellin_neighborhoods.parquet /user/root/bronze
#!hdfs dfs -ls /user/root/silver
!hdfs dfs -ls /user/root/gold
#!hdfs dfs -ls /user/root/bronze
#!hdfs dfs -ls /user/root/bronze | grep medellin_neighborhoods.parquet

Found 1 items
drwxr-xr-x   - root supergroup          0 2024-06-20 04:13 /user/root/gold/UNALWater


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType
from shapely.geometry import Point
import geopandas as gpd
import matplotlib.pyplot as plt

# Crear la sesión de Spark con configuración ajustada
spark = SparkSession.builder \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

# Definir la ruta del archivo Parquet con datos de ventas en la capa Silver
cargue_inicial_path = 'hdfs:///user/root/silver/unificado.parquet'

try:
    # Cargar el archivo Parquet de cargue inicial como un DataFrame de Spark
    df_cargue = spark.read.parquet(cargue_inicial_path).limit(10)  # Limitar a 10 registros para pruebas

    # Definir la función Python para crear puntos a partir de latitud y longitud
    def create_point(longitude, latitude):
        return Point(float(longitude), float(latitude))

    # Registrar la función como una UDF (User Defined Function) en Spark
    create_point_udf = udf(create_point, returnType=FloatType())

    # Convertir las columnas de longitud y latitud a tipo Float y crear columna 'geom'
    df_cargue = df_cargue.withColumn('longitude', col('longitude').cast(FloatType())) \
                         .withColumn('latitude', col('latitude').cast(FloatType())) \
                         .withColumn('geom', create_point_udf(col('longitude'), col('latitude')))
    df_cargue.show()
#     # Convertir el DataFrame de Spark en un GeoDataFrame de GeoPandas
#     gdf_cargue = gpd.GeoDataFrame(df_cargue.toPandas(), geometry='geom')
    
#     # Imprimir las primeras filas para verificar la conversión
#     print("Primeras filas de GeoDataFrame:")
#     print(gdf_cargue.head())

#     # Definir la ruta del archivo Parquet con geometrías de Medellín en la capa Bronze
#     medellin_neighborhoods_path = 'hdfs:///user/root/bronze/medellin_neighborhoods.parquet'
    
#     # Cargar el archivo Parquet de geometrías de Medellín como un GeoDataFrame de GeoPandas
#     medellin_neighborhoods = gpd.read_parquet(medellin_neighborhoods_path)
    
#     # Imprimir las primeras filas para verificar la carga de datos
#     print("\nPrimeras filas de medellin_neighborhoods GeoDataFrame:")
#     print(medellin_neighborhoods.head())

# #     # Crear el gráfico con las geometrías de Medellín y los puntos del cargue inicial
# #     fig, ax = plt.subplots(figsize=(20, 20))
# #     medellin_neighborhoods.plot(ax=ax, color='lightgrey', edgecolor='darkblue')
# #     gdf_cargue.plot(ax=ax, color='blue', markersize=10, alpha=0.6)

# #     plt.title('Datos simulados de ventas en Medellín y ubicaciones de clientes')
# #     plt.xlabel('Longitud')
# #     plt.ylabel('Latitud')
# #     plt.grid(True)
# #     plt.show()
    
# #     # Guardar la figura en un archivo si la visualización es correcta
# #     plt.savefig('./data/medellin_neighborhoods_simulacion.png')
# #     plt.close()

except Exception as e:
    print(f"Error al convertir o visualizar los datos con GeoPandas: {str(e)}")
