In [1]:
username = 'jiaxi'
dbutils.widgets.text("username", username)
spark.sql(f"CREATE DATABASE IF NOT EXISTS db_{username}")
spark.sql(f"USE db_{username}")
health_tracker = f"/dbacademy/{username}/DLRS/healthtracker/"

In [2]:
# Configure spark shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", 8)

In [3]:
%sh
mkdir -p ./data/health_tracker/
cd ./data/health_tracker/
pwd
wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_1.json
wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_2.json
wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_2_late.json
wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_3.json

In [4]:
%sh
ls data/health_tracker/

In [5]:
health_tracker

In [6]:
dbutils.fs.mv("file:/databricks/driver/data/health_tracker/health_tracker_data_2020_1.json", 
              health_tracker + "raw/health_tracker_data_2020_1.json")
dbutils.fs.mv("file:/databricks/driver/data/health_tracker/health_tracker_data_2020_2.json", 
              health_tracker + "raw/health_tracker_data_2020_2.json")
dbutils.fs.mv("file:/databricks/driver/data/health_tracker/health_tracker_data_2020_2_late.json", 
              health_tracker + "raw/health_tracker_data_2020_2_late.json")
dbutils.fs.mv("file:/databricks/driver/data/health_tracker/health_tracker_data_2020_3.json", 
              health_tracker + "raw/health_tracker_data_2020_3.json")

In [7]:
dbutils.fs.help()

In [8]:
# List dbfs
# display(dbutils.fs.ls(health_tracker))
dbutils.fs.ls('dbfs:/')

In [9]:
%fs ls f'/dbacademy/jiaxi/DLRS/healthtracker/raw/

In [10]:
file_path = health_tracker + "raw/health_tracker_data_2020_1.json"
 
health_tracker_data_2020_1_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [11]:
display(health_tracker_data_2020_1_df)

device_id,heartrate,name,time
0,52.8139067501,Deborah Powell,1577836800.0
0,53.9078900098,Deborah Powell,1577840400.0
0,52.7129593616,Deborah Powell,1577844000.0
0,52.2880422685,Deborah Powell,1577847600.0
0,52.5156095386,Deborah Powell,1577851200.0
0,53.6280743846,Deborah Powell,1577854800.0
0,52.1760037066,Deborah Powell,1577858400.0
0,90.0456721836,Deborah Powell,1577862000.0
0,89.4695644522,Deborah Powell,1577865600.0
0,88.1490304138,Deborah Powell,1577869200.0


In [12]:
# remove existing folder 
dbutils.fs.rm(health_tracker + "processed", recurse=True)

In [13]:
from pyspark.sql.functions import col, from_unixtime

def process_health_tracker_data(dataframe):
  return (
    dataframe
    .withColumn("time", from_unixtime("time"))
    .withColumnRenamed("device_id", "p_device_id")
    .withColumn("time", col("time").cast("timestamp"))
    .withColumn("dte", col("time").cast("date"))
    .withColumn("p_device_id", col("p_device_id").cast("integer"))
    .select("dte", "time", "heartrate", "name", "p_device_id")
    )

processedDF = process_health_tracker_data(health_tracker_data_2020_1_df)

In [14]:
display(processedDF)

dte,time,heartrate,name,p_device_id
2020-01-01,2020-01-01T00:00:00.000+0000,52.8139067501,Deborah Powell,0
2020-01-01,2020-01-01T01:00:00.000+0000,53.9078900098,Deborah Powell,0
2020-01-01,2020-01-01T02:00:00.000+0000,52.7129593616,Deborah Powell,0
2020-01-01,2020-01-01T03:00:00.000+0000,52.2880422685,Deborah Powell,0
2020-01-01,2020-01-01T04:00:00.000+0000,52.5156095386,Deborah Powell,0
2020-01-01,2020-01-01T05:00:00.000+0000,53.6280743846,Deborah Powell,0
2020-01-01,2020-01-01T06:00:00.000+0000,52.1760037066,Deborah Powell,0
2020-01-01,2020-01-01T07:00:00.000+0000,90.0456721836,Deborah Powell,0
2020-01-01,2020-01-01T08:00:00.000+0000,89.4695644522,Deborah Powell,0
2020-01-01,2020-01-01T09:00:00.000+0000,88.1490304138,Deborah Powell,0


In [15]:
# Write to processed path
processedDF.write \
  .mode("overwrite") \
  .format("parquet") \
  .partitionBy("p_device_id") \
  .save(health_tracker + "processed")

In [16]:
%fs ls /dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=0/

path,name,size
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=0/_SUCCESS,_SUCCESS,0
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=0/_committed_2146857267709725675,_committed_2146857267709725675,124
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=0/_committed_7566592121338257719,_committed_7566592121338257719,124
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=0/_started_2146857267709725675,_started_2146857267709725675,0
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=0/_started_7566592121338257719,_started_7566592121338257719,0
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=0/part-00000-32df48db-4e4b-40c9-b99e-e9583745f2e2.c000.snappy.parquet,part-00000-32df48db-4e4b-40c9-b99e-e9583745f2e2.c000.snappy.parquet,11411
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=0/part-00000-581a45e6-f0df-4036-8a84-75a88530ae32.c000.snappy.parquet,part-00000-581a45e6-f0df-4036-8a84-75a88530ae32.c000.snappy.parquet,10762
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=0/part-00000-tid-2146857267709725675-20aa07c6-6bcc-4ec6-ac3e-053531452f17-743-1.c000.snappy.parquet,part-00000-tid-2146857267709725675-20aa07c6-6bcc-4ec6-ac3e-053531452f17-743-1.c000.snappy.parquet,11411
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=0/part-00000-tid-7566592121338257719-ee9acf60-9ad2-4fa3-bfe3-5d951e6d6491-748-1.c000.snappy.parquet,part-00000-tid-7566592121338257719-ee9acf60-9ad2-4fa3-bfe3-5d951e6d6491-748-1.c000.snappy.parquet,11411


