## Configure

In [0]:
username = "sparsha"
health_tracker = f"/dbacademy/{username}/lakehouse-with-delta-lake-dd/health-tracker/"
spark.sql(f"CREATE DATABASE IF NOT EXISTS dbacademy_{username}")
spark.sql(f"USE dbacademy_{username}")

In [0]:
from pyspark.sql.session import SparkSession
from urllib.request import urlretrieve
import time

BASE_URL = "https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/"


def retrieve_data(year: int, month: int, raw_path: str, is_late: bool = False) -> bool:
    file, dbfsPath, driverPath = _generate_file_handles(year, month, raw_path, is_late)
    uri = BASE_URL + file

    urlretrieve(uri, file)
    dbutils.fs.mv(driverPath, dbfsPath)
    return True


def _generate_file_handles(year: int, month: int, raw_path: str, is_late: bool):
    late = ""
    if is_late:
        late = "_late"
    file = f"health_tracker_data_{year}_{month}{late}.json"

    dbfsPath = raw_path
    if is_late:
        dbfsPath += "late/"
    dbfsPath += file

    driverPath = "file:/databricks/driver/" + file

    return file, dbfsPath, driverPath


def stop_all_streams() -> bool:
    stopped = False
    for stream in spark.streams.active:
        stopped = True
        stream.stop()
    return stopped


def stop_named_stream(spark: SparkSession, namedStream: str) -> bool:
    stopped = False
    for stream in spark.streams.active:
        if stream.name == namedStream:
            stopped = True
            stream.stop()
    return stopped


def untilStreamIsReady(namedStream: str, progressions: int = 3) -> bool:
    queries = list(filter(lambda query: query.name == namedStream, spark.streams.active))
    while len(queries) == 0 or len(queries[0].recentProgress) < progressions:
        time.sleep(5)
        queries = list(filter(lambda query: query.name == namedStream, spark.streams.active))
    print("The stream {} is active and ready.".format(namedStream))
    return True


## Retrieve Raw Data

We will ingest data from a remote source into our source directory, `raw`.

In [0]:
dbutils.fs.rm(health_tracker, recurse=True)

spark.sql(f"""
DROP TABLE IF EXISTS health_tracker_processed
""")

spark.sql(f"""
DROP TABLE IF EXISTS health_tracker_gold_aggregate_heartrate
""")

Retrieve First Month of Data

In [0]:
retrieve_data(2020, 1, health_tracker + "raw/")
retrieve_data(2020, 2, health_tracker + "raw/")
retrieve_data(2020, 2, health_tracker + "raw/", is_late=True)
retrieve_data(2020, 3, health_tracker + "raw/")

The expected file has the following name:

In [0]:
file_2020_1 = "health_tracker_data_2020_1.json"

Display the Files in the Raw Path

In [0]:
display(dbutils.fs.ls(health_tracker + "raw/"))

path,name,size,modificationTime
dbfs:/dbacademy/sparsha/lakehouse-with-delta-lake-dd/health-tracker/raw/health_tracker_data_2020_1.json,health_tracker_data_2020_1.json,310628,1655632625000
dbfs:/dbacademy/sparsha/lakehouse-with-delta-lake-dd/health-tracker/raw/health_tracker_data_2020_2.json,health_tracker_data_2020_2.json,284670,1655632626000
dbfs:/dbacademy/sparsha/lakehouse-with-delta-lake-dd/health-tracker/raw/health_tracker_data_2020_3.json,health_tracker_data_2020_3.json,402785,1655632627000
dbfs:/dbacademy/sparsha/lakehouse-with-delta-lake-dd/health-tracker/raw/late/,late/,0,1655632640825


Write an Assertion Statement to Verify File Ingestion

In [0]:
assert file_2020_1 in [
    item.name for item in dbutils.fs.ls(health_tracker + "raw/")
], "File not present in Raw Path"
print("Assertion passed.")

## Reviewing and Visualizing data

One common use case for working with Delta Lake is to collect and process Internet of Things (IoT) Data. Here, we provide a mock IoT sensor dataset for demonstration purposes. The data simulates heart rate data measured by a health tracker device.

**Health tracker data sample**

```
{"device_id":0,"heartrate":52.8139067501,"name":"Deborah Powell","time":1.5778368E9}
{"device_id":0,"heartrate":53.9078900098,"name":"Deborah Powell","time":1.5778404E9}
{"device_id":0,"heartrate":52.7129593616,"name":"Deborah Powell","time":1.577844E9}
{"device_id":0,"heartrate":52.2880422685,"name":"Deborah Powell","time":1.5778476E9}
{"device_id":0,"heartrate":52.5156095386,"name":"Deborah Powell","time":1.5778512E9}
{"device_id":0,"heartrate":53.6280743846,"name":"Deborah Powell","time":1.5778548E9}
```
This shows a sample of the health tracker data we will be using. Note that each line is a valid JSON object.

**Health tracker data schema**
The data has the following schema:

| Column    | Type      |
|-----------|-----------|
| name      | string    |
| heartrate | double    |
| device_id | int       |
| time      | long      |

Load the data as a Spark DataFrame from the raw directory.
This is done using the `.format("json")` option,
as well as a path to the `.load()` method.

