# Silver Fact Order Line

**Tipo de tabla:** Hechos de lineas de detalles de ventas

**Origen:**  `orders_header` `orders_details` 

**Destino:** `fact_order_line`

## Lectura de datos parquet de Bronze

In [None]:
from pyspark.sql import SparkSession
from dotenv import load_dotenv
import os

load_dotenv("/home/jovyan/work/.env")

spark = SparkSession.builder.appName("silver_order_line").config("spark.driver.memory", "4g").getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", "8")  # o menos, 4, etc.

bronze_path = os.getenv("BRONZE_PATH")
silver_path = os.getenv("SILVER_PATH")

## orders_header

Se genera nuevo Dataframe de spark desde tablas bronze.

In [None]:
# Lectura de headers y limpieza
directory_path = os.path.join(bronze_path, "orders_header")
bronze_order_header_df = spark.read.parquet(directory_path)

# Display esquema y filas
bronze_order_header_df.printSchema()
bronze_order_header_df.count()


root
 |-- order_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- canal: string (nullable = true)
 |-- tipo_pago: string (nullable = true)
 |-- flag_promo_pedido: integer (nullable = true)
 |-- order_net_amount: double (nullable = true)
 |-- order_gross_amount: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = true)
 |-- source_file: string (nullable = true)



2810000

In [4]:
#Creacion de tabla de paso para limpieza
# esto para utilziar SQL en los procesos de estandarización
bronze_order_header_df.createOrReplaceTempView("temp_orders_header")

## orders_lines

In [None]:
# lectura de orders lines
directory_path = os.path.join(bronze_path, "orders_detail")
bronze_order_detail_df = spark.read.parquet(directory_path)

# Display esquema y filas
bronze_order_detail_df.printSchema()
bronze_order_detail_df.count()