In [17]:
%sql 

DROP TABLE IF EXISTS health_tracker_processed;

CREATE TABLE health_tracker_processed                        
USING PARQUET                
LOCATION "/dbacademy/$username/DLRS/healthtracker/processed"

In [18]:
%sql 
-- MSCK REPAIR TABLE health_tracker_processed;
SELECT * FROM health_tracker_processed where p_device_id = '0' and time = TIMESTAMP('2020-01-24T07:00:00')

dte,time,heartrate,name,p_device_id
2020-01-24,2020-01-24T07:00:00.000+0000,186.4790827731,Deborah Powell,0


In [19]:
%sql

DESCRIBE DETAIL health_tracker_processed

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
delta,f91281db-5b2b-4f58-9a17-7314349f8f21,db_jiaxi.health_tracker_processed,,dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed,2020-07-27T05:29:54.106+0000,2020-07-27T06:02:17.000+0000,List(p_device_id),5,56938,Map(),1,2


In [20]:
from delta.tables import DeltaTable

parquet_table = f"parquet.`{health_tracker}processed`"
partitioning_scheme = "p_device_id int"

DeltaTable.convertToDelta(spark, parquet_table, partitioning_scheme)

In [21]:
%sql

DROP TABLE IF EXISTS health_tracker_processed;

CREATE TABLE health_tracker_processed
USING DELTA
LOCATION "/dbacademy/$username/DLRS/healthtracker/processed"

In [22]:
%sql DESCRIBE DETAIL health_tracker_processed

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
delta,f91281db-5b2b-4f58-9a17-7314349f8f21,db_jiaxi.health_tracker_processed,,dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed,2020-07-27T05:29:54.106+0000,2020-07-27T05:29:57.000+0000,List(p_device_id),10,113876,Map(),1,2


In [23]:
health_tracker_processed = spark.read.table("health_tracker_processed")
health_tracker_processed.count()

In [24]:
dbutils.fs.rm(health_tracker + "gold/health_tracker_user_analytics",
              recurse=True)

In [25]:
from pyspark.sql.functions import col, avg, max, stddev

health_tracker_gold_user_analytics = (
  health_tracker_processed
  .groupby("p_device_id")
  .agg(avg(col("heartrate")).alias("avg_heartrate"),
       max(col("heartrate")).alias("max_heartrate"),
       stddev(col("heartrate")).alias("stddev_heartrate"))
)

In [26]:
# display(health_tracker_gold_user_analytics)

In [27]:
(health_tracker_gold_user_analytics.write
 .format("delta")
 .mode("overwrite")
 .save(health_tracker + "gold/health_tracker_user_analytics"))

In [28]:
display(dbutils.fs.ls(health_tracker + "gold/health_tracker_user_analytics/"))

path,name,size
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/gold/health_tracker_user_analytics/_delta_log/,_delta_log/,0
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/gold/health_tracker_user_analytics/part-00000-58ae8ea2-5f44-4b41-aa60-0c819ca1f841-c000.snappy.parquet,part-00000-58ae8ea2-5f44-4b41-aa60-0c819ca1f841-c000.snappy.parquet,586
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/gold/health_tracker_user_analytics/part-00003-ec5626d2-9cf7-4526-a9d9-fa60e04a31b8-c000.snappy.parquet,part-00003-ec5626d2-9cf7-4526-a9d9-fa60e04a31b8-c000.snappy.parquet,1297
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/gold/health_tracker_user_analytics/part-00006-779d6ae2-2068-4e98-9118-596aeb2343f4-c000.snappy.parquet,part-00006-779d6ae2-2068-4e98-9118-596aeb2343f4-c000.snappy.parquet,1297
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/gold/health_tracker_user_analytics/part-00007-eaee8d41-bdad-4b1a-8239-225d9ee7b560-c000.snappy.parquet,part-00007-eaee8d41-bdad-4b1a-8239-225d9ee7b560-c000.snappy.parquet,1267


In [29]:
%sql

DROP TABLE IF EXISTS health_tracker_gold_user_analytics;

CREATE TABLE health_tracker_gold_user_analytics
USING DELTA
LOCATION "/dbacademy/$username/DLRS/healthtracker/gold/health_tracker_user_analytics"

At Delta table creation, the Delta files in Object Storage define the schema, partitioning, and table properties. For this reason, it is not necessary to specify any of these when registering the table with the Metastore. Furthermore, NO TABLE REPAIR IS REQUIRED. The transaction log stored with the Delta files contains all the metadata needed for an immediate query.

In [31]:
display(spark.read.table("health_tracker_gold_user_analytics"))

p_device_id,avg_heartrate,max_heartrate,stddev_heartrate
1,78.5776567337699,168.114687819,31.61967903784856
3,82.65419819635204,171.8435388833,30.92932874000444
4,83.08377376550952,173.5770785921,34.16032267669617
2,79.99574196662837,184.7433209566,31.408007741222
0,81.21484441523789,186.4790827731,31.343789198032887


In [32]:
file_path = health_tracker + "raw/health_tracker_data_2020_2.json"
 
health_tracker_data_2020_2_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [33]:
processedDF = process_health_tracker_data(health_tracker_data_2020_2_df)

In [34]:
(processedDF.write
 .mode("append")
 .format("delta")
 # did not even specify partition cols
 .save(health_tracker + "processed"))

In [35]:
display(dbutils.fs.ls(health_tracker + "processed/p_device_id=0/"))

