In [0]:
username = "zainafzal"
dbutils.widgets.text("username", username)
spark.sql(f"CREATE DATABASE IF NOT EXISTS dbacademy_{username}")
spark.sql(f"USE dbacademy_{username}")
health_tracker = f"/dbacademy/{username}/DLRS/healthtracker/"

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", 8)


In [0]:
%sh

wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_1.json
wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_2.json
wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_2_late.json
wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_3.json

In [0]:
%sh ls

In [0]:
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_1.json", 
              health_tracker + "raw/health_tracker_data_2020_1.json")
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_2.json", 
              health_tracker + "raw/health_tracker_data_2020_2.json")
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_2_late.json", 
              health_tracker + "raw/health_tracker_data_2020_2_late.json")
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_3.json", 
              health_tracker + "raw/health_tracker_data_2020_3.json")

In [0]:
file_path = health_tracker + "raw/health_tracker_data_2020_1.json"
 
health_tracker_data_2020_1_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [0]:
display(health_tracker_data_2020_1_df)


device_id,heartrate,name,time
0,52.8139067501,Deborah Powell,1577836800.0
0,53.9078900098,Deborah Powell,1577840400.0
0,52.7129593616,Deborah Powell,1577844000.0
0,52.2880422685,Deborah Powell,1577847600.0
0,52.5156095386,Deborah Powell,1577851200.0
0,53.6280743846,Deborah Powell,1577854800.0
0,52.1760037066,Deborah Powell,1577858400.0
0,90.0456721836,Deborah Powell,1577862000.0
0,89.4695644522,Deborah Powell,1577865600.0
0,88.1490304138,Deborah Powell,1577869200.0


In [0]:
dbutils.fs.rm(health_tracker + "processed", recurse=True)


In [0]:
from pyspark.sql.functions import col, from_unixtime
 
def process_health_tracker_data(dataframe):
  return (
    dataframe
    .withColumn("time", from_unixtime("time"))
    .withColumnRenamed("device_id", "p_device_id")
    .withColumn("time", col("time").cast("timestamp"))
    .withColumn("dte", col("time").cast("date"))
    .withColumn("p_device_id", col("p_device_id").cast("integer"))
    .select("dte", "time", "heartrate", "name", "p_device_id")
    )
  
processedDF = process_health_tracker_data(health_tracker_data_2020_1_df)

In [0]:
processedDF.show(3)

In [0]:
(processedDF.write
 .mode("overwrite")
 .format("parquet")
 .partitionBy("p_device_id")
 .save(health_tracker + "processed"))

In [0]:
%sql 

DROP TABLE IF EXISTS health_tracker_processed;

CREATE TABLE health_tracker_processed                        
USING PARQUET                
LOCATION "/dbacademy/$username/DLRS/healthtracker/processed"

In [0]:
health_tracker_processed = spark.read.table("health_tracker_processed")
health_tracker_processed.count()

In [0]:
%sql

MSCK REPAIR TABLE health_tracker_processed


In [0]:
health_tracker_processed.count()


In [0]:
from delta.tables import DeltaTable

parquet_table = f"parquet.`{health_tracker}processed`"
partitioning_scheme = "p_device_id int"

DeltaTable.convertToDelta(spark, parquet_table, partitioning_scheme)

In [0]:
%sql

DROP TABLE IF EXISTS health_tracker_processed;

CREATE TABLE health_tracker_processed
USING DELTA
LOCATION "/dbacademy/$username/DLRS/healthtracker/processed"

In [0]:
%sql
DESCRIBE DETAIL health_tracker_processed


format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
delta,b0c8a195-ecc6-45b8-81c8-4e576beb224d,dbacademy_zainafzal.health_tracker_processed,,dbfs:/dbacademy/zainafzal/DLRS/healthtracker/processed,2021-08-17T12:28:28.004+0000,2021-08-17T12:28:34.000+0000,List(p_device_id),5,57263,Map(),1,2