In [0]:
file_path = health_tracker + "raw/health_tracker_data_2020_1.json"

health_tracker_data_2020_1_df = spark.read.format("json").load(file_path)

In [0]:
display(health_tracker_data_2020_1_df)

device_id,heartrate,name,time
0,52.8139067501,Deborah Powell,1577836800.0
0,53.9078900098,Deborah Powell,1577840400.0
0,52.7129593616,Deborah Powell,1577844000.0
0,52.2880422685,Deborah Powell,1577847600.0
0,52.5156095386,Deborah Powell,1577851200.0
0,53.6280743846,Deborah Powell,1577854800.0
0,52.1760037066,Deborah Powell,1577858400.0
0,90.0456721836,Deborah Powell,1577862000.0
0,89.4695644522,Deborah Powell,1577865600.0
0,88.1490304138,Deborah Powell,1577869200.0


## Create a Parquet Table

We perform transformations by selecting columns in the following ways:
- use `from_unixtime` to transform `"time"`, cast as a `date`, and aliased to `dte`
- use `from_unixtime` to transform `"time"`, cast as a `timestamp`, and aliased to `time`
- `heartrate` is selected as is
- `name` is selected as is
- cast `"device_id"` as an integer aliased to `p_device_id`

In [0]:
from pyspark.sql.functions import col, from_unixtime


def process_health_tracker_data(dataframe):
    return dataframe.select(
        from_unixtime("time").cast("date").alias("dte"),
        from_unixtime("time").cast("timestamp").alias("time"),
        "heartrate",
        "name",
        col("device_id").cast("integer").alias("p_device_id"),
    )


processedDF = process_health_tracker_data(health_tracker_data_2020_1_df)

Note that we are partitioning the data by device id.

1. use `.format("parquet")`
2. partition by `"p_device_id"`

In [0]:
(
    processedDF.write.mode("overwrite")
    .format("parquet")
    .partitionBy("p_device_id")
    .save(health_tracker + "processed")
)

Next, use Spark SQL to register the table in the metastore. Upon creation we specify the format as parquet and that the location where the parquet files were written should be used.

In [0]:
spark.sql(
    f"""
DROP TABLE IF EXISTS health_tracker_processed
"""
)

spark.sql(
    f"""
CREATE TABLE health_tracker_processed
USING PARQUET
LOCATION "{health_tracker}/processed"
"""
)

Verify Parquet-based Data Lake table by counting the records in the `health_tracker_processed` Table

In [0]:
health_tracker_processed = spark.read.table("health_tracker_processed")
health_tracker_processed.count()

Note that the count does not return results.

Per best practice, we have created a partitioned table. However, if you create a partitioned table from existing data,
Spark SQL does not automatically discover the partitions and register them in the Metastore.

`MSCK REPAIR TABLE` will register the partitions in the Hive Metastore. Learn more about this command in <a href="https://docs.databricks.com/spark/latest/spark-sql/language-manual/sql-ref-syntax-ddl-repair-table.html" target="_blank">
the docs</a>.

In [0]:
spark.sql("MSCK REPAIR TABLE health_tracker_processed")

Count the records in the `health_tracker_processed` table.

With the table repaired and the partitions registered, we now have results.
We expect there to be 3720 records: five device measurements, 24 hours a day for 31 days.

In [0]:
health_tracker_processed.count()

## Create Delta Tables
 
Objective: Convert a Parquet-based table to a Delta table. 

Recall that a Delta table consists of three things:
- the data files kept in object storage (i.e. AWS S3, Azure Data Lake Storage)
- the Delta Transaction Log saved with the data files in object storage
- a table registered in the Metastore. This step is optional, but usually recommended. 

With Delta Lake, you create tables:
* When ingesting new files into a Delta Table for the first time
* By transforming an existing Parquet-based data lake table to a Delta table

Before we convert the `health_tracker_processed` table, let's use the Spark SQL `DESCRIBE`command, with the optional parameter `EXTENDED`, to display the attributes of the table.
Note that the table has the "provider" listed as `PARQUET`.

In [0]:
%sql

DESCRIBE EXTENDED health_tracker_processed

col_name,data_type,comment
dte,date,
time,timestamp,
heartrate,double,
name,string,
p_device_id,int,
# Partition Information,,
# col_name,data_type,comment
p_device_id,int,
,,
# Detailed Table Information,,


First, we'll convert the files in-place to Delta files. The conversion creates a Delta Lake transaction log that tracks associated files.

In [0]:
from delta.tables import DeltaTable

parquet_table = f"parquet.`{health_tracker}processed`"
partitioning_scheme = "p_device_id int"

DeltaTable.convertToDelta(spark, parquet_table, partitioning_scheme)

At this point, the files containing our records have been converted to Delta files. The Metastore, however, has not been updated to reflect the change. To change this we re-register the table in the Metastore. The Spark SQL command will automatically infer the data schema by reading the footers of the Delta files.

In [0]:
spark.sql(f"""
DROP TABLE IF EXISTS health_tracker_processed
""")

spark.sql(f"""
CREATE TABLE health_tracker_processed
USING DELTA
LOCATION "{health_tracker}/processed" 
""")

