In [0]:
from pyspark.sql import functions as F, Window as W


b = spark.table("portfolio_jobs.bronze_linkedin_jobs")

def col_if(name):
    return name if name in b.columns else None

job_url   = col_if("job_url") or col_if("url")
job_id    = col_if("job_id") or col_if("job_posting_id")
title     = col_if("job_title") or col_if("title")
company   = col_if("company_name") or col_if("company")
location  = col_if("job_location") or col_if("location")
posted_at = col_if("posted_at") or col_if("date_posted") or col_if("posted_time")

# This may not be necessary, but it helps my mental model.
# Make a reference to bronze that we will turn into silver:
s = b

if posted_at:
    s = s.withColumn("posted_ts", F.to_timestamp(F.col(posted_at)))

if title:
    s = s.withColumn("job_title_norm", F.trim(F.regexp_replace(F.lower(F.col(title)), r"\s+", " ")))

if company:
    s = s.withColumn("company_norm", F.trim(F.regexp_replace(F.lower(F.col(company)), r"\s+", " ")))

if location:
    s = s.withColumn("location_norm", F.trim(F.regexp_replace(F.lower(F.col(location)), r"\s+", " ")))

# Basic completeness filter
keepers = [c for c in [job_url, job_id, title, company] if c]
for c in keepers:
    s = s.filter(F.col(c).isNotNull())

# Dedupe (prefer latest ingest for same key)
dedupe_key = job_id or job_url
w = W.partitionBy(dedupe_key).orderBy(F.col("_ingest_ts").desc())
s = (s
     .withColumn("_rn", F.row_number().over(w))
     .filter(F.col("_rn") == 1)
     .drop("_rn"))

(s.write
  .mode("overwrite")
  .format("delta")
  .saveAsTable("portfolio_jobs.silver_linkedin_jobs"))