path,name,size
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=0/_SUCCESS,_SUCCESS,0
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=0/_committed_2146857267709725675,_committed_2146857267709725675,124
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=0/_committed_7566592121338257719,_committed_7566592121338257719,124
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=0/_started_2146857267709725675,_started_2146857267709725675,0
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=0/_started_7566592121338257719,_started_7566592121338257719,0
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=0/part-00000-32df48db-4e4b-40c9-b99e-e9583745f2e2.c000.snappy.parquet,part-00000-32df48db-4e4b-40c9-b99e-e9583745f2e2.c000.snappy.parquet,11411
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=0/part-00000-581a45e6-f0df-4036-8a84-75a88530ae32.c000.snappy.parquet,part-00000-581a45e6-f0df-4036-8a84-75a88530ae32.c000.snappy.parquet,10762
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=0/part-00000-81d2ae7d-6f34-4e7a-b167-cdfe9a51e465.c000.snappy.parquet,part-00000-81d2ae7d-6f34-4e7a-b167-cdfe9a51e465.c000.snappy.parquet,10762
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=0/part-00000-tid-2146857267709725675-20aa07c6-6bcc-4ec6-ac3e-053531452f17-743-1.c000.snappy.parquet,part-00000-tid-2146857267709725675-20aa07c6-6bcc-4ec6-ac3e-053531452f17-743-1.c000.snappy.parquet,11411
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=0/part-00000-tid-7566592121338257719-ee9acf60-9ad2-4fa3-bfe3-5d951e6d6491-748-1.c000.snappy.parquet,part-00000-tid-7566592121338257719-ee9acf60-9ad2-4fa3-bfe3-5d951e6d6491-748-1.c000.snappy.parquet,11411


In [36]:
%sql DESCRIBE DETAIL health_tracker_processed

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
delta,f91281db-5b2b-4f58-9a17-7314349f8f21,db_jiaxi.health_tracker_processed,,dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed,2020-07-27T05:29:54.106+0000,2020-07-27T06:05:35.000+0000,List(p_device_id),10,109655,Map(),1,2


In [37]:
(spark.read
 .option("versionAsOf", 2)
 .format("delta")
 .load(health_tracker + "processed")
 .count())


In [38]:
health_tracker_processed.count()

In [39]:
from pyspark.sql.functions import count

display(
  spark.read
  .format("delta")
  .load(health_tracker + "processed")
  .groupby("p_device_id").count()
#   .agg(count("*"))
)

p_device_id,count
3,1440
1,1440
2,1440
4,1368
0,1440


In [40]:
display(
  spark.read
  .format("delta")
  .load(health_tracker + "processed")
  .where(col("p_device_id").isin([3,4]))
)

dte,time,heartrate,name,p_device_id
2020-01-01,2020-01-01T00:00:00.000+0000,55.2272036665,Minh Nguyen,3
2020-01-01,2020-01-01T01:00:00.000+0000,56.035689123,Minh Nguyen,3
2020-01-01,2020-01-01T02:00:00.000+0000,55.6403282219,Minh Nguyen,3
2020-01-01,2020-01-01T03:00:00.000+0000,56.3692513843,Minh Nguyen,3
2020-01-01,2020-01-01T04:00:00.000+0000,56.5412281859,Minh Nguyen,3
2020-01-01,2020-01-01T05:00:00.000+0000,55.8311481148,Minh Nguyen,3
2020-01-01,2020-01-01T06:00:00.000+0000,54.9402513831,Minh Nguyen,3
2020-01-01,2020-01-01T07:00:00.000+0000,92.2205431894,Minh Nguyen,3
2020-01-01,2020-01-01T08:00:00.000+0000,93.8159033652,Minh Nguyen,3
2020-01-01,2020-01-01T09:00:00.000+0000,92.0210547557,Minh Nguyen,3


In [41]:
broken_readings = (
  health_tracker_processed
  .select(col("heartrate"), col("dte"))
  .where(col("heartrate") < 0)
  .groupby("dte")
  .agg(count("heartrate"))
  .orderBy("dte")
)
 
broken_readings.createOrReplaceTempView("broken_readings")

In [42]:
display(spark.read.table('broken_readings'))

dte,count(heartrate)
2020-01-01,1
2020-01-02,1
2020-01-04,1
2020-01-06,1
2020-01-07,1
2020-01-09,2
2020-01-12,3
2020-01-13,2
2020-01-14,1
2020-01-16,2


In [43]:
%sql
SELECT sum(`count(heartrate)`) from broken_readings

sum(count(heartrate))
60


We identified two issues with the health_tracker_processed table:

1. There were 72 missing records
2. There were 67 records with broken readings

There are two patterns for modifying existing Delta tables. 

1. appending files to an existing directory of Delta files
2. merging a set of updates and insertions (*)

In [45]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag, lead
 
dteWindow = Window.partitionBy("p_device_id").orderBy("dte")
 
interpolatedDF = (
  spark.read
  .table("health_tracker_processed")
  .select(col("dte"),
          col("time"),
          col("heartrate"),
          lag(col("heartrate")).over(dteWindow).alias("prev_amt"),
          lead(col("heartrate")).over(dteWindow).alias("next_amt"),
          col("name"),
          col("p_device_id"))
)

In [46]:
display(interpolatedDF)

