In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark.catalog.setCurrentCatalog("mini_dolar_futuro")
df_bronze = spark.read.table("mini_dolar_futuro.bronze.wdofut_raw")

df_cleaned = (df_bronze
    .select(
        F.to_date(F.col("data"), "dd/MM/yyyy").alias("data_pregao"), 
        F.col("Maxima").alias("maxima"),
        F.col("Minima").alias("minima"),
        F.col("Fechamento").alias("fechamento"),
        (F.col("Maxima") - F.col("Minima")).alias("amplitude_diaria")
    )
    .where("data_pregao IS NOT NULL")
)

df_stats = (df_cleaned.agg(
    F.stddev("amplitude_diaria").alias("stddev_amplitude")
).collect()[0])

stddev_amplitude = df_stats["stddev_amplitude"]

df_final = (df_cleaned
    .withColumn("pivot_point", F.col("fechamento")) 
    
    .withColumn("R1", F.round(F.col("fechamento") + stddev_amplitude * 0.5, 2))
    .withColumn("R2", F.round(F.col("fechamento") + stddev_amplitude * 1.0, 2))
    .withColumn("R3", F.round(F.col("fechamento") + stddev_amplitude * 1.5, 2))
    
    .withColumn("S1", F.round(F.col("fechamento") - stddev_amplitude * 0.5, 2))
    .withColumn("S2", F.round(F.col("fechamento") - stddev_amplitude * 1.0, 2))
    .withColumn("S3", F.round(F.col("fechamento") - stddev_amplitude * 1.5, 2))
    .select(
        "data_pregao",
        "pivot_point",
        "R1", "R2", "R3",
        "S1", "S2", "S3"
    )
)

(df_final.write
  .format("delta")
  .option("overwriteSchema", "true")
  .mode("overwrite")
  .saveAsTable("mini_dolar_futuro.silver.wdofut_tratado")
)