In [1]:
import os
from pyspark.sql import SparkSession, Row
from delta.tables import DeltaTable
import importlib.metadata

print("Versão delta-spark (Python):", importlib.metadata.version("delta-spark"))

os.environ["SPARK_HOME"] = "/usr/local/spark"
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--jars /usr/local/spark/jars/delta-spark_2.12-3.2.1.jar,"
    "/usr/local/spark/jars/delta-storage-3.2.1.jar pyspark-shell"
)

spark = (
    SparkSession.builder
    .appName("Delta-Bronze")
    .master("local[*]")
    .config("spark.executor.memory", "1g")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "true")
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("ERROR")

spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
print("Handler S3A inicializado com sucesso:", fs)

spark.sql("SELECT version()").show()

Versão delta-spark (Python): 3.2.1


25/11/07 13:26:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Handler S3A inicializado com sucesso: org.apache.hadoop.hive.ql.io.ProxyLocalFileSystem@115694bf


                                                                                

+--------------------+
|           version()|
+--------------------+
|3.5.7 ed00d046951...|
+--------------------+



In [2]:
drivers = spark.read.option("header", True).csv("/home/jovyan/data/drivers.csv")
constructors = spark.read.option("header", True).csv("/home/jovyan/data/constructors.csv")
races = spark.read.option("header", True).csv("/home/jovyan/data/races.csv")
circuits = spark.read.option("header", True).csv("/home/jovyan/data/circuits.csv")
results = spark.read.option("header", True).csv("/home/jovyan/data/results.csv")

print(f"Drivers: {drivers.count()} | Results: {results.count()}")

Drivers: 857 | Results: 26080


In [3]:
drivers.createOrReplaceTempView("drivers")
constructors.createOrReplaceTempView("constructors")
races.createOrReplaceTempView("races")
circuits.createOrReplaceTempView("circuits")
results.createOrReplaceTempView("results")

query = """
SELECT
    ra.year,
    ra.name AS race_name,
    c.name AS circuit,
    d.forename || ' ' || d.surname AS driver,
    d.nationality AS driver_nationality,
    cs.name AS constructor,
    cs.nationality AS constructor_nationality,
    rs.position,
    rs.points
FROM results rs
JOIN races ra ON ra.raceId = rs.raceId
JOIN drivers d ON d.driverId = rs.driverId
JOIN constructors cs ON cs.constructorId = rs.constructorId
JOIN circuits c ON c.circuitId = ra.circuitId
WHERE ra.year = 2022
"""

df = spark.sql(query)
df.show(20, truncate=False)

+----+------------------+-----------------------------+----------------+------------------+--------------+-----------------------+--------+------+
|year|race_name         |circuit                      |driver          |driver_nationality|constructor   |constructor_nationality|position|points|
+----+------------------+-----------------------------+----------------+------------------+--------------+-----------------------+--------+------+
|2022|Bahrain Grand Prix|Bahrain International Circuit|Charles Leclerc |Monegasque        |Ferrari       |Italian                |1       |26    |
|2022|Bahrain Grand Prix|Bahrain International Circuit|Carlos Sainz    |Spanish           |Ferrari       |Italian                |2       |18    |
|2022|Bahrain Grand Prix|Bahrain International Circuit|Lewis Hamilton  |British           |Mercedes      |German                 |3       |15    |
|2022|Bahrain Grand Prix|Bahrain International Circuit|George Russell  |British           |Mercedes      |German      

                                                                                

