In [0]:
# Seleciona o catálogo e o schema silver
spark.sql("USE CATALOG mvp")
spark.sql("USE SCHEMA silver")

In [0]:
# Carrega a tabela bronze 'dados_postos_anp' do catálogo 'mvp'
df = spark.table("mvp.bronze.dados_postos_anp")

# Salva o DataFrame como uma nova tabela 'dados_postos_anp_s' sobrescrevendo se já existir
df.write.mode("overwrite").saveAsTable("dados_postos_anp_s")

# Exibe uma amostra dos dados
display(df.limit(10))

In [0]:
# Carrega a tabela silver
df_silver = spark.table("mvp.silver.dados_postos_anp")
# Carrega a tabela bronze
df_bronze = spark.table("mvp.bronze.dados_postos_anp_s")

# Compara quantidade de colunas e linhas entre silver e bronze
comparacao = {
    "silver": {
        "colunas": len(df_silver.columns),
        "linhas": df_silver.count()
    },
    "bronze": {
        "colunas": len(df_bronze.columns),
        "linhas": df_bronze.count()
    }
}

# Exibe a comparação
display(comparacao)

In [0]:
import pyspark.sql.functions as F
import unicodedata

# Função para remover acentos
def remove_accents_udf(col):
    return F.udf(lambda x: unicodedata.normalize('NFKD', x).encode('ASCII', 'ignore').decode('utf-8') if x else None)


In [0]:

# Padronizar tipos
df = df.withColumn("CNPJ", F.col("CNPJ").cast("string")) \
       .withColumn("CEP", F.col("CEP").cast("string")) \
       .withColumn("Código Instalação i-Simp", F.col("Código Instalação i-Simp").cast("string")) \
       .withColumn("Tancagem (m³)", F.col("Tancagem (m³)").cast("float")) \
       .withColumn("Qtde de Bico", F.col("Qtde de Bico").cast("int")) \
       .withColumn("Delivery", F.when(F.upper(F.col("Delivery")) == "SIM", F.lit(True))
                              .when(F.upper(F.col("Delivery")) == "NÃO", F.lit(False))
                              .otherwise(None))


In [0]:

# Padronizar datas
df = df.withColumn("Data Publicação DOU - Autorização", 
                   F.to_date(F.col("Data Publicação DOU - Autorização"), "dd/MM/yyyy")) \
       .withColumn("Data de Vinculação a Distribuidor", 
                   F.to_date(F.col("Data de Vinculação a Distribuidor"), "dd/MM/yyyy")) \
       .withColumn("Data Autorização Delivery", 
                   F.to_date(F.when(F.col("Data Autorização Delivery").rlike("^\d{2}-\d{2}-\d{4}$"),
                                   F.col("Data Autorização Delivery"))
                             .otherwise(F.col("Data Autorização Delivery")),
                             "dd-MM-yyyy"))


In [0]:

# Função para normalizar texto
def normalize_text(col):
    return F.trim(F.regexp_replace(F.regexp_replace(col, r"\s+", " "), r",+", ","))



In [0]:
# Padronizar campos de texto e criar versão sem acentos
text_cols = ["Endereço", "COMPLEMENTO", "BAIRRO", "MUNICÍPIO", "Razão Social"]
for col in text_cols:
    df = df.withColumn(col, normalize_text(F.col(col)))
    df = df.withColumn(f"{col}_sem_acentos", remove_accents_udf(col)(F.col(col)))


In [0]:
import pyspark.sql.functions as F

# Análise de incidência de valores nulos e vazios por coluna na tabela silver
df_silver = spark.table("mvp.silver.dados_postos_anp_s")
total_rows = df_silver.count()

null_analysis = []
for col in df_silver.columns:
    null_count = df_silver.filter(F.col(col).isNull()).count()  # valores explicitamente null
    empty_count = df_silver.filter(F.col(col) == "").count() if dict(df_silver.dtypes)[col] == 'string' else None  # ausência de dado (string vazia)
    percent_null = (null_count / total_rows) * 100 if total_rows > 0 else 0
    percent_empty = (empty_count / total_rows) * 100 if (empty_count is not None and total_rows > 0) else None
    null_analysis.append((col, null_count, f"{percent_null:.2f}%", empty_count, f"{percent_empty:.2f}%" if percent_empty is not None else None))

result_df = spark.createDataFrame(null_analysis, ["coluna", "qtd_null", "percentual_null", "qtd_vazio", "percentual_vazio"])
display(result_df)