root
 |-- order_id: integer (nullable = true)
 |-- line_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- product_category: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- qty_units: integer (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- discount_amount: double (nullable = true)
 |-- promo_flag: integer (nullable = true)
 |-- line_net_amount: double (nullable = true)
 |-- order_net_amount_replicated: double (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = true)
 |-- source_file: string (nullable = true)



11241634

In [6]:
#Creacion de tabla de paso para limpieza
# esto para utilziar SQL en los procesos de estandarización
bronze_order_detail_df.createOrReplaceTempView("temp_orders_detail")

## Tablas extras dimensionales para posteriores cruces

In [None]:
# tablas dimensionales para cruce de info
customer_path= os.path.join(silver_path, "dim_customer.parquet")
spark.read.parquet(customer_path).createOrReplaceTempView("dim_customer")


product_path= os.path.join(silver_path, "dim_product.parquet")
spark.read.parquet(product_path).createOrReplaceTempView("dim_product")


date_path= os.path.join(silver_path, "dim_date.parquet")
spark.read.parquet(date_path).createOrReplaceTempView("dim_date")

## Limpieza de Silver
### Reglas de Limpieza aplicada:
* Tipos Correctos
* Eliminacion de Duplicados por customer_id
* Evitar * en selects

### Reglas de Transformacion:
* Se agrega dato dummy para consistencia en productos no existentes
* Estandarizacion en lenguaje de nombres de columnas

In [12]:
# ORDER HEADERS
query = """
with cte_order_header AS (
   SELECT order_id
       , customer_id
       , order_date
       , channel
       , pay_type
       , flag_promo_pedido
       , order_net_amount
       , order_gross_amount
       , currency
   FROM (
       SELECT CAST(order_id AS INT) AS order_id
            , cast(customer_id AS INT) AS customer_id
            , CAST(order_date AS DATE) AS order_date
            , TRIM(CAST(canal AS VARCHAR(30))) AS channel
            , TRIM(CAST(tipo_pago AS VARCHAR(30))) AS pay_type
            , CAST(flag_promo_pedido AS INT) AS flag_promo_pedido
            , CAST(order_net_amount AS NUMERIC(18,2)) AS order_net_amount
            , CAST(order_gross_amount AS NUMERIC(18,2)) AS order_gross_amount
            , TRIM(CAST(currency AS VARCHAR(5))) AS currency
            , ROW_NUMBER() OVER(PARTITION BY order_id ORDER BY order_date DESC) AS RW
        FROM temp_orders_header
    ) AS A
    WHERE RW = 1
),
cte_order_line AS (
   SELECT order_id
       , line_id
       , product_id
       , qty_units
       , unit_price
       , discount_amount
       , promo_flag
       , line_net_amount
       , order_net_amount_replicated
   FROM (
       SELECT CAST(order_id AS INT) AS order_id
           , CAST(line_id AS INT) AS line_id
           , CAST(product_id AS INT) AS product_id
           , CAST(qty_units AS INT) AS qty_units
           , CAST(unit_price AS NUMERIC(18,2)) AS unit_price
           , CAST(discount_amount AS NUMERIC(18,2)) AS discount_amount
           , CAST(promo_flag AS INT) AS promo_flag
           , CAST(line_net_amount AS NUMERIC(18,2)) AS line_net_amount
           , CAST(order_net_amount_replicated AS NUMERIC(18,2)) AS order_net_amount_replicated
           , ROW_NUMBER() OVER(PARTITION BY order_id, line_id ORDER BY line_id DESC) AS RW
        FROM temp_orders_detail
    ) AS A
    WHERE RW = 1
)
SELECT MD5(concat_ws('||', COALESCE(head.order_id, line.order_id, 0), COALESCE(line.line_id, 0))) AS order_line_key
    , COALESCE(head.order_id, line.order_id, 0) AS order_id
    , COALESCE(line.line_id, 0) AS line_id
    , pro.product_id
    , cus.customer_id
    , dat.date_id AS order_date_id 
    , CAST(date_format(head.order_date, 'yyyyMM') AS INT) AS period
    , head.channel
    , head.pay_type
    , head.currency
    , line.promo_flag
    , line.qty_units
    , line.unit_price
    , line.line_net_amount
    , line.order_net_amount_replicated
FROM cte_order_line AS line
LEFT JOIN cte_order_header AS head
    ON line.order_id = head.order_id
LEFT JOIN dim_customer AS cus
    ON head.customer_id = cus.customer_id
LEFT JOIN dim_product AS pro
    ON line.product_id = pro.product_id
LEFT JOIN dim_date AS dat
    ON head.order_date = dat.date
WHERE
 ( head.order_net_amount > 0 OR line.qty_units > 0)
"""
#  11.241.634
# Execute the SQL query and get the result as a new DataFrame
lines_df = spark.sql(query)

# Display the results
lines_df.printSchema()
lines_df.count()

root
 |-- order_line_key: string (nullable = false)
 |-- order_id: integer (nullable = false)
 |-- line_id: integer (nullable = false)
 |-- product_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- order_date_id: integer (nullable = true)
 |-- period: integer (nullable = true)
 |-- channel: string (nullable = true)
 |-- pay_type: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- promo_flag: integer (nullable = true)
 |-- qty_units: integer (nullable = true)
 |-- unit_price: decimal(18,2) (nullable = true)
 |-- line_net_amount: decimal(18,2) (nullable = true)
 |-- order_net_amount_replicated: decimal(18,2) (nullable = true)



11241634

## Escritura de datos
* Todo se escribe en parquet, en carpetas de silver.
* Se escribe con metodo upsert para posteriores ingestas masivas de datos
* Posibilidad de realizar SCD para preservar cambios historicos en dimensiones

**Mejora** utilziar deltas tables para control de merges y log (ACID)

In [None]:
# Escritura de dim_product en silver como parquet
output_path =  os.path.join(silver_path, "fact_order_line.parquet")

lines_df.write.mode("overwrite").format("parquet").option("maxRecordsPerFile", 500000).partitionBy("period").save(output_path)

## Validaciones.

In [14]:
summary_stats = spark.read.parquet(output_path).describe()
summary_stats.show()

+-------+--------------------+-----------------+------------------+------------------+------------------+-------------------+-----------+--------+--------+-------------------+------------------+-----------------+------------------+---------------------------+------------------+
|summary|      order_line_key|         order_id|           line_id|        product_id|       customer_id|      order_date_id|    channel|pay_type|currency|         promo_flag|         qty_units|       unit_price|   line_net_amount|order_net_amount_replicated|            period|
+-------+--------------------+-----------------+------------------+------------------+------------------+-------------------+-----------+--------+--------+-------------------+------------------+-----------------+------------------+---------------------------+------------------+
|  count|            11241634|         11241634|          11241634|          11241634|          11241634|           11241634|   11241634|11241634|11241634|        