In [0]:
from pyspark.sql.functions import col, lit, when, coalesce, upper

TABLE_BRONZE = "saude_global.bronze.owid_covid_data_raw"
TABLE_SILVER = "saude_global.silver.daily_health_metrics"

df_bronze = spark.table(TABLE_BRONZE)

df_silver = df_bronze.select(
    upper(col("continent")).alias("continent_clean"),
    
    col("iso_code"),
    col("location").alias("country_name"),
    col("date").cast("date").alias("report_date"),
    
    col("population"), 
    
    when(
        coalesce(col("new_cases"), lit(0)) < 0, lit(0)
    ).otherwise(
        coalesce(col("new_cases"), lit(0))
    ).alias("daily_new_cases"),
    
    coalesce(col("new_deaths"), lit(0)).alias("daily_new_deaths"),
    coalesce(col("new_vaccinations_smoothed"), lit(0)).alias("daily_vaccinations")
)

df_silver = df_silver.filter(
    (col("continent_clean").isNotNull()) &  
    (col("population").isNotNull()) & 
    (col("population") > 0)                
)

df_silver = df_silver.withColumn(
    "case_incidence_per_100k",
    (col("daily_new_cases") / col("population")) * 100000
)

df_silver.write.format("delta") \
    .partitionBy("continent_clean") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(TABLE_SILVER)
