In [0]:
#Specify the database
username = "mariapastora"
dbutils.widgets.text("username", username)
spark.sql(f"CREATE DATABASE IF NOT EXISTS dbacademy_{username}")
spark.sql(f"USE dbacademy_{username}")
health_tracker = f"/dbacademy/{username}/DLRS/healthtracker/"

In [0]:
#Configure the Number of Shuffle Partitions
spark.conf.set("spark.sql.shuffle.partitions", 8)

In [0]:
%sh
#Download the data to the driver
#%sh permite usar comando de terminal
#wget para traer datos de paginas de internet

wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_1.json
wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_2.json
wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_2_late.json
wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_3.json

--2022-06-06 06:42:00--  https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_1.json
Resolving hadoop-and-big-data.s3-us-west-2.amazonaws.com (hadoop-and-big-data.s3-us-west-2.amazonaws.com)... 52.218.184.17
Connecting to hadoop-and-big-data.s3-us-west-2.amazonaws.com (hadoop-and-big-data.s3-us-west-2.amazonaws.com)|52.218.184.17|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 310628 (303K) [application/json]
Saving to: ‘health_tracker_data_2020_1.json’

     0K .......... .......... .......... .......... .......... 16% 30.1M 0s
    50K .......... .......... .......... .......... .......... 32% 42.9M 0s
   100K .......... .......... .......... .......... .......... 49% 37.2M 0s
   150K .......... .......... .......... .......... .......... 65% 51.0M 0s
   200K .......... .......... .......... .......... .......... 82% 32.2M 0s
   250K .......... .......... .......... .......... .......... 98% 44.2M 0s
   300K ...     

In [0]:
%sh ls
##Verify the downloads

conf
eventlogs
ganglia
health_tracker_data_2020_1.json
health_tracker_data_2020_2.json
health_tracker_data_2020_2_late.json
health_tracker_data_2020_3.json
logs
metastore_db
preload_class.lst


In [0]:
#Move the data to the raw directory
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_1.json", 
              health_tracker + "raw/health_tracker_data_2020_1.json")
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_2.json", 
              health_tracker + "raw/health_tracker_data_2020_2.json")
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_2_late.json", 
              health_tracker + "raw/health_tracker_data_2020_2_late.json")
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_3.json", 
              health_tracker + "raw/health_tracker_data_2020_3.json")

Out[63]: True

In [0]:
#Load the data
#Load the data as a Spark DataFrame from the raw directory.
file_path = health_tracker + "raw/health_tracker_data_2020_1.json"
 
health_tracker_data_2020_1_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [0]:
#Display the data
display(health_tracker_data_2020_1_df)


device_id,heartrate,name,time
0,52.8139067501,Deborah Powell,1577836800.0
0,53.9078900098,Deborah Powell,1577840400.0
0,52.7129593616,Deborah Powell,1577844000.0
0,52.2880422685,Deborah Powell,1577847600.0
0,52.5156095386,Deborah Powell,1577851200.0
0,53.6280743846,Deborah Powell,1577854800.0
0,52.1760037066,Deborah Powell,1577858400.0
0,90.0456721836,Deborah Powell,1577862000.0
0,89.4695644522,Deborah Powell,1577865600.0
0,88.1490304138,Deborah Powell,1577869200.0


In [0]:
#Create a Parquet Table
#Step 1: Remove files in the /dbacademy/DLRS/healthtracker/processed directory (This step will make the notebook idempotent. In other words, it could be run more than once without throwing errors or introducing extra files)
dbutils.fs.rm(health_tracker + "processed", recurse=True)

Out[66]: False

In [0]:
#Step 2: Transform the data 
#Use the from_unixtime Spark SQL function to transform the unix timestamp into a time string
#Cast the time column to type timestamp to replace the column time
#Cast the time column to type date to create the column dte
#Select the columns in the order in which we would like them to be written
from pyspark.sql.functions import col, from_unixtime
 