Comments can make your tables easier to read and maintain. We use an ALTER TABLE command to add new column comments to the exiting Delta table.

In [0]:
%sql
ALTER TABLE
  health_tracker_processed
REPLACE COLUMNS
  (dte DATE COMMENT "Format: YYYY/mm/dd", 
  time TIMESTAMP, 
  heartrate DOUBLE,
  name STRING COMMENT "Format: First Last",
  p_device_id INT COMMENT "range 0 - 4")

We can verify that comments have been added to the table by using the `DESCRIBE`Spark SQL command followed by the optional parameter, `EXTENDED`. You can see the column comments that we added as well as some additional information. Scrool down to confirm that the new table had Delta listed as the provider.

In [0]:
%sql

DESCRIBE EXTENDED health_tracker_processed

col_name,data_type,comment
dte,date,Format: YYYY/mm/dd
time,timestamp,
heartrate,double,
name,string,Format: First Last
p_device_id,int,range 0 - 4
,,
# Partitioning,,
Part 0,p_device_id,
,,
# Detailed Table Information,,


We count the records in `health_tracker_processed` with Apache Spark.
With Delta Lake, the Delta table requires no repair and is immediately ready for use.

In [0]:
health_tracker_processed = spark.read.table("health_tracker_processed")
health_tracker_processed.count()

Next, we'll create a new Delta table. We'll do this by creating an aggregate table from the data in the health_track_processed Delta table we just created. Within the context of our EDSS, this is a downstream aggregate table or data mart.

In [0]:
dbutils.fs.rm(health_tracker + "gold/health_tracker_user_analytics",
              recurse=True)from pyspark.sql.functions import col, avg, max, stddev

health_tracker_gold_user_analytics = (
  health_tracker_processed
  .groupby("p_device_id")
  .agg(avg(col("heartrate")).alias("avg_heartrate"),
       max(col("heartrate")).alias("max_heartrate"),
       stddev(col("heartrate")).alias("stddev_heartrate"))
)

In [0]:
from pyspark.sql.functions import col, avg, max, stddev

health_tracker_gold_user_analytics = (
  health_tracker_processed
  .groupby("p_device_id")
  .agg(avg(col("heartrate")).alias("avg_heartrate"),
       max(col("heartrate")).alias("max_heartrate"),
       stddev(col("heartrate")).alias("stddev_heartrate"))
)

Write the Delta Files

In [0]:
(health_tracker_gold_user_analytics.write
 .format("delta")
 .mode("overwrite")
 .save(health_tracker + "gold/health_tracker_user_analytics"))

Finally, register this table in the Metastore.

In [0]:
spark.sql(f"""
DROP TABLE IF EXISTS health_tracker_gold_user_analytics
""")

spark.sql(f"""
CREATE TABLE health_tracker_gold_user_analytics
USING DELTA
LOCATION "{health_tracker}/gold/health_tracker_user_analytics"
""")

In [0]:
display(health_tracker_gold_user_analytics)

p_device_id,avg_heartrate,max_heartrate,stddev_heartrate
1,78.57765673376989,168.114687819,31.61967903784856
3,82.65419819635214,171.8435388833,30.929328740004404
0,81.21484441523789,186.4790827731,31.343789198032898
4,83.08377376550958,173.5770785921,34.160322676696175
2,79.99574196662844,184.7433209566,31.40800774122196


## Batch Write to Delta Tables

Within the context of our data ingestion pipeline, this is the addition of new raw files to our Single Source of Truth.

We begin by loading the data from the file `health_tracker_data_2020_2.json`, using the `.format("json")` option as before.

In [0]:
file_path = health_tracker + "raw/health_tracker_data_2020_2.json"

health_tracker_data_2020_2_df = (
  spark.read
  .format("json")
  .load(file_path)
)

We perform the same data engineering on the data:
- Use the from_unixtime Spark SQL function to transform the unixtime into a time string
- Cast the time column to type timestamp to replace the column time
- Cast the time column to type date to create the column dte

In [0]:
from pyspark.sql import DataFrame
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col, from_unixtime

def process_health_tracker_data(spark: SparkSession, df: DataFrame) -> DataFrame:
  return (
    df
    .withColumn("time", from_unixtime("time"))
    .withColumnRenamed("device_id", "p_device_id")
    .withColumn("time", col("time").cast("timestamp"))
    .withColumn("dte", col("time").cast("date"))
    .withColumn("p_device_id", col("p_device_id").cast("integer"))
    .select("dte", "time", "heartrate", "name", "p_device_id")
    )


In [0]:
processedDF = process_health_tracker_data(spark, health_tracker_data_2020_2_df)

Append the Data to the `health_tracker_processed` Delta table

We do this using `.mode("append")`. Note that it is not necessary to perform any action on the Metastore.

In [0]:
(processedDF.write
 .mode("append")
 .format("delta")
 .save(health_tracker + "processed"))

## View the Commit Using Time Travel

Delta Lake can query an earlier version of a Delta table using a feature known as time travel. Here, we query the data as of version 0, that is, the initial conversion of the table from Parquet.

View the table as of Version 0

This is done by specifying the option `"versionAsOf"` as 0. When we time travel to Version 0, we see **only** the first month of data, five device measurements, 24 hours a day for 31 days.

