In [0]:
from datetime import datetime
from pyspark.sql import Row

def log_etl(step: str, status: str, message: str):
    log_df = spark.createDataFrame([Row(
        step=step,
        status=status,
        timestamp=datetime.now(),
        message=message
    )])
    log_df.write.format("delta").mode("append").saveAsTable("silver.log_etl")

In [0]:
df_customer = spark.sql("""
WITH deduplicated AS (
  SELECT *
  FROM (
    SELECT
      CASE
        WHEN customer_id = 'nan' THEN sha2(
          concat_ws(coalesce(first__name, '-1')
          ,coalesce(last__name, '-1') 
          ,coalesce(cell_phone, '-1') 
          ,coalesce(email, '-1') ), 256)
        ELSE customer_id
      END           AS customer_id,
      first__name   AS first_name,
      last__name    AS last_name,
      cell_phone    AS phone,
      arm_status,
      CAST(CASE 
       WHEN LOWER(arm__last_bill_date) = 'nat' THEN NULL
       ELSE arm__last_bill_date
       END AS TIMESTAMP) AS arm_lastbilldate,
      CAST(CASE
        WHEN LOWER(arm__expires) = 'nat' THEN NULL
        ELSE arm__expires
        END AS TIMESTAMP)        AS arm_expires,
      CASE
        WHEN payment__token = 'nan' THEN NULL
        ELSE payment__token
      END               AS payment_token,
      ROW_NUMBER() OVER (PARTITION BY customer_id, cell_phone ORDER BY customer_id) AS rn
    FROM bronze.cleaned_customer_data
  ) t
  WHERE rn = 1
)

SELECT * EXCEPT (rn)
FROM deduplicated
""")
df_customer.display()


In [0]:
%sql
create schema if not exists silver

In [0]:
try:
    df_customer.write.format("delta").mode("overwrite").saveAsTable("silver.customer")
    log_etl("Load Customer", "SUCCESS", "Customer data successfully loaded into silver.customer")
except Exception as e:
    log_etl("Load Customer", "ERROR", f"Failed to load Customer data: {str(e)}")

In [0]:
df_vehicle  = spark.sql("""
WITH filtered AS (
  SELECT
    CASE
        WHEN customer_id = 'nan' THEN sha2(
          concat_ws(coalesce(first__name, '-1')
          ,coalesce(last__name, '-1') 
          ,coalesce(cell_phone, '-1') 
          ,coalesce(email, '-1') ), 256)
        ELSE customer_id
      END           AS customer_id,
    extra_number_rfid as rfid
  FROM bronze.cleaned_customer_data
  WHERE extra_number_rfid IS NOT NULL AND extra_number_rfid != ''
),
deduplicated AS (
  SELECT *
  FROM (
    SELECT *,
           ROW_NUMBER() OVER (PARTITION BY customer_id, rfid ORDER BY customer_id) AS rn
    FROM filtered
  ) t
  WHERE rn = 1
)

SELECT * EXCEPT (rn)
FROM deduplicated""")
df_vehicle.display()

In [0]:
try:
    df_vehicle.write.format("delta").mode("overwrite").option('overwriteSchema', True).saveAsTable("silver.vehicle")
    log_etl("Load Vehicle", "SUCCESS", "Vehicle data successfully loaded into silver.vehicle")
except Exception as e:
    log_etl("Load Vehicle", "ERROR", f"Failed to load Vehicle data: {str(e)}")

In [0]:
%sql
SELECT * FROM silver.customer
WHERE payment_token IS NULL;

In [0]:
%sql
SELECT * FROM silver.customer
WHERE arm_status = 'Active'
  AND (arm_lastbilldate < current_date() OR arm_expires < current_date());

In [0]:
%sql
SELECT * FROM silver.customer
WHERE arm_expires > current_date();