def process_health_tracker_data(dataframe):
  return (
    dataframe
    .withColumn("time", from_unixtime("time"))
    .withColumnRenamed("device_id", "p_device_id")
    .withColumn("time", col("time").cast("timestamp"))
    .withColumn("dte", col("time").cast("date"))
    .withColumn("p_device_id", col("p_device_id").cast("integer"))
    .select("dte", "time", "heartrate", "name", "p_device_id")
    )
  
processedDF = process_health_tracker_data(health_tracker_data_2020_1_df)

In [0]:
#Step 3: Write the Files to the processed directory
(processedDF.write
 .mode("overwrite")
 .format("parquet")
 .partitionBy("p_device_id")
 .save(health_tracker + "processed"))

In [0]:
%sql
--Step 4: Register the table in the metastore
--use Spark SQL to register the table in the metastore. We specify the table format as parquet and we refer to the location where we wrote the parquet files

DROP TABLE IF EXISTS health_tracker_processed;
CREATE TABLE health_tracker_processed                        
USING PARQUET                
LOCATION "/dbacademy/$username/DLRS/healthtracker/processed"


In [0]:
#Step 5: Verify and repair the Parquet-based Data Lake table
#Step 5a: Count the records in the health_tracker_processed table
#Per best practice, we have created a partitioned table. However, if you create a partitioned table from existing data, Spark SQL does not automatically discover the partitions
#and register them in the Metastore. Note that the count does not return results
health_tracker_processed = spark.read.table("health_tracker_processed")
health_tracker_processed.count()

Out[69]: 0

In [0]:
%sql
--Step 5b: Register the partitions

MSCK REPAIR TABLE health_tracker_processed

In [0]:
#Step 5c: Count the records in the health_tracker_processed table
health_tracker_processed.count()

Out[70]: 3720

In [0]:
%sql
--Creating Delta table from a parquet table
--You can create a Delta table by either of the following methods:
--Convert parquet files using the Delta Lake API
--Write new files using the Spark DataFrame writer with .format("delta")
--Either of these will automatically create the Transaction Log in the same top-level directory as the data files. Optionally, you can also register the table in the Metastore.
--Describe the health_tracker_processed table to verifique its a parquet

DESCRIBE DETAIL health_tracker_processed


format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
PARQUET,,dbacademy_mariapastora.health_tracker_processed,,dbfs:/dbacademy/mariapastora/DLRS/healthtracker/processed,2022-06-06T06:42:50.000+0000,,List(p_device_id),,,Map(),,


In [0]:
#Step 1: Convert the files to Delta files
#The conversion creates a Delta Lake transaction log that tracks the files. Now, the directory is a directory of Delta files

from delta.tables import DeltaTable

parquet_table = f"parquet.`{health_tracker}processed`"
partitioning_scheme = "p_device_id int"

DeltaTable.convertToDelta(spark, parquet_table, partitioning_scheme)

Out[71]: <delta.tables.DeltaTable at 0x7fa44686bd90>

In [0]:
%sql
--Step 2: Register the Delta table
--the files containing our records have been converted to Delta files. The Metastore, however, has not been updated to reflect the change. To change this we re-register the table --in the Metastore. The Spark SQL command will automatically infer the data schema by reading the footers of the Delta files. 

DROP TABLE IF EXISTS health_tracker_processed;

CREATE TABLE health_tracker_processed
USING DELTA
LOCATION "/dbacademy/${username}/DLRS/healthtracker/processed"

In [0]:
%sql
--Step 3: Describe the health_tracker_processed table

DESCRIBE DETAIL health_tracker_processed

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
delta,e107b958-b242-48ae-a594-8eb77f9ee520,dbacademy_mariapastora.health_tracker_processed,,dbfs:/dbacademy/mariapastora/DLRS/healthtracker/processed,2022-06-06T06:43:27.677+0000,2022-06-06T06:43:30.000+0000,List(p_device_id),5,58024,Map(),1,2


In [0]:
#Step 4: Count the records in the health_tracker_processed table
health_tracker_processed = spark.read.table("health_tracker_processed")
health_tracker_processed.count()

Out[72]: 3720

In [0]:
#Create a New Delta table
#We'll do this by creating an aggregate table from the data in the health_track_processed
#Step 1: Remove files in the health_tracker_user_analytics directory
dbutils.fs.rm(health_tracker + "gold/health_tracker_user_analytics",
              recurse=True)