dte,time,heartrate,prev_amt,next_amt,name,p_device_id
2020-01-01,2020-01-01T00:00:00.000+0000,47.5378557652,,48.3496970512,Kristin Vasser,1
2020-01-01,2020-01-01T01:00:00.000+0000,48.3496970512,47.5378557652,49.1212033115,Kristin Vasser,1
2020-01-01,2020-01-01T02:00:00.000+0000,49.1212033115,48.3496970512,47.9982802854,Kristin Vasser,1
2020-01-01,2020-01-01T03:00:00.000+0000,47.9982802854,49.1212033115,47.841083408,Kristin Vasser,1
2020-01-01,2020-01-01T04:00:00.000+0000,47.841083408,47.9982802854,47.7225468025,Kristin Vasser,1
2020-01-01,2020-01-01T05:00:00.000+0000,47.7225468025,47.841083408,47.3041211781,Kristin Vasser,1
2020-01-01,2020-01-01T06:00:00.000+0000,47.3041211781,47.7225468025,49.1694935562,Kristin Vasser,1
2020-01-01,2020-01-01T07:00:00.000+0000,49.1694935562,47.3041211781,80.1944436044,Kristin Vasser,1
2020-01-01,2020-01-01T08:00:00.000+0000,80.1944436044,49.1694935562,80.0663193135,Kristin Vasser,1
2020-01-01,2020-01-01T09:00:00.000+0000,80.0663193135,80.1944436044,80.4780281723,Kristin Vasser,1


In [47]:
updatesDF = (
  interpolatedDF
  .where(col("heartrate") < 0)
  .select(col("dte"),
          col("time"),
#           col('heartrate').alias('old_heartrate_value'),
          ((col("prev_amt") + col("next_amt"))/2).alias("heartrate"),
          col("name"),
          col("p_device_id"))
)

In [48]:
display(updatesDF)

dte,time,heartrate,name,p_device_id
2020-01-02,2020-01-02T11:00:00.000+0000,79.4468974614,Kristin Vasser,1
2020-01-04,2020-01-04T20:00:00.000+0000,92.10732860255,Kristin Vasser,1
2020-01-06,2020-01-06T23:00:00.000+0000,55.435854391700005,Kristin Vasser,1
2020-01-07,2020-01-07T22:00:00.000+0000,78.55314577125,Kristin Vasser,1
2020-01-13,2020-01-13T04:00:00.000+0000,53.246325668,Kristin Vasser,1
2020-01-13,2020-01-13T18:00:00.000+0000,87.8613741164,Kristin Vasser,1
2020-01-22,2020-01-22T05:00:00.000+0000,55.02781121305,Kristin Vasser,1
2020-01-27,2020-01-27T18:00:00.000+0000,96.05970136825,Kristin Vasser,1
2020-02-01,2020-02-01T10:00:00.000+0000,88.26517343345,Kristin Vasser,1
2020-02-13,2020-02-13T14:00:00.000+0000,78.5661485906,Kristin Vasser,1


In [49]:
health_tracker_processed.printSchema()
updatesDF.printSchema()

In [50]:
updatesDF.count()

In [51]:
file_path = health_tracker + "raw/health_tracker_data_2020_2_late.json"
 
health_tracker_data_2020_2_late_df = (
  spark.read
  .format("json")
  .load(file_path)
)


In [52]:
health_tracker_data_2020_2_late_df.count()

In [53]:
insertsDF = process_health_tracker_data(health_tracker_data_2020_2_late_df)

In [54]:
insertsDF.printSchema()

In [55]:
upsertsDF = updatesDF.union(insertsDF)

In [56]:
display(upsertsDF.printSchema())
upsertsDF.count()

In [57]:
# from delta.tables import *
# processedDeltaTable = DeltaTable.forPath(spark, health_tracker + "processed") # path-based tables, or
processedDeltaTable = DeltaTable.forName(spark, 'health_tracker_processed')    # Hive metastore-based tables

update_match = """
  health_tracker.time = upserts.time 
  AND 
  health_tracker.p_device_id = upserts.p_device_id
"""

update = { "heartrate" : "upserts.heartrate" }

insert = {
  "p_device_id" : "upserts.p_device_id",
  "heartrate" : "upserts.heartrate",
  "name" : "upserts.name",
  "time" : "upserts.time",
  "dte" : "upserts.dte"
}

(processedDeltaTable.alias("health_tracker")
 .merge(upsertsDF.alias("upserts"), update_match)
 .whenMatchedUpdate(set=update)
 .whenNotMatchedInsert(values=insert)
 .execute())

In [58]:
(spark.read
 .option("versionAsOf", 3)
 .format("delta")
 .load(health_tracker + "processed")
 .count())

In [59]:
health_tracker_processed.count()

In [60]:
display(processedDeltaTable.history())

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics
4,2020-07-27T06:48:07.000+0000,100083,jiaxi.li@intellify.com.au,MERGE,Map(predicate -> ((health_tracker.`time` = upserts.`time`) AND (health_tracker.`p_device_id` = upserts.`p_device_id`))),,List(4769),0727-035638-billy1,3.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 7068, numTargetRowsDeleted -> 0, numFiles -> 40, numTargetFilesAfterSkipping -> 10, numTargetFilesAdded -> 40, numTargetRowsInserted -> 72, numTargetRowsUpdated -> 60, numOutputRows -> 7200, numParts -> 40, numOutputBytes -> 166790, numSourceRows -> 132, numTargetFilesRemoved -> 10, numTargetFilesBeforeSkipping -> 10)"
3,2020-07-27T06:05:35.000+0000,100083,jiaxi.li@intellify.com.au,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(4769),0727-035638-billy1,2.0,WriteSerializable,True,"Map(numFiles -> 5, numOutputBytes -> 52716, numOutputRows -> 3408, numParts -> 5)"
2,2020-07-27T06:02:17.000+0000,100083,jiaxi.li@intellify.com.au,WRITE,"Map(mode -> Overwrite, partitionBy -> [""p_device_id""])",,List(4769),0727-035638-billy1,1.0,WriteSerializable,False,"Map(numFiles -> 5, numOutputBytes -> 56937, numOutputRows -> 3720, numParts -> 5)"
1,2020-07-27T05:54:55.000+0000,100083,jiaxi.li@intellify.com.au,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(4769),0727-035638-billy1,0.0,WriteSerializable,True,"Map(numFiles -> 5, numOutputBytes -> 52716, numOutputRows -> 3408, numParts -> 5)"
0,2020-07-27T05:29:57.000+0000,100083,jiaxi.li@intellify.com.au,CONVERT,"Map(numFiles -> 10, partitionedBy -> [""p_device_id""], collectStats -> true)",,List(4769),0727-035638-billy1,,,,Map(numConvertedFiles -> 10)


