In [0]:
# Importando as bibliotecas necessárias
from delta.tables import *
# Lendo o DataFrame original do Delta Lake
despachantes_df = spark.table("despachantes")
despachantes_df.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [0]:
# Criando um novo DataFrame com os dados que serão inseridos ou atualizados
novo_despachante = spark.createDataFrame([(1, "João", "Ativo", "São Paulo", 10000, "2023-07-05"),
                                          (11, "Maria", "Inativo", "Rio de Janeiro", 5000, "2023-07-05")],
                                         ["id", "nome", "status", "cidade", "vendas", "data"])

# Especificando o caminho para o diretório Delta Lake onde os dados serão armazenados
delta_path = "/Volumes/workspace/default/data_files/despachantes"

# Salvando o DataFrame original no formato Delta Lake
despachantes_df.write.format("delta").mode("overwrite").save(delta_path)

# Carregando o DeltaTable a partir do caminho especificado
delta_table = DeltaTable.forPath(spark, delta_path)

# Definindo a condição de merge (como exemplo, vamos usar a coluna "id")
condition = "target.id = source.id"

# Executando o merge/upsert
delta_table.alias("target") \
    .merge(novo_despachante.alias("source"), condition) \
    .whenMatchedUpdate(set={"nome": "source.nome", "status": "source.status", "cidade": "source.cidade",
                            "vendas": "source.vendas", "data": "source.data"}) \
    .whenNotMatchedInsert(values={"id": "source.id", "nome": "source.nome", "status": "source.status",
                                  "cidade": "source.cidade", "vendas": "source.vendas", "data": "source.data"}) \
    .execute()

# Lendo o DataFrame resultante após o merge/upsert
despachantes_atualizados_df = spark.read.format("delta").load(delta_path)

# Exibindo o DataFrame resultante
despachantes_atualizados_df.orderBy("id").show()


+---+-------------------+-------+--------------+------+----------+
| id|               nome| status|        cidade|vendas|      data|
+---+-------------------+-------+--------------+------+----------+
|  1|               João|  Ativo|     São Paulo| 10000|2023-07-05|
|  2|    Deolinda Vilela|  Ativo| Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles|  Ativo|  Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles|  Ativo|  Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas|  Ativo|  Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças|  Ativo|  Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça|  Ativo|   Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez|  Ativo|  Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz|  Ativo|  Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira|  Ativo|  Porto Alegre|     0|2020-09-05|
| 11|              Maria|Inativo|Rio de Janeiro|  5000|2023-07-05|
+---+-------------------+-------+--------------+------+-------

In [0]:
dbutils.fs.rm("/Volumes/workspace/default/data_files/despachantes", recurse=True)

True