### Introdução ao Data Lake usando Delta Lake no Databricks
Neste notebook, vamos explorar o processo de criação e manipulação de um Data Lake utilizando a tecnologia Delta Lake no Databricks. O Delta Lake é uma camada de armazenamento que traz confiabilidade aos data lakes, oferecendo transações ACID, manuseio de metadados unificados e escalabilidade para dados estruturados e semiestruturados. Usando arquivos CSV como ponto de partida, vamos carregar dados em formato Delta, realizar operações de leitura, inserção, e upsert, além de executar consultas SQL para extrair insights a partir dos dados.

#### Carregando Arquivos CSV e Salvando como Delta
Primeiro, carregamos os arquivos CSV, que contêm dados de várias tabelas relacionadas a produtos, clientes, empregados, detalhes de pedidos, entre outros. Após a leitura, esses dados foram armazenados no formato Delta para garantir maior consistência e desempenho.


In [0]:
#criar delta a partir do csv
categories = spark.read.format("csv").option("header", "true").option("delimiter", ";").option("inferSchema", "true").load("dbfs:/FileStore/tables/datalake/categories.csv")
categories.write.format("delta").save("dbfs:/FileStore/tables/datalake/delta/categories.delta")

customers = spark.read.format("csv").option("header", "true").option("delimiter", ";").option("inferSchema", "true").load("dbfs:/FileStore/tables/datalake/customers.csv")
customers.write.format("delta").save("dbfs:/FileStore/tables/datalake/delta/customers.delta")

employees = spark.read.format("csv").option("header", "true").option("delimiter", ";").option("inferSchema", "true").load("dbfs:/FileStore/tables/datalake/employees.csv")
employees.write.format("delta").save("dbfs:/FileStore/tables/datalake/delta/employees.delta")

orderdetails = spark.read.format("csv").option("header", "true").option("delimiter", ";").option("inferSchema", "true").load("dbfs:/FileStore/tables/datalake/orderdetails.csv")
orderdetails.write.format("delta").save("dbfs:/FileStore/tables/datalake/delta/orderdetails.delta")

products = spark.read.format("csv").option("header", "true").option("delimiter", ";").option("inferSchema", "true").load("dbfs:/FileStore/tables/datalake/products.csv")
products.write.format("delta").save("dbfs:/FileStore/tables/datalake/delta/products.delta")

shippers = spark.read.format("csv").option("header", "true").option("delimiter", ";").option("inferSchema", "true").load("dbfs:/FileStore/tables/datalake/shippers.csv")
shippers.write.format("delta").save("dbfs:/FileStore/tables/datalake/delta/shippers.delta")

suppliers = spark.read.format("csv").option("header", "true").option("delimiter", ";").option("inferSchema", "true").load("dbfs:/FileStore/tables/datalake/suppliers.csv")
suppliers.write.format("delta").save("dbfs:/FileStore/tables/datalake/delta/suppliers.delta")

orders = spark.read.format("csv").option("header", "true").option("delimiter", ";").option("inferSchema", "true").load("dbfs:/FileStore/tables/datalake/orders.csv")
orders.write.format("delta").save("dbfs:/FileStore/tables/datalake/delta/orders.delta")

#### Leitura de Dados no Formato Delta
Com os dados armazenados no formato Delta, agora podemos ler e visualizar as tabelas. Isso nos permite aproveitar as vantagens do Delta Lake, como transações ACID e leitura rápida.

In [0]:
#ler delta
products = spark.read.format("delta").load("dbfs:/FileStore/tables/datalake/delta/products.delta")
products.show()

+---------+--------------------+----------+----------+--------------------+---------+------------+------------+------------+------------+
|ProductID|         ProductName|SupplierID|CategoryID|     QuantityPerUnit|UnitPrice|UnitsInStock|UnitsOnOrder|ReorderLevel|Discontinued|
+---------+--------------------+----------+----------+--------------------+---------+------------+------------+------------+------------+
|        1|                Chai|         1|         1|  10 boxes x 20 bags|     18.0|          39|           0|          10|           0|
|        2|               Chang|         1|         1|  24 - 12 oz bottles|     19.0|          17|          40|          25|           0|
|        3|       Aniseed Syrup|         1|         2| 12 - 550 ml bottles|     10.0|          13|          70|          25|           0|
|        4|Chef Anton's Caju...|         2|         2|      48 - 6 oz jars|     22.0|          53|           0|           0|           0|
|        5|Chef Anton's Gumb...|  

#### Inserção de Novos Registros no Delta Lake
Além de apenas ler os dados, também é possível inserir novos registros em uma tabela Delta. Por exemplo, aqui adicionamos uma nova categoria no dataset categories.

