# Introduction

##### Delta Lake is an open-source storage framework that enables building a Lakehouse architecture. It provides ACID transactions, scalable metadata handling, and unifies streaming and batch data processing. It runs on top of your existing data lakes and is compatible with processing engines like Apache Spark, PrestoDB, Flink, Trino, and Hive and APIs for Scala, Java, Rust, Ruby, and Python. Specifically, it provides the following features:
- __ACID guarantees__ <br>
     Delta Lake ensures that all data changes written to storage are committed for durability and made visible to readers atomically. In other words, no more partial or corrupted files! We will discuss more on the acid guarantees as part of the transaction log later in this chapter.

- __Scalable data and metadata handling:__<br>
    Since Delta Lake is built on data lakes, all reads and writes using Spark or other distributed processing engines are inherently scalable to petabyte-scale. However, unlike most other storage formats and query engines, Delta
    Lake leverages Spark to scale out all the metadata processing, thus efficiently handling metadata of billions of files for petabyte-scale tables. We will discuss more on the transaction log later in this chapter.

- __Audit History and Time travel__<br>
    The Delta Lake transaction log records details about every change made to data providing a full audit trail of the changes. These data snapshots enable developers to access and revert to earlier versions of data for audits, 
    rollbacks, or to reproduce experiments. We will dive further into this topic in Chapter 3: Time Travel with Delta.

- __Schema enforcement and schema evolution__<br>
    Delta Lake automatically prevents the insertion of data with an incorrect schema, i.e. not matching the table schema. And when needed, it allows the table schema to be explicitly and safely evolved to accommodate ever-change 
    data. We will dive further into this topic in Chapter 4 focusing on schema enforcement and evolution.

- __Support for deletes updates, and merge__<br>
    Most distributed processing frameworks do not support atomic data modification operations on data lakes. Delta Lake supports merge, update, and delete operations to enable complex use cases including but not limited to change-
    datacapture (CDC), slowly-changing-dimension (SCD) operations, and streaming upserts. We will dive further into this topic in Chapter 5: Data modifications in Delta.

- __Streaming and batch unification__<br>
    A Delta Lake table has the ability to work both in batch and as a streaming source and sink. The ability to work across a wide variety of latencies ranging from streaming data ingest to batch historic backfill to interactive 
    queries all just work out of the box. We will dive further into this topic in Chapter 6: Streaming Applications with Delta.

