## [Integration of lakeFS with Delta Lake](https://docs.lakefs.io/integrations/delta.html)

### Use Cases:
#### 1. Isolating ETL job and atomic promotion to production
#### 2. Atomic rollback of Multi-Table Transactions

### Prerequisites

####### This Notebook requires connecting to a lakeFS Server. 
####### To spin up lakeFS quickly - use [lakeFS Cloud](https://demo.lakefs.io) which provides lakeFS server on-demand with a single click; 
####### Or, alternatively, refer to [lakeFS Quickstart doc](https://docs.lakefs.io/quickstart/installing.html).

#### Setup Task: Change your lakeFS credentials

In [0]:
lakefsEndPoint = '<lakeFS Endpoint URL>'
lakefsAccessKey = '<lakeFS Access Key>'
lakefsSecretKey = '<lakeFS Secret Key>'

#### Setup Task: You can change lakeFS repo name (it can be an existing repo or provide a new repo name)

In [0]:
repositoryName = "delta-repo"

#### Setup Task: Versioning Information

In [0]:
mainBranch = "main"
deltaLakeETLBranch = "delta-lake-etl-branch"
customersTable = "customers"
ordersTable = "orders"
orderUpdatesTable = "order_updates"

#### Setup Task: Storage Information - Optional on Playground
###### Change the Storage Namespace to a location in the bucket you’ve configured. The storage namespace is a location in the underlying storage where data for this repository will be stored.

In [0]:
storageNamespace = "https://storage-account-name.blob.core.windows.net/storage-container-name/"  + repositoryName # Unique per repository

#### Setup Task: This notebook uses few sample CSV data files. Download all CSV files from [lakeFS samples data set](https://github.com/treeverse/lakeFS-samples/tree/main/data/OrionStar) and upload to [Databricks Filestore](https://docs.databricks.com/dbfs/filestore.html) in '/FileStore/tables/data/OrionStar' folder. If you use a different folder name then change the folder name here.

In [0]:
sampleDataFolder = '/FileStore/tables/data/OrionStar'

#### Setup Task: Run additional [Setup](./?o=8911673420610391#notebook/3578552658771058) tasks here

In [0]:
%run ./deltaLakeSetup

#### Create Repository - Optional on Playground or if repository exists

In [0]:
repo = lakefs.Repository(
  repositoryName,
  client=clt).create(
    storage_namespace=storageNamespace,
    default_branch=mainBranch,
    exist_ok=True)
branchMain = repo.branch(mainBranch)
print(repo)

#### Create Customers delta table in the main branch (using CUSTOMER.csv file)

In [0]:
customersTablePath = f"lakefs://{repositoryName}/{mainBranch}/{customersTable}"
df = spark.read.csv(sampleDataFolder+'/CUSTOMER.csv',header=True,schema=customersSchema)
df.write.format("delta").mode("overwrite").save(customersTablePath)
df.display(10)

#### Create Orders delta table in the main branch (using ORDER_FACT.csv file)

In [0]:
ordersTablePath = f"lakefs://{repositoryName}/{mainBranch}/{ordersTable}"
df = spark.read.csv(sampleDataFolder+'/ORDER_FACT.csv',header=True,schema=ordersSchema)
df.write.format("delta").mode("overwrite").save(ordersTablePath)
df.display(10)

#### Commit changes and attach some metadata

In [0]:
branchMain.commit(message='Added customers and orders Delta tables!', 
        metadata={'using': 'python_api'})

### ETL Job Starts

#### Create a new branch

In [0]:
branchDeltaLakeETL = repo.branch(deltaLakeETLBranch).create(source_reference=mainBranch, exist_ok=True)
print(f"{deltaLakeETLBranch} ref:", branchDeltaLakeETL.get_commit().id)

#### Apply POS (Point of Sale) Transactions to Delta Lake: delete data for a customer on the new branch

In [0]:
from delta.tables import *