In [0]:
#inserir no delta
df = spark.read.format("delta").load("dbfs:/FileStore/tables/datalake/delta/categories.delta")

novacategoria = spark.createDataFrame([(9, "coffee", "Moka pot, Aeropress, cappuccino")], df.schema)

novacategoria.write.format("delta").mode("append").save("dbfs:/FileStore/tables/datalake/delta/categories.delta")

In [0]:
categories = spark.read.format("delta").load("dbfs:/FileStore/tables/datalake/delta/categories.delta")
categories.show()

+----------+--------------+--------------------+
|CategoryID|  CategoryName|         Description|
+----------+--------------+--------------------+
|         1|     Beverages|Soft drinks, coff...|
|         2|    Condiments|Sweet and savory ...|
|         3|   Confections|Desserts, candies...|
|         4|Dairy Products|             Cheeses|
|         5|Grains/Cereals|Breads, crackers,...|
|         6|  Meat/Poultry|      Prepared meats|
|         7|       Produce|Dried fruit and b...|
|         8|       Seafood|    Seaweed and fish|
|         9|        coffee|Moka pot, Aeropre...|
+----------+--------------+--------------------+



#### Operação de Upsert com Delta Lake
O Delta Lake também suporta operações de upsert (inserção ou atualização), que são fundamentais para garantir a integridade dos dados em cenários de atualização contínua. Por exemplo, realizamos uma operação de upsert para atualizar ou inserir novos registros nos datasets orders e order_details.

In [0]:
#upsert 
from delta.tables import *

# Carregar tabelas Delta como DeltaTable
deltaTable_orders = DeltaTable.forPath(spark, "dbfs:/FileStore/tables/datalake/delta/orders.delta")
deltaTable_order_details = DeltaTable.forPath(spark, "dbfs:/FileStore/tables/datalake/delta/orderdetails.delta")

# Criar os novos registros que queremos inserir
new_order = spark.createDataFrame([(11078, "ALFKI", 1, "2023-08-01")], ["OrderID", "CustomerID", "EmployeeID", "OrderDate"])
new_order_details = spark.createDataFrame([(11078, 1, 18, 3)], ["OrderID", "ProductID", "UnitPrice", "Quantity"])

deltaTable_orders.alias("orders").merge(
    new_order.alias("newOrder"),
    "orders.OrderID = newOrder.OrderID")\
    .whenMatchedUpdate(set = {"CustomerID" : "newOrder.CustomerID", "EmployeeID" : "newOrder.EmployeeID", "OrderDate" : "newOrder.OrderDate"})\
    .whenNotMatchedInsert(values = {"OrderID" : "newOrder.OrderID", "CustomerID" : "newOrder.CustomerID", "EmployeeID" : "newOrder.EmployeeID", "OrderDate" : "newOrder.OrderDate"})\
    .execute()

deltaTable_order_details.alias("order_details").merge(
    new_order_details.alias("newOrderDetails"),
    "order_details.OrderID = newOrderDetails.OrderID AND order_details.ProductID = newOrderDetails.ProductID")\
    .whenMatchedUpdate(set = {"UnitPrice" : "newOrderDetails.UnitPrice", "Quantity" : "newOrderDetails.Quantity"})\
    .whenNotMatchedInsert(values = {"OrderID" : "newOrderDetails.OrderID", "ProductID" : "newOrderDetails.ProductID", "UnitPrice" : "newOrderDetails.UnitPrice", "Quantity" : "newOrderDetails.Quantity"})\
    .execute()

In [0]:
# Ler a tabela Delta e filtrar por uma determinada condição
df_orders = spark.read.format("delta").load("dbfs:/FileStore/tables/datalake/delta/orders.delta").filter("OrderID == 11078")
df_orders.show()

+-------+----------+----------+-------------------+------------+-----------+-------+-------+--------+-----------+--------+----------+--------------+-----------+
|OrderID|CustomerID|EmployeeID|          OrderDate|RequiredDate|ShippedDate|ShipVia|Freight|ShipName|ShipAddress|ShipCity|ShipRegion|ShipPostalCode|ShipCountry|
+-------+----------+----------+-------------------+------------+-----------+-------+-------+--------+-----------+--------+----------+--------------+-----------+
|  11078|     ALFKI|         1|2023-08-01 00:00:00|        null|       null|   null|   null|    null|       null|    null|      null|          null|       null|
+-------+----------+----------+-------------------+------------+-----------+-------+-------+--------+-----------+--------+----------+--------------+-----------+



