In [1]:
import os

from pyspark import SparkConf
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import col, to_json, lit, collect_list, size, avg

import logging
import os

In [2]:
# configura os logs
LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()

def setup_logger():
    """
    Configura o logger para o formato desejado.
    """
    logging.basicConfig(
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        level=logging.INFO,
        datefmt='%Y-%m-%d %H:%M:%S'
    )

    logger = logging.getLogger(__name__)
    
    return logger

In [3]:
# configura sessao spark
def sessao_spark(app_name):

    minio_endpoint = "http://minio:9000"
    minio_access_key = "lakehouse"
    minio_secret_key = "2fUDaiyqNFhoMmgYuXjO4d24fchviXQjM2TWTgUe"

    # Lista todos os JARs na pasta
    jars_dir = "./jars"
    jars = [os.path.join(jars_dir, jar) for jar in os.listdir(jars_dir) if jar.endswith(".jar")]
    jars_str = ",".join(jars)
    
    # Configuração do Spark
    spark = (
        SparkSession 
        .builder 
        .appName(f"{app_name}") 
        .master("spark://spark-master:7077") 
        .config("spark.jars", jars_str)
        .config("spark.executor.memory", "3g") 
        .config("spark.executor.cores", "2")
        .config("spark.sql.files.maxPartitionBytes", "134217728")
        .config("spark.hadoop.fs.s3a.endpoint", minio_endpoint) 
        .config("spark.hadoop.fs.s3a.access.key", minio_access_key) 
        .config("spark.hadoop.fs.s3a.secret.key", minio_secret_key) 
        .config("spark.hadoop.fs.s3a.multipart.size", "104857600")
        .config("spark.hadoop.fs.s3a.path.style.access", "true") 
        .config("spark.hadoop.fs.s3a.fast.upload", "true")
        .config("spark.hadoop.fs.s3a.connection.maximum", "100")
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") 
        .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") 
        .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 
        .getOrCreate()
    )

    return spark

In [4]:
# configs
spark = sessao_spark('delta-time-travel')

logger = setup_logger()
logger.info("Iniciando o script.")

logger.info(spark)
logger.info(f"Configs: {SparkConf().getAll()}")
spark.sparkContext.setLogLevel("ERROR")

minio_bucket = f"s3a://production/landing/dsa"