In [61]:
from pyspark.sql.functions import count

display(
  spark.read
  .format("delta")
  .load(health_tracker + "processed")
  .groupby("p_device_id").count()
#   .agg(count("*"))
)

p_device_id,count
1,1440
3,1440
4,1440
2,1440
0,1440


In [62]:
%sql 

SELECT SUM(`count(heartrate)`) FROM broken_readings

sum(count(heartrate))
1


In [63]:
%sql 

SELECT * FROM broken_readings

dte,count(heartrate)
2020-02-27,1


In [64]:
updatesDF.count()

In [65]:
display(updatesDF)

dte,time,heartrate,name,p_device_id
2020-02-27,2020-02-27T05:00:00.000+0000,98.2417546888,James Hou,4


In [66]:
upsertsDF = updatesDF

(processedDeltaTable.alias("health_tracker")
 .merge(upsertsDF.alias("upserts"), update_match)
 .whenMatchedUpdate(set=update)
 .whenNotMatchedInsert(values=insert)
 .execute())

In [67]:
%sql

SELECT SUM(`count(heartrate)`) FROM broken_readings

sum(count(heartrate))
""


In [68]:
display(processedDeltaTable.history())

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics
5,2020-07-27T07:00:52.000+0000,100083,jiaxi.li@intellify.com.au,MERGE,Map(predicate -> ((health_tracker.`time` = upserts.`time`) AND (health_tracker.`p_device_id` = upserts.`p_device_id`))),,List(4769),0727-035638-billy1,4.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 204, numTargetRowsDeleted -> 0, numFiles -> 1, numTargetFilesAfterSkipping -> 40, numTargetFilesAdded -> 1, numTargetRowsInserted -> 0, numTargetRowsUpdated -> 1, numOutputRows -> 205, numParts -> 1, numOutputBytes -> 4504, numSourceRows -> 1, numTargetFilesRemoved -> 1, numTargetFilesBeforeSkipping -> 40)"
4,2020-07-27T06:48:07.000+0000,100083,jiaxi.li@intellify.com.au,MERGE,Map(predicate -> ((health_tracker.`time` = upserts.`time`) AND (health_tracker.`p_device_id` = upserts.`p_device_id`))),,List(4769),0727-035638-billy1,3.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 7068, numTargetRowsDeleted -> 0, numFiles -> 40, numTargetFilesAfterSkipping -> 10, numTargetFilesAdded -> 40, numTargetRowsInserted -> 72, numTargetRowsUpdated -> 60, numOutputRows -> 7200, numParts -> 40, numOutputBytes -> 166790, numSourceRows -> 132, numTargetFilesRemoved -> 10, numTargetFilesBeforeSkipping -> 10)"
3,2020-07-27T06:05:35.000+0000,100083,jiaxi.li@intellify.com.au,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(4769),0727-035638-billy1,2.0,WriteSerializable,True,"Map(numFiles -> 5, numOutputBytes -> 52716, numOutputRows -> 3408, numParts -> 5)"
2,2020-07-27T06:02:17.000+0000,100083,jiaxi.li@intellify.com.au,WRITE,"Map(mode -> Overwrite, partitionBy -> [""p_device_id""])",,List(4769),0727-035638-billy1,1.0,WriteSerializable,False,"Map(numFiles -> 5, numOutputBytes -> 56937, numOutputRows -> 3720, numParts -> 5)"
1,2020-07-27T05:54:55.000+0000,100083,jiaxi.li@intellify.com.au,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(4769),0727-035638-billy1,0.0,WriteSerializable,True,"Map(numFiles -> 5, numOutputBytes -> 52716, numOutputRows -> 3408, numParts -> 5)"
0,2020-07-27T05:29:57.000+0000,100083,jiaxi.li@intellify.com.au,CONVERT,"Map(numFiles -> 10, partitionedBy -> [""p_device_id""], collectStats -> true)",,List(4769),0727-035638-billy1,,,,Map(numConvertedFiles -> 10)


In [69]:
file_path = health_tracker + "raw/health_tracker_data_2020_3.json"

health_tracker_data_2020_3_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [70]:
def process_health_tracker_data(dataframe):
  return (
    dataframe
    .withColumn("time", from_unixtime("time"))
    .withColumnRenamed("device_id", "p_device_id")
    .withColumn("time", col("time").cast("timestamp"))
    .withColumn("dte", col("time").cast("date"))
    .withColumn("p_device_id", col("p_device_id").cast("integer"))
    .select("dte", "time", "device_type", "heartrate", "name", "p_device_id")
    )
  
processedDF = process_health_tracker_data(health_tracker_data_2020_3_df)

In [71]:
(processedDF.write
 .mode("append")
 .format("delta")
 .save(health_tracker + "processed"))

What Is schema enforcement?
Schema enforcement, also known as schema validation, is a safeguard in Delta Lake that ensures data quality by rejecting writes to a table that do not match the table’s schema. Like the front desk manager at a busy restaurant that only accepts reservations, it checks to see whether each column in data inserted into the table is on its list of expected columns (in other words, whether each one has a “reservation”), and rejects any writes with columns that aren’t on the list or with data type mismatches.

