# Build a house on the lake - Lab 3

-->(Introductory section added by Denis - Feel free to adjust or remove):

---
Today, as organizations collects large amounts of data of any type (structured, unstructured, semi-structured), the vast majority of enterprise data lands in **Data Lakes**. Different software system, analytics applications consume data directly through the Data Lake. In our environment, the data that reside on the Data Lake is managed by HPE Ezmeral Data Fabric. The Data Lake can store any type of data (structured, unstructured, semi-structured). 

To accomplish both data analytics workloads (such as AI/ML) and Business Intelligence (BI) use cases, organizations typically have to build a **Data Lakehouse** on top of Data Lake which serves as data storage. A Data Lakehouse is an open architecture that combines the best elements of data lakes and data warehouses. Data Lakehouse overcomes critical Data Lake limitations such as poor performance, no support for transactions and no enforcement of data quality.

In this part of the lab, you will explore how to build a data Lakehouse with **Delta Lake** and how to access data. 

Delta Lake is a key technology used to implement a Data Lakehouse that enables storing any type of data once in a data lake and doing AI and BI on that data directly.

Delta Lake implements key functions such as ACID properties for atomic transactions, time travel (easily move data to any of its version in the timeline through the transaction logs), open format (Delta Lake uses the open format ***Apache Parquet*** to store the data), governance (access controls, audit logging), data access (Spark SQL can be used to query the data).

Delta Lake is primarily based on 2 technologies:
* Apache Spark (analytics engine for large-scale data processing both for batch and streaming modes)
* Apache Parquet (columnar storage format most suitable to store and read data for analytics purpose. Technology like Spark SQL can be used to query the data). Apache Parquet format is used to store all the metadata used for data quality, ACID transaction, versioning, and governance.

---

<img src="Pictures/build-the-lakehouse.png" height="60%" width="60%" align="right">

### Here we are at the edge of our lake and our foundation has been poured using HPE Ezmeral as the concrete
In the diagram you see that you are using one of the Jupyter Notebook applications managed by HPE Ezmeral Runtime Enterprise in a Kubernetes cluster. In that cluster is the tenant or Kubernetes namespace that your studentID gives you access to. Other students have access to the same tenant, but they are running their own "Jupyter Notebook with ML Toolkits" applications.

Next you can see on the right that the data in our Data Lake is being managed by HPE Ezmeral Data Fabric.  We have configured the HPE Ezmeral Data Fabric to provide all of our tenant storage for us. In this lab we are using a sandbox storage space that is called "TenantStorage". This is in fact a default DataTap setup automatically by HPE Ezmeral Runtime Enterprise when the cluster and tenant were created.

We can also create DataTaps to other HDFS or MAPRFS existing Data Lakes if we want to.  For this lab we will just use the default TenantStorage sandbox DataTap.  Each Student using this workshop gets their own folder inside the shared TenantStorage. Each Student has a copy of the source data pre-copied into their student folder. That source data - a parquet file will be used to create a Delta Lake table. 

In [1]:
%kubeRefresh

please enter your password


 ············


kubeconfig set for user student900


In [2]:
%%bash
kubectl config current-context

k8scluster2-DeltaLake-student900


In [3]:
%%bash
kubectl get pods

NAME                                                  READY   STATUS    RESTARTS   AGE
hivemeta-0                                            1/1     Running   0          28d
jupyter-notebook-test-student900-controller-lffx6-0   2/2     Running   0          31m
livy-0                                                1/1     Running   0          28d
ml-pipeline-ui-artifact-5d68fc6566-rknch              1/1     Running   0          28d
ml-pipeline-visualizationserver-6b46b8fbd9-rnhfh      1/1     Running   0          28d
tenantcli-0                                           1/1     Running   0          28d


