# Importações

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Carregar dados da camada Bronze

In [0]:
df_bronze = spark.table("bronze_ooni")

In [0]:
%sql describe bronze_ooni

# Limpeza e tratamento dos dados

### Padronização e tipagem

In [0]:
df = (
    df_bronze
    .withColumn("measurement_ts", F.to_timestamp("measurement_start_time"))
    .withColumn("probe_asn", F.regexp_replace("probe_asn", "AS", "").cast("int"))
    .withColumn("probe_cc", F.upper(F.col("probe_cc")))
)


### Nivelamento (flatten) dos dados

In [0]:
df = df.select(
    "measurement_uid",
    "measurement_ts",
    "probe_cc",
    "probe_asn",
    "test_name",
    "anomaly",
    "confirmed",
    "failure",

    # métricas globais de bloqueio
    F.col("scores.blocking_global").alias("blocking_global"),
    F.col("scores.blocking_isp").alias("blocking_isp"),
    F.col("scores.blocking_local").alias("blocking_local"),
    F.col("scores.blocking_general").alias("blocking_general"),

    # tipo de bloqueio (vem do analysis)
    F.col("scores.analysis.blocking_type").alias("blocking_type"),

    # sinais técnicos reais (existem no schema)
    F.col("scores.facebook_dns_blocking").alias("facebook_dns_blocking"),
    F.col("scores.facebook_tcp_blocking").alias("facebook_tcp_blocking"),
    F.col("scores.analysis.whatsapp_endpoints_accessible").alias("whatsapp_endpoints_accessible")
)


df = (
    df
    .withColumn("is_anomalous", F.coalesce(F.col("anomaly"), F.lit(False)))
    .withColumn("is_confirmed_blocking", F.coalesce(F.col("confirmed"), F.lit(False)))
    .withColumn("has_failure", F.coalesce(F.col("failure"), F.lit(False)))
)


### Qualidade dos dados

In [0]:
# completude

df = (
    df
    .withColumn("has_probe_cc", F.col("probe_cc").isNotNull())
    .withColumn("has_probe_asn", F.col("probe_asn").isNotNull())
)

# deduplicação

window = Window.partitionBy("measurement_uid").orderBy(F.col("measurement_ts").desc())

df = (
    df
    .withColumn("rn", F.row_number().over(window))
    .filter(F.col("rn") == 1)
    .drop("rn")
)


### Dimensões

In [0]:
# tempo

df = (
    df
    .withColumn("year", F.year("measurement_ts"))
    .withColumn("month", F.month("measurement_ts"))
    .withColumn("day", F.dayofmonth("measurement_ts"))
)


# Registro da tabela final

In [0]:
(
    df.write
      .mode("overwrite")
      .option("overwriteSchema", "true")
      .saveAsTable("silver_ooni")
)
