In [0]:
#Importamos las librerias necesarias
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import types as T

In [0]:
#Par치metro para el entorno
dbutils.widgets.text("ENV", "dev")
ENV = dbutils.widgets.get("ENV").strip().lower()

if ENV not in ("dev", "prod"):
    raise ValueError("ENV debe ser 'dev' o 'prod'")

In [0]:
#Configuraci칩n de entorno para trabajar
if ENV == "dev":
    CATALOG = "dev-adventureworks"
    ADLS_ACCOUNT = "adsldevadventureworks"
else:
    CATALOG = "prod-adventureworks"
    ADLS_ACCOUNT = "adslprodadventureworks"

print("ENV:", ENV)
print("CATALOG:", CATALOG)
print("ADLS_ACCOUNT:", ADLS_ACCOUNT)

In [0]:
#Usamos la base de datos y el schema para poder trabajar
spark.sql(f"USE CATALOG `{CATALOG}`")
spark.sql(F"USE SCHEMA bronze_schema")

In [0]:
#Creamos DataFreames para cada tabla para poder trabajarlos de manera independiente a partir de los csv
df_product = spark.read.format("csv")\
                        .option("header", True)\
                        .option("inferSchema", True)\
                        .load(f"abfss://bronze@{ADLS_ACCOUNT}.dfs.core.windows.net/Product/Product.csv")

df_region = spark.read.format("csv")\
                        .option("header", True)\
                        .option("inferSchema", True)\
                        .load(f"abfss://bronze@{ADLS_ACCOUNT}.dfs.core.windows.net/Region/Region.csv")

df_reseller = spark.read.format("csv")\
                        .option("header", True)\
                        .option("inferSchema", True)\
                        .load(f"abfss://bronze@{ADLS_ACCOUNT}.dfs.core.windows.net/Reseller/Reseller.csv")

df_sales = spark.read.format("csv")\
                        .option("header", True)\
                        .option("inferSchema", True)\
                        .load(f"abfss://bronze@{ADLS_ACCOUNT}.dfs.core.windows.net/Sales/Sales.csv")

df_salesperson = spark.read.format("csv")\
                        .option("header", True)\
                        .option("inferSchema", True)\
                        .load(f"abfss://bronze@{ADLS_ACCOUNT}.dfs.core.windows.net/Salesperson/Salesperson.csv")

df_SalespersonRegion = spark.read.format("csv")\
                        .option("header", True)\
                        .option("inferSchema", True)\
                        .load(f"abfss://bronze@{ADLS_ACCOUNT}.dfs.core.windows.net/SalespersonRegion/SalespersonRegion.csv")

df_targets = spark.read.format("csv")\
                        .option("header", True)\
                        .option("inferSchema", True)\
                        .load(f"abfss://bronze@{ADLS_ACCOUNT}.dfs.core.windows.net/Targets/Targets.csv")

       


In [0]:
#Creamos una funci칩n para escribir los DataFrames en la base de datos y schema correspondiente para la capa bronze
def escribir_to_bronze_table(df, table_name):
  df.write.format("delta")\
  .mode("overwrite")\
  .option("overwriteSchema", "true")\
  .saveAsTable(f"`{CATALOG}`.bronze_schema.{table_name}")

#Ejecutamos la funci칩n para cada DataFrame 
escribir_to_bronze_table(df_product, "Product")
escribir_to_bronze_table(df_region, "Region")
escribir_to_bronze_table(df_reseller, "Reseller")
escribir_to_bronze_table(df_sales, "Sales")
escribir_to_bronze_table(df_salesperson, "Salesperson")
escribir_to_bronze_table(df_SalespersonRegion, "SalespersonRegion")
escribir_to_bronze_table(df_targets, "Targets")

In [0]:
#Cambiamos el schema
spark.sql("USE SCHEMA silver_schema")

In [0]:
#Creamos DataFrames para cada tabla para poder trabajarlos de manera independiente a partir de los csv y ejecutar las queries
df_product           = spark.table(f"`{CATALOG}`.bronze_schema.Product")
df_region            = spark.table(f"`{CATALOG}`.bronze_schema.Region")
df_reseller          = spark.table(f"`{CATALOG}`.bronze_schema.Reseller")
df_sales             = spark.table(f"`{CATALOG}`.bronze_schema.Sales")
df_salesperson       = spark.table(f"`{CATALOG}`.bronze_schema.Salesperson")
df_SalespersonRegion = spark.table(f"`{CATALOG}`.bronze_schema.SalespersonRegion")
df_targets           = spark.table(f"`{CATALOG}`.bronze_schema.Targets")

**Limpieza Product**

In [0]:
#Eliminamos las columnas que no vamos a utilizar
df_product = df_product.drop("Background_Color_Format", "Font_Color_Format")

#organizamos las columnas y renombramos las columnas
df_product = df_product.withColumnRenamed("ProductKey", "product_id")
df_product = df_product.withColumnRenamed("Product", "product_name")
df_product = df_product.withColumnRenamed("Standard_Cost", "cost")
df_product = df_product.withColumnRenamed("Color", "color")
df_product = df_product.withColumnRenamed("Subcategory", "subcategory")
df_product = df_product.withColumnRenamed("Category", "category")

#Trim de columnas de texto
df_product = (df_product
      .withColumn("product_name", F.trim(F.col("product_name")))
      .withColumn("color", F.trim(F.col("color")))
      .withColumn("subcategory", F.trim(F.col("subcategory")))
      .withColumn("category", F.trim(F.col("category")))
)


#Realizar conversiones de tipos de datos
df_product = (df_product
      .withColumn("product_id", F.col("product_id").cast(T.IntegerType()))
      .withColumn("product_name", F.col("product_name").cast(T.StringType()))
      .withColumn("color", F.col("color").cast(T.StringType()))
      .withColumn("subcategory", F.col("subcategory").cast(T.StringType()))
      .withColumn("category", F.col("category").cast(T.StringType()))
)

#Eliminamos duplicados
df_product = df_product.dropDuplicates(["product_id"])
#Eliminamos valores nulos
df_product = df_product.na.drop()


**Limpieza Reseller**

In [0]:

#Cambiar nombre de las columas
df_reseller = df_reseller.withColumnRenamed("ResellerKey", "reseller_id")
df_reseller = df_reseller.withColumnRenamed("BusinessType", "business_type")
df_reseller = df_reseller.withColumnRenamed("Reseller", "reseller_name")
df_reseller = df_reseller.withColumnRenamed("City", "city")
df_reseller = df_reseller.withColumnRenamed("StateProvince", "state_province")
df_reseller = df_reseller.withColumnRenamed("CountryRegion", "country_region")

# 3) Trim de columnas de texto
df_reseller = (df_reseller
      .withColumn("business_type", F.trim(F.col("business_type")))
      .withColumn("reseller_name", F.trim(F.col("reseller_name")))
      .withColumn("city", F.trim(F.col("city")))
      .withColumn("state_province", F.trim(F.col("state_province")))
      .withColumn("country_region", F.trim(F.col("country_region")))
)

#Eliminamos duplicados
df_reseller = df_reseller.dropDuplicates(["reseller_id"])

#Eliminamos valores nulos (solo campos clave)
df_reseller = df_reseller.na.drop(subset=["reseller_id", "reseller_name"])


**Limpieza Sales**

In [0]:
#Cambiar nombre de las columas
df_sales = df_sales.withColumnRenamed("SalesOrderNumber", "sales_order_number")
df_sales = df_sales.withColumnRenamed("OrderDate", "order_date")
df_sales = df_sales.withColumnRenamed("ProductKey", "product_id")
df_sales = df_sales.withColumnRenamed("ResellerKey", "reseller_id")
df_sales = df_sales.withColumnRenamed("EmployeeKey", "employee_id")
df_sales = df_sales.withColumnRenamed("SalesTerritoryKey", "salesterritory_id")
df_sales = df_sales.withColumnRenamed("Quantity", "quantity")
df_sales = df_sales.withColumnRenamed("Unit_Price", "unit_price")
df_sales = df_sales.withColumnRenamed("Sales", "sales")
df_sales = df_sales.withColumnRenamed("Cost", "cost")

#Convertir tipo de dato Fecha
df_sales = df_sales.withColumn("order_date", F.to_date(F.col("order_date")))

#Eliminar duplicados
df_sales = df_sales.dropDuplicates(["sales_order_number", "product_id"])

#Eliminamos valores nulos (solo campos clave)
df_sales = df_sales.na.drop(subset=["sales_order_number", "product_id", "order_date"])


**Limpieza Salesperson**

In [0]:
#Cambiar nombre de las columas
df_salesperson = df_salesperson.withColumnRenamed("EmployeeKey", "salesperson_key")
df_salesperson = df_salesperson.withColumnRenamed("EmployeeID", "salesperson_id")
df_salesperson = df_salesperson.withColumnRenamed("Salesperson", "salesperson")
df_salesperson = df_salesperson.withColumnRenamed("Title", "title")
df_salesperson = df_salesperson.withColumnRenamed("UPN", "email")

#Trim de columnas de texto
df_salesperson = (df_salesperson
      .withColumn("salesperson", F.trim(F.col("salesperson")))
      .withColumn("title", F.trim(F.col("title")))
      .withColumn("email", F.trim(F.col("email")))
)

# Eliminamos duplicados
df_salesperson = df_salesperson.dropDuplicates(["salesperson_key"])

# Eliminamos valores nulos (solo campos clave)
df_salesperson = df_salesperson.na.drop(subset=["salesperson_key", "salesperson"])


**Limpieza Targets**

In [0]:
#Cambio de tipo fecha 
df_targets = df_targets.withColumn("TargetMonth", to_date("TargetMonth"))
df_targets = df_targets.withColumnRenamed("TargetMonth", "target_month")


In [0]:
#Usamos la base de datos y el schema para poder trabajar
spark.sql(f"USE CATALOG `{CATALOG}`")
spark.sql(F"USE SCHEMA silver_schema")

**Escribir en Silver**

In [0]:
def escribir_to_silver_table(df, table_name):
  df.write.format("delta")\
  .mode("overwrite")\
  .option("overwriteSchema", "true")\
  .saveAsTable(f"`{CATALOG}`.silver_schema.{table_name}")

escribir_to_silver_table(df_product, "product_silver")
escribir_to_silver_table(df_region, "region_silver")
escribir_to_silver_table(df_reseller, "reseller_silver")
escribir_to_silver_table(df_sales, "sales_silver")
escribir_to_silver_table(df_salesperson, "salesperson_silver")
escribir_to_silver_table(df_SalespersonRegion, "salespersonRegion_silver")
escribir_to_silver_table(df_targets, "targets_silver")



**Escribir en bronze ADSL**

In [0]:
silver_base_path = (f"abfss://silver@{ADLS_ACCOUNT}.dfs.core.windows.net")

df_product.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(f"{silver_base_path}/product_silver")

df_region.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(f"{silver_base_path}/region_silver")

df_reseller.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(f"{silver_base_path}/reseller_silver")

df_sales.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(f"{silver_base_path}/sales_silver")

df_salesperson.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(f"{silver_base_path}/salesperson_silver")

df_SalespersonRegion.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(f"{silver_base_path}/salespersonregion_silver")

df_targets.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(f"{silver_base_path}/targets_silver")