ordersTablePath = f"lakefs://{repositoryName}/{deltaLakeETLBranch}/{ordersTable}"
deltaTable = DeltaTable.forPath(spark, ordersTablePath)
deltaTable.delete("Customer_ID = 19444")

In [0]:
customersTablePath = f"lakefs://{repositoryName}/{deltaLakeETLBranch}/{customersTable}"
deltaTable = DeltaTable.forPath(spark, customersTablePath)
deltaTable.delete("Customer_ID = 19444")

#### Apply POS Transactions to Delta Lake: update data for a customer on the new branch

In [0]:
customersTablePath = f"lakefs://{repositoryName}/{deltaLakeETLBranch}/{customersTable}"
deltaTable = DeltaTable.forPath(spark, customersTablePath)
deltaTable.update(
  condition = expr("Customer_ID = 63"),
  set = { "Customer_FirstName": "'Jim'",
          "Customer_Name": "'Jim Klisurich'"})

#### Apply POS Transactions to Delta Lake: batch upsert (5 updated and 10 new orders in ORDER_FACT_UPDATES.csv file)

In [0]:
ordersTablePath = f"lakefs://{repositoryName}/{deltaLakeETLBranch}/{ordersTable}"
deltaTableOrders = DeltaTable.forPath(spark, ordersTablePath)

orderUpdatesTablePath = f"lakefs://{repositoryName}/{deltaLakeETLBranch}/{orderUpdatesTable}"
dfOrderUpdates = spark.read.csv(sampleDataFolder+'/ORDER_FACT_UPDATES.csv',header=True,schema=ordersSchema)
dfOrderUpdates.write.format("delta").mode("overwrite").save(orderUpdatesTablePath)

deltaTableOrders.alias('orders') \
  .merge(
    dfOrderUpdates.alias('orderUpdates'),
    'orders.Order_ID = orderUpdates.Order_ID AND orders.Product_ID = orderUpdates.Product_ID'
  ) \
  .whenMatchedUpdate(set =
    {
      "Customer_ID": "orderUpdates.Customer_ID",
      "Employee_ID": "orderUpdates.Employee_ID",
      "Street_ID": "orderUpdates.Street_ID",
      "Order_Date": "orderUpdates.Order_Date",
      "Delivery_Date": "orderUpdates.Delivery_Date",
      "Order_ID": "orderUpdates.Order_ID",
      "Order_Type": "orderUpdates.Order_Type",
      "Product_ID": "orderUpdates.Product_ID",
      "Quantity": "orderUpdates.Quantity",
      "Total_Retail_Price": "orderUpdates.Total_Retail_Price",
      "CostPrice_Per_Unit": "orderUpdates.CostPrice_Per_Unit",
      "Discount": "orderUpdates.Discount"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "Customer_ID": "orderUpdates.Customer_ID",
      "Employee_ID": "orderUpdates.Employee_ID",
      "Street_ID": "orderUpdates.Street_ID",
      "Order_Date": "orderUpdates.Order_Date",
      "Delivery_Date": "orderUpdates.Delivery_Date",
      "Order_ID": "orderUpdates.Order_ID",
      "Order_Type": "orderUpdates.Order_Type",
      "Product_ID": "orderUpdates.Product_ID",
      "Quantity": "orderUpdates.Quantity",
      "Total_Retail_Price": "orderUpdates.Total_Retail_Price",
      "CostPrice_Per_Unit": "orderUpdates.CostPrice_Per_Unit",
      "Discount": "orderUpdates.Discount"
    }
  ) \
  .execute()

#### Data Validation: Compare Customers delta table in the main and new branch

In [0]:
customersTablePath = f"lakefs://{repositoryName}/{mainBranch}/{customersTable}"
df = spark.read.format("delta").load(customersTablePath).where("Customer_ID = 19444 OR Customer_ID = 63").display()

