In [0]:
from pyspark.sql.functions import when,current_date
from delta import *


In [0]:
# Caminho para o diretório Delta Lake onde os dados serão armazenados
delta_path = "/path/evolutivo"

In [0]:
# Criar um DataFrame de exemplo com informações de produtos
data = [
    (1, "Produto A", 100.0, "2022-01-01"),
    (2, "Produto B", 150.0, "2022-01-15"),
    (3, "Produto A", 110.0, "2022-02-01"),
    (4, "Produto C", 80.0, "2022-02-15"),
    (5, "Produto B", 160.0, "2022-03-01"),
]

schema = ["ID", "NomeProduto", "Preco", "DataVenda"]
df = spark.createDataFrame(data, schema)


In [0]:
# Exibir o DataFrame inicial
print("DataFrame Inicial:")
df.show()

DataFrame Inicial:
+---+-----------+-----+----------+
| ID|NomeProduto|Preco| DataVenda|
+---+-----------+-----+----------+
|  1|  Produto A|100.0|2022-01-01|
|  2|  Produto B|150.0|2022-01-15|
|  3|  Produto A|110.0|2022-02-01|
|  4|  Produto C| 80.0|2022-02-15|
|  5|  Produto B|160.0|2022-03-01|
+---+-----------+-----+----------+



In [0]:
# Salvando o DataFrame no formato Delta Lake
df.write.format("delta").mode("overwrite").save(delta_path)

In [0]:
# Criar coluna "CategoriaProduto" com base no nome do produto
df_novo = df.withColumn("CategoriaProduto", when(df["NomeProduto"] == "Produto A", "Categoria A")
                                   .when(df["NomeProduto"] == "Produto B", "Categoria B")
                                   .when(df["NomeProduto"] == "Produto C", "Categoria C")
                                   .otherwise("Outra Categoria"))

# Exibir o DataFrame após a inclusão da nova coluna "CategoriaProduto"
print("\nDataFrame com Nova Coluna 'CategoriaProduto':")
df_novo.show()


DataFrame com Nova Coluna 'CategoriaProduto':
+---+-----------+-----+----------+----------------+
| ID|NomeProduto|Preco| DataVenda|CategoriaProduto|
+---+-----------+-----+----------+----------------+
|  1|  Produto A|100.0|2022-01-01|     Categoria A|
|  2|  Produto B|150.0|2022-01-15|     Categoria B|
|  3|  Produto A|110.0|2022-02-01|     Categoria A|
|  4|  Produto C| 80.0|2022-02-15|     Categoria C|
|  5|  Produto B|160.0|2022-03-01|     Categoria B|
+---+-----------+-----+----------+----------------+



In [0]:
# Incluir uma nova coluna "DataCorrente" com a data atual
df_novo = df_novo.withColumn("DataCorrente", current_date())

In [0]:
# Salvando o DataFrame com o novo esquema no mesmo diretório Delta Lake
df_novo.write.format("delta").mode("overwrite").option("mergeSchema", "true").save(delta_path)

In [0]:
# Leitura do Delta Lake (Path Evolutivo)
delta_table = DeltaTable.forPath(spark, delta_path)

In [0]:
# Lendo o DataFrame com o esquema evolutivo
df_atual = spark.read.format("delta").load(delta_path)
# Exibindo o DataFrame resultante
df_atual.show()

+---+-----------+-----+----------+----------------+------------+
| ID|NomeProduto|Preco| DataVenda|CategoriaProduto|DataCorrente|
+---+-----------+-----+----------+----------------+------------+
|  1|  Produto A|100.0|2022-01-01|     Categoria A|  2023-10-23|
|  2|  Produto B|150.0|2022-01-15|     Categoria B|  2023-10-23|
|  3|  Produto A|110.0|2022-02-01|     Categoria A|  2023-10-23|
|  4|  Produto C| 80.0|2022-02-15|     Categoria C|  2023-10-23|
|  5|  Produto B|160.0|2022-03-01|     Categoria B|  2023-10-23|
+---+-----------+-----+----------+----------------+------------+



In [0]:
# Carregando o DataFrame de uma versão anterior do Delta Lake
df_versao_0 = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)
# Exibindo o DataFrame da versão 0
df_versao_0.show()

+---+-----------+-----+----------+
| ID|NomeProduto|Preco| DataVenda|
+---+-----------+-----+----------+
|  1|  Produto A|100.0|2022-01-01|
|  2|  Produto B|150.0|2022-01-15|
|  3|  Produto A|110.0|2022-02-01|
|  4|  Produto C| 80.0|2022-02-15|
|  5|  Produto B|160.0|2022-03-01|
+---+-----------+-----+----------+



In [0]:
dbutils.fs.rm("/path/evolutivo", recurse=True)

Out[160]: True