In [0]:
health_tracker_processed = spark.read.table("health_tracker_processed")
health_tracker_processed.count()

In [0]:
dbutils.fs.rm(health_tracker + "gold/health_tracker_user_analytics",
              recurse=True)

In [0]:
from pyspark.sql.functions import col, avg, max, stddev

health_tracker_gold_user_analytics = (
  health_tracker_processed
  .groupby("p_device_id")
  .agg(avg(col("heartrate")).alias("avg_heartrate"),
       max(col("heartrate")).alias("max_heartrate"),
       stddev(col("heartrate")).alias("stddev_heartrate"))
)

In [0]:
(health_tracker_gold_user_analytics.write
 .format("delta")
 .mode("overwrite")
 .save(health_tracker + "gold/health_tracker_user_analytics"))

In [0]:
%sql

DROP TABLE IF EXISTS health_tracker_gold_user_analytics;

CREATE TABLE health_tracker_gold_user_analytics
USING DELTA
LOCATION "/dbacademy/$username/DLRS/healthtracker/gold/health_tracker_user_analytics"

In [0]:
display(spark.read.table("health_tracker_gold_user_analytics"))


p_device_id,avg_heartrate,max_heartrate,stddev_heartrate
1,78.5776567337699,168.114687819,31.61967903784856
0,81.21484441523789,186.4790827731,31.343789198032887
3,82.65419819635204,171.8435388833,30.92932874000444
4,83.08377376550952,173.5770785921,34.16032267669617
2,79.99574196662837,184.7433209566,31.408007741222


In [0]:
file_path = health_tracker + "raw/health_tracker_data_2020_2.json"
 
health_tracker_data_2020_2_df = (
  spark.read
  .format("json")
  .load(file_path)
)


In [0]:
processedDF = process_health_tracker_data(health_tracker_data_2020_2_df)


In [0]:
(processedDF.write
 .mode("append")
 .format("delta")
 .save(health_tracker + "processed"))

In [0]:
(spark.read
 .option("versionAsOf", 0)
 .format("delta")
 .load(health_tracker + "processed")
 .count())

In [0]:
  health_tracker_processed.count()


In [0]:
from pyspark.sql.functions import count

display(
  spark.read
  .format("delta")
  .load(health_tracker + "processed")
  .groupby("p_device_id")
  .agg(count("*"))
)

p_device_id,count(1)
1,1440
3,1440
2,1440
4,1368
0,1440


In [0]:
display(
  spark.read
  .format("delta")
  .load(health_tracker + "processed")
  .where(col("p_device_id").isin([3,4]))
)

dte,time,heartrate,name,p_device_id
2020-01-01,2020-01-01T00:00:00.000+0000,55.2272036665,Minh Nguyen,3
2020-01-01,2020-01-01T01:00:00.000+0000,56.035689123,Minh Nguyen,3
2020-01-01,2020-01-01T02:00:00.000+0000,55.6403282219,Minh Nguyen,3
2020-01-01,2020-01-01T03:00:00.000+0000,56.3692513843,Minh Nguyen,3
2020-01-01,2020-01-01T04:00:00.000+0000,56.5412281859,Minh Nguyen,3
2020-01-01,2020-01-01T05:00:00.000+0000,55.8311481148,Minh Nguyen,3
2020-01-01,2020-01-01T06:00:00.000+0000,54.9402513831,Minh Nguyen,3
2020-01-01,2020-01-01T07:00:00.000+0000,92.2205431894,Minh Nguyen,3
2020-01-01,2020-01-01T08:00:00.000+0000,93.8159033652,Minh Nguyen,3
2020-01-01,2020-01-01T09:00:00.000+0000,92.0210547557,Minh Nguyen,3


In [0]:
broken_readings = (
  health_tracker_processed
  .select(col("heartrate"), col("dte"))
  .where(col("heartrate") < 0)
  .groupby("dte")
  .agg(count("heartrate"))
  .orderBy("dte")
)
 