In [4]:
!/usr/local/bin/mc alias set local http://minio:9000 minioadmin minioadmin
!mc --version
!mc mb --ignore-existing local/datalake-bronze
!mc mb --ignore-existing local/datalake-silver
!mc mb --ignore-existing local/datalake-gold
!mc mb --ignore-existing local/datalake-meta
!mc ls local

]11;?\[6n[32;1mmc: [0m[32;1mConfiguration written to `/home/jovyan/.mc/config.json`. Please update your access credentials.[0;22m
[32;1mmc: [0m[32;1mSuccessfully created `/home/jovyan/.mc/share`.
[0m[32;1mmc: [0m[32;1mInitialized share uploads `/home/jovyan/.mc/share/uploads.json` file.
[0m[32;1mmc: [0m[32;1mInitialized share downloads `/home/jovyan/.mc/share/downloads.json` file.
[0m[m[32mAdded `local` successfully.[0m[m
]11;?\[6nmc version RELEASE.2025-08-13T08-35-41Z (commit-id=7394ce0dd2a80935aded936b09fa12cbb3cb8096)
Runtime: go1.24.6 linux/amd64
Copyright (c) 2015-2025 MinIO, Inc.
License GNU AGPLv3 <https://www.gnu.org/licenses/agpl-3.0.html>
]11;?\[6n[m[32;1mBucket created successfully `local/datalake-bronze`.[0;22m[m
]11;?\[6n[m[32;1mBucket created successfully `local/datalake-silver`.[0;22m[m
]11;?\[6n[m[32;1mBucket created successfully `local/datalake-gold`.[0;22m[m
]11;?\[6n[m[32;1mBucket created successfully `local/datala

In [6]:
delta_path = "s3a://datalake-bronze/f1_2022_results_delta"

df.write.format("delta").mode("overwrite").save(delta_path)
print(f"Tabela Delta gravada com sucesso em {delta_path}")

tabela = DeltaTable.forPath(spark, delta_path)
df_read = tabela.toDF()
print("Total de registros lidos do Delta:", df_read.count())
df_read.show(10, truncate=False)

                                                                                

Tabela Delta gravada com sucesso em s3a://datalake-bronze/f1_2022_results_delta


                                                                                

Total de registros lidos do Delta: 440


                                                                                

+----+------------------+-----------------------------+---------------+------------------+--------------+-----------------------+--------+------+
|year|race_name         |circuit                      |driver         |driver_nationality|constructor   |constructor_nationality|position|points|
+----+------------------+-----------------------------+---------------+------------------+--------------+-----------------------+--------+------+
|2022|Bahrain Grand Prix|Bahrain International Circuit|Charles Leclerc|Monegasque        |Ferrari       |Italian                |1       |26    |
|2022|Bahrain Grand Prix|Bahrain International Circuit|Carlos Sainz   |Spanish           |Ferrari       |Italian                |2       |18    |
|2022|Bahrain Grand Prix|Bahrain International Circuit|Lewis Hamilton |British           |Mercedes      |German                 |3       |15    |
|2022|Bahrain Grand Prix|Bahrain International Circuit|George Russell |British           |Mercedes      |German             

In [7]:
!mc ls local/datalake-bronze/f1_2022_results_delta/

]11;?\[6n[m[32m[2025-11-07 13:30:20 UTC][0m[33m 5.6KiB[0m [34mSTANDARD[0m[1m part-00000-50fda618-111e-4779-92c8-fab56363fd7c-c000.snappy.parquet[22m[m
[m[32m[2025-11-07 13:30:49 UTC][0m[33m     0B[0m[36;1m _delta_log/[0;22m[m


In [8]:
spark.sql(f"""
UPDATE delta.`{delta_path}`
SET points = points + 1
WHERE driver = 'Lewis Hamilton'
""")

                                                                                

DataFrame[num_affected_rows: bigint]

In [9]:
spark.sql(f"""
DELETE FROM delta.`{delta_path}`
WHERE position IS NULL
""")

                                                                                

DataFrame[num_affected_rows: bigint]

In [10]:
updates = spark.createDataFrame([
    Row(year=2022, race_name="Brazil Grand Prix", driver="Lewis Hamilton", points=25),
    Row(year=2022, race_name="New Race", driver="New Driver", points=10)
])

In [11]:
updates.createOrReplaceTempView("updates")

In [12]:
spark.sql(f"""
MERGE INTO delta.`{delta_path}` AS target
USING updates AS source
ON target.driver = source.driver AND target.race_name = source.race_name
WHEN MATCHED THEN UPDATE SET target.points = source.points
WHEN NOT MATCHED THEN INSERT (year, race_name, driver, points)
VALUES (source.year, source.race_name, source.driver, source.points)
""")

                                                                                

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [13]:
tabela_delta = DeltaTable.forPath(spark, delta_path)
tabela_delta.history().select("version", "timestamp", "operation").show(truncate=False)

old_df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)
print("Versão inicial:")
old_df.show(5, truncate=False)

new_df = spark.read.format("delta").load(delta_path)
print("Versão atual:")
new_df.show(5, truncate=False)

+-------+-------------------+---------+
|version|timestamp          |operation|
+-------+-------------------+---------+
|2      |2025-11-07 13:33:52|MERGE    |
|1      |2025-11-07 13:32:58|UPDATE   |
|0      |2025-11-07 13:30:22|WRITE    |
+-------+-------------------+---------+

Versão inicial:


                                                                                

+----+------------------+-----------------------------+---------------+------------------+------------+-----------------------+--------+------+
|year|race_name         |circuit                      |driver         |driver_nationality|constructor |constructor_nationality|position|points|
+----+------------------+-----------------------------+---------------+------------------+------------+-----------------------+--------+------+
|2022|Bahrain Grand Prix|Bahrain International Circuit|Charles Leclerc|Monegasque        |Ferrari     |Italian                |1       |26    |
|2022|Bahrain Grand Prix|Bahrain International Circuit|Carlos Sainz   |Spanish           |Ferrari     |Italian                |2       |18    |
|2022|Bahrain Grand Prix|Bahrain International Circuit|Lewis Hamilton |British           |Mercedes    |German                 |3       |15    |
|2022|Bahrain Grand Prix|Bahrain International Circuit|George Russell |British           |Mercedes    |German                 |4       |



+----+------------------+-----------------------------+---------------+------------------+------------+-----------------------+--------+------+
|year|race_name         |circuit                      |driver         |driver_nationality|constructor |constructor_nationality|position|points|
+----+------------------+-----------------------------+---------------+------------------+------------+-----------------------+--------+------+
|2022|Bahrain Grand Prix|Bahrain International Circuit|Charles Leclerc|Monegasque        |Ferrari     |Italian                |1       |26    |
|2022|Bahrain Grand Prix|Bahrain International Circuit|Carlos Sainz   |Spanish           |Ferrari     |Italian                |2       |18    |
|2022|Bahrain Grand Prix|Bahrain International Circuit|Lewis Hamilton |British           |Mercedes    |German                 |3       |16.0  |
|2022|Bahrain Grand Prix|Bahrain International Circuit|George Russell |British           |Mercedes    |German                 |4       |

                                                                                