Out[73]: True

In [0]:
#Step 2: Create an aggregate DataFrame
from pyspark.sql.functions import col, avg, max, stddev

health_tracker_gold_user_analytics = (
  health_tracker_processed
  .groupby("p_device_id")
  .agg(avg(col("heartrate")).alias("avg_heartrate"),
       max(col("heartrate")).alias("max_heartrate"),
       stddev(col("heartrate")).alias("stddev_heartrate"))
)

In [0]:
#Step 3: Write the Delta files
(health_tracker_gold_user_analytics.write
 .format("delta")
 .mode("overwrite")
 .save(health_tracker + "gold/health_tracker_user_analytics"))

In [0]:
%sql
--Step 4: Register the Delta table in the Metastore

DROP TABLE IF EXISTS health_tracker_gold_user_analytics;

CREATE TABLE health_tracker_gold_user_analytics
USING DELTA
LOCATION "/dbacademy/$username/DLRS/healthtracker/gold/health_tracker_user_analytics"

In [0]:
#Prepare a dashboard using the health_tracker_user_analytics table
display(spark.read.table("health_tracker_gold_user_analytics"))

p_device_id,avg_heartrate,max_heartrate,stddev_heartrate
1,78.5776567337699,168.114687819,31.61967903784856
0,81.21484441523789,186.4790827731,31.343789198032887
3,82.65419819635204,171.8435388833,30.92932874000444
4,83.08377376550952,173.5770785921,34.16032267669617
2,79.99574196662837,184.7433209566,31.408007741222


In [0]:
#Batch Write to Delta Tables
#we look at two patterns for modifying existing Delta tables:
##appending files to an existing directory of Delta files
##merging a set of updates and insertions
#Appending files to an existing Delta table:
#Step 1: Load the next month of data
#we append the next month of records. We begin by loading the data from the file
file_path = health_tracker + "raw/health_tracker_data_2020_2.json"
 
health_tracker_data_2020_2_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [0]:
#Step 2: Transform the data
#This is done using the process_health_tracker_data function we defined previously
processedDF = process_health_tracker_data(health_tracker_data_2020_2_df)

In [0]:
#Step 3: Append the data to the health_tracker_processed Delta table
#We do this using .mode("append"). Note that it is not necessary to perform any action on the Metastore
(processedDF.write
 .mode("append")
 .format("delta")
 .save(health_tracker + "processed"))

In [0]:
#View the commit using Time Travel
#Step 1: View the table as of version 0
(spark.read
 .option("versionAsOf", 0)
 .format("delta")
 .load(health_tracker + "processed")
 .count())

Out[80]: 3720

In [0]:
#Step 2: Count the most recent version
#When we query the table without specifying a version, it shows the latest version of the table and includes the new records added.
health_tracker_processed.count()
#We are missing 72 records.

Out[81]: 7128

In [0]:
#Late-Arriving Data
#Delta Lake allows us to process data as it arrives and is prepared to handle the occurrence of late-arriving data
#Step 1: Count the number of records per device
from pyspark.sql.functions import count

display(
  spark.read
  .format("delta")
  .load(health_tracker + "processed")
  .groupby("p_device_id")
  .agg(count("*"))
)
#It looks like device 4 is missing 72 records. 

p_device_id,count(1)
1,1440
3,1440
2,1440
4,1368
0,1440


In [0]:
#Step 2: Plot the missing records
display(
  spark.read
  .format("delta")
  .load(health_tracker + "processed")
  .where(col("p_device_id").isin([3,4]))
)
#It appears that we have no records for device 4 for the last few days of the month.