broken_readings.createOrReplaceTempView("broken_readings")

In [0]:
%sql

SELECT * FROM broken_readings

dte,count(heartrate)
2020-01-01,1
2020-01-02,1
2020-01-04,1
2020-01-06,1
2020-01-07,1
2020-01-09,2
2020-01-12,3
2020-01-13,2
2020-01-14,1
2020-01-16,2


In [0]:
%sql
 
SELECT SUM(`count(heartrate)`) FROM broken_readings


sum(count(heartrate))
60


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag, lead
 
dteWindow = Window.partitionBy("p_device_id").orderBy("dte")
 
interpolatedDF = (
  spark.read
  .table("health_tracker_processed")
  .select(col("dte"),
          col("time"),
          col("heartrate"),
          lag(col("heartrate")).over(dteWindow).alias("prev_amt"),
          lead(col("heartrate")).over(dteWindow).alias("next_amt"),
          col("name"),
          col("p_device_id"))
)

In [0]:
updatesDF = (
  interpolatedDF
  .where(col("heartrate") < 0)
  .select(col("dte"),
          col("time"),
          ((col("prev_amt") + col("next_amt"))/2).alias("heartrate"),
          col("name"),
          col("p_device_id"))
)

In [0]:
health_tracker_processed.printSchema()
updatesDF.printSchema()

In [0]:
updatesDF.count()


In [0]:
file_path = health_tracker + "raw/health_tracker_data_2020_2_late.json"
 
health_tracker_data_2020_2_late_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [0]:
insertsDF = process_health_tracker_data(health_tracker_data_2020_2_late_df)


In [0]:
insertsDF.printSchema()


In [0]:
upsertsDF = updatesDF.union(insertsDF)


In [0]:
processedDeltaTable = DeltaTable.forPath(spark, health_tracker + "processed")

update_match = """
  health_tracker.time = upserts.time 
  AND 
  health_tracker.p_device_id = upserts.p_device_id
"""

update = { "heartrate" : "upserts.heartrate" }

insert = {
  "p_device_id" : "upserts.p_device_id",
  "heartrate" : "upserts.heartrate",
  "name" : "upserts.name",
  "time" : "upserts.time",
  "dte" : "upserts.dte"
}

(processedDeltaTable.alias("health_tracker")
 .merge(upsertsDF.alias("upserts"), update_match)
 .whenMatchedUpdate(set=update)
 .whenNotMatchedInsert(values=insert)
 .execute())

In [0]:
(spark.read
 .option("versionAsOf", 1)
 .format("delta")
 .load(health_tracker + "processed")
 .count())

In [0]:
health_tracker_processed.count()


In [0]:
display(processedDeltaTable.history())


version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
2,2021-08-17T12:55:20.000+0000,4952005494363215,zainafzal003@gmail.com,MERGE,"Map(predicate -> ((health_tracker.`time` = upserts.`time`) AND (health_tracker.`p_device_id` = upserts.`p_device_id`)), matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(370147426899128),0817-110527-root527,1,WriteSerializable,False,"Map(numTargetRowsCopied -> 7068, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 5, executionTimeMs -> 14626, numTargetRowsInserted -> 72, scanTimeMs -> 5596, numTargetRowsUpdated -> 60, numOutputRows -> 7200, numTargetChangeFilesAdded -> 0, numSourceRows -> 132, numTargetFilesRemoved -> 10, rewriteTimeMs -> 8914)",
1,2021-08-17T12:37:45.000+0000,4952005494363215,zainafzal003@gmail.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(370147426899128),0817-110527-root527,0,WriteSerializable,True,"Map(numFiles -> 5, numOutputBytes -> 53042, numOutputRows -> 3408)",
0,2021-08-17T12:28:34.000+0000,4952005494363215,zainafzal003@gmail.com,CONVERT,"Map(numFiles -> 5, partitionedBy -> [""p_device_id""], collectStats -> true)",,List(370147426899128),0817-110527-root527,-1,Serializable,False,Map(numConvertedFiles -> 5),


