In [0]:
spark.sql("CREATE SCHEMA IF NOT EXISTS  silver")


In [0]:
df = spark.table("workspace.bronze.weather_results")
display(df)

In [0]:
from pyspark.sql.functions import col, try_to_date, round

# FLATTENIZA O JSON EM COLUNAS
df_flat = df.select(
    try_to_date(col("date"), 'yyyy-MM-dd').alias("date"),
    round(100-col("cloud_cover.afternoon"),0).alias("cloud_cover"),
    round(col("humidity.afternoon"),0).alias("humidity"),
    round(col("precipitation.total"),0).alias("precipitation"),
    round(col("temperature.min") - 272, 2).alias("temperature_min"),
    round(col("temperature.max") - 272, 2).alias("temperature_max"),
    round(col("temperature.afternoon") - 272, 2).alias("temperature_afternoon"),
    round(col("temperature.night") - 272, 2).alias("temperature_night"),
    round(col("temperature.evening") - 272, 2).alias("temperature_evening"),
    round(col("temperature.morning") - 272, 2).alias("temperature_morning"),
    round(col("pressure.afternoon"),0).alias("pressure"),
    col("wind.max.speed").alias("wind_max_speed"),
    round(col("wind.max.direction"),1).alias("wind_max_direction")
)
display(df_flat)

In [0]:
from pyspark.sql.functions import col, when, month, round

# CRIA NOVAS COLUNAS E TABELA SILVER 
df_new_columns = df_flat.select(
    "*",
    when(
        (col("wind_max_direction") >= 337.5) | (col("wind_max_direction") < 22.5), "Norte"
    ).when(
        (col("wind_max_direction") >= 22.5) & (col("wind_max_direction") < 67.5), "Nordeste"
    ).when(
        (col("wind_max_direction") >= 67.5) & (col("wind_max_direction") < 112.5), "Leste"
    ).when(
        (col("wind_max_direction") >= 112.5) & (col("wind_max_direction") < 157.5), "Sudeste"
    ).when(
        (col("wind_max_direction") >= 157.5) & (col("wind_max_direction") < 202.5), "Sul"
    ).when(
        (col("wind_max_direction") >= 202.5) & (col("wind_max_direction") < 247.5), "Sudoeste"
    ).when(
        (col("wind_max_direction") >= 247.5) & (col("wind_max_direction") < 292.5), "Oeste"
    ).when(
        (col("wind_max_direction") >= 292.5) & (col("wind_max_direction") < 337.5), "Noroeste"
    ).otherwise("Desconhecido")
    .alias("wind_max_direction_compass"),
    when(
        month(col("date")).isin(12, 1, 2), "Verão"
    ).when(
        month(col("date")).isin(3, 4, 5), "Outono"
    ).when(
        month(col("date")).isin(6, 7, 8), "Inverno"
    ).when(
        month(col("date")).isin(9, 10, 11), "Primavera"
    ).otherwise("Desconhecido")
    .alias("estacao"),
    round(((col("temperature_max") + col("temperature_min")) / 2),2).alias("temperature_avg")
)

df_new_columns.write \
    .format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("silver.fct_daily_weather")
    
display(df_new_columns)