In [0]:
(spark.read
 .option("versionAsOf", 0)
 .format("delta")
 .load(health_tracker + "processed")
 .count())

Count the Most Recent Version

When we query the table without specifying a version, it shows the latest version of the table and includes the new records added.
When we look at the current version, we expect to see two months of data: January 2020 and February 2020. 

The data should include the following records: 

``` 5 devices * 60 days * 24 hours = 7200 records```

Note that the range of data includes the month of February during a leap year. 29 days in Feb plus 31 in January gives us 60 days total.

In [0]:
(spark.read
 .format("delta")
 .load(health_tracker + "processed")
 .count())

Note that we do not have a correct count. We are missing 72 records.

## Identify Late-Arriving data and bad data

**Count the Number of Records Per Device**

Let’s run a query to count the number of records per device.
Recall that we will need to tell Spark that our format is a Delta table,
which we can do with our `.format()` method. Additionally, instead of passing in the path
as we did in previous sections, we need to pass in the health tracker variable.
Finally, we'll do a `groupby` and aggregation on our `p_device_id` column.

In [0]:
from pyspark.sql.functions import count

display(
  spark.read
  .format("delta")
  .load(health_tracker + "processed")
  .groupby("p_device_id")
  .agg(count("*"))
)

p_device_id,count(1)
1,1440
3,1440
4,1368
2,1440
0,1440


**Plot the Missing Records**

Let’s run a query to discover the timing of the missing records. We use a Databricks visualization to display the number of records per day. It appears that we have no records for device 4 for the last few days of the month.

In [0]:
from pyspark.sql.functions import col

display(
  spark.read
  .format("delta")
  .load(health_tracker + "processed")
  .where(col("p_device_id").isin([3,4]))
)

dte,time,heartrate,name,p_device_id
2020-01-31,2020-01-31T23:00:00.000+0000,58.5711262443,Minh Nguyen,3
2020-01-31,2020-01-31T22:00:00.000+0000,58.4275904455,Minh Nguyen,3
2020-01-31,2020-01-31T21:00:00.000+0000,59.7123859631,Minh Nguyen,3
2020-01-31,2020-01-31T20:00:00.000+0000,98.3999326321,Minh Nguyen,3
2020-01-31,2020-01-31T19:00:00.000+0000,99.1570224705,Minh Nguyen,3
2020-01-31,2020-01-31T18:00:00.000+0000,97.2759909741,Minh Nguyen,3
2020-01-31,2020-01-31T17:00:00.000+0000,99.1726578721,Minh Nguyen,3
2020-01-31,2020-01-31T16:00:00.000+0000,97.8016886935,Minh Nguyen,3
2020-01-31,2020-01-31T15:00:00.000+0000,100.2292769633,Minh Nguyen,3
2020-01-31,2020-01-31T14:00:00.000+0000,97.0769486256,Minh Nguyen,3


**Broken Readings in the Table**

Upon our initial load of data into the `health_tracker_processed` table, we noted that there are broken records in the data. In particular, we made a note of the fact that several negative readings were present even though it is impossible to record a negative heart rate.

Let’s assess the extent of these broken readings in our table.

First, we create a temporary view for the broken readings in the `health_tracker_processed` table.
Here, we want to find the columns where `heartrate` is less than 0.

In [0]:
broken_readings = (
  spark.read
  .format("delta")
  .load(health_tracker + "processed")
  .select(col("heartrate"), col("dte"))
  .where(col("heartrate") < 0)
  .groupby("dte")
  .agg(count("heartrate"))
  .orderBy("dte")
)
broken_readings.createOrReplaceTempView("broken_readings")

Display the records in the `broken_readings` view, again using a Databricks visualization.
Note that most days have at least one broken reading and that some have more than one.

In [0]:
%sql
SELECT * FROM broken_readings

dte,count(heartrate)
2020-01-01,1
2020-01-02,1
2020-01-04,1
2020-01-06,1
2020-01-07,1
2020-01-09,2
2020-01-12,3
2020-01-13,2
2020-01-14,1
2020-01-16,2


Next, we sum the records in the view.

In [0]:
%sql
SELECT SUM(`count(heartrate)`) FROM broken_readings

sum(count(heartrate))
60


## Upsert Into a Delta Table

**Objective:**  Repair records with an upsert

In the previous section, we identified two issues with the `health_tracker_processed` table:
- There were 72 missing records
- There were 60 records with broken readings

In this section, we will repair the table by modifying the `health_tracker_processed` table.

To repair the broken sensor readings (less than zero), we'll interpolate using the value recorded before and after for each device. The Spark SQL functions LAG and LEAD will make this a trivial calculation.
We'll write these values to a temporary view called updates. This view will be used later to upsert values into our health_tracker_processed Delta table.

Create a DataFrame Interpolating Broken Values

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag, lead

dteWindow = Window.partitionBy("p_device_id").orderBy("dte")

interpolatedDF = (
   spark.read
   .table("health_tracker_processed")
   .select(col("dte"),
           col("time"),
           col("heartrate"),
           lag(col("heartrate")).over(dteWindow).alias("prev_amt"),
           lead(col("heartrate")).over(dteWindow).alias("next_amt"),
           col("name"),
           col("p_device_id"))
 )

