# Layer: Silver (Refined)
**Project:** Lean Logistics Data Pipeline  
**Business Domain:** E-commerce (Olist Dataset)

---
## üìë Notebook Information
| Version | Date | Author | Summary of Changes |
| :--- | :--- | :--- | :--- |
| v1.0 | 2026-02-20 | T√°ssia Marchito | Initial ingestion from Bronze, schema enforcement, and PK constraints. |
| v1.1 | 2026-02-20 | T√°ssia Marchito | Added `tb_order_items` with Decimal(10,2) for financial precision. |
| v1.2 | 2026-02-20 | T√°ssia Marchito | Added Implemented `ts_` (timestamp) and `dt_` (date) prefixes for time-based columns. |
| v2.0 | 2026-02-20 | T√°ssia Marchito | Implemented `try_cast` for fault tolerance and strict data quality filtering for `tb_order_reviews`. |

---
## üéØ Objectives
The Silver layer represents the "Single Source of Truth". Our goal is to transform raw data into high-quality business entities.
* **Schema Enforcement:** Strict data typing using `cd_`, `ts_`, `dt_`, `vl_`, and `nm_`/`ds_` prefixes.
* **Fault Tolerance:** Usage of `try_cast` and `try_to_date` to handle malformed strings and column shifts from source APIs.
* **Data Cleansing:** Filtering corrupted rows in `tb_order_reviews` by validating `review_id` length and `review_score` range.
* **Governance:** Applying Unity Catalog constraints (PKs RELY) and removing redundant Bronze audit columns (`ts_ingestion`/`_ts_ingestion`).
* **Standardization:** Ensuring all 9 tables are correctly cataloged and deduplicated.

In [0]:
from pyspark.sql.functions import col, current_timestamp, trim, upper, to_timestamp, to_date, length, expr

In [0]:
# 1. Configura√ß√£o Exata fornecida
silver_config = {
    "tb_orders": {
        "pk": ["order_id"], 
        "ts": ["order_purchase_timestamp", "order_approved_at", "order_delivered_carrier_date", "order_delivered_customer_date"], 
        "dt": ["order_estimated_delivery_date"], 
        "vl": []
    },
    "tb_order_items": {
        "pk": ["order_id", "order_item_id"], 
        "ts": ["shipping_limit_date"], 
        "dt": [], 
        "vl": ["price", "freight_value"]
    },
    "tb_order_reviews": {
        "pk": ["review_id"], 
        "ts": ["review_answer_timestamp"], 
        "dt": ["review_creation_date"], 
        "vl": ["review_score"]
    },
    "tb_order_payments": {
        "pk": ["order_id", "payment_sequential"], 
        "ts": [], 
        "dt": [], 
        "vl": ["payment_value"]
    },
    "tb_products": {
        "pk": ["product_id"], 
        "ts": [], 
        "dt": [], 
        "vl": ["product_weight_g", "product_length_cm", "product_height_cm", "product_width_cm"]
    },
    "tb_customers": {
        "pk": ["customer_id"], 
        "ts": [], 
        "dt": [], 
        "vl": []
    },
    "tb_sellers": {
        "pk": ["seller_id"], 
        "ts": [], 
        "dt": [], 
        "vl": []
    },
    "tb_geolocation": {
        "pk": ["geolocation_zip_code_prefix", "geolocation_lat", "geolocation_lng"], 
        "ts": [], 
        "dt": [], 
        "vl": []
    },
    "tb_product_category_name_translation": {
        "pk": ["product_category_name"], 
        "ts": [], 
        "dt": [], 
        "vl": []
    }
}

def get_col_info(col_name, config):
    c = col_name.lower()
    # Prioridade para tipos definidos
    if col_name in config["ts"]: return f"ts_{c.replace('_timestamp','').replace('_at','')}", "timestamp"
    if col_name in config["dt"]: return f"dt_{c.replace('_date','')}", "date"
    if col_name in config["vl"]: return f"vl_{c}", "decimal"
    
    # Prefixos por padr√£o de nome
    if any(x in c for x in ["_id", "id", "_code", "_prefix", "sequential", "lat", "lng"]):
        return f"cd_{c.replace('cd_', '')}", "string"
    
    prefix = "nm_" if any(x in c for x in ["name", "city", "state"]) else "ds_"
    return f"{prefix}{c}", "string"

# 2. Loop de Processamento
for table, cfg in silver_config.items():
    source = f"cat_tm_services_bronze.db_logistics.{table}"
    target = f"cat_tm_services_silver.db_logistics.{table}"
    
    print(f"üíé Refining Silver: {table}")
    try:
        df = spark.read.table(source)
        
        # --- ETAPA DE QUALIDADE BLINDADA ---
        if table == "tb_order_reviews":
            # Usamos try_cast diretamente no filtro para evitar o erro de malformed input
            # Se o valor n√£o for um inteiro v√°lido (como uma data), o try_cast retorna NULL
            # e o filtro descarta a linha sem quebrar o processo.
            df = df.filter(length(col("review_id")) == 32) \
                   .filter(expr("try_cast(review_score as int)").isin(1, 2, 3, 4, 5))
            print(f"   ‚ö†Ô∏è Qualidade: Linhas malformadas neutralizadas.")

        # --- MAPEAMENTO E TRANSFORMA√á√ÉO ---
        transform_exprs = []
        new_pk_list = []
        
        audit_cols = ["ts_ingestion", "_ts_ingestion", "_source_file"]
        business_cols = [c for c in df.columns if c not in audit_cols]
        
        for c in business_cols:
            new_name, d_type = get_col_info(c, cfg)
            if c in cfg["pk"]: new_pk_list.append(new_name)
            
            # Casts seguros para colunas de destino
            if d_type == "timestamp":
                c_expr = expr(f"try_to_timestamp({c})")
            elif d_type == "date":
                c_expr = expr(f"try_to_date({c})")
            elif d_type == "decimal":
                # Tamb√©m usamos try_cast aqui para seguran√ßa total
                c_expr = expr(f"try_cast({c} as decimal(10,2))")
            else:
                c_expr = upper(trim(col(c)))
            
            transform_exprs.append(c_expr.alias(new_name))

        # --- GERA√á√ÉO DA SILVER E DEDUPLICA√á√ÉO ---
        df_silver = df.select(*transform_exprs) \
                      .dropDuplicates(new_pk_list) \
                      .withColumn("ts_silver_at", current_timestamp())

        # Persist√™ncia
        df_silver.write.format("delta").mode("overwrite") \
                 .option("overwriteSchema", "true").saveAsTable(target)
        
        # Constraints
        pk_sql = ", ".join(new_pk_list)
        for c_pk in new_pk_list:
            spark.sql(f"ALTER TABLE {target} ALTER COLUMN {c_pk} SET NOT NULL")
        spark.sql(f"ALTER TABLE {target} ADD CONSTRAINT pk_{table}_slv PRIMARY KEY({pk_sql}) RELY")
        
        print(f"   ‚úÖ Sucesso!")

    except Exception as e:
        print(f"   ‚ùå Erro em {table}: {e}")