dte,time,heartrate,name,p_device_id
2020-01-01,2020-01-01T00:00:00.000+0000,55.2272036665,Minh Nguyen,3
2020-01-01,2020-01-01T01:00:00.000+0000,56.035689123,Minh Nguyen,3
2020-01-01,2020-01-01T02:00:00.000+0000,55.6403282219,Minh Nguyen,3
2020-01-01,2020-01-01T03:00:00.000+0000,56.3692513843,Minh Nguyen,3
2020-01-01,2020-01-01T04:00:00.000+0000,56.5412281859,Minh Nguyen,3
2020-01-01,2020-01-01T05:00:00.000+0000,55.8311481148,Minh Nguyen,3
2020-01-01,2020-01-01T06:00:00.000+0000,54.9402513831,Minh Nguyen,3
2020-01-01,2020-01-01T07:00:00.000+0000,92.2205431894,Minh Nguyen,3
2020-01-01,2020-01-01T08:00:00.000+0000,93.8159033652,Minh Nguyen,3
2020-01-01,2020-01-01T09:00:00.000+0000,92.0210547557,Minh Nguyen,3


In [0]:
#Broken Readings in the Table
#Step 1: Create temporary view for broken readings
broken_readings = (
  health_tracker_processed
  .select(col("heartrate"), col("dte"))
  .where(col("heartrate") < 0)
  .groupby("dte")
  .agg(count("heartrate"))
  .orderBy("dte")
)
 
broken_readings.createOrReplaceTempView("broken_readings")

In [0]:
%sql 
--Step 2: Display broken_readings
--Note that most days have at least one broken reading and that some have more than one. 
SELECT * FROM broken_readings

dte,count(heartrate)
2020-01-01,1
2020-01-02,1
2020-01-04,1
2020-01-06,1
2020-01-07,1
2020-01-09,2
2020-01-12,3
2020-01-13,2
2020-01-14,1
2020-01-16,2


In [0]:
%sql
--Step 3: Sum the broken readings
SELECT SUM(`count(heartrate)`) FROM broken_readings   

sum(count(heartrate))
60


In [0]:
#Repair Records with an Upsert (updates and insertions)
#we will repair the table by modifying the health_tracker_processed table.
#There are two patterns for modifying existing Delta tables:
##appending files to an existing directory of Delta files
##merging a set of updates and insertions
#An upsert will update records where some criteria are met and otherwise will insert the record
#When upserting into an existing Delta table, use Spark SQL to perform the merge from another registered table or view. The Transaction Log records the transaction, and the Metastore immediately reflects the changes.
#The merge appends both the new/inserted files and the files containing the updates to the Delta file directory. The transaction log tells the Delta reader which file to use for each record

In [0]:
#A)Prepare updates DataFrame
#To repair the broken sensor readings (less than zero), we'll interpolate using the value recorded before and after for each device. The Spark SQL functions LAG and LEAD will make this a trivial calculation. 
#We'll write these values to a temporary view called updates. This view will be used later to upsert values into our health_tracker_processed Delta table.
#Step 1: Create a DataFrame interpolating broken values
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag, lead
 
dteWindow = Window.partitionBy("p_device_id").orderBy("dte")
 
interpolatedDF = (
  spark.read
  .table("health_tracker_processed")
  .select(col("dte"),
          col("time"),
          col("heartrate"),
          lag(col("heartrate")).over(dteWindow).alias("prev_amt"),
          lead(col("heartrate")).over(dteWindow).alias("next_amt"),
          col("name"),
          col("p_device_id"))
)

In [0]:
#Step 2: Create a DataFrame of updates
updatesDF = (
  interpolatedDF
  .where(col("heartrate") < 0)
  .select(col("dte"),
          col("time"),
          ((col("prev_amt") + col("next_amt"))/2).alias("heartrate"),
          col("name"),
          col("p_device_id"))
)

In [0]:
#Step 3: View the schemas of the updatesDF and health_tracker_processed table
health_tracker_processed.printSchema()
updatesDF.printSchema()