In [0]:
customersTablePath = f"lakefs://{repositoryName}/{deltaLakeETLBranch}/{customersTable}"
spark.read.format("delta").load(customersTablePath).where("Customer_ID = 19444 OR Customer_ID = 63").display()

#### Data Validation: Compare Customers count in the main and new branch

In [0]:
refs = [mainBranch, deltaLakeETLBranch]

delta_table_compare_branches(customersTable, refs)

#### Data Validation: Compare Orders delta table in the main and new branch

In [0]:
ordersTablePath = f"lakefs://{repositoryName}/{mainBranch}/{ordersTable}"
df = spark.read.format("delta").load(ordersTablePath).where("Customer_ID = 19444").display()

In [0]:
ordersTablePath = f"lakefs://{repositoryName}/{deltaLakeETLBranch}/{ordersTable}"
df = spark.read.format("delta").load(ordersTablePath).where("Customer_ID = 19444").display()

#### Data Validation: Compare Orders count in the main and new branch

In [0]:
refs = [mainBranch, deltaLakeETLBranch]

delta_table_compare_branches(ordersTable, refs)

#### Commit changes and attach some metadata

In [0]:
branchDeltaLakeETL.commit(message='Deleted and updated customers. Deleted and upserted orders.', 
        metadata={'using': 'python_api'})

#### Diff between the new branch and the source branch

In [0]:
diff = branchMain.diff(other_ref=branchDeltaLakeETL)
print_diff(diff)

### ETL Job Completes

#### Delete new branch if ETL job fails or merge new branch to main branch if ETL job succeeds

#### Delete new branch if ETL job fails

In [0]:
# Uncomment if you want to run this

#branchDeltaLakeETL.delete()

#### Or merge new branch to the main branch if ETL job succeeds (atomic promotion to production)

In [0]:
res = branchDeltaLakeETL.merge_into(branchMain)
print(res)

#### Data Validation: Read data from the main branch

In [0]:
customersTablePath = f"lakefs://{repositoryName}/{mainBranch}/{customersTable}"
spark.read.format("delta").load(customersTablePath).where("Customer_ID = 19444 OR Customer_ID = 63").display()

In [0]:
ordersTablePath = f"lakefs://{repositoryName}/{mainBranch}/{ordersTable}"
df = spark.read.format("delta").load(ordersTablePath).where("Customer_ID = 19444").display()

#### Data Validation: Compare Customers count in the main and new branch

In [0]:
refs = [mainBranch, deltaLakeETLBranch]

delta_table_compare_branches(customersTable, refs)

#### Data Validation: Compare Orders count in the main and new branch

In [0]:
refs = [mainBranch, deltaLakeETLBranch]

delta_table_compare_branches(ordersTable, refs)

#### If you merged new branch to the main branch then you can atomically rollback Multi-Table Transactions

##### Go to lakeFS UI and get the commit ID or copy the 'reference' from the previous merge statement

In [0]:
branchMain.revert(parent_number=1, reference=mainBranch)

#### Data Validation: Read data again from the main branch

In [0]:
customersTablePath = f"lakefs://{repositoryName}/{mainBranch}/{customersTable}"
spark.read.format("delta").load(customersTablePath).where("Customer_ID = 19444 OR Customer_ID = 63").display()

In [0]:
ordersTablePath = f"lakefs://{repositoryName}/{mainBranch}/{ordersTable}"
df = spark.read.format("delta").load(ordersTablePath).where("Customer_ID = 19444").display()

#### Data Validation: Compare Customers count in the main and new branch

In [0]:
refs = [mainBranch, deltaLakeETLBranch]

delta_table_compare_branches(customersTable, refs)

#### Data Validation: Compare Orders count in the main and new branch

In [0]:
refs = [mainBranch, deltaLakeETLBranch]

delta_table_compare_branches(ordersTable, refs)

#### More Questions?

###### Join the [lakeFS Slack group](https://lakefs.io/slack)