In [0]:
from pyspark.sql import functions as f
from delta.tables import *
from pyspark.sql.catalog import Catalog

In [0]:
dbutils.fs.ls('/Volumes/workspace/default/qa')

In [0]:
sessao = '/Volumes/workspace/default/qa/votacao_secao_2022_ES.csv'

df = (
    spark
    .read
    .csv(
        sessao, 
        header=True, 
        inferSchema=True, 
        sep=';', 
        encoding='latin1'
    )
)

df.limit(10).display()

In [0]:
df.count()

In [0]:
df_dim_loc_votacao = (
    df
    .select(
        ['SG_UF', 'NM_UE', 'CD_MUNICIPIO', 'NM_MUNICIPIO', 'NR_ZONA', 'NR_SECAO', 'NR_LOCAL_VOTACAO', 'NM_LOCAL_VOTACAO']
    )
    .distinct()
    .withColumn('SQ_LOCAL_VOTACAO', f.md5(f.trim(f.concat('SG_UF', 'CD_MUNICIPIO', 'NR_ZONA', 'NR_SECAO', 'NR_LOCAL_VOTACAO'))))
)

display(df_dim_loc_votacao.limit(10))

In [0]:
%sql show catalogs

In [0]:
spark.sql('create database if not exists workspace.default')
# df_sessao.write.mode('overwrite').saveAsTable('workspace.default.votacao_secao')

In [0]:
catalog = spark.catalog

# Verifica se a tabela existe no catálogo e schema especificados
if catalog.tableExists(f"workspace.default.dim_loc_votacao"):

    dim_loc_votacao = DeltaTable.forName(spark, "workspace.default.dim_loc_votacao")

    (    
        dim_loc_votacao
        .alias('desttable')
        .merge(
            df_dim_loc_votacao.alias('updates'), 
                'updates.SQ_LOCAL_VOTACAO = desttable.SQ_LOCAL_VOTACAO'
            )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )

else:
    (
        dim_loc_votacao
        .write
        .format('delta')
        .mode('overwrite')
        .saveAsTable('workspace.default.dim_loc_votacao')
    )

In [0]:
%sql 
SELECT
  *
FROM 
  default.dim_loc_votacao
LIMIT 10