In [0]:
spark

In [0]:
# Traemos los datos de prueba bronces
# data = spark.table("poctesting.bronze_events")
# display(data.limit(5))
# print(f"El archivo tiene {data.count()} registros")

In [0]:
from pyspark.sql.functions import col

# Importar bronze
df_bronze = spark.table("poctesting.bronze_events")

# Registros completos
df_completos = df_bronze.filter(
    col("neighborhood").isNotNull() & col("district").isNotNull()
)

# Registros incompletos
df_incompletos = df_bronze.filter(
    col("neighborhood").isNull() | col("district").isNull()
)

In [0]:
from pyspark.sql.functions import col, row_number, count
from pyspark.sql.window import Window
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point
from sklearn.neighbors import BallTree
import numpy as np

def corregir_con_sjoin_y_balltree(df_spark, path_parquet_neigh, schema):
    df = df_spark.toPandas()
    if df.empty:
        return spark.createDataFrame([], schema)

    df["geometry"] = df.apply(lambda row: Point(row["longitude"], row["latitude"]), axis=1)
    gdf = gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326")

    gdf_neigh = gpd.read_parquet(path_parquet_neigh)[["NOMBRE", "IDENTIFICACION", "geometry"]]
    gdf_neigh = gdf_neigh.dropna(subset=["geometry"]).set_crs("EPSG:4326")

    gdf = gdf.to_crs("EPSG:3857")
    gdf_neigh = gdf_neigh.to_crs("EPSG:3857")

    gdf_joined = gpd.sjoin(gdf, gdf_neigh, how="left", predicate="within")
    gdf["neighborhood"] = gdf_joined["NOMBRE"]
    gdf["district"] = gdf_joined["IDENTIFICACION"]

    gdf_nulos = gdf[gdf["neighborhood"].isna()].copy()
    if not gdf_nulos.empty:
        centroids = gdf_neigh.geometry.centroid
        neigh_coords = np.array([[pt.y, pt.x] for pt in centroids])
        point_coords = np.array([[pt.y, pt.x] for pt in gdf_nulos.geometry])
        tree = BallTree(np.deg2rad(neigh_coords), metric="haversine")
        dist, idx = tree.query(np.deg2rad(point_coords), k=1)
        gdf.loc[gdf["neighborhood"].isna(), "neighborhood"] = gdf_neigh.iloc[idx.flatten()]["NOMBRE"].values
        gdf.loc[gdf["district"].isna(), "district"] = gdf_neigh.iloc[idx.flatten()]["IDENTIFICACION"].values

    gdf = gdf.dropna(subset=["neighborhood", "district"])

    if gdf.empty:
        return spark.createDataFrame([], schema)
    else:
        return spark.createDataFrame(gdf.drop(columns=["geometry"]))

def actualizar_silver_eventos(df_completos, df_incompletos, path_parquet_neigh):
    nombre_tabla_silver = "poctesting.silver_events"
    schema = df_incompletos.schema

    # Corregir los incompletos
    df_corregido = corregir_con_sjoin_y_balltree(df_incompletos, path_parquet_neigh, schema)
    cantidad_corregidos = df_corregido.count()
    cantidad_completos = df_completos.count()

    # Unir completados
    if df_corregido.limit(1).count() == 0:
        print(f"⚠️ No se corrigieron registros incompletos. Se usará solo df_completos ({cantidad_completos}).")
        df_union = df_completos
    else:
        print(f"✅ Se corrigieron {cantidad_corregidos} registros. Total con completos: {cantidad_completos + cantidad_corregidos}")
        df_union = df_completos.unionByName(df_corregido)

    # Eliminar duplicados por order_id conservando el de mayor quantity_products
    window_spec = Window.partitionBy("order_id").orderBy(col("quantity_products").desc())
    df_union_dedup = df_union.withColumn("rn", row_number().over(window_spec)).filter("rn = 1").drop("rn")

    # Cargar datos existentes (si los hay)
    tabla_existe = spark.catalog.tableExists(nombre_tabla_silver)
    if tabla_existe:
        df_existente = spark.table(nombre_tabla_silver)
        df_todo = df_existente.unionByName(df_union_dedup)

        # Deduplicar final por order_id (mantener mayor quantity_products)
        window_final = Window.partitionBy("order_id").orderBy(col("quantity_products").desc())
        df_final = df_todo.withColumn("rn", row_number().over(window_final)).filter("rn = 1").drop("rn")
    else:
        df_final = df_union_dedup

    # Guardar en tabla silver
    modo = "overwrite" if not tabla_existe else "overwrite"
    df_final.write.mode(modo).saveAsTable(nombre_tabla_silver)

    # Verificación final de duplicados
    df_verif = spark.table(nombre_tabla_silver)
    df_check = df_verif.groupBy("order_id").agg(count("*").alias("cantidad")).filter("cantidad > 1")

    if df_check.count() > 0:
        print(f"❌ Duplicados encontrados en 'order_id': {df_check.count()}")
        display(df_check)
    else:
        print(f"✅ Tabla 'silver_eventos' actualizada correctamente con {df_final.count()} registros únicos por orden.")


In [0]:
path_parquet_neigh = "/Workspace/Users/danielale22rojas@gmail.com//medellin-bigdata-poc/data/raw/medellin_neighborhoods.parquet"
actualizar_silver_eventos(df_completos, df_incompletos, path_parquet_neigh)

In [0]:
df_silver = spark.table("poctesting.silver_events")
# df_silver.write.format("delta").mode("overwrite").saveAsTable("poctesting.silver_events")
print(f"Descartamos {df_bronze.count() - df_silver.count()} registros con información faltante")

In [0]:
%sql
-- cual es el numero promedio de produtos vendidos
-- SELECT avg(quantity_products) FROM poctesting.silver_events;
-- SELECT avg(quantity_products) FROM poctesting.bronze_events;