In [73]:
display(processedDF)

dte,time,device_type,heartrate,name,p_device_id
2020-03-01,2020-03-01T00:00:00.000+0000,version 2,57.6447293596,Deborah Powell,0
2020-03-01,2020-03-01T01:00:00.000+0000,version 2,57.6175546013,Deborah Powell,0
2020-03-01,2020-03-01T02:00:00.000+0000,version 2,57.8486376876,Deborah Powell,0
2020-03-01,2020-03-01T03:00:00.000+0000,version 2,57.8821378637,Deborah Powell,0
2020-03-01,2020-03-01T04:00:00.000+0000,version 2,59.0531490807,Deborah Powell,0
2020-03-01,2020-03-01T05:00:00.000+0000,version 2,58.8875470047,Deborah Powell,0
2020-03-01,2020-03-01T06:00:00.000+0000,version 2,98.5773130991,Deborah Powell,0
2020-03-01,2020-03-01T07:00:00.000+0000,version 2,97.2872071492,Deborah Powell,0
2020-03-01,2020-03-01T08:00:00.000+0000,version 2,96.6472644341,Deborah Powell,0
2020-03-01,2020-03-01T09:00:00.000+0000,version 2,98.1131919274,Deborah Powell,0


In [74]:
(processedDF.write
 .mode("append")
 .option("mergeSchema", True)
 .format("delta")
 .save(health_tracker + "processed"))

In [75]:
health_tracker_processed.count()

In [76]:
processedDF.printSchema()

In [77]:
health_tracker_processed.printSchema()

In [78]:
display(spark.read.table("health_tracker_processed"))

dte,time,heartrate,name,p_device_id,device_type
2020-01-01,2020-01-01T02:00:00.000+0000,52.7129593616,Deborah Powell,0,
2020-01-01,2020-01-01T12:00:00.000+0000,89.2044154758,Deborah Powell,0,
2020-01-02,2020-01-02T01:00:00.000+0000,59.4214354849,Deborah Powell,0,
2020-01-02,2020-01-02T02:00:00.000+0000,59.3844675677,Deborah Powell,0,
2020-01-02,2020-01-02T03:00:00.000+0000,60.2974234204,Deborah Powell,0,
2020-01-02,2020-01-02T13:00:00.000+0000,100.0287629659,Deborah Powell,0,
2020-01-02,2020-01-02T23:00:00.000+0000,59.4297495073,Deborah Powell,0,
2020-01-03,2020-01-03T04:00:00.000+0000,56.005265286,Deborah Powell,0,
2020-01-03,2020-01-03T08:00:00.000+0000,93.7377200318,Deborah Powell,0,
2020-01-03,2020-01-03T10:00:00.000+0000,93.5772314868,Deborah Powell,0,


In [79]:
from pyspark.sql.functions import lit
(spark.read
  .option("versionAsOf", 6)
  .format("delta")
  .load(health_tracker + "processed")
  .where("p_device_id = 4")
  .select("dte", "time", "device_type", "heartrate", lit(None).alias("name"), "p_device_id"))

In [80]:
processedDeltaTable.delete("p_device_id = 4")

In [81]:
%sql

select * from health_tracker_processed where p_device_id = 4

dte,time,heartrate,name,p_device_id,device_type


In [82]:
from pyspark.sql.functions import lit

upsertsDF = (
  spark.read
  .option("versionAsOf", 6)
  .format("delta")
  .load(health_tracker + "processed")
  .where("p_device_id = 4")
  .select("dte", "time", "device_type", "heartrate", lit(None).alias("name"), "p_device_id")
)

In [83]:
display(upsertsDF)

dte,time,device_type,heartrate,name,p_device_id
2020-01-01,2020-01-01T08:00:00.000+0000,,100.6013295271,,4
2020-01-01,2020-01-01T19:00:00.000+0000,,101.2125469924,,4
2020-01-01,2020-01-01T23:00:00.000+0000,,60.4378692462,,4
2020-01-02,2020-01-02T05:00:00.000+0000,,49.3500741525,,4
2020-01-02,2020-01-02T20:00:00.000+0000,,82.7006382938,,4
2020-01-03,2020-01-03T08:00:00.000+0000,,98.0384225058,,4
2020-01-03,2020-01-03T19:00:00.000+0000,,99.3962322003,,4
2020-01-03,2020-01-03T22:00:00.000+0000,,99.2013860104,,4
2020-01-04,2020-01-04T02:00:00.000+0000,,48.1513680372,,4
2020-01-04,2020-01-04T03:00:00.000+0000,,48.8240925017,,4


In [84]:
processedDeltaTable = DeltaTable.forPath(spark, health_tracker + "processed")

insert = {
  "dte" : "upserts.dte",
  "time" : "upserts.time",
  "device_type" : "upserts.device_type",
  "heartrate" : "upserts.heartrate",
  "name" : "upserts.name",
  "p_device_id" : "upserts.p_device_id"
}

(processedDeltaTable.alias("health_tracker")
 .merge(upsertsDF.alias("upserts"), update_match)
 .whenMatchedUpdate(set=update)
 .whenNotMatchedInsert(values=insert)
 .execute())

In [85]:
health_tracker_processed.count()

In [86]:
display(health_tracker_processed.where("p_device_id = 4"))

