<p align="center">
  <a href="" rel="noopener">
 <img width=500px height=100px src="https://docs.delta.io/latest/_static/delta-lake-logo.png" alt="Project logo"></a>
</p>

<h6 align="center">Delta Lake é um projeto de código aberto que permite a construção de uma arquitetura Lakehouse. Fornecendo features como, transações ACID, manipulação de metadados escalonáveis e unificação do modo de realizar processamento de dados em lote e streaming sobre os data lakes existentes, como S3, ADLS, GCS e HDFS.</h6>

<div align="center">
</div>

<h6 align="center">
Delta Lake é uma camada de armazenamento que utiliza o formato Parquet como padrão e que fornece transações compatíveis com ACID e benefícios adicionais para Data Lakes. O Delta Lake pode ajudar a resolver diversos problemas alguns deles são:
</h6>
    
- Dificuldade em anexar dados
- Jobs extremamente custosos que falham durante a execução
- Modificações de dados armazenados são difíceis
- Operações em tempo real
- É caro manter versões históricas de dados
- Difícil de lidar com metadados grandes
- Problemas de “muitos arquivos”
- É difícil obter um ótimo desempenho
- Problemas de qualidade de dados

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = (
    
    SparkSession
            .builder
            .config('spark.jars.packages', 'io.delta:delta-core_2.12:2.1.0')
            .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension')
            .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog')
            .getOrCreate()
            
)

22/09/24 06:32:40 WARN Utils: Your hostname, desktop-pro resolves to a loopback address: 127.0.1.1; using 172.29.28.57 instead (on interface eth0)
22/09/24 06:32:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/carlosbarbosa/.local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/carlosbarbosa/.ivy2/cache
The jars for the packages stored in: /home/carlosbarbosa/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c1f6976c-da90-4ad1-8531-0d12a31fa53e;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.1.0 in central
	found io.delta#delta-storage;2.1.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
:: resolution report :: resolve 93ms :: artifacts dl 5ms
	:: modules in use:
	io.delta#delta-core_2.12;2.1.0 from central in [default]
	io.delta#delta-storage;2.1.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	org.codehaus.jackson#jackson-core-asl;1.9.13 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| searc

In [3]:
df_user = (
    spark
    .read
    .format("json")
    .load("../datalake/landing-zone/user")
    .select("user_id", "email")
)

df_user.show(n=5, truncate=False)

+-------+-----------------------+
|user_id|email                  |
+-------+-----------------------+
|1703   |daron.bailey@email.com |
|3650   |jonah.barrows@email.com|
|8809   |carla.hansen@email.com |
|4606   |tomas.ledner@email.com |
|1      |alyse.ortiz@email.com  |
+-------+-----------------------+
only showing top 5 rows



In [4]:
(
    df_user
    .write
    .format("delta")
    .mode("overwrite") # overwrite | append
    .save("../datalake/bronze/user")
)

                                                                                

In [5]:
from delta.tables import DeltaTable

In [6]:
deltaTable = DeltaTable.forPath(spark, "../datalake/bronze/user")
deltaTable

<delta.tables.DeltaTable at 0x7fb08ff335b0>

In [7]:
deltaTable.history().show(vertical=True, truncate=False)

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 version             | 6                                                                                                                                                                                                                                                                                                               
 timestamp           | 2022-09-24 06:32:48.958                                                                                                                                                                                                                                                                                         
 userId         

In [8]:
print("BEFORE UPDATE")
history = deltaTable.toDF()
history.where("email == 'marcos.collier@email.com'").show(truncate=False)

print("AFTER UPDATE")
deltaTable.update(
    condition = "email = 'marcos.collier@email.com'",
    set = { "email": "'michaeljackson@gmail.com'" } 
)

history = deltaTable.toDF()
history.where("email == 'michaeljackson@gmail.com'").show(truncate=False)

BEFORE UPDATE
+-------+------------------------+
|user_id|email                   |
+-------+------------------------+
|3395   |marcos.collier@email.com|
+-------+------------------------+

AFTER UPDATE
+-------+------------------------+
|user_id|email                   |
+-------+------------------------+
|3395   |michaeljackson@gmail.com|
+-------+------------------------+



In [9]:
df_old = (
    spark.read.format("delta").load("../datalake/bronze/user")
)

df_old.where("user_id in (1703, 3650)").show(truncate=False)

+-------+-----------------------+
|user_id|email                  |
+-------+-----------------------+
|1703   |daron.bailey@email.com |
|3650   |jonah.barrows@email.com|
+-------+-----------------------+



In [10]:
items = [("1703","harrypotter@gmail.com"), ("3650", "brucelee@gmail.com")]
cols = ["user_id", "email"]
df_new = spark.createDataFrame(items, cols)
df_new.show()

+-------+--------------------+
|user_id|               email|
+-------+--------------------+
|   1703|harrypotter@gmail...|
|   3650|  brucelee@gmail.com|
+-------+--------------------+



In [11]:
(
    deltaTable.alias("old")
    .merge(
        df_new.alias("new"),"old.user_id = new.user_id"
    )
    .whenMatchedUpdateAll(
        condition = "old.user_id = new.user_id"
    )
    .whenNotMatchedInsertAll()
    .execute()
)