# (Added by Denis) Create your studentID folder under the default shared DataTap storage and copy the Parquet data file
The default TenantStorage Datatap is shared among all the workshop participants. The participants (studentID) will get their own folder (/studentID) under the shared datatap storage (dtap://TenantStorage/studentID/).

In [5]:
%%bash
userID="student900"
#hadoop version
hadoop fs -rm -f -r dtap://TenantStorage/${userID}
hadoop fs -mkdir dtap://TenantStorage/${userID}
hadoop fs -put SAISEU19-loan-risks.snappy.parquet dtap://TenantStorage/${userID}/.
hadoop fs -ls dtap://TenantStorage/${userID}

Found 1 items
-rw-r--r--   3 student900 users     164631 2022-08-23 08:28 dtap://TenantStorage/student900/SAISEU19-loan-risks.snappy.parquet


# Truly Magic
<img src="Pictures/its-magic.png" height="60%" width="60%" align="right">

### Great things in small packages
The 1st cell in this notebook does some really helpful things for you. Jupyter Notebooks have many standard "magic" commands to make your code more integrated with various other environments, such as running inline OS or BASH commands within a given kernel. HPE Ezmeral ML Ops includes this %%spark command to quickly get you connected to the Livy server that is already deployed for you in your tenant.  When you execute the below %%spark magic command, you will get a Livy managed spark session running in your tenant with a Livy Spark driver and two executor pods.

The rest of the diagram shows you the environment that this Jupyter Notebook with ML Toolkits kubernetes cluster based application is running in.  You are interacting with a Jupyter Notebook. That notebook has access to HPE Ezmeral Runtime Enterprise DataTaps including our default "sandbox TenantStorage".  Illustrated in this diagram is an example of also tapping into other data sources world wide.  This lab only uses the local tenantstorage sandbox however.

# Run the %%spark cell magic
When you run this cell magic command the HPE Ezmeral Runtime Enterprise custom logic will scan the Tenant for any Livy endpoints currently deployed. The URL will be output in an interactive prompt below this cell. Select the URL with your mouse and copy it into your clipboard using control-c or command-c depending on your client OS. Then, paste that URL into the provided edit box.  Next, ***as Livy authenticates users with LDAP credentials***, you will be required to enter your Student Password. After you enter your password and press enter, it will take a little over 1 minute to start a dynamic Spark cluster ***(1 Spark driver POD and 2 Spark executor PODs for the duration of the Spark Livy session)*** using REST API calls to the Livy REST endpoint. 

In [6]:
%%spark

Available Livy endpoints for livy-svc: 
https://gateway2.etc.fr.comm.hpecorp.net:13023


Enter Livy endpoint (e.g., http://<internal-livy-session-url>:<port>) : https://gateway2.etc.fr.comm.hpecorp.net:13023
Enter your password:  ············


INFO: Please wait a few minutes for the spark session to be configured.
Starting Spark application


Spark Session ID,Kind,State,Spark UI,Driver log,User,Current session?
15,pyspark,idle,,,student900,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.
Time taken to configure Spark Session: 106.13636946678162 seconds


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Customize our Livy, Spark session specifically to include Delta Lake libraries
We now have a default Spark session running. We could start running many Spark commands. However, we also want to run Delta Lake extensions to PySpark.  To do this we will further customize our Spark environment. This next cell uses another custom Jupyter Magic command to re-configure our Spark session and add in support for Delta lake extensions. Simply run this cell and after another minute we will have a new session ready to accept Pyspark, Delta Lake commands.

In [7]:
%%configure -f
{
  "conf": {
            "spark.ssl.enabled": false,
            "spark.hadoop.fs.dtap.impl": "com.bluedata.hadoop.bdfs.Bdfs",
            "spark.hadoop.fs.AbstractFileSystem.dtap.impl": "com.bluedata.hadoop.bdfs.BdAbstractFS",
            "spark.hadoop.fs.dtap.impl.disable.cache": "false",
            "spark.driver.extraClassPath": "local:///opt/bdfs/bluedata-dtap.jar",
            "spark.executor.extraClassPath": "local:///opt/bdfs/bluedata-dtap.jar",
            "spark.kubernetes.driver.label.hpecp.hpe.com/dtap": "hadoop2",
            "spark.kubernetes.executor.label.hpecp.hpe.com/dtap": "hadoop2",
            "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
            "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog"
  }
}

Starting Spark application


Spark Session ID,Kind,State,Spark UI,Driver log,User,Current session?
16,pyspark,idle,,,student900,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.
Time taken to configure Spark Session: 108.04321527481079 seconds


Spark Session ID,Kind,State,Spark UI,Driver log,User,Current session?
16,pyspark,idle,,,student900,✔


In [8]:
%%bash
kubectl get pods

NAME                                                  READY   STATUS    RESTARTS   AGE
hivemeta-0                                            1/1     Running   0          28d
jupyter-notebook-test-student900-controller-lffx6-0   2/2     Running   0          52m
livy-0                                                1/1     Running   0          28d
livy-session-16-2b661d82c9d6e2bc-exec-1               2/2     Running   0          15m
livy-session-16-2b661d82c9d6e2bc-exec-2               2/2     Running   0          15m
livy-session-16-f871cf82c9d60a98-driver               2/2     Running   0          16m
ml-pipeline-ui-artifact-5d68fc6566-rknch              1/1     Running   0          28d
ml-pipeline-visualizationserver-6b46b8fbd9-rnhfh      1/1     Running   0          28d
tenantcli-0                                           1/1     Running   0          28d


# Setup our source file
Use HDFS commands to copy our parquet file out to our tenant storage

In [9]:
%%local
home = !echo $HOME
home = home[0]
import subprocess
fpath = 'dtap://TenantStorage/student900/SAISEU19-loan-risks.snappy.parquet'
#subprocess.check_output(['hadoop', 'fs', '-mkdir', '-p', 'dtap://TenantStorage/deltalake-workshop'])
#subprocess.check_output(['hadoop', 'fs', '-test', '-e', fpath])
print(f'Home [{home}] fpath [{fpath}] ')
try:
    #subprocess.check_output(['hadoop', 'fs', '-e', fpath])
    subprocess.check_output(['hadoop', 'fs', '-test', '-e', fpath])
    print(f"{fpath} exists")
except:
    print(f" error")


Home [/home/student900] fpath [dtap://TenantStorage/student900/SAISEU19-loan-risks.snappy.parquet] 
dtap://TenantStorage/student900/SAISEU19-loan-risks.snappy.parquet exists


## 1.Load our source Parquet file and create a Parquet Table "View" out of it

In [10]:
parquet_path = "dtap://TenantStorage/student900/deltalake-workshop/"

spark.read.format("parquet").load("dtap://TenantStorage/student900/SAISEU19-loan-risks.snappy.parquet") \
  .write.format("parquet").save(parquet_path)

print("Created a Parquet table at " + parquet_path)

# Create a view on the table called loans_parquet
spark.read.format("parquet").load(parquet_path).createOrReplaceTempView("loans_parquet")
print("Defined view 'loans_parquet'")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Created a Parquet table at dtap://TenantStorage/student900/deltalake-workshop/
Defined view 'loans_parquet'

In [11]:
# ##### Let's explore this parquet table.
# *Schema of the table*
# - load_id - unique id for each loan
# - funded_amnt - principal amount of the loan funded to the loanee
# - paid_amnt - amount from the principle that has been paid back (ignoring interests)
# - addr_state - state where this loan was funded

spark.sql("select * from loans_parquet").show()

spark.sql("select count(*) from loans_parquet").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------+---------+----------+
|loan_id|funded_amnt|paid_amnt|addr_state|
+-------+-----------+---------+----------+
|      0|       1000|   182.22|        CA|
|      1|       1000|   361.19|        WA|
|      2|       1000|   176.26|        TX|
|      3|       1000|   1000.0|        OK|
|      4|       1000|   249.98|        PA|
|      5|       1000|    408.6|        CA|
|      6|       1000|   1000.0|        MD|
|      7|       1000|   168.81|        OH|
|      8|       1000|   193.64|        TX|
|      9|       1000|   218.83|        CT|
|     10|       1000|   322.37|        NJ|
|     11|       1000|   400.61|        NY|
|     12|       1000|   1000.0|        FL|
|     13|       1000|   165.88|        NJ|
|     14|       1000|    190.6|        TX|
|     15|       1000|   1000.0|        OH|
|     16|       1000|   213.72|        MI|
|     17|       1000|   188.89|        MI|
|     18|       1000|   237.41|        CA|
|     19|       1000|   203.85|        CA|
+-------+--

### **Let's start appending some new data to it using Structured Streaming.**
- We will generate a stream of data from with randomly generated loan ids and amounts. 
- In addition, we are going to define a few more useful utility functions.

In [12]:
import random
from pyspark.sql.functions import *
from pyspark.sql.types import *

def random_checkpoint_dir(): 
    return "dtap://TenantStorage/student900/deltalake-workshop/chkpt/%s" % str(random.randint(0, 10000))

# User-defined function to generate random state

states = ["CA", "TX", "NY", "IA"]

@udf(returnType=StringType())
def random_state():
    return str(random.choice(states))

# Function to start a streaming query with a stream of randomly generated load data and append to the parquet table
def generate_and_append_data_stream(table_format, table_path):
  
    stream_data = spark.readStream.format("rate").option("rowsPerSecond", 5).load() \
                .withColumn("loan_id", 10000 + col("value")) \
                .withColumn("funded_amnt", (rand() * 5000 + 5000).cast("integer")) \
                .withColumn("paid_amnt", col("funded_amnt") - (rand() * 2000)) \
                .withColumn("addr_state", random_state())
    query = stream_data.writeStream.format(table_format) \
                .option("checkpointLocation", random_checkpoint_dir()) \
                .trigger(processingTime = "10 seconds") \
                .start(table_path)
    return query

# Function to stop all streaming queries 
import os
def stop_all_streams():
    # Stop all the streams
    print("Stopping all streams")
    for s in spark.streams.active:
        s.stop()
    print("Stopped all streams")
#     print("Deleting checkpoints")  
#     try:
#         deleteS3Folder("delta","loans-demo/chkpt")
#     except S3Error as err:
#         print(str(err))
#     print("Deleted checkpoints")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
# Let's start a new stream to append data to the Parquet table
from time import sleep

stream_query = generate_and_append_data_stream(
    table_format = "parquet", 
    table_path = parquet_path)
sleep(15)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
# Let's see if the data is being added to the table or not
spark.read.format("parquet").load(parquet_path).count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

905

In [15]:
# Where did our existing 14705 rows go? Let's see the data once again
spark.read.format("parquet").load(parquet_path).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----+-------+-----------+------------------+----------+
|           timestamp|value|loan_id|funded_amnt|         paid_amnt|addr_state|
+--------------------+-----+-------+-----------+------------------+----------+
|2022-08-23 08:56:...|  455|  10455|       6168|4748.3256342271125|        NY|
|2022-08-23 08:56:...|  457|  10457|       6580| 4864.665204292984|        TX|
|2022-08-23 08:56:...|  459|  10459|       7446| 6360.294551888913|        IA|
|2022-08-23 08:56:...|  461|  10461|       6336| 4742.231639439925|        NY|
|2022-08-23 08:56:...|  463|  10463|       8786|7158.5525662866485|        NY|
|2022-08-23 08:56:...|  465|  10465|       9595| 7992.805563037219|        TX|
|2022-08-23 08:56:...|  467|  10467|       5982| 5373.129775468111|        NY|
|2022-08-23 08:56:...|  469|  10469|       5891| 5326.444508909475|        TX|
|2022-08-23 08:56:...|  471|  10471|       6120|5466.5448950871805|        IA|
|2022-08-23 08:56:...|  473|  10473|       8989| 787

Where did the two new columns `timestamp` and `value` come from? What happened here!

What really happened is that when the streaming query started adding new data to the Parquet table, it did not properly account for the existing data in the table. Furthermore, the new data files that written out accidentally had two extra columns in the schema. Hence, when reading the table, the 2 different schema from different files were merged together, thus unexpectedly modifying the schema of the table.


In [16]:
stop_all_streams()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Stopping all streams
Stopped all streams

## 2. Batch + stream processing and schema enforcement with Delta Lake
- Let's understand Delta Lake solves these particular problems (among many others). We will start by creating a Delta table from the original data.

In [17]:
# Configure Delta Lake Silver Path
delta_path = "dtap://TenantStorage/student900/deltalake-workshop/loans-delta"

spark.sql("set spark.sql.shuffle.partitions = 1")
spark.sql("set spark.databricks.delta.snapshotPartitions = 1")

# Create the Delta table with the same loans data
spark.read.format("parquet").load("dtap://TenantStorage/student900/SAISEU19-loan-risks.snappy.parquet").write.format("delta").save(delta_path)
print("Created a Delta table at " + delta_path)

spark.read.format("delta").load(delta_path).createOrReplaceTempView("loans_delta")
print("Defined view 'loans_delta'")

spark.sql("select count(*) from loans_delta").show()

spark.sql("select * from loans_delta").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Created a Delta table at dtap://TenantStorage/student900/deltalake-workshop/loans-delta
Defined view 'loans_delta'
+--------+
|count(1)|
+--------+
|   14705|
+--------+

+-------+-----------+---------+----------+
|loan_id|funded_amnt|paid_amnt|addr_state|
+-------+-----------+---------+----------+
|      0|       1000|   182.22|        CA|
|      1|       1000|   361.19|        WA|
|      2|       1000|   176.26|        TX|
|      3|       1000|   1000.0|        OK|
|      4|       1000|   249.98|        PA|
|      5|       1000|    408.6|        CA|
|      6|       1000|   1000.0|        MD|
|      7|       1000|   168.81|        OH|
|      8|       1000|   193.64|        TX|
|      9|       1000|   218.83|        CT|
|     10|       1000|   322.37|        NJ|
|     11|       1000|   400.61|        NY|
|     12|       1000|   1000.0|        FL|
|     13|       1000|   165.88|        NJ|
|     14|       1000|    190.6|        TX|
|     15|       1000|   1000.0|        OH|
|     16|   

In [19]:
# Let's run a streaming count(*) on the table so that the count updates automatically
spark.readStream.format("delta").load(delta_path).createOrReplaceTempView("loans_delta_stream")
spark.sql("select count(*) from loans_delta_stream")

# Now let's try writing the streaming appends once again
stream_query_2 = generate_and_append_data_stream(table_format = "delta", table_path = delta_path)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
# The writes were blocked because the schema of the new data did not match the schema of table
# **Now, let's fix the streaming query by selecting the columns we want to write.**

from pyspark.sql.functions import *
# Generate a stream of randomly generated load data and append to the parquet table
def generate_and_append_data_stream_fixed(table_format, table_path):
    
    stream_data = spark.readStream.format("rate").option("rowsPerSecond", 50).load() \
            .withColumn("loan_id", 10000 + col("value")) \
            .withColumn("funded_amnt", (rand() * 5000 + 5000).cast("integer")) \
            .withColumn("paid_amnt", col("funded_amnt") - (rand() * 2000)) \
            .withColumn("addr_state", random_state()) \
            .select("loan_id", "funded_amnt", "paid_amnt", "addr_state")   # *********** FIXED THE SCHEMA OF THE GENERATED DATA *************

    query = stream_data.writeStream.format(table_format) \
            .option("checkpointLocation", random_checkpoint_dir()) \
            .trigger(processingTime="10 seconds") \
            .start(table_path)

    return query

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
# Now we can successfully write to the table. Note the count in the above streaming query increasing as we write to this table.
stream_query_2 = generate_and_append_data_stream_fixed(table_format = "delta", table_path = delta_path)
sleep(10)

# In fact, we can run multiple concurrent streams writing to that table, it will work together.
stream_query_3 = generate_and_append_data_stream_fixed(table_format = "delta", table_path = delta_path)

sleep(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
# Just for sanity check, let's query as a batch
spark.sql("select count(*) from loans_delta").show()

stop_all_streams()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+
|count(1)|
+--------+
|   15855|
+--------+

Stopping all streams
Stopped all streams

In [23]:
# ## Schema Evolution
# - Let's evolve the schema of the table
# - We will run a batch query that will
# - Append some new loans
# - Add a boolean column 'closed' that signifies whether the loan has been closed and paid off or not.
# - We are going to set the option `mergeSchema` to `true` to force the evolution of the Delta table's schema.
# 
cols = ['loan_id', 'funded_amnt', 'paid_amnt', 'addr_state', 'closed']

items = [
  (1111111, 1000, 1000.0, 'TX', True), 
  (2222222, 2000, 0.0, 'CA', False)
]

loan_updates = spark.createDataFrame(items, cols) \
.withColumn("funded_amnt", col("funded_amnt").cast("int"))
  
loan_updates.write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save(delta_path)

spark.read.format("delta").load(delta_path).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------+---------+----------+------+
|loan_id|funded_amnt|paid_amnt|addr_state|closed|
+-------+-----------+---------+----------+------+
|      0|       1000|   182.22|        CA|  null|
|      1|       1000|   361.19|        WA|  null|
|      2|       1000|   176.26|        TX|  null|
|      3|       1000|   1000.0|        OK|  null|
|      4|       1000|   249.98|        PA|  null|
|      5|       1000|    408.6|        CA|  null|
|      6|       1000|   1000.0|        MD|  null|
|      7|       1000|   168.81|        OH|  null|
|      8|       1000|   193.64|        TX|  null|
|      9|       1000|   218.83|        CT|  null|
|     10|       1000|   322.37|        NJ|  null|
|     11|       1000|   400.61|        NY|  null|
|     12|       1000|   1000.0|        FL|  null|
|     13|       1000|   165.88|        NJ|  null|
|     14|       1000|    190.6|        TX|  null|
|     15|       1000|   1000.0|        OH|  null|
|     16|       1000|   213.72|        MI|  null|


## 3. Delete from Delta Lake table

In [24]:
# Let's see the number of fully paid loans.
spark.sql("SELECT COUNT(*) FROM loans_delta WHERE funded_amnt = paid_amnt").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+
|count(1)|
+--------+
|    5134|
+--------+

In [25]:
sc = spark.sparkContext

#sc.addPyFile("/opt/spark/jars/delta-core_2.12-1.0.0.jar")
sc.addPyFile("/opt/mapr/spark/spark-3.1.2/jars/delta-core_2.12-1.0.0.jar")

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, delta_path)
deltaTable.delete("funded_amnt = paid_amnt")

# Let's check the number of fully paid loans once again.
spark.sql("SELECT COUNT(*) FROM loans_delta WHERE funded_amnt = paid_amnt").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+
|count(1)|
+--------+
|       0|
+--------+

In [26]:
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)
df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------+---------+----------+
|loan_id|funded_amnt|paid_amnt|addr_state|
+-------+-----------+---------+----------+
|      0|       1000|   182.22|        CA|
|      1|       1000|   361.19|        WA|
|      2|       1000|   176.26|        TX|
|      3|       1000|   1000.0|        OK|
|      4|       1000|   249.98|        PA|
|      5|       1000|    408.6|        CA|
|      6|       1000|   1000.0|        MD|
|      7|       1000|   168.81|        OH|
|      8|       1000|   193.64|        TX|
|      9|       1000|   218.83|        CT|
|     10|       1000|   322.37|        NJ|
|     11|       1000|   400.61|        NY|
|     12|       1000|   1000.0|        FL|
|     13|       1000|   165.88|        NJ|
|     14|       1000|    190.6|        TX|
|     15|       1000|   1000.0|        OH|
|     16|       1000|   213.72|        MI|
|     17|       1000|   188.89|        MI|
|     18|       1000|   237.41|        CA|
|     19|       1000|   203.85|        CA|
+-------+--

## 4. Audit Delta Lake Table History
- All changes to the Delta table are recorded as commits in the table's transaction log. As you write into a Delta table or directory, every operation is automatically versioned. You can use the HISTORY command to view the table's history. For more information, check out the [docs](https://docs.delta.io/latest/delta-utility.html#history).


In [27]:
deltaTable = DeltaTable.forPath(spark, delta_path)
deltaTable.history().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------------+------+--------+----------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|version|          timestamp|userId|userName|       operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|
+-------+-------------------+------+--------+----------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|      7|2022-08-23 09:00:36|  null|    null|          DELETE|{predicate -> ["(...|null|    null|     null|          6|          null|        false|{numRemovedFiles ...|        null|
|      6|2022-08-23 09:00:21|  null|    null|           WRITE|{mode -> Append, ...|null|    null|     null|          5|          null|         true|{numFiles -> 2, n...|        null|
|      5|2022-08-23 09:00:15|  null|    null|STREAMING UPDATE|{outputMode -> Ap...|nu

## 5. Travel back in time
- Delta Lake’s time travel feature allows you to access previous versions of the table. Here are some possible uses of this feature:
    - Auditing Data Changes
    - Reproducing experiments & reports
    - Rollbacks
 
You can query by using either a timestamp or a version number using Python, Scala, and/or SQL syntax. For this examples we will query a specific version using the Python syntax.



In [28]:
# Let's query the table's state before we deleted the data, which still contains the fully paid loans.
previousVersion = deltaTable.history(1).select("version").collect()[0][0] - 1

spark.read.format("delta") \
    .option("versionAsOf", previousVersion) \
    .load(delta_path) \
    .createOrReplaceTempView("loans_delta_pre_delete")

# We see the same number of fully paid loans that we had seen before delete.
spark.sql("SELECT COUNT(*) FROM loans_delta_pre_delete WHERE funded_amnt = paid_amnt").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+
|count(1)|
+--------+
|    5134|
+--------+

## 6. Vacuum old versions of Delta Lake tables 

### (Denis' comment: this vacuum function fails - is it worth keeping it in the workshop?)

- While it's nice to be able to time travel to any previous version, sometimes you want actually delete the data from storage completely for reducing storage costs or for compliance reasons (example, GDPR).
- The Vacuum operation deletes data files that have been removed from the table for a certain amount of time. For more information, check out the [docs](https://docs.delta.io/latest/delta-utility.html#vacuum).
- By default, `vacuum()` retains all the data needed for the last 7 days. For this example, since this table does not have 7 days worth of history, we will retain 0 hours, which means to only keep the latest state of the table.


In [29]:
spark.sql("SET spark.databricks.delta.retentionDurationCheck.enabled = false")
deltaTable.vacuum(retentionHours = 0)

print("previousversion:{previousVersion}")

spark.read.format("delta").option("versionAsOf", previousVersion).load(delta_path).createOrReplaceTempView("loans_delta_pre_delete")

# Same query as before, but it now fails
try:
    spark.sql("SELECT COUNT(*) FROM loans_delta_pre_delete WHERE funded_amnt = paid_amnt").show()
except Exception as e:
    print(str(e))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

previousversion:{previousVersion}
An error occurred while calling o321.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 179.0 failed 4 times, most recent failure: Lost task 0.3 in stage 179.0 (TID 20240) (10.192.0.209 executor 1): java.io.FileNotFoundException: dtap://TenantStorage/student900/deltalake-workshop/loans-delta/part-00000-8ec6b42f-946d-47ea-b9d5-39fb641e6a87-c000.snappy.parquet::Failed to execute the command:'bdfs_get_filestatus' index:'0' response:'2' ('cmd_file_or_directory_not_found') posix-error:'0'
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124)
	at org.apache.spark.sql.execution.datasources.FileScanRDD

## 7. Upsert into Delta Lake table using Merge"

You can upsert data from an Apache Spark DataFrame into a Delta Lake table using the merge operation. This operation is similar to the SQL MERGE command but has additional support for deletes and extra conditions in updates, inserts, and deletes. For more information checkout the [docs](https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge).

With a legacy data pipeline, to insert or update a table, you must:
1. Identify the new rows to be inserted
2. Identify the rows that will be replaced (i.e. updated)
3. Identify all of the rows that are not impacted by the insert or update
4. Create a new temp based on all three insert statements
5. Delete the original table (and all of those associated files)
6. "Rename" the temp table back to the original table name
7. Drop the temp table

In [29]:
# Configure Delta Lake Silver Path
delta_small_path = "dtap://TenantStorage/student900/deltalake-workshop/loans-delta-small"

# Create the Delta table with the same loans data
spark.read.format("parquet").load("dtap://TenantStorage/student900/SAISEU19-loan-risks.snappy.parquet") \
    .where("loan_id < 3") \
    .write.format("delta").save(delta_small_path)
print("Created a Delta table at " + delta_small_path)

spark.read.format("delta").load(delta_small_path).createOrReplaceTempView("loans_delta_small")
print("Defined view 'loans_delta_small'")

# Let's focus only on a part of the loans_delta table
spark.sql("select * from loans_delta_small order by loan_id").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Created a Delta table at dtap://TenantStorage/student900/deltalake-workshop/loans-delta-small
Defined view 'loans_delta_small'
+-------+-----------+---------+----------+
|loan_id|funded_amnt|paid_amnt|addr_state|
+-------+-----------+---------+----------+
|      0|       1000|   182.22|        CA|
|      1|       1000|   361.19|        WA|
|      2|       1000|   176.26|        TX|
+-------+-----------+---------+----------+

In [30]:
# **Now, let's say we got some new loan information**
# 1. Existing loan_id = 2 has been fully repaid. The corresponding row needs to be updated.
# 2. New loan_id = 3 has been funded in CA. This is need to be inserted as a new row.

cols = ['loan_id', 'funded_amnt', 'paid_amnt', 'addr_state']
items = [
  (2, 1000, 1000.0, 'TX'), # existing loan's paid_amnt updated, loan paid in full
  (3, 2000, 0.0, 'CA')     # new loan's details
]

loan_updates = spark.createDataFrame(items, cols)

loan_updates.show()


# **Merge can upsert this in a single atomic operation.** 
#  
# SQL `MERGE` command can do both `UPDATE` and `INSERT`.
# ``` 
# MERGE INTO target t
# USING source s
# WHEN MATCHED THEN UPDATE SET ...
# WHEN NOT MATCHED THEN INSERT ....
# ```


from delta.tables import *

delta_table = DeltaTable.forPath(spark, delta_small_path)

delta_table.alias("t").merge(
    loan_updates.alias("s"), 
    "s.loan_id = t.loan_id") \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()

spark.sql("select * from loans_delta_small order by loan_id").show()

# **Note the changes in the table**
# - Existing loan_id = 2 should have been updated with paid_amnt set to 1000. 
# - New loan_id = 3 have been inserted.

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------+---------+----------+
|loan_id|funded_amnt|paid_amnt|addr_state|
+-------+-----------+---------+----------+
|      2|       1000|   1000.0|        TX|
|      3|       2000|      0.0|        CA|
+-------+-----------+---------+----------+

+-------+-----------+---------+----------+
|loan_id|funded_amnt|paid_amnt|addr_state|
+-------+-----------+---------+----------+
|      0|       1000|   182.22|        CA|
|      1|       1000|   361.19|        WA|
|      2|       1000|   1000.0|        TX|
|      3|       2000|      0.0|        CA|
+-------+-----------+---------+----------+

## 8. Advanced uses of Merge

### 8.1 Streaming upserts into a Delta Lake table using merge and foreachBatch
- You can continuously upsert from a streaming query output using merge inside `foreachBatch` operation of Structured Streaming.
- Let's say, we want to maintain a count of the loans per state in a table, and new loans arrive, we want to update the counts.
- To do this, we will first initialize the table and a few associated UDFs and configurations.


In [31]:
# Generate a stream of randomly generated load data and append to the parquet table
import random
from delta.tables import *
from pyspark.sql.functions import * 
from pyspark.sql.types import *

loan_counts_by_states_path = "dtap://TenantStorage/student900/deltalake-workshop/loans-by-states"
chkpt_path = "dtap://TenantStorage/student900/deltalake-workshop/chkpt/%s" % str(random.randint(0, 10000))


# Initialize the table
spark.createDataFrame([ ('CA', '0') ], ["addr_state" , "count"]).write.format("delta").mode("overwrite").save(loan_counts_by_states_path)


# User-defined function to generate random state
states = ["CA", "TX", "NY", "IA"]
@udf(returnType=StringType())
def random_state():
    return str(random.choice(states))

# Define the function to be called on the output of each micro-batch. 
# This function will use merge to upsert into the Delta table.

loan_counts_by_states_table = DeltaTable.forPath(spark, loan_counts_by_states_path)

# Function to upsert per-state counts generated in each microbatch of a streaming query
# - updated_counts_df = the updated counts generated from a microbatch
def upsert_state_counts_into_delta_table(updated_counts_df, batch_id):
    loan_counts_by_states_table.alias("t").merge(
      updated_counts_df.alias("s"), 
      "s.addr_state = t.addr_state") \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

# Define and run the streaming query using this function with `foreachBatch`.
# loan_ids that have been complete paid off, random generated
loans_update_stream_data = spark.readStream.format("rate").option("rowsPerSecond", 5).load() \
        .withColumn("loan_id", rand() * 100) \
        .withColumn("funded_amnt", (rand() * 5000 + 5000).cast("integer")) \
        .withColumn("paid_amnt", col("funded_amnt") - (rand() * 2000)) \
        .withColumn("addr_state", random_state()) \
        .createOrReplaceTempView("generated_loads")

# use foreachBatch to define what to do with each output micro-batch DataFrame
query = spark.sql("select addr_state, count(*) as count from generated_loads group by addr_state") \
      .writeStream.format("delta").foreachBatch(upsert_state_counts_into_delta_table) \
      .option("checkpointLocation", chkpt_path) \
      .trigger(processingTime = '3 seconds') \
      .outputMode("update") \
      .start(loan_counts_by_states_path)

sleep(30)
# Let's query the state to see the counts. If you run the following cell repeatedly, 
# you will see that the counts will keep growing.

spark.read.format("delta").load(loan_counts_by_states_path).orderBy("addr_state").show()

stop_all_streams()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-----+
|addr_state|count|
+----------+-----+
|        CA|   26|
|        IA|   30|
|        NY|   33|
|        TX|   36|
+----------+-----+

Stopping all streams
Stopped all streams

### 8.2 Deduplication using `insert-only` merge

In [32]:
from delta.tables import *

delta_path = "dtap://TenantStorage/student900/deltalake-workshop/loans-delta2"

# Define new loans table data
data = [
  (0, 1000, 1000.0, 'TX'), 
  (1, 2000, 0.0, 'CA')
]
cols = ['loan_id', 'funded_amnt', 'paid_amnt', 'addr_state']

# Write a new Delta Lake table with the loans data
spark.createDataFrame(data, cols).write.format("delta").save(delta_path)

# Define DeltaTable object
dt = DeltaTable.forPath(spark, delta_path)
dt.toDF().show()

# Define a DataFrame containing new data, some of which is already present in the table
new_data = [  
  (1, 2000, 0.0, 'CA'),    # duplicate, loan_id = 1 is already present in table and don't want to update
  (2, 5000, 1010.0, 'NY')  # new data, not present in table
]
new_data_df = spark.createDataFrame(new_data, cols)

new_data_df.show()

# Run "insert-only" merqe query (i.e., no update clause)
dt = DeltaTable.forPath(spark, delta_path)

dt.alias("t").merge(
    new_data_df.alias("s"), 
    "s.loan_id = t.loan_id") \
  .whenNotMatchedInsertAll() \
  .execute()

dt.toDF().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------+---------+----------+
|loan_id|funded_amnt|paid_amnt|addr_state|
+-------+-----------+---------+----------+
|      0|       1000|   1000.0|        TX|
|      1|       2000|      0.0|        CA|
+-------+-----------+---------+----------+

+-------+-----------+---------+----------+
|loan_id|funded_amnt|paid_amnt|addr_state|
+-------+-----------+---------+----------+
|      1|       2000|      0.0|        CA|
|      2|       5000|   1010.0|        NY|
+-------+-----------+---------+----------+

+-------+-----------+---------+----------+
|loan_id|funded_amnt|paid_amnt|addr_state|
+-------+-----------+---------+----------+
|      0|       1000|   1000.0|        TX|
|      2|       5000|   1010.0|        NY|
|      1|       2000|      0.0|        CA|
+-------+-----------+---------+----------+

## 9. Tutorial Summary
Full support for batch and streaming workloads
- Delta Lake allows batch and streaming workloads to concurrently read and write to Delta Lake tables with full ACID transactional guarantees.

Schema enforcement and schema evolution
- Delta Lake provides the ability to specify your schema and enforce it. This helps ensure that the data types are correct and required columns are present, preventing bad data from causing data corruption.

Table History and Time Travel
- Delta Lake transaction log records details about every change made to data providing a full audit trail of the changes. 
- You can query previous snapshots of data enabling developers to access and revert to earlier versions of data for audits, rollbacks or to reproduce experiments.

Delete data and Vacuum old versions
- Delete data from tables using a predicate.
- Fully remove data from previous versions using Vaccum to save storage and satisfy compliance requirements.

Upsert data using Merge
- Upsert data into tables from batch and streaming workloads
- Use extended merge syntax for advanced usecases like data deduplication, change data capture, SCD type 2 operations, etc.



In [33]:
%%info

Spark Session ID,Kind,State,Spark UI,Driver log,User,Current session?
16,pyspark,idle,,,student900,✔


In [34]:
%%cleanup -f

In [35]:
%%info

## (Added by Denis) Verify the Spark cluster PODs are terminating for your Spark Livy session

In [39]:
%%bash
kubectl get pods

NAME                                                  READY   STATUS    RESTARTS   AGE
hivemeta-0                                            1/1     Running   0          28d
jupyter-notebook-test-student900-controller-lffx6-0   2/2     Running   0          66m
livy-0                                                1/1     Running   0          28d
ml-pipeline-ui-artifact-5d68fc6566-rknch              1/1     Running   0          28d
ml-pipeline-visualizationserver-6b46b8fbd9-rnhfh      1/1     Running   0          28d
tenantcli-0                                           1/1     Running   0          28d


## (Added by Denis) Delete the participant's folder in the default DataTap storage

In [38]:
%%bash
userID="student900"
hadoop fs -rm -r dtap://TenantStorage/${userID}
hadoop fs -ls dtap://TenantStorage/

Deleted dtap://TenantStorage/student900