In [0]:
%sql 

SELECT SUM(`count(heartrate)`) FROM broken_readings

sum(count(heartrate))
1


In [0]:
%sql 

SELECT SUM(`count(heartrate)`) FROM broken_readings WHERE dte < '2020-02-25'


sum(count(heartrate))
""


In [0]:
updatesDF.count()


In [0]:
upsertsDF = updatesDF

(processedDeltaTable.alias("health_tracker")
 .merge(upsertsDF.alias("upserts"), update_match)
 .whenMatchedUpdate(set=update)
 .whenNotMatchedInsert(values=insert)
 .execute())

In [0]:
%sql

SELECT SUM(`count(heartrate)`) FROM broken_readings

sum(count(heartrate))
""


In [0]:
file_path = health_tracker + "raw/health_tracker_data_2020_3.json"

health_tracker_data_2020_3_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [0]:
def process_health_tracker_data(dataframe):
  return (
    dataframe
    .withColumn("time", from_unixtime("time"))
    .withColumnRenamed("device_id", "p_device_id")
    .withColumn("time", col("time").cast("timestamp"))
    .withColumn("dte", col("time").cast("date"))
    .withColumn("p_device_id", col("p_device_id").cast("integer"))
    .select("dte", "time", "device_type", "heartrate", "name", "p_device_id")
    )
  
processedDF = process_health_tracker_data(health_tracker_data_2020_3_df)

In [0]:
(processedDF.write
 .mode("append")
 .option("mergeSchema", True)
 .format("delta")
 .save(health_tracker + "processed"))

In [0]:
health_tracker_processed.count()


In [0]:
processedDeltaTable.delete("p_device_id = 4")


In [0]:
from pyspark.sql.functions import lit

upsertsDF = (
  spark.read
  .option("versionAsOf", 4)
  .format("delta")
  .load(health_tracker + "processed")
  .where("p_device_id = 4")
  .select("dte", "time", "device_type", "heartrate", lit(None).alias("name"), "p_device_id")
)

In [0]:
processedDeltaTable = DeltaTable.forPath(spark, health_tracker + "processed")

insert = {
  "dte" : "upserts.dte",
  "time" : "upserts.time",
  "device_type" : "upserts.device_type",
  "heartrate" : "upserts.heartrate",
  "name" : "upserts.name",
  "p_device_id" : "upserts.p_device_id"
}

(processedDeltaTable.alias("health_tracker")
 .merge(upsertsDF.alias("upserts"), update_match)
 .whenMatchedUpdate(set=update)
 .whenNotMatchedInsert(values=insert)
 .execute())

In [0]:
health_tracker_processed.count()


In [0]:
display(health_tracker_processed.where("p_device_id = 4"))


dte,time,heartrate,name,p_device_id
2020-03-01,2020-03-01T00:00:00.000+0000,97.8678768636,,4
2020-03-01,2020-03-01T01:00:00.000+0000,97.586595396,,4
2020-03-01,2020-03-01T02:00:00.000+0000,97.188151848,,4
2020-03-01,2020-03-01T03:00:00.000+0000,97.4361573672,,4
2020-03-01,2020-03-01T04:00:00.000+0000,95.8997954454,,4
2020-03-01,2020-03-01T05:00:00.000+0000,96.5277339825,,4
2020-03-01,2020-03-01T06:00:00.000+0000,98.1774838993,,4
2020-03-01,2020-03-01T07:00:00.000+0000,95.8929343311,,4
2020-03-01,2020-03-01T08:00:00.000+0000,95.545442375,,4
2020-03-01,2020-03-01T09:00:00.000+0000,95.3578614286,,4