Create a DataFrame of Updates

In [0]:
updatesDF = (
  interpolatedDF
  .where(col("heartrate") < 0)
  .select(col("dte"),
          col("time"),
          ((col("prev_amt") + col("next_amt"))/2).alias("heartrate"),
          col("name"),
          col("p_device_id"))
)

View the schemas of the `updatesDF` and `health_tracker_processed` table

In [0]:
(
  spark.read
  .format("delta")
  .load(health_tracker + "processed")
  .printSchema()
)
updatesDF.printSchema()

Perform a `.count()` on the `updatesDF` view. It should have the same number of records as the `SUM` performed on the broken_readings view.

In [0]:
updatesDF.count()

It turns out that our expectation of receiving the missing records late was correct. These records have subsequently been made available to us as the file `health_tracker_data_2020_02_01.json`.

Load the Late-Arriving Data

In [0]:
file_path = health_tracker + "raw/late/health_tracker_data_2020_2_late.json"

health_tracker_data_2020_2_late_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [0]:
health_tracker_data_2020_2_late_df.count()

Transform the Data

In addition to updating the broken records, we wish to add this late-arriving data. We begin by preparing another temporary view with the appropriate transformations:
* Use the `from_unixtime` Spark SQL function to transform the unixtime into a time string
* Cast the `time` column to type `timestamp` to replace the column `time`
* Cast the `time` column to type `date` to create the column `dte`

In [0]:
insertsDF = process_health_tracker_data(spark, health_tracker_data_2020_2_late_df)

View the Schema of the Inserts DataFrame

In [0]:
insertsDF.printSchema()

Finally, we prepare the `upsertsDF` that consists of all the records in both the `updatesDF` and the `insertsDF`. We use the DataFrame `.union()` command to create the view.

In [0]:
upsertsDF = updatesDF.union(insertsDF)

View the Schema

In [0]:
upsertsDF.printSchema()

You can upsert data into a Delta table using the merge operation. This operation is similar to the SQL `MERGE` command but has added support for deletes and other conditions in updates, inserts, and deletes. In other words, using the DeltaTable command `.merge()` provides full support for an upsert operation.

In [0]:
from delta.tables import DeltaTable

processedDeltaTable = DeltaTable.forPath(spark, health_tracker + "processed")

update_match = """health_tracker.time = upserts.time
                  AND
                  health_tracker.p_device_id = upserts.p_device_id"""
update = {"heartrate" : "upserts.heartrate"}

insert = {
    "p_device_id" : "upserts.p_device_id",
    "heartrate" : "upserts.heartrate",
    "name" : "upserts.name",
    "time" : "upserts.time",
    "dte" : "upserts.dte"
}

(processedDeltaTable.alias("health_tracker")
 .merge(upsertsDF.alias("upserts"), update_match)
 .whenMatchedUpdate(set=update)
 .whenNotMatchedInsert(values=insert)
 .execute())

View the table as of Version 2

This is done by specifying the option `"versionAsOf"` as 2. When we time travel to Version 0, we see only the first month of data. In version 1, we see the table after we added comments. 
When we time travel to Version 2, we see the first two months of data, minus the 72 missing records.

In [0]:
(spark.read
 .option("versionAsOf", 2)
 .format("delta")
 .load(health_tracker + "processed")
 .count())

When we query the table without specifying a version, it shows the latest version of the table and includes the full two months of data. Note that the range of data includes the month of February during a leap year. That is why there are 29 days in the month.

In [0]:
spark.read.table("health_tracker_processed").count()

The `.history()` Delta Table command provides provenance information, including the operation, user, and so on, for each write to a table.
Note that each operation performed on the table is given a version number. These are the numbers we have been using when performing a time travel query on the table, e.g., `SELECT COUNT(*) FROM health_tracker_processed VERSION AS OF 1`.

In [0]:
display(processedDeltaTable.history())

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
3,2022-06-19T10:52:41.000+0000,319612229898062,recodata01@gmail.com,MERGE,"Map(predicate -> ((health_tracker.time = upserts.time) AND (health_tracker.p_device_id = upserts.p_device_id)), matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(2676673923515373),0619-095022-r1korpa,2,WriteSerializable,False,"Map(numTargetRowsCopied -> 7068, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 5, executionTimeMs -> 6963, numTargetRowsInserted -> 72, scanTimeMs -> 3445, numTargetRowsUpdated -> 60, numOutputRows -> 7200, numTargetChangeFilesAdded -> 0, numSourceRows -> 132, numTargetFilesRemoved -> 10, rewriteTimeMs -> 3429)",,Databricks-Runtime/10.4.x-photon-scala2.12
2,2022-06-19T10:39:12.000+0000,319612229898062,recodata01@gmail.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(2676673923515373),0619-095022-r1korpa,1,WriteSerializable,True,"Map(numFiles -> 5, numOutputRows -> 3408, numOutputBytes -> 48396)",,Databricks-Runtime/10.4.x-photon-scala2.12
1,2022-06-19T10:17:57.000+0000,319612229898062,recodata01@gmail.com,REPLACE COLUMNS,"Map(columns -> [{""name"":""dte"",""type"":""date"",""nullable"":true,""metadata"":{""comment"":""Format: YYYY/mm/dd""}},{""name"":""time"",""type"":""timestamp"",""nullable"":true,""metadata"":{}},{""name"":""heartrate"",""type"":""double"",""nullable"":true,""metadata"":{}},{""name"":""name"",""type"":""string"",""nullable"":true,""metadata"":{""comment"":""Format: First Last""}},{""name"":""p_device_id"",""type"":""integer"",""nullable"":true,""metadata"":{""comment"":""range 0 - 4""}}])",,List(2676673923515373),0619-095022-r1korpa,0,WriteSerializable,True,Map(),,Databricks-Runtime/10.4.x-photon-scala2.12
0,2022-06-19T10:16:56.000+0000,319612229898062,recodata01@gmail.com,CONVERT,"Map(numFiles -> 5, partitionedBy -> [""p_device_id""], collectStats -> true)",,List(2676673923515373),0619-095022-r1korpa,-1,Serializable,False,Map(numConvertedFiles -> 5),,Databricks-Runtime/10.4.x-photon-scala2.12