In [0]:
# Ler a tabela Delta e filtrar por uma determinada condição
df_order_details = spark.read.format("delta").load("dbfs:/FileStore/tables/datalake/delta/orderdetails.delta").filter("OrderID == 11078").filter("ProductID == 1")
df_order_details.show()

+-------+---------+---------+--------+--------+
|OrderID|ProductID|UnitPrice|Quantity|Discount|
+-------+---------+---------+--------+--------+
|  11078|        1|     18.0|       3|    null|
+-------+---------+---------+--------+--------+



In [0]:
#criar tabela com finalidade especifica
# Carregar as tabelas Delta em DataFrames do Spark
df_categories = spark.read.format("delta").load("dbfs:/FileStore/tables/datalake/delta/categories.delta")
df_categories.createOrReplaceTempView("categories")

df_products = spark.read.format("delta").load("dbfs:/FileStore/tables/datalake/delta/products.delta")
df_products.createOrReplaceTempView("products")

df_suppliers = spark.read.format("delta").load("dbfs:/FileStore/tables/datalake/delta/suppliers.delta")
df_suppliers.createOrReplaceTempView("suppliers")

df_employees = spark.read.format("delta").load("dbfs:/FileStore/tables/datalake/delta/employees.delta")
df_employees.createOrReplaceTempView("employees")

df_order_details = spark.read.format("delta").load("dbfs:/FileStore/tables/datalake/delta/orderdetails.delta")
df_order_details.createOrReplaceTempView("order_details")

df_orders = spark.read.format("delta").load("dbfs:/FileStore/tables/datalake/delta/orders.delta")
df_orders.createOrReplaceTempView("orders")

df_shippers = spark.read.format("delta").load("dbfs:/FileStore/tables/datalake/delta/shippers.delta")
df_shippers.createOrReplaceTempView("shippers")

df_customers = spark.read.format("delta").load("dbfs:/FileStore/tables/datalake/delta/customers.delta")
df_customers.createOrReplaceTempView("customers")

join_query = """
SELECT order_details.OrderID AS OrderID, order_details.Quantity , order_details.UnitPrice as UnitPrice,
products.ProductID as ProductID,  products.ProductName as Product, suppliers.CompanyName AS Suppliers,  
employees.LastName as Employee, orders.OrderDate as Date, customers.CompanyName as Customer
FROM orders
JOIN order_details ON orders.OrderID = order_details.OrderID
JOIN products ON order_details.ProductID = products.ProductID
JOIN categories ON products.CategoryID = categories.CategoryID
JOIN suppliers ON products.SupplierID = suppliers.SupplierID
JOIN employees ON orders.EmployeeID = employees.EmployeeID
JOIN shippers ON orders.ShipVia = shippers.ShipperID
JOIN customers ON orders.CustomerID = customers.CustomerID
"""

df_result = spark.sql(join_query)

# Escrever o resultado em uma nova tabela Delta
df_result.write.format("delta").mode("overwrite").save("dbfs:/FileStore/tables/datalake/delta/join.delta")

#### Consultas SQL sobre Tabelas Delta
Para extrair informações específicas, é possível criar consultas SQL diretamente sobre as tabelas Delta. Isso permite um acesso rápido e eficiente aos dados para análise e geração de relatórios.

In [0]:
#consultar a tabela no delta com SQL
df = spark.read.format("delta").load("dbfs:/FileStore/tables/datalake/delta/join.delta")

df.createOrReplaceTempView("OrdersJoin")

results = spark.sql("SELECT * FROM OrdersJoin WHERE OrderID = 10248 AND ProductID =11 ")

results.show()

+-------+--------+---------+---------+--------------+--------------------+--------+-------------------+--------------------+
|OrderID|Quantity|UnitPrice|ProductID|       Product|           Suppliers|Employee|               Date|            Customer|
+-------+--------+---------+---------+--------------+--------------------+--------+-------------------+--------------------+
|  10248|      12|     14.0|       11|Queso Cabrales|Cooperativa de Qu...|Buchanan|2020-07-04 00:00:00|Vins et alcools C...|
+-------+--------+---------+---------+--------------+--------------------+--------+-------------------+--------------------+



In [0]:
#dbutils.fs.rm("dbfs:/FileStore/tables/datalake/delta/", recurse=True)

#### Conclusão
Utilizando o Delta Lake no Databricks, conseguimos criar e manipular um Data Lake robusto, aproveitando a capacidade de realizar transações ACID, fazer upserts e consultas eficientes. Essa abordagem facilita o manuseio de grandes volumes de dados para análises avançadas e machine learning.