dte,time,heartrate,name,p_device_id
2020-03-01,2020-03-01T00:00:00.000+0000,97.8678768636,,4
2020-03-01,2020-03-01T01:00:00.000+0000,97.586595396,,4
2020-03-01,2020-03-01T02:00:00.000+0000,97.188151848,,4
2020-03-01,2020-03-01T03:00:00.000+0000,97.4361573672,,4
2020-03-01,2020-03-01T04:00:00.000+0000,95.8997954454,,4
2020-03-01,2020-03-01T05:00:00.000+0000,96.5277339825,,4
2020-03-01,2020-03-01T06:00:00.000+0000,98.1774838993,,4
2020-03-01,2020-03-01T07:00:00.000+0000,95.8929343311,,4
2020-03-01,2020-03-01T08:00:00.000+0000,95.545442375,,4
2020-03-01,2020-03-01T09:00:00.000+0000,95.3578614286,,4


d ## Maintain compliance with a vacuum operation

In [88]:
display(processedDeltaTable.history())

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics
8,2020-07-27T07:23:26.000+0000,100083,jiaxi.li@intellify.com.au,MERGE,Map(predicate -> ((health_tracker.`time` = upserts.`time`) AND (health_tracker.`p_device_id` = upserts.`p_device_id`))),,List(4769),0727-035638-billy1,7.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numFiles -> 2, numTargetFilesAfterSkipping -> 36, numTargetFilesAdded -> 2, numTargetRowsInserted -> 2184, numTargetRowsUpdated -> 0, numOutputRows -> 2184, numParts -> 2, numOutputBytes -> 34407, numSourceRows -> 2184, numTargetFilesRemoved -> 0, numTargetFilesBeforeSkipping -> 36)"
7,2020-07-27T07:21:58.000+0000,100083,jiaxi.li@intellify.com.au,DELETE,"Map(predicate -> [""(db_jiaxi.health_tracker_processed.`p_device_id` = 4)""])",,List(4769),0727-035638-billy1,6.0,WriteSerializable,False,"Map(numTotalRows -> 0, numRemovedFiles -> 9, numAddedFiles -> 0)"
6,2020-07-27T07:12:42.000+0000,100083,jiaxi.li@intellify.com.au,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(4769),0727-035638-billy1,5.0,WriteSerializable,True,"Map(numFiles -> 5, numOutputBytes -> 58508, numOutputRows -> 3720, numParts -> 5)"
5,2020-07-27T07:00:52.000+0000,100083,jiaxi.li@intellify.com.au,MERGE,Map(predicate -> ((health_tracker.`time` = upserts.`time`) AND (health_tracker.`p_device_id` = upserts.`p_device_id`))),,List(4769),0727-035638-billy1,4.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 204, numTargetRowsDeleted -> 0, numFiles -> 1, numTargetFilesAfterSkipping -> 40, numTargetFilesAdded -> 1, numTargetRowsInserted -> 0, numTargetRowsUpdated -> 1, numOutputRows -> 205, numParts -> 1, numOutputBytes -> 4504, numSourceRows -> 1, numTargetFilesRemoved -> 1, numTargetFilesBeforeSkipping -> 40)"
4,2020-07-27T06:48:07.000+0000,100083,jiaxi.li@intellify.com.au,MERGE,Map(predicate -> ((health_tracker.`time` = upserts.`time`) AND (health_tracker.`p_device_id` = upserts.`p_device_id`))),,List(4769),0727-035638-billy1,3.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 7068, numTargetRowsDeleted -> 0, numFiles -> 40, numTargetFilesAfterSkipping -> 10, numTargetFilesAdded -> 40, numTargetRowsInserted -> 72, numTargetRowsUpdated -> 60, numOutputRows -> 7200, numParts -> 40, numOutputBytes -> 166790, numSourceRows -> 132, numTargetFilesRemoved -> 10, numTargetFilesBeforeSkipping -> 10)"
3,2020-07-27T06:05:35.000+0000,100083,jiaxi.li@intellify.com.au,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(4769),0727-035638-billy1,2.0,WriteSerializable,True,"Map(numFiles -> 5, numOutputBytes -> 52716, numOutputRows -> 3408, numParts -> 5)"
2,2020-07-27T06:02:17.000+0000,100083,jiaxi.li@intellify.com.au,WRITE,"Map(mode -> Overwrite, partitionBy -> [""p_device_id""])",,List(4769),0727-035638-billy1,1.0,WriteSerializable,False,"Map(numFiles -> 5, numOutputBytes -> 56937, numOutputRows -> 3720, numParts -> 5)"
1,2020-07-27T05:54:55.000+0000,100083,jiaxi.li@intellify.com.au,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(4769),0727-035638-billy1,0.0,WriteSerializable,True,"Map(numFiles -> 5, numOutputBytes -> 52716, numOutputRows -> 3408, numParts -> 5)"
0,2020-07-27T05:29:57.000+0000,100083,jiaxi.li@intellify.com.au,CONVERT,"Map(numFiles -> 10, partitionedBy -> [""p_device_id""], collectStats -> true)",,List(4769),0727-035638-billy1,,,,Map(numConvertedFiles -> 10)


In [89]:
display(
  spark.read
  .option("versionAsOf", 8)
  .format("delta")
  .load(health_tracker + "processed")
  .where("p_device_id = 4")
)

dte,time,heartrate,name,p_device_id,device_type
2020-03-01,2020-03-01T00:00:00.000+0000,97.8678768636,,4,version 2
2020-03-01,2020-03-01T01:00:00.000+0000,97.586595396,,4,version 2
2020-03-01,2020-03-01T02:00:00.000+0000,97.188151848,,4,version 2
2020-03-01,2020-03-01T03:00:00.000+0000,97.4361573672,,4,version 2
2020-03-01,2020-03-01T04:00:00.000+0000,95.8997954454,,4,version 2
2020-03-01,2020-03-01T05:00:00.000+0000,96.5277339825,,4,version 2
2020-03-01,2020-03-01T06:00:00.000+0000,98.1774838993,,4,version 2
2020-03-01,2020-03-01T07:00:00.000+0000,95.8929343311,,4,version 2
2020-03-01,2020-03-01T08:00:00.000+0000,95.545442375,,4,version 2
2020-03-01,2020-03-01T09:00:00.000+0000,95.3578614286,,4,version 2