root
 |-- dte: date (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- heartrate: double (nullable = true)
 |-- name: string (nullable = true)
 |-- p_device_id: integer (nullable = true)

root
 |-- dte: date (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- heartrate: double (nullable = true)
 |-- name: string (nullable = true)
 |-- p_device_id: integer (nullable = true)



In [0]:
#Step 4: Verify UpdatesDF
#It should have the same number of records as the SUM performed on the broken_readings view.
updatesDF.count()

Out[88]: 60

In [0]:
#B)Prepare inserts DataFrame
#Step 1: Load the late-arriving data
#It turns out that our expectation of receiving the missing records late was correct. These records have subsequently been made available to us as the file health_tracker_data_2020_02_01.json.
file_path = health_tracker + "raw/health_tracker_data_2020_2_late.json"
 
health_tracker_data_2020_2_late_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [0]:
#Step 2: Transform the data
insertsDF = process_health_tracker_data(health_tracker_data_2020_2_late_df)

In [0]:
#Step 3: View the schema of the inserts DataFrame
insertsDF.printSchema()

root
 |-- dte: date (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- heartrate: double (nullable = true)
 |-- name: string (nullable = true)
 |-- p_device_id: integer (nullable = true)



In [0]:
#C)Prepare Upserts DataFrame
#Step 1: Create the union DataFrame
#Finally, we prepare the upsertsDF that consists of all the records in both the updatesDF and the insertsDF. We use the DataFrame .union() command to create the view.
upsertsDF = updatesDF.union(insertsDF)

In [0]:
#Step 2: View the schema
upsertsDF.printSchema()

root
 |-- dte: date (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- heartrate: double (nullable = true)
 |-- name: string (nullable = true)
 |-- p_device_id: integer (nullable = true)



In [0]:
#D)Perform Upsert into the health_tracker_processed table
#You can upsert data into a Delta table using the merge operation. This operation is similar to the SQL MERGE command but has added support for deletes and other conditions in updates, inserts, and deletes
#Perform the Upsert

processedDeltaTable = DeltaTable.forPath(spark, health_tracker + "processed")

update_match = """
  health_tracker.time = upserts.time 
  AND 
  health_tracker.p_device_id = upserts.p_device_id
"""

update = { "heartrate" : "upserts.heartrate" }

insert = {
  "p_device_id" : "upserts.p_device_id",
  "heartrate" : "upserts.heartrate",
  "name" : "upserts.name",
  "time" : "upserts.time",
  "dte" : "upserts.dte"
}

(processedDeltaTable.alias("health_tracker")
 .merge(upsertsDF.alias("upserts"), update_match)
 .whenMatchedUpdate(set=update)
 .whenNotMatchedInsert(values=insert)
 .execute())

In [0]:
#View the commit using time travel
#Step 1: View the table as of version 1
(spark.read
 .option("versionAsOf", 1)
 .format("delta")
 .load(health_tracker + "processed")
 .count())

Out[95]: 7128

In [0]:
#Step 2: Count the most recent version
health_tracker_processed.count()

Out[96]: 7200

In [0]:
#Step 3: Describe the history of the health_tracker_processed table
#The .history() Delta table command provides provenance information, including the operation, user, and so on, for each action performed on a table. 
display(processedDeltaTable.history())

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
2,2022-06-06T06:46:24.000+0000,3279574748515926,mariapastora.alvarez@bosonit.com,MERGE,"Map(predicate -> ((health_tracker.time = upserts.time) AND (health_tracker.p_device_id = upserts.p_device_id)), matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(4192180070529184),0606-060456-wpn3v13q,1,WriteSerializable,False,"Map(numTargetRowsCopied -> 7068, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 5, executionTimeMs -> 12518, numTargetRowsInserted -> 72, scanTimeMs -> 4618, numTargetRowsUpdated -> 60, numOutputRows -> 7200, numTargetChangeFilesAdded -> 0, numSourceRows -> 132, numTargetFilesRemoved -> 10, rewriteTimeMs -> 7833)",,Databricks-Runtime/10.4.x-scala2.12
1,2022-06-06T06:44:39.000+0000,3279574748515926,mariapastora.alvarez@bosonit.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(4192180070529184),0606-060456-wpn3v13q,0,WriteSerializable,True,"Map(numFiles -> 5, numOutputRows -> 3408, numOutputBytes -> 53803)",,Databricks-Runtime/10.4.x-scala2.12
0,2022-06-06T06:43:30.000+0000,3279574748515926,mariapastora.alvarez@bosonit.com,CONVERT,"Map(numFiles -> 5, partitionedBy -> [""p_device_id""], collectStats -> true)",,List(4192180070529184),0606-060456-wpn3v13q,-1,Serializable,False,Map(numConvertedFiles -> 5),,Databricks-Runtime/10.4.x-scala2.12


In [0]:
%sql 

--Perform a Second Upsert
--Step 1: Sum the broken readings

SELECT SUM(`count(heartrate)`) FROM broken_readings

sum(count(heartrate))
1


In [0]:
%sql 

--Step 2: Verify that these are new broken readings
--Let’s query the broken_readings with a WHERE clause to verify that these are indeed new broken readings introduced by inserting the late-arriving data. 

SELECT SUM(`count(heartrate)`) FROM broken_readings WHERE dte < '2020-02-25'

sum(count(heartrate))
""


In [0]:
#Step 3: Verify updates
#It should have the same number of records as the SUM performed on the broken_readings view.
updatesDF.count()

Out[98]: 1

In [0]:
#Step 4: Perform Upsert into the health_tracker_processed table
upsertsDF = updatesDF

(processedDeltaTable.alias("health_tracker")
 .merge(upsertsDF.alias("upserts"), update_match)
 .whenMatchedUpdate(set=update)
 .whenNotMatchedInsert(values=insert)
 .execute())

In [0]:
%sql
--Step 5: Sum the broken readings

SELECT SUM(`count(heartrate)`) FROM broken_readings

sum(count(heartrate))
""


In [0]:
#Appending files to an existing Delta table
#Step 1: Load the next month of data
file_path = health_tracker + "raw/health_tracker_data_2020_3.json"

health_tracker_data_2020_3_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [0]:
#Step 2: Transform the data
def process_health_tracker_data(dataframe):
  return (
    dataframe
    .withColumn("time", from_unixtime("time"))
    .withColumnRenamed("device_id", "p_device_id")
    .withColumn("time", col("time").cast("timestamp"))
    .withColumn("dte", col("time").cast("date"))
    .withColumn("p_device_id", col("p_device_id").cast("integer"))
    .select("dte", "time", "device_type", "heartrate", "name", "p_device_id")
    )
  
processedDF = process_health_tracker_data(health_tracker_data_2020_3_df)

In [0]:
#Step 3: Append the data to the health_tracker_processed Delta table with schema evolution
#It’s used when performing an append or overwrite operation, to automatically adapt the schema to include one or more new columns.

(processedDF.write
 .mode("append")
 .option("mergeSchema", True)
 .format("delta")
 .save(health_tracker + "processed"))

In [0]:
#Verify the commit 
health_tracker_processed.count()

Out[105]: 10920

In [0]:
#Delete Data and Recover Lost Data
#Step 1: Delete all records for device 4
processedDeltaTable.delete("p_device_id = 4")

In [0]:
#Recover lost data
#we use the Time Travel capability of Delta Lake to recover everything but the user’s name
#Step 1: Prepare new Upserts view
from pyspark.sql.functions import lit

upsertsDF = (
  spark.read
  .option("versionAsOf", 4)
  .format("delta")
  .load(health_tracker + "processed")
  .where("p_device_id = 4")
  .select("dte", "time", "device_type", "heartrate", lit(None).alias("name"), "p_device_id")
)

In [0]:
#Step 2: Perform Upsert into the health_tracker_processed table
#Note that it is necessary to define 1) the reference to the Delta table and 2) the insert logic because the schema has changed.
processedDeltaTable = DeltaTable.forPath(spark, health_tracker + "processed")

insert = {
  "dte" : "upserts.dte",
  "time" : "upserts.time",
  "device_type" : "upserts.device_type",
  "heartrate" : "upserts.heartrate",
  "name" : "upserts.name",
  "p_device_id" : "upserts.p_device_id"
}

(processedDeltaTable.alias("health_tracker")
 .merge(upsertsDF.alias("upserts"), update_match)
 .whenMatchedUpdate(set=update)
 .whenNotMatchedInsert(values=insert)
 .execute())

In [0]:
#Step 3: Count the most recent version
#When we look at the current version, we expect to see three months of data, five device measurements, 24 hours a day for (31 + 29 + 31) days, or 10920 records
health_tracker_processed.count()

Out[110]: 10920

In [0]:
#Step 4: Query device 4 to demonstrate compliance
display(health_tracker_processed.where("p_device_id = 4"))

dte,time,heartrate,name,p_device_id
2020-03-01,2020-03-01T00:00:00.000+0000,97.8678768636,,4
2020-03-01,2020-03-01T01:00:00.000+0000,97.586595396,,4
2020-03-01,2020-03-01T02:00:00.000+0000,97.188151848,,4
2020-03-01,2020-03-01T03:00:00.000+0000,97.4361573672,,4
2020-03-01,2020-03-01T04:00:00.000+0000,95.8997954454,,4
2020-03-01,2020-03-01T05:00:00.000+0000,96.5277339825,,4
2020-03-01,2020-03-01T06:00:00.000+0000,98.1774838993,,4
2020-03-01,2020-03-01T07:00:00.000+0000,95.8929343311,,4
2020-03-01,2020-03-01T08:00:00.000+0000,95.545442375,,4
2020-03-01,2020-03-01T09:00:00.000+0000,95.3578614286,,4


In [0]:
#Maintain Compliance with a Vacuum Operation
#Step 1: Query an earlier table version 
#We query the health_tracker_processed table against an earlier version to demonstrate that it is still possible to retrieve the name associated with device 4.
display(
  spark.read
  .option("versionAsOf", 4)
  .format("delta")
  .load(health_tracker + "processed")
  .where("p_device_id = 4")
)

dte,time,heartrate,name,p_device_id,device_type
2020-01-01,2020-01-01T00:00:00.000+0000,60.7236962271,James Hou,4,
2020-01-01,2020-01-01T01:00:00.000+0000,59.7518357438,James Hou,4,
2020-01-01,2020-01-01T02:00:00.000+0000,59.7552762926,James Hou,4,
2020-01-01,2020-01-01T03:00:00.000+0000,61.8018342845,James Hou,4,
2020-01-01,2020-01-01T04:00:00.000+0000,60.3112488045,James Hou,4,
2020-01-01,2020-01-01T05:00:00.000+0000,60.0099058887,James Hou,4,
2020-01-01,2020-01-01T06:00:00.000+0000,59.8323375338,James Hou,4,
2020-01-01,2020-01-01T07:00:00.000+0000,59.9795666159,James Hou,4,
2020-01-01,2020-01-01T08:00:00.000+0000,100.6013295271,James Hou,4,
2020-01-01,2020-01-01T09:00:00.000+0000,100.1857471896,James Hou,4,


In [0]:
#Step 2: Vacuum table to remove old files
#The VACUUM command recursively vacuums directories associated with the Delta table and removes files that are no longer in the latest state of the transaction log for that table and that are older than a retention threshold. The default threshold is 7 days. 
processedDeltaTable.vacuum(0)
#da error, tenemos q configurar

[0;31m---------------------------------------------------------------------------[0m
[0;31mIllegalArgumentException[0m                  Traceback (most recent call last)
[0;32m<command-4138986985048147>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;31m#Step 2: Vacuum table to remove old files[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[1;32m      2[0m [0;31m#The VACUUM command recursively vacuums directories associated with the Delta table and removes files that are no longer in the latest state of the transaction log for that table and that are older than a retention threshold. The default threshold is 7 days.[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 3[0;31m [0mprocessedDeltaTable[0m[0;34m.[0m[0mvacuum[0m[0;34m([0m[0;36m0[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/delta/tables.py[0m in [0;36mvacuum[0;34m(self, retentionHours)[0m
[1;32m    245[0m         [0;32melse[0m[0;34m:[0m[0;34m[0m[0;34m[0m[

In [0]:
#Step 3: Set Delta to allow the operation
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", False)

In [0]:
#Step 4: Vacuum table to remove old files
processedDeltaTable.vacuum(0)

Out[115]: DataFrame[]

In [0]:
#Step 5: Attempt to query an earlier version
#This error indicates that we are not able to query data from this earlier version because the files have been expunged from the system.
display(
  spark.read
  .option("versionAsOf", 4)
  .format("delta")
  .load(health_tracker + "processed")
  .where("p_device_id = 4")
)