2025-05-11 18:21:55 - __main__ - INFO - Iniciando o script.
2025-05-11 18:21:55 - __main__ - INFO - <pyspark.sql.session.SparkSession object at 0x747ad637bad0>
2025-05-11 18:21:55 - __main__ - INFO - Configs: [('spark.hadoop.fs.s3a.multipart.size', '104857600'), ('spark.executor.memory', '3g'), ('spark.hadoop.fs.s3a.path.style.access', 'true'), ('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider'), ('spark.jars', 'file:///home/jovyan/notebooks/jars/hive-exec-3.1.2.jar,file:///home/jovyan/notebooks/jars/delta-spark_2.12-3.2.0.jar,file:///home/jovyan/notebooks/jars/spark-measure_2.12-0.24.jar,file:///home/jovyan/notebooks/jars/hive-metastore-3.1.2.jar,file:///home/jovyan/notebooks/jars/hadoop-aws-3.3.4.jar,file:///home/jovyan/notebooks/jars/delta-storage-3.2.0.jar,file:///home/jovyan/notebooks/jars/postgresql-42.5.1.jar,file:///home/jovyan/notebooks/jars/aws-java-sdk-bundle-1.11.1026.jar,file:///home/jovyan/notebooks/jars/antlr4-runtime

In [5]:
# Leitura dos dados
dsa_dados = spark.read.csv(
    f"{minio_bucket}/csv/dados_iniciais.csv",
    header = True,
    inferSchema = True
    )

In [6]:
# Gravando os dados em formato delta
(
    dsa_dados
    .write
    .format("delta")
    .mode("overwrite")
    .save(f"{minio_bucket}/lakehouse/dados_iniciais")
)

## Manipulando os Dados no Data Lakehouse

In [7]:
logger.info("Dados iniciais:")
delta_table_path = f"{minio_bucket}/lakehouse/dados_iniciais"
spark.read.format("delta").load(delta_table_path).show()

2025-05-11 18:23:09 - __main__ - INFO - Dados iniciais:


+---+--------+-----+-------------------+
| id|    nome|idade|             funcao|
+---+--------+-----+-------------------+
|  1|   Lucas|   30| Cientista de Dados|
|  2|   Bruno|   18|  Analista de Dados|
|  3| Mariana|   35| Arquiteto de Dados|
|  4|Fernando|   40|Engenheiro de Dados|
|  5| Gabriel|   28|   Engenheiro de IA|
|  6|  Camila|   50|   Engenheiro de ML|
|  7|  Amanda|   29| Engenheiro DataOps|
|  8| Juliano|   43| Arquiteto de Dados|
|  9| Gustavo|   56| Arquiteto de Dados|
| 10| Vanessa|   31|  Analista de Dados|
+---+--------+-----+-------------------+



### Update

In [8]:
delta_table = DeltaTable.forPath(spark, delta_table_path)
delta_table.update(
    condition = "nome = 'Lucas'",
    set = {"idade": "32", "funcao": "'Gerente de Data Science'"}
)
print("Após atualização de Lucas:")
delta_table.toDF().show(truncate=False)

Após atualização de Lucas:
+---+--------+-----+-----------------------+
|id |nome    |idade|funcao                 |
+---+--------+-----+-----------------------+
|1  |Lucas   |32   |Gerente de Data Science|
|2  |Bruno   |18   |Analista de Dados      |
|3  |Mariana |35   |Arquiteto de Dados     |
|4  |Fernando|40   |Engenheiro de Dados    |
|5  |Gabriel |28   |Engenheiro de IA       |
|6  |Camila  |50   |Engenheiro de ML       |
|7  |Amanda  |29   |Engenheiro DataOps     |
|8  |Juliano |43   |Arquiteto de Dados     |
|9  |Gustavo |56   |Arquiteto de Dados     |
|10 |Vanessa |31   |Analista de Dados      |
+---+--------+-----+-----------------------+



### Delete

In [9]:
delta_table.delete(condition = "idade <= 30")
print("Após remoção de funcionários com idade <= 30:")
delta_table.toDF().show(truncate=False)

Após remoção de funcionários com idade <= 30:
+---+--------+-----+-----------------------+
|id |nome    |idade|funcao                 |
+---+--------+-----+-----------------------+
|1  |Lucas   |32   |Gerente de Data Science|
|3  |Mariana |35   |Arquiteto de Dados     |
|4  |Fernando|40   |Engenheiro de Dados    |
|6  |Camila  |50   |Engenheiro de ML       |
|8  |Juliano |43   |Arquiteto de Dados     |
|9  |Gustavo |56   |Arquiteto de Dados     |
|10 |Vanessa |31   |Analista de Dados      |
+---+--------+-----+-----------------------+



### Insert

In [10]:
new_employees = spark.createDataFrame([
    (11, "Leonardo", 27, "Analytics Engineer"),
    (12, "Felipe", 31, "Analytics Engineer"),
    (13, "Paula", 26, "Engenheiro de Dados"),
    (14, "Melissa", 26, "Cientista de Dados")
], ["id", "nome", "idade", "funcao"])

delta_table.alias("existingData").merge(
    new_employees.alias("newData"),
    "existingData.id = newData.id"
).whenNotMatchedInsertAll().execute()

print("Após inserção de novos registros:")
delta_table.toDF().show(truncate=False)

2025-05-11 18:25:08 - numexpr.utils - INFO - NumExpr defaulting to 8 threads.


Após inserção de novos registros:
+---+--------+-----+-----------------------+
|id |nome    |idade|funcao                 |
+---+--------+-----+-----------------------+
|1  |Lucas   |32   |Gerente de Data Science|
|3  |Mariana |35   |Arquiteto de Dados     |
|4  |Fernando|40   |Engenheiro de Dados    |
|6  |Camila  |50   |Engenheiro de ML       |
|8  |Juliano |43   |Arquiteto de Dados     |
|9  |Gustavo |56   |Arquiteto de Dados     |
|10 |Vanessa |31   |Analista de Dados      |
|11 |Leonardo|27   |Analytics Engineer     |
|14 |Melissa |26   |Cientista de Dados     |
|12 |Felipe  |31   |Analytics Engineer     |
|13 |Paula   |26   |Engenheiro de Dados    |
+---+--------+-----+-----------------------+



### Upsert

In [11]:
upsert_data = spark.createDataFrame([
    (3, "Mariana", 36, "Arquiteto de Dados"),  # Atualizar idade da Mariana
    (15, "Tales", 24, "Arquiteto RPA")         # Novo registro
], ["id", "nome", "idade", "funcao"])

delta_table.alias("oldData").merge(
    upsert_data.alias("upsertData"),
    "oldData.id = upsertData.id"
).whenMatchedUpdate(set={
    "idade": "upsertData.idade",
    "funcao": "upsertData.funcao"
}).whenNotMatchedInsertAll().execute()

print("Após upsert (atualização/inserção):")
delta_table.toDF().show(truncate=False)

Após upsert (atualização/inserção):
+---+--------+-----+-----------------------+
|id |nome    |idade|funcao                 |
+---+--------+-----+-----------------------+
|1  |Lucas   |32   |Gerente de Data Science|
|3  |Mariana |36   |Arquiteto de Dados     |
|4  |Fernando|40   |Engenheiro de Dados    |
|6  |Camila  |50   |Engenheiro de ML       |
|8  |Juliano |43   |Arquiteto de Dados     |
|9  |Gustavo |56   |Arquiteto de Dados     |
|10 |Vanessa |31   |Analista de Dados      |
|15 |Tales   |24   |Arquiteto RPA          |
|11 |Leonardo|27   |Analytics Engineer     |
|14 |Melissa |26   |Cientista de Dados     |
|12 |Felipe  |31   |Analytics Engineer     |
|13 |Paula   |26   |Engenheiro de Dados    |
+---+--------+-----+-----------------------+



In [12]:
## Histórico de Alterações (Time Travel)

In [13]:
# Caminho para a tabela Delta
delta_table = DeltaTable.forPath(spark, delta_table_path)

# Obter o histórico completo
history_df = delta_table.history()

# Contar o número de versões
num_versions = history_df.count()
print(f"A tabela tem {num_versions} versões.")

A tabela tem 5 versões.


In [14]:
# Acessar a versão mais antiga da tabela (versão 0)
print("Versão inicial da tabela:")
old_version = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
old_version.show(truncate=False)

Versão inicial da tabela:
+---+--------+-----+-------------------+
|id |nome    |idade|funcao             |
+---+--------+-----+-------------------+
|1  |Lucas   |30   |Cientista de Dados |
|2  |Bruno   |18   |Analista de Dados  |
|3  |Mariana |35   |Arquiteto de Dados |
|4  |Fernando|40   |Engenheiro de Dados|
|5  |Gabriel |28   |Engenheiro de IA   |
|6  |Camila  |50   |Engenheiro de ML   |
|7  |Amanda  |29   |Engenheiro DataOps |
|8  |Juliano |43   |Arquiteto de Dados |
|9  |Gustavo |56   |Arquiteto de Dados |
|10 |Vanessa |31   |Analista de Dados  |
+---+--------+-----+-------------------+



In [15]:
print("Versão 0 da tabela:")
version_0 = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
version_0.show(truncate=False)

print("Versão 1 da tabela:")
version_1 = spark.read.format("delta").option("versionAsOf", 1).load(delta_table_path)
version_1.show(truncate=False)

print("Versão 4 da tabela:")
version_4 = spark.read.format("delta").option("versionAsOf", 4).load(delta_table_path)
version_4.show(truncate=False)

Versão 0 da tabela:
+---+--------+-----+-------------------+
|id |nome    |idade|funcao             |
+---+--------+-----+-------------------+
|1  |Lucas   |30   |Cientista de Dados |
|2  |Bruno   |18   |Analista de Dados  |
|3  |Mariana |35   |Arquiteto de Dados |
|4  |Fernando|40   |Engenheiro de Dados|
|5  |Gabriel |28   |Engenheiro de IA   |
|6  |Camila  |50   |Engenheiro de ML   |
|7  |Amanda  |29   |Engenheiro DataOps |
|8  |Juliano |43   |Arquiteto de Dados |
|9  |Gustavo |56   |Arquiteto de Dados |
|10 |Vanessa |31   |Analista de Dados  |
+---+--------+-----+-------------------+

Versão 1 da tabela:
+---+--------+-----+-----------------------+
|id |nome    |idade|funcao                 |
+---+--------+-----+-----------------------+
|1  |Lucas   |32   |Gerente de Data Science|
|2  |Bruno   |18   |Analista de Dados      |
|3  |Mariana |35   |Arquiteto de Dados     |
|4  |Fernando|40   |Engenheiro de Dados    |
|5  |Gabriel |28   |Engenheiro de IA       |
|6  |Camila  |50   |Engen

In [16]:
# Adicionar uma coluna que identifica a versão
version_0 = version_0.withColumn("versao", lit(0))
version_4 = version_4.withColumn("versao", lit(4))

# Unir as duas versões
changes = version_0.union(version_4)

In [17]:
# Mostrar as diferenças em relação ao nome
print("Alterações entre versões:")
changes.groupBy("id", "nome") \
       .agg(collect_list("versao").alias("versoes")) \
       .filter(size("versoes") == 1) \
       .show(truncate=False)

Alterações entre versões:
+---+--------+-------+
|id |nome    |versoes|
+---+--------+-------+
|15 |Tales   |[4]    |
|12 |Felipe  |[4]    |
|11 |Leonardo|[4]    |
|13 |Paula   |[4]    |
|7  |Amanda  |[0]    |
|5  |Gabriel |[0]    |
|2  |Bruno   |[0]    |
|14 |Melissa |[4]    |
+---+--------+-------+



In [18]:
# Mostrar as diferenças em relação a idade
print("Alterações entre versões:")
changes.groupBy("id", "idade") \
       .agg(collect_list("versao").alias("versoes")) \
       .filter(size("versoes") == 1) \
       .show(truncate=False)

Alterações entre versões:
+---+-----+-------+
|id |idade|versoes|
+---+-----+-------+
|15 |24   |[4]    |
|1  |32   |[4]    |
|3  |36   |[4]    |
|12 |31   |[4]    |
|11 |27   |[4]    |
|13 |26   |[4]    |
|5  |28   |[0]    |
|3  |35   |[0]    |
|2  |18   |[0]    |
|1  |30   |[0]    |
|7  |29   |[0]    |
|14 |26   |[4]    |
+---+-----+-------+



In [19]:
# Calcular a média de idade por versão
avg_ages = version_0.union(version_4) \
    .groupBy("versao") \
    .agg(avg("idade").alias("media_idade"))

# Mostrar a diferença de média de idade entre as versões
print("Diferença de média de idade entre versões:")
avg_ages.show()

Diferença de média de idade entre versões:
+------+------------------+
|versao|       media_idade|
+------+------------------+
|     4|35.166666666666664|
|     0|              36.0|
+------+------------------+



In [20]:
# Carrega a tabela delta
delta_table = DeltaTable.forPath(spark, delta_table_path)

# Carregar o histórico de alterações da tabela Delta
history = delta_table.history()

# Selecionar apenas as colunas relevantes
formatted_history = history.select(
    col("version").alias("Versão"),
    col("operation").alias("Operação"),
    col("operationMetrics").alias("Métricas"),
    col("userMetadata").alias("Metadados do Usuário")
)

# Mostrar as alterações 
print("Histórico de alterações da tabela Delta (formatado):")
formatted_history.show(truncate=False)

Histórico de alterações da tabela Delta (formatado):
+------+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+
|Versão|Operação|Métricas                                                                                                                                                                                                                                                           

In [21]:
# Se você quiser exibir apenas operações específicas (por exemplo, UPDATE), pode usar .filter():
filtered_history = formatted_history.filter(col("Operação") == "UPDATE")
filtered_history.show(truncate=False)

+------+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+
|Versão|Operação|Métricas                                                                                                                                                                                                                                                                                                                       |Metadados do Usuário|
+------+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [22]:
# Carregar o histórico de alterações da tabela Delta
history = delta_table.history()

# Selecionar e formatar as colunas
formatted_history = history.select(
    col("version").alias("Versão"),
    col("operation").alias("Operação"),
    to_json(col("operationMetrics")).alias("Métricas"),  # Converter MAP para JSON, para conseguir salvar em CSV
    col("userMetadata").alias("Metadados do Usuário")
)

# Salvar o histórico formatado em CSV
output_path = f"{minio_bucket}/csv/output"
formatted_history.write.format("csv").option("header", "true").save(output_path)

print(f"Histórico salvo em: {output_path}")

Histórico salvo em: s3a://production/landing/dsa/csv/output


Embora não haja um comando direto de rollback no Delta Lake, você pode sobrescrever uma nova versão com os dados de uma versão anterior 
sem perder todo o histórico:

In [23]:
# Consultar uma versão antiga (versão 2)
spark.read.format("delta").option("versionAsOf", 2).load(delta_table_path).show(truncate=False)

+---+--------+-----+-----------------------+
|id |nome    |idade|funcao                 |
+---+--------+-----+-----------------------+
|1  |Lucas   |32   |Gerente de Data Science|
|3  |Mariana |35   |Arquiteto de Dados     |
|4  |Fernando|40   |Engenheiro de Dados    |
|6  |Camila  |50   |Engenheiro de ML       |
|8  |Juliano |43   |Arquiteto de Dados     |
|9  |Gustavo |56   |Arquiteto de Dados     |
|10 |Vanessa |31   |Analista de Dados      |
+---+--------+-----+-----------------------+



In [24]:
# Carregar a versão 2
old_version = spark.read.format("delta").option("versionAsOf", 2).load(delta_table_path)

In [25]:
# Sobrescrever a tabela principal com a versão 2
# Isso vai gerar uma cópia da versão 2 que será agora a versão principal. 
old_version.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(delta_table_path)

In [26]:
# Verificar os dados sobrescritos
spark.read.format("delta").load(delta_table_path).show(truncate=False)

+---+--------+-----+-----------------------+
|id |nome    |idade|funcao                 |
+---+--------+-----+-----------------------+
|1  |Lucas   |32   |Gerente de Data Science|
|3  |Mariana |35   |Arquiteto de Dados     |
|4  |Fernando|40   |Engenheiro de Dados    |
|6  |Camila  |50   |Engenheiro de ML       |
|8  |Juliano |43   |Arquiteto de Dados     |
|9  |Gustavo |56   |Arquiteto de Dados     |
|10 |Vanessa |31   |Analista de Dados      |
+---+--------+-----+-----------------------+



In [27]:
# Caminho para a tabela Delta
delta_table = DeltaTable.forPath(spark, delta_table_path)

# Obter o histórico completo
history_df = delta_table.history()

# Contar o número de versões
num_versions = history_df.count()
print(f"A tabela tem {num_versions} versões.")

A tabela tem 6 versões.


Considerações:

- A operação de sobrescrita cria uma nova versão na tabela Delta. Dados atuais ainda estarão no histórico, mas os dados sobrescritos substituem a visão principal da tabela.

- Certifique-se de que o esquema da versão antiga é compatível com o esquema atual. Caso contrário, pode ser necessário habilitar a opção overwriteSchema.

- Em ambientes críticos, prefira corrigir os dados com operações como MERGE ou UPDATE em vez de sobrescrever diretamente.

## Aplicando o VACUUM

Por padrão, o Delta Lake impõe uma retenção mínima de 7 dias para garantir que operações como time travel ainda sejam possíveis e para evitar exclusão acidental de dados necessários para transações. Se você quiser reduzir esse período, será necessário modificar a configuração de retenção mínima.

Você não poderá acessar versões anteriores além do período de retenção configurado.

In [28]:
# Desativar temporariamente a proteção para retenção mínima
spark.sql("SET spark.databricks.delta.retentionDurationCheck.enabled = false")

DataFrame[key: string, value: string]

In [None]:
# Executar VACUUM com retenção de 1 dia
print("Executando vacuum com retenção de 5 minutos...")
delta_table.vacuum(retentionHours=0.0833) # 5 minutos = 0.0833 horas

In [30]:
# Reativar a proteção para retenção mínima
spark.sql("SET spark.databricks.delta.retentionDurationCheck.enabled = true")

DataFrame[key: string, value: string]

Se o VACUUM foi executado com um período curto de retenção, versões mais antigas podem ter sido excluídas e não estarão disponíveis no histórico.

In [33]:
# Caminho para a tabela Delta
delta_table = DeltaTable.forPath(spark, delta_table_path)

# Obter o histórico completo
history_df = delta_table.history()

# Contar o número de versões
num_versions = history_df.count()
print(f"A tabela tem {num_versions} versões.")

A tabela tem 6 versões.


In [34]:
# Carregar a versão 2
old_version = spark.read.format("delta").option("versionAsOf", 2).load(delta_table_path)

In [35]:
old_version.show()

Py4JJavaError: An error occurred while calling o308.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 267.0 failed 4 times, most recent failure: Lost task 0.3 in stage 267.0 (TID 12230) (172.18.0.12 executor 1): org.apache.spark.SparkFileNotFoundException: No such file or directory: s3a://production/landing/dsa/lakehouse/dados_iniciais/part-00000-0492ee30-7f4a-418f-9d38-d488359ba69e-c000.snappy.parquet
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:780)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:220)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:279)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:593)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4344)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3326)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4334)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4332)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3326)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3549)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at jdk.internal.reflect.GeneratedMethodAccessor78.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.SparkFileNotFoundException: No such file or directory: s3a://production/landing/dsa/lakehouse/dados_iniciais/part-00000-0492ee30-7f4a-418f-9d38-d488359ba69e-c000.snappy.parquet
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:780)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:220)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:279)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:593)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [42]:
spark.stop()