In [12]:
final_delta = deltaTable.toDF()
final_delta.where("user_id in (1703, 3650)").show(truncate=False)

print(f"AMOUNT: {final_delta.count()}")

+-------+---------------------+
|user_id|email                |
+-------+---------------------+
|1703   |harrypotter@gmail.com|
|3650   |brucelee@gmail.com   |
+-------+---------------------+

AMOUNT: 600


In [13]:
history_delta = deltaTable.history()
history_delta.show(vertical=True, n=10, truncate=False)

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 version             | 8                                                                                                                                                                                                                                                                                                               
 timestamp           | 2022-09-24 06:32:54.008                                                                                                                                                                                                                                                                                         
 userId         

In [14]:
(
    history_delta
    .select("version","operation","timestamp")
    .where("operation == 'WRITE'")
    .show(truncate=False)
)

+-------+---------+-----------------------+
|version|operation|timestamp              |
+-------+---------+-----------------------+
|6      |WRITE    |2022-09-24 06:32:48.958|
|5      |WRITE    |2022-09-24 06:30:37.715|
|4      |WRITE    |2022-09-24 06:29:08.533|
|3      |WRITE    |2022-09-24 06:28:20.691|
|0      |WRITE    |2022-09-24 06:20:20.756|
+-------+---------+-----------------------+



In [15]:
time_travel_version_0 = (
    spark
    .read
    .format("delta")
    .option("versionAsOf", "0")
    .load("../datalake/bronze/user")
)

(
    time_travel_version_0
    .where("user_id in (1703, 3650)")
    .show(truncate=False)
)

+-------+-----------------------+
|user_id|email                  |
+-------+-----------------------+
|1703   |daron.bailey@email.com |
|3650   |jonah.barrows@email.com|
+-------+-----------------------+



In [16]:
(
    history_delta
    .select("version","operation","timestamp")
    .where("operation == 'MERGE'")
    .show(truncate=False)
)

+-------+---------+-----------------------+
|version|operation|timestamp              |
+-------+---------+-----------------------+
|8      |MERGE    |2022-09-24 06:32:54.008|
|2      |MERGE    |2022-09-24 06:26:26.777|
+-------+---------+-----------------------+



In [17]:
time_travel_timestamp = (
    spark
    .read
    .format("delta")
    .option("timestampAsOf", "2022-09-24 06:26:26.777") # register timestamp
    .load("../datalake/bronze/user")
)

time_travel_timestamp.where("user_id in (1703, 3650)").show(truncate=False)

+-------+---------------------+
|user_id|email                |
+-------+---------------------+
|1703   |harrypotter@gmail.com|
|3650   |brucelee@gmail.com   |
+-------+---------------------+



In [18]:
time_travel_timestamp.exceptAll(time_travel_version_0).show()

+-------+--------------------+
|user_id|               email|
+-------+--------------------+
|   3650|  brucelee@gmail.com|
|   1703|harrypotter@gmail...|
|   3395|     a3lab@gmail.com|
+-------+--------------------+



In [19]:
df_user = (
    spark
    .read
    .format("json")
    .load("../datalake/landing-zone/user")
    .select("user_id", "email") #remove gender
)

(
    df_user
    .write
    .format("delta")
    .mode("overwrite")
    .option("mergeSchema", True)
    .save("../datalake/bronze/user")
)

In [20]:
df_merge_schema = (
    spark
    .read
    .format("delta")
    .load("../datalake/bronze/user")
    .show(n=5, truncate=False)
)

+-------+---------------------------+
|user_id|email                      |
+-------+---------------------------+
|3395   |marcos.collier@email.com   |
|1556   |elina.hills@email.com      |
|1879   |enedina.schroeder@email.com|
|7805   |colin.ryan@email.com       |
|3982   |dallas.boyle@email.com     |
+-------+---------------------------+
only showing top 5 rows



In [21]:
df_user = (
    spark
    .read
    .format("json")
    .load("../datalake/landing-zone/user")
    .select("user_id", "email") #remove gender
)

df_user.show(truncate=False, n=5)

(
    df_user
    .repartition(10)
    .write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", True)
    .save("../datalake/bronze/user")
)

+-------+-----------------------+
|user_id|email                  |
+-------+-----------------------+
|1703   |daron.bailey@email.com |
|3650   |jonah.barrows@email.com|
|8809   |carla.hansen@email.com |
|4606   |tomas.ledner@email.com |
|1      |alyse.ortiz@email.com  |
+-------+-----------------------+
only showing top 5 rows



In [22]:
df_merge_schema = (
    spark
    .read
    .format("delta")
    .load("../datalake/bronze/user")
    .show(n=5, truncate=False)
)

+-------+------------------------+
|user_id|email                   |
+-------+------------------------+
|9725   |garfield.kunze@email.com|
|7110   |aubrey.blanda@email.com |
|2017   |ursula.howell@email.com |
|720    |taneka.mayer@email.com  |
|9266   |pablo.tromp@email.com   |
+-------+------------------------+
only showing top 5 rows



In [23]:
deltaTable = DeltaTable.forPath(spark, "../datalake/bronze/user")
deltaTable.generate("symlink_format_manifest")

In [24]:
# deltaTable = DeltaTable.convertToDelta(spark, f"parquet.`../datalake-in-parquet/user`")