### For additional details, please refer to Apache Iceberg documentation:<br>
- __[Deltalake Documentation](https://docs.delta.io/latest/index.html)__

## Pre-Requisite:

For executing the code in this notebook you will need the below:
- A AWS account <br>

Below services should be created and configured
- EMR Studio
- EMR Studio Workspace
- EMR on EKS Virtual Cluster
- EKS Cluster (EC2 based)
- Managed Endpoint
- IAM Policy
- Application Load Balancer
- VPC and Subnet

### Note: 
##### As of __EMR relase 6.9.0__ for EMR on EKS, EMR Studio notebooks support cell magic for configuring Spark parameters including passing the Deltalake Jar files. At this point just __%%configure__ is supported for cell magic.
##### For more information on release notes, please refer to __[Release notes](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/emr-eks-6.9.0.html)__ <br>

#### With cell magic support to override Spark parameters within the notebook itself, we now have multiple ways to configure the parameters
 - Defining the parameters within the Managed Endpoint which have shown how to configure for both __[Iceberg](https://github.com/techguruonline/EMR_Studio_with_EMR_on_EKS_and_Iceberg-Interactive-workloads)__ and __[Hudi](https://github.com/techguruonline/EMR_Studio_with_EMR_on_EKS_and_HUDI-Interactive-workloads)__
 - Defining the parameters within the EMR Studio notebook which can be used either to override the parameters defined in the Managed endpoint or just define it here instead of the Managed Endpoint. Have shown this in this notebook for Deltalake
 
 
### Current Setup used for this notebook
- EMR version: __emr-6.9.0-latest__
- EKS version: __1.21__
- Instance Type for EKS cluster: __m5.xlarge__
- No of Instances: __3__
- Deltalake Version: __Delta 2.1.0__
- Spark Version: __3.3.0__

In [4]:
%%configure -f

{
  "driverMemory": "1G",
  "driverCores" : 1,
  "executorMemory" : "1G",
  "executorCores": 1,
  "conf": {
     "spark.dynamicAllocation.maxExecutors" : 10,
     "spark.dynamicAllocation.minExecutors": 1,
     "spark.sql.extensions" : "io.delta.sql.DeltaSparkSessionExtension",
     "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
     "spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog",
     "spark.databricks.hive.metastore.glueCatalog.enabled":"true",
     "spark.jars.packages":"io.delta:delta-core_2.12:2.1.0",
     "spark.databricks.delta.schema.autoMerge.enabled" : "true"
  }
}

[I 2022-12-07 10:22:01,167.167 configure_magic] Magic cell payload received: {"driverMemory": "1G", "driverCores": 1, "executorMemory": "1G", "executorCores": 1, "conf": {"spark.dynamicAllocation.maxExecutors": 10, "spark.dynamicAllocation.minExecutors": 1, "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension", "spark.serializer": "org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog", "spark.databricks.hive.metastore.glueCatalog.enabled": "true", "spark.jars.packages": "io.delta:delta-core_2.12:2.1.0", "spark.databricks.delta.schema.autoMerge.enabled": "true"}, "proxyUser": "assumed-role_cf-emr-studio-1-StudioUserRole-1UMGXJ16SJMRN_emrstudio"}

[I 2022-12-07 10:22:01,168.168 configure_magic] Sending request to update kernel. Please wait while the kernel will be refreshed.


The kernel is successfully refreshed.

In [1]:
#Import few libraries
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from delta.tables import *

In [2]:
InputData = [
    (1,'Prasad Nadig', 25, 'NJ','2022-01-01', datetime.strptime(datetime.now().strftime("%Y-%d-%m %H:%M:%S"), "%Y-%d-%m %H:%M:%S")),
    (2,'Ethereum', 80, 'NY', '2022-01-02', datetime.strptime(datetime.now().strftime("%Y-%d-%m %H:%M:%S"), "%Y-%d-%m %H:%M:%S")),
    (3,'Cosmos', 25, 'PA', '2022-01-03', datetime.strptime(datetime.now().strftime("%Y-%d-%m %H:%M:%S"), "%Y-%d-%m %H:%M:%S")),
    (4,'Solana', 55, 'MD', '2022-01-04', datetime.strptime(datetime.now().strftime("%Y-%d-%m %H:%M:%S"), "%Y-%d-%m %H:%M:%S")),
    (5,'Cardano', 15, 'TX', '2022-01-05', datetime.strptime(datetime.now().strftime("%Y-%d-%m %H:%M:%S"), "%Y-%d-%m %H:%M:%S")),
    (6,'Link', 45, 'NJ', '2022-01-06', datetime.strptime(datetime.now().strftime("%Y-%d-%m %H:%M:%S"), "%Y-%d-%m %H:%M:%S"))
]

#Define schema for the source data
schema = StructType([ \
    StructField("cust_id",IntegerType(),True), \
    StructField("cust_name",StringType(),True), \
    StructField("cust_age",IntegerType(),True), \
    StructField("cust_loc",StringType(),True), \
    StructField("create_date", StringType(), True), \
    StructField("last_updated_time", TimestampType(), True)
  ])

#Create dataframe from the input data and the corresponding schema
inputDF = spark.createDataFrame(data=InputData,schema=schema)

In [3]:
inputDF.show()

+-------+------------+--------+--------+-----------+-------------------+
|cust_id|   cust_name|cust_age|cust_loc|create_date|  last_updated_time|
+-------+------------+--------+--------+-----------+-------------------+
|      1|Prasad Nadig|      25|      NJ| 2022-01-01|2022-12-07 10:22:25|
|      2|    Ethereum|      80|      NY| 2022-01-02|2022-12-07 10:22:25|
|      3|      Cosmos|      25|      PA| 2022-01-03|2022-12-07 10:22:25|
|      4|      Solana|      55|      MD| 2022-01-04|2022-12-07 10:22:25|
|      5|     Carnado|      15|      TX| 2022-01-05|2022-12-07 10:22:25|
|      6|        Link|      45|      NJ| 2022-01-06|2022-12-07 10:22:25|
+-------+------------+--------+--------+-----------+-------------------+



In [4]:
# Write DataFrame as a Delta table
inputDF.write.format("delta") \
       .mode("overwrite") \
       .option("overwriteSchema", "true") \
       .partitionBy("create_date") \
       .save(f"s3://emr-studio-emr-on-eks/tmp/delta/")

In [5]:
# Set path to the Delta table, read data from Delta table
deltaPath = 's3://emr-studio-emr-on-eks/tmp/delta/'
df_delta = spark.read.format("delta").load(deltaPath)
df_delta.show()

+-------+------------+--------+--------+-----------+-------------------+
|cust_id|   cust_name|cust_age|cust_loc|create_date|  last_updated_time|
+-------+------------+--------+--------+-----------+-------------------+
|      1|Prasad Nadig|      25|      NJ| 2022-01-01|2022-12-07 10:22:25|
|      2|    Ethereum|      80|      NY| 2022-01-02|2022-12-07 10:22:25|
|      5|     Carnado|      15|      TX| 2022-01-05|2022-12-07 10:22:25|
|      4|      Solana|      55|      MD| 2022-01-04|2022-12-07 10:22:25|
|      3|      Cosmos|      25|      PA| 2022-01-03|2022-12-07 10:22:25|
|      6|        Link|      45|      NJ| 2022-01-06|2022-12-07 10:22:25|
+-------+------------+--------+--------+-----------+-------------------+



### UPSERT with Delta tables

In [6]:
#Update couple of existing records. Update cust_age to 35 for cust_id 1 and cust_loc from MD to CA for cust_id 4
#Insert a new Record cust_id = 7
UpdateData = [
    (1,'Prasad Nadig', 35, 'NJ','2022-01-01', datetime.strptime(datetime.now().strftime("%Y-%d-%m %H:%M:%S"), "%Y-%d-%m %H:%M:%S")),
    (4,'Solana', 55, 'MD', '2022-01-04', datetime.strptime(datetime.now().strftime("%Y-%d-%m %H:%M:%S"), "%Y-%d-%m %H:%M:%S")),
    (7,'Algorand', 55, 'NC', '2022-01-07', datetime.strptime(datetime.now().strftime("%Y-%d-%m %H:%M:%S"), "%Y-%d-%m %H:%M:%S"))
]

#Define schema for the source data
schema = StructType([ \
    StructField("cust_id",IntegerType(),True), \
    StructField("cust_name",StringType(),True), \
    StructField("cust_age",IntegerType(),True), \
    StructField("cust_loc",StringType(),True), \
    StructField("create_date", StringType(), True), \
    StructField("last_updated_time", TimestampType(), True)
  ])

#Create dataframe from the input data and the corresponding schema
updateDF = spark.createDataFrame(data=UpdateData,schema=schema)


In [7]:
# Merge both Source and Target data with conditions to either update the record if it exists based on the primary key (cust_id) else Insert if it doesn't exist in target.
# NOTE: if you run this step, please ignore the next step, have shown different ways to perform UPSERT using Deltalake.

deltaTable = DeltaTable.forPath(spark, deltaPath)

deltaTable.alias('targetData') \
    .merge(
        updateDF.alias('updatedData'),
        'targetData.cust_id = updatedData.cust_id') \
    .whenMatchedUpdate( set = 
        {
            "cust_id": "updatedData.cust_id",
            "cust_name": "updatedData.cust_name",
            "cust_age": "updatedData.cust_age",
            "cust_loc": "updatedData.cust_loc",
            "create_date": "updatedData.create_date",
            "last_updated_time": "updatedData.last_updated_time"
        } 
                      ) \
    .whenNotMatchedInsert(values = 
        {
            "cust_id": "updatedData.cust_id",
            "cust_name": "updatedData.cust_name",
            "cust_age": "updatedData.cust_age",
            "cust_loc": "updatedData.cust_loc",
            "create_date": "updatedData.create_date",
            "last_updated_time": "updatedData.last_updated_time"
        } 
                         ) \
    .execute()
        
            

In [9]:
# This step is optional, have shown different ways to UPSERT data in Deltalake. The above option provides you flexibility to update/insert only selective columns, this option will update all the columns and same for Insert as well.
deltaTable = DeltaTable.forPath(spark, deltaPath)

deltaTable.alias('targetData') \
    .merge(
        updateDF.alias('updatedData'),
        'targetData.cust_id = updatedData.cust_id') \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()


In [8]:
df_delta_updates = spark.read.format("delta").load(deltaPath)
df_delta_updates.sort('cust_id').show()

+-------+------------+--------+--------+-----------+-------------------+
|cust_id|   cust_name|cust_age|cust_loc|create_date|  last_updated_time|
+-------+------------+--------+--------+-----------+-------------------+
|      1|Prasad Nadig|      35|      NJ| 2022-01-01|2022-12-07 10:23:24|
|      2|    Ethereum|      80|      NY| 2022-01-02|2022-12-07 10:22:25|
|      3|      Cosmos|      25|      PA| 2022-01-03|2022-12-07 10:22:25|
|      4|      Solana|      55|      MD| 2022-01-04|2022-12-07 10:23:24|
|      5|     Carnado|      15|      TX| 2022-01-05|2022-12-07 10:22:25|
|      6|        Link|      45|      NJ| 2022-01-06|2022-12-07 10:22:25|
|      7|    Algorand|      55|      NC| 2022-01-07|2022-12-07 10:23:24|
+-------+------------+--------+--------+-----------+-------------------+



### Delete records with Delta tables

In [9]:
# Delete an existing record. Records from source if matched with records in target will be deleted from target table.

DeleteData = [
    (6,'Link', 45, 'NJ', '2022-01-06', datetime.strptime(datetime.now().strftime("%Y-%d-%m %H:%M:%S"), "%Y-%d-%m %H:%M:%S")),
]

#Define schema for the source data
schema = StructType([ \
    StructField("cust_id",IntegerType(),True), \
    StructField("cust_name",StringType(),True), \
    StructField("cust_age",IntegerType(),True), \
    StructField("cust_loc",StringType(),True), \
    StructField("create_date", StringType(), True), \
    StructField("last_updated_time", TimestampType(), True)
  ])

#Create dataframe from the input data and the corresponding schema
deleteDF = spark.createDataFrame(data=DeleteData,schema=schema)

In [10]:
# Similar to Upsert, we use the .merge function from delta tables, if the record matches between source and target, the records will be deleted from the target table.
deltaTable = DeltaTable.forPath(spark, deltaPath)

deltaTable.alias('targetData') \
    .merge(
        deleteDF.alias('deletedData'),
        'targetData.cust_id = deletedData.cust_id') \
    .whenMatchedDelete()\
    .execute()

In [11]:
df_delta_deletes = spark.read.format("delta").load(deltaPath)
df_delta_deletes.sort('cust_id').show()

+-------+------------+--------+--------+-----------+-------------------+
|cust_id|   cust_name|cust_age|cust_loc|create_date|  last_updated_time|
+-------+------------+--------+--------+-----------+-------------------+
|      1|Prasad Nadig|      35|      NJ| 2022-01-01|2022-12-07 10:23:24|
|      2|    Ethereum|      80|      NY| 2022-01-02|2022-12-07 10:22:25|
|      3|      Cosmos|      25|      PA| 2022-01-03|2022-12-07 10:22:25|
|      4|      Solana|      55|      MD| 2022-01-04|2022-12-07 10:23:24|
|      5|     Carnado|      15|      TX| 2022-01-05|2022-12-07 10:22:25|
|      7|    Algorand|      55|      NC| 2022-01-07|2022-12-07 10:23:24|
+-------+------------+--------+--------+-----------+-------------------+



### Time Travel with Deltalake

In [12]:
# List different versions
deltaTable.history().show()

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      2|2022-12-07 10:24:11|  null|    null|    MERGE|{predicate -> (ta...|null|    null|     null|          1|  Serializable|        false|{numTargetRowsCop...|        null|Apache-Spark/3.3....|
|      1|2022-12-07 10:23:43|  null|    null|    MERGE|{predicate -> (ta...|null|    null|     null|          0|  Serializable|        false|{numTargetRowsCop...|        null|Apache-Spark/3.3....|
|      0|2022-1

In [13]:
# Let's query the delta table to fetch the data before Upsert operation i.e, initial inserted data.
beforeUpsert = spark.read.format("delta").option("versionAsOf", 0).load(deltaPath)
beforeUpsert.sort('cust_id').show()

+-------+------------+--------+--------+-----------+-------------------+
|cust_id|   cust_name|cust_age|cust_loc|create_date|  last_updated_time|
+-------+------------+--------+--------+-----------+-------------------+
|      1|Prasad Nadig|      25|      NJ| 2022-01-01|2022-12-07 10:22:25|
|      2|    Ethereum|      80|      NY| 2022-01-02|2022-12-07 10:22:25|
|      3|      Cosmos|      25|      PA| 2022-01-03|2022-12-07 10:22:25|
|      4|      Solana|      55|      MD| 2022-01-04|2022-12-07 10:22:25|
|      5|     Carnado|      15|      TX| 2022-01-05|2022-12-07 10:22:25|
|      6|        Link|      45|      NJ| 2022-01-06|2022-12-07 10:22:25|
+-------+------------+--------+--------+-----------+-------------------+



In [14]:
# Now let's query the delta table to fetch the data before the record was deleted
beforeDelete = spark.read.format("delta").option("versionAsOf", 1).load(deltaPath)
beforeDelete.sort('cust_id').show()

+-------+------------+--------+--------+-----------+-------------------+
|cust_id|   cust_name|cust_age|cust_loc|create_date|  last_updated_time|
+-------+------------+--------+--------+-----------+-------------------+
|      1|Prasad Nadig|      35|      NJ| 2022-01-01|2022-12-07 10:23:24|
|      2|    Ethereum|      80|      NY| 2022-01-02|2022-12-07 10:22:25|
|      3|      Cosmos|      25|      PA| 2022-01-03|2022-12-07 10:22:25|
|      4|      Solana|      55|      MD| 2022-01-04|2022-12-07 10:23:24|
|      5|     Carnado|      15|      TX| 2022-01-05|2022-12-07 10:22:25|
|      6|        Link|      45|      NJ| 2022-01-06|2022-12-07 10:22:25|
|      7|    Algorand|      55|      NC| 2022-01-07|2022-12-07 10:23:24|
+-------+------------+--------+--------+-----------+-------------------+