## Perform a Second Upsert
In the previous section, we performed an upsert to the `health_tracker_processed` table, which updated records containing broken readings. When we inserted the late arriving data, we inadvertently added more broken readings!

Sum the Broken Readings

In [0]:
from pyspark.sql.functions import col, count

broken_readings = (
  spark.read
  .format("delta")
  .load(health_tracker + "processed")
  .select(col("heartrate"), col("dte"))
  .where(col("heartrate") < 0)
  .groupby("dte")
  .agg(count("heartrate"))
  .orderBy("dte")
)
broken_readings.createOrReplaceTempView("broken_readings")

Verify That These are New Broken Readings

Let’s query the broken_readings with a `WHERE` clause to verify that these are indeed new broken readings introduced by inserting the late-arriving data.
Note that there are no broken readings before ‘2020-02-25’.

In [0]:
%sql
SELECT SUM(`count(heartrate)`) FROM broken_readings WHERE dte < '2020-02-25'

sum(count(heartrate))
""


Verify Updates

Perform a `.count()` on the `updatesDF` view.

**Note:** It is not necessary to redefine the DataFrame. Recall that a Spark DataFrame is lazily defined, pulling the correct number of updates when an action is triggered.
It should have the same number of records as the SUM performed on the `broken_readings` view.

In [0]:
updatesDF.count()

Once more, we upsert into the `health_tracker_processed` table using the DeltaTable command `.merge()`.

In [0]:
upsertsDF = updatesDF

(processedDeltaTable.alias("health_tracker")
 .merge(upsertsDF.alias("upserts"), update_match)
 .whenMatchedUpdate(set=update)
 .whenNotMatchedInsert(values=insert)
 .execute())

Let’s sum the records in the `broken_readings` view one last time.

In [0]:
%sql
SELECT SUM(`count(heartrate)`) FROM broken_readings

sum(count(heartrate))
""


## Work with evolving schema

**Health tracker data sample**

```
{"device_id":0,"heartrate":57.6447293596,"name":"Deborah Powell","time":1.5830208E9,"device_type":"version 2"}
{"device_id":0,"heartrate":57.6175546013,"name":"Deborah Powell","time":1.5830244E9,"device_type":"version 2"}
{"device_id":0,"heartrate":57.8486376876,"name":"Deborah Powell","time":1.583028E9,"device_type":"version 2"}
{"device_id":0,"heartrate":57.8821378637,"name":"Deborah Powell","time":1.5830316E9,"device_type":"version 2"}
{"device_id":0,"heartrate":59.0531490807,"name":"Deborah Powell","time":1.5830352E9,"device_type":"version 2"}
```
This shows a sample of the health tracker data we will be using. Note that each line is a valid JSON object.

**Health tracker data schema**

The data has the following schema:

| Column     | Type      |
|------------|-----------|
| name       | string    |
| heartrate  | double    |
| device_id  | int       |
| time       | long      |
| device_type| string    |

**Load the Next Month of Data**

We begin by loading the data from the file `health_tracker_data_2020_3.json`, using the `.format("json")` option as before.

In [0]:
file_path = health_tracker + "raw/health_tracker_data_2020_3.json"

health_tracker_data_2020_3_df = (
  spark.read
  .format("json")
  .load(file_path)
)

**Transform the Data**

We perform the same data engineering on the data:
- Use the `from_unixtime` Spark SQL function to transform the unixtime into a time string
- Cast the time column to type `timestamp` to replace the column `time`
- Cast the time column to type `date` to create the column `dte`

In [0]:
from pyspark.sql.functions import col, from_unixtime

def process_health_tracker_data(dataframe):
    return (
     dataframe
     .select(
         from_unixtime("time").cast("date").alias("dte"),
         from_unixtime("time").cast("timestamp").alias("time"),
         "heartrate",
         "name",
         col("device_id").cast("integer").alias("p_device_id"),
         "device_type"
       )
     )
processedDF = process_health_tracker_data(health_tracker_data_2020_3_df)

**Append the Data to the `health_tracker_processed` Delta table**

We do this using `.mode("append")`.