In [90]:
%fs ls /dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=4/

path,name,size
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=4/_SUCCESS,_SUCCESS,0
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=4/_committed_2146857267709725675,_committed_2146857267709725675,124
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=4/_committed_7566592121338257719,_committed_7566592121338257719,124
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=4/_started_2146857267709725675,_started_2146857267709725675,0
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=4/_started_7566592121338257719,_started_7566592121338257719,0
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=4/part-00000-c2da3b34-4904-4492-9167-dccba6a35d64.c000.snappy.parquet,part-00000-c2da3b34-4904-4492-9167-dccba6a35d64.c000.snappy.parquet,23226
dbfs:/dbacademy/jiaxi/DLRS/healthtracker/processed/p_device_id=4/part-00001-52f824a6-3b8d-44dd-b80c-78bfddad5506.c000.snappy.parquet,part-00001-52f824a6-3b8d-44dd-b80c-78bfddad5506.c000.snappy.parquet,11182


In [91]:
processedDeltaTable.vacuum(0)

In [92]:
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", False)
processedDeltaTable.vacuum(0)

In [93]:
display(processedDeltaTable.history())

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics
8,2020-07-27T07:23:26.000+0000,100083,jiaxi.li@intellify.com.au,MERGE,Map(predicate -> ((health_tracker.`time` = upserts.`time`) AND (health_tracker.`p_device_id` = upserts.`p_device_id`))),,List(4769),0727-035638-billy1,7.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numFiles -> 2, numTargetFilesAfterSkipping -> 36, numTargetFilesAdded -> 2, numTargetRowsInserted -> 2184, numTargetRowsUpdated -> 0, numOutputRows -> 2184, numParts -> 2, numOutputBytes -> 34407, numSourceRows -> 2184, numTargetFilesRemoved -> 0, numTargetFilesBeforeSkipping -> 36)"
7,2020-07-27T07:21:58.000+0000,100083,jiaxi.li@intellify.com.au,DELETE,"Map(predicate -> [""(db_jiaxi.health_tracker_processed.`p_device_id` = 4)""])",,List(4769),0727-035638-billy1,6.0,WriteSerializable,False,"Map(numTotalRows -> 0, numRemovedFiles -> 9, numAddedFiles -> 0)"
6,2020-07-27T07:12:42.000+0000,100083,jiaxi.li@intellify.com.au,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(4769),0727-035638-billy1,5.0,WriteSerializable,True,"Map(numFiles -> 5, numOutputBytes -> 58508, numOutputRows -> 3720, numParts -> 5)"
5,2020-07-27T07:00:52.000+0000,100083,jiaxi.li@intellify.com.au,MERGE,Map(predicate -> ((health_tracker.`time` = upserts.`time`) AND (health_tracker.`p_device_id` = upserts.`p_device_id`))),,List(4769),0727-035638-billy1,4.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 204, numTargetRowsDeleted -> 0, numFiles -> 1, numTargetFilesAfterSkipping -> 40, numTargetFilesAdded -> 1, numTargetRowsInserted -> 0, numTargetRowsUpdated -> 1, numOutputRows -> 205, numParts -> 1, numOutputBytes -> 4504, numSourceRows -> 1, numTargetFilesRemoved -> 1, numTargetFilesBeforeSkipping -> 40)"
4,2020-07-27T06:48:07.000+0000,100083,jiaxi.li@intellify.com.au,MERGE,Map(predicate -> ((health_tracker.`time` = upserts.`time`) AND (health_tracker.`p_device_id` = upserts.`p_device_id`))),,List(4769),0727-035638-billy1,3.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 7068, numTargetRowsDeleted -> 0, numFiles -> 40, numTargetFilesAfterSkipping -> 10, numTargetFilesAdded -> 40, numTargetRowsInserted -> 72, numTargetRowsUpdated -> 60, numOutputRows -> 7200, numParts -> 40, numOutputBytes -> 166790, numSourceRows -> 132, numTargetFilesRemoved -> 10, numTargetFilesBeforeSkipping -> 10)"
3,2020-07-27T06:05:35.000+0000,100083,jiaxi.li@intellify.com.au,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(4769),0727-035638-billy1,2.0,WriteSerializable,True,"Map(numFiles -> 5, numOutputBytes -> 52716, numOutputRows -> 3408, numParts -> 5)"
2,2020-07-27T06:02:17.000+0000,100083,jiaxi.li@intellify.com.au,WRITE,"Map(mode -> Overwrite, partitionBy -> [""p_device_id""])",,List(4769),0727-035638-billy1,1.0,WriteSerializable,False,"Map(numFiles -> 5, numOutputBytes -> 56937, numOutputRows -> 3720, numParts -> 5)"
1,2020-07-27T05:54:55.000+0000,100083,jiaxi.li@intellify.com.au,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(4769),0727-035638-billy1,0.0,WriteSerializable,True,"Map(numFiles -> 5, numOutputBytes -> 52716, numOutputRows -> 3408, numParts -> 5)"
0,2020-07-27T05:29:57.000+0000,100083,jiaxi.li@intellify.com.au,CONVERT,"Map(numFiles -> 10, partitionedBy -> [""p_device_id""], collectStats -> true)",,List(4769),0727-035638-billy1,,,,Map(numConvertedFiles -> 10)


In [94]:
display(
  spark.read
  .option("versionAsOf", 6)
  .format("delta")
  .load(health_tracker + "processed")
  .where("p_device_id = 4")
)