In [0]:
from pyspark.sql.functions import col, lit, monotonically_increasing_id, expr
from pyspark.sql.types import TimestampType, StructType, StructField, IntegerType, FloatType, DateType

## Transform Product table

In [0]:
product_path = '/mnt/bronze/SalesLT/Product/Product.parquet'
categories_path = '/mnt/bronze/SalesLT/ProductCategory/ProductCategory.parquet'
output_path = '/mnt/silver/SalesLT/STG_Products/'

products_df = spark.read.parquet(product_path)
categories_df = spark.read.parquet(categories_path)

# Rename column 'Name' in categories_df
categories_df = categories_df.withColumnRenamed("Name", "category_name")

stg_products_df = products_df \
    .join(categories_df, products_df["ProductCategoryID"] == categories_df["ProductCategoryID"], "left") \
    .withColumn("ProductKey", monotonically_increasing_id() + 1) \
    .withColumn("ProductID", col("ProductID")) \
    .withColumn("ProductName", col("Name")) \
    .withColumn("Color", col("Color")) \
    .withColumn("ColorCategory", expr("""
        CASE 
            WHEN Color IN ('Red', 'Orange', 'Yellow') THEN 'Warm'
            WHEN Color IN ('Blue', 'Green', 'Purple') THEN 'Cool'
            ELSE 'Neutral'
        END
    """)) \
    .withColumn("StandardCost", col("StandardCost")) \
    .withColumn("ListPrice", col("ListPrice")) \
    .withColumn("ProductMaxGrossProfit", col("ListPrice") - col("StandardCost")) \
    .withColumn("CategoryName", col("category_name"))

stg_products_df = stg_products_df.dropDuplicates()

# Drop rows with ListPrice is 0
stg_products_df = stg_products_df.select(col("ProductKey"), col("ProductID"), col("ProductName"), col("Color"), col("ColorCategory"), col("StandardCost"), col("ListPrice"), col("ProductMaxGrossProfit"), col("CategoryName")).filter(col("ListPrice") > 0)

stg_products_df.write.format("delta").mode("overwrite").save(output_path)

## Transform Customer table

In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import col, row_number

path = '/mnt/bronze/SalesLT/Customer/Customer.parquet'
output_path = '/mnt/silver/SalesLT/STG_Customers/'

df = spark.read.parquet(path)
column = df.columns

# Get rows with EmailAddres is not null and FirsName is not null and LastName is not null
filtered_df = df.select(    
  col("CustomerID"),
  col("FirstName"),
  col("LastName"),
  col("EmailAddress"),
  col("Phone"),
  col("CompanyName"),
  col("ModifiedDate")
).filter(col("EmailAddress").isNotNull())

# Remove duplicate rows
filtered_df = filtered_df.dropDuplicates()

# Crear una ventana para generar una clave incremental
window_spec = Window.orderBy(col("CustomerID"))

# Agregar la columna CustomerKey con valores autogenerados
filtered_df_with_key = filtered_df.withColumn(
    "CustomerKey",
    row_number().over(window_spec)
)

filtered_df = filtered_df_with_key.select(
  col("CustomerKey"),    
  col("CustomerID"),
  col("FirstName"),
  col("LastName"),
  col("EmailAddress"),
  col("Phone"),
  col("CompanyName"),
)

filtered_df.write.format("delta").mode("overwrite").save(output_path)

## Transform  SalesOrderDetail & SalesOrderHeader tables

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, DateType, FloatType
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window


path_sales_order_detail = '/mnt/bronze/SalesLT/SalesOrderDetail/SalesOrderDetail.parquet'
path_sales_order_header = '/mnt/bronze/SalesLT/SalesOrderHeader/SalesOrderHeader.parquet'
output_path_sales_order_detail = '/mnt/silver/SalesLT/STG_SalesOrderDetail/'
output_path_sales_order_header = '/mnt/silver/SalesLT/STG_SalesOrderHeader/'

sales_order_detail_df = spark.read.parquet(path_sales_order_detail)

clean_sales_order_detail_df = sales_order_detail_df.dropDuplicates()
clean_sales_order_detail_df.write.format("delta").mode("overwrite").save(output_path_sales_order_detail)

sales_order_header_df = spark.read.parquet(path_sales_order_header)
clean_sales_order_header_df = sales_order_header_df.dropDuplicates()
clean_sales_order_header_df.write.format("delta").mode("overwrite").save(output_path_sales_order_header)