In [0]:
from pyspark.sql.utils import AnalysisException
from pyspark.sql.functions import lit

try:
  (
    processedDF.write
    .mode("append")
    .format("delta")
    .save(health_tracker + "processed")
  )
except AnalysisException as error:
  print("Analysis Exception:")
  print(error)

**Schema Mismatch**

The command above produces the error: 
```
AnalysisException: A schema mismatch detected when writing to the Delta table (Table ID: ...)
```

To enable schema migration using DataFrameWriter or DataStreamWriter, set: `.option("mergeSchema", "true")`.

For other operations, set the session configuration `spark.databricks.delta.schema.autoMerge.enabled` to `"true"`. See [the documentation](https://databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html) specific to the operation for details.

*What Is Schema Enforcement?*

Schema enforcement, also known as schema validation, is a safeguard in Delta Lake that ensures data quality by rejecting writes to a table that do not match the table’s schema. 

*What Is Schema Evolution?*

Schema evolution is a feature that allows users to easily change a table’s current schema to accommodate data that is changing over time. Most commonly, it’s used when performing an append or overwrite operation, to automatically adapt the schema to include one or more new columns.

**Append the Data with Schema Evolution to the `health_tracker_processed` Delta table**

We do this using `.mode("append")`.

In [0]:
(processedDF.write
 .mode("append")
 .option("mergeSchema", True)
 .format("delta")
 .save(health_tracker + "processed"))

**Count the most recent version**

In [0]:
spark.read.table("health_tracker_processed").count()

## Delete user records

Under the European Union General Data Protection Regulation (GDPR) and the California Consumer Privacy Act (CCPA),
a user of the health tracker device has the right to request that their data be expunged from the system.
We might simply do this by deleting all records associated with that user's device id.

We use the `DELETE` Spark SQL command to remove all records from the `health_tracker_processed`
table that match the given predicate.

In [0]:
from delta.tables import DeltaTable

processedDeltaTable = DeltaTable.forPath(spark, health_tracker + "processed")
processedDeltaTable.delete("p_device_id = 4")

## Recover the Lost Data

In the previous section , we deleted all records from the `health_tracker_processed` table
for the health tracker device with id, 4. 

Suppose that the user did not wish to remove all of their data,
but merely to have their name scrubbed from the system.

In this section, we use the Time Travel capability of Delta Lake to recover everything but the user’s name.

**Prepare New upserts View**

We prepare a view for upserting using Time Travel to recover the missing records.
Note that we have replaced the entire name column with the value `NULL`.
Complete the `.where()` to grab just `p_device_id` records that are equal to 4.

In [0]:
from pyspark.sql.functions import lit

upsertsDF = (
  spark.read
  .option("versionAsOf", 5)
  .format("delta")
  .load(health_tracker + "processed")
  .where("p_device_id = 4")
  .select("dte", "time",
          "heartrate", lit(None).alias("name"), "p_device_id")
)

**Perform Upsert Into the `health_tracker_processed` Table**

Once more, we upsert into the `health_tracker_processed` Table using the DeltaTable command `.merge()`.
Note that it is necessary to define:
1. The reference to the Delta table
1. The insert logic because the schema has changed.

Our keys will be our original column names and our values will be
`"upserts+columnName"`

In [0]:
processedDeltaTable = DeltaTable.forPath(spark, health_tracker + "processed")

update_match = """health_tracker.time = upserts.time
                  AND
                  health_tracker.p_device_id = upserts.p_device_id"""
update = {"heartrate" : "upserts.heartrate"}

insert = {
      "p_device_id" : "upserts.p_device_id",
      "heartrate" : "upserts.heartrate",
      "name" : "upserts.name",
      "time" : "upserts.time",
      "dte" : "upserts.dte"
}

(processedDeltaTable.alias("health_tracker")
 .merge(upsertsDF.alias("upserts"), update_match)
 .whenMatchedUpdate(set=update)
 .whenNotMatchedInsert(values=insert)
 .execute())

**Count the Most Recent Version**

When we look at the current version, we expect to see: 

$$5\ devices \times 24\ hours \times (31 + 29 + 31)\ days $$

That should give us 10920 records.

In [0]:
(
  spark.read
  .format("delta")
  .load(health_tracker + "processed")
  .count()
)

We query the `health_tracker_processed` table to demonstrate that the name associated with device 4 has indeed been removed.

In [0]:
display(
  spark.read
  .format("delta")
  .load(health_tracker + "processed")
  .where("p_device_id = 4")
)

dte,time,heartrate,name,p_device_id,device_type
2020-03-31,2020-03-31T23:00:00.000+0000,58.8721829425,,4,
2020-03-31,2020-03-31T22:00:00.000+0000,58.5802898547,,4,
2020-03-31,2020-03-31T21:00:00.000+0000,58.9508096208,,4,
2020-03-31,2020-03-31T20:00:00.000+0000,96.1922913515,,4,
2020-03-31,2020-03-31T19:00:00.000+0000,97.8421748359,,4,
2020-03-31,2020-03-31T18:00:00.000+0000,96.2252476794,,4,
2020-03-31,2020-03-31T17:00:00.000+0000,95.6192221978,,4,
2020-03-31,2020-03-31T16:00:00.000+0000,97.1935749485,,4,
2020-03-31,2020-03-31T15:00:00.000+0000,95.3640425703,,4,
2020-03-31,2020-03-31T14:00:00.000+0000,96.202848279,,4,


## Maintaining Compliance with a Vacuum Operation
Unfortunately, with the power of the Delta Lake Time Travel feature, we are still out of compliance as the table could simply be queried against an earlier version to identify the name of the user associated with device 4.

**Query an Earlier Table Version**

We query the `health_tracker_processed` table against an earlier version to demonstrate that it is still possible to retrieve the name associated with device 4.

In [0]:
display(
  spark.read
  .option("versionAsOf", 2)
  .format("delta")
  .load(health_tracker + "processed")
  .where("p_device_id = 4")
)

dte,time,heartrate,name,p_device_id
2020-01-31,2020-01-31T23:00:00.000+0000,60.8260801166,James Hou,4
2020-01-31,2020-01-31T22:00:00.000+0000,59.9985046572,James Hou,4
2020-01-31,2020-01-31T21:00:00.000+0000,60.4589919915,James Hou,4
2020-01-31,2020-01-31T20:00:00.000+0000,100.4140296197,James Hou,4
2020-01-31,2020-01-31T19:00:00.000+0000,99.1926406258,James Hou,4
2020-01-31,2020-01-31T18:00:00.000+0000,99.6327598989,James Hou,4
2020-01-31,2020-01-31T17:00:00.000+0000,101.9077302315,James Hou,4
2020-01-31,2020-01-31T16:00:00.000+0000,99.1530432935,James Hou,4
2020-01-31,2020-01-31T15:00:00.000+0000,98.8177306896,James Hou,4
2020-01-31,2020-01-31T14:00:00.000+0000,101.2310828282,James Hou,4


**Vacuum Table to Remove Old Files**

The `VACUUM` Spark SQL command can be used to solve this problem. The `VACUUM` command recursively vacuums directories associated with the Delta table and removes files that are no longer in the latest state of the transaction log for that table and that are older than a retention threshold. The default threshold is 7 days.

In [0]:
from pyspark.sql.utils import IllegalArgumentException

try:
  processedDeltaTable.vacuum(0)
except IllegalArgumentException as error:
  print(error)

When we run this command, we receive the below error. The default threshold is in place
to prevent corruption of the Delta table.
```
IllegalArgumentException: requirement failed: Are you sure you would like
to vacuum files with such a low retention period?
If you have writers that are currently writing to this table, there is a risk
that you may corrupt the state of your Delta table.

If you are certain that there are no operations being performed on this table, such as insert/upsert/delete/optimize, then you may turn off this check by setting: spark.databricks.delta.retentionDurationCheck.enabled = false

If you are not sure, please use a value not less than "168 hours".
```

To demonstrate the `VACUUM` command, we set our retention period to 0 hours
to be able to remove the questionable files now. This is typically not a best practice
and in fact, there are safeguards in place to prevent this operation from being performed.
For demonstration purposes, we will set Delta to allow this operation.

In [0]:
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", False)

**Vacuum Table to Remove Old Files**

In [0]:
processedDeltaTable.vacuum(0)

Now when we attempt to query an earlier version, an error is thrown.
This error indicates that we are not able to query data from this earlier version because the files have been expunged from the system.

In [0]:
display(
  spark.read
  .option("versionAsOf", 4)
  .format("delta")
  .load(health_tracker + "processed")
  .where("p_device_id = 4")
)

dte,time,heartrate,name,p_device_id
2020-01-01,2020-01-01T00:00:00.000+0000,60.7236962271,James Hou,4
2020-01-01,2020-01-01T01:00:00.000+0000,59.7518357438,James Hou,4
2020-01-01,2020-01-01T02:00:00.000+0000,59.7552762926,James Hou,4
2020-01-01,2020-01-01T03:00:00.000+0000,61.8018342845,James Hou,4
2020-01-01,2020-01-01T04:00:00.000+0000,60.3112488045,James Hou,4
2020-01-01,2020-01-01T05:00:00.000+0000,60.0099058887,James Hou,4
2020-01-01,2020-01-01T06:00:00.000+0000,59.8323375338,James Hou,4
2020-01-01,2020-01-01T07:00:00.000+0000,59.9795666159,James Hou,4
2020-01-01,2020-01-01T08:00:00.000+0000,100.6013295271,James Hou,4
2020-01-01,2020-01-01T09:00:00.000+0000,100.1857471896,James Hou,4


## Summary

We used Spark SQL and Delta Lake to do the following to create a Single Source of Truth in our EDSS, the `health_tracker_processed` Delta table.

We did this through the following steps:
- We converted an existing Parquet-based data lake table to a Delta table, health_tracker_processed.
- We performed a batch upload of new data to this table.
- We used Apache Spark to identify broken and missing records in this table.
- We used Delta Lake’s ability to do an upsert, where we updated broken records and inserted missing records.
- We evolved the schema of the Delta table.
- We used Delta Lake’s Time Travel feature to scrub the personal data of a user intelligently.

Additionally, we used Delta Lake to create an aggregate table, `health_tracker_user_analytics`, downstream from the `health_tracker_processed` table.