In [None]:
import pyspark
from delta import configure_spark_with_delta_pip

try:
    spark.stop()
except NameError:
    pass

# This will get wiped each time the container is shutdown and is just for demo purposes 
warehouse_location = "/tmp/spark-warehouse"
# Clean out each run of this cell for demo
%rm -fr {warehouse_location}

builder = pyspark.sql.SparkSession.builder.appName("Delta") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.warehouse.dir", warehouse_location)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [None]:
# Create in-memory SQL DataFrame from JSON
df = spark.read.format("json").load("/sampledata/health_tracker_data_2020_01.json")

In [None]:
df.printSchema()

In [None]:
# No tables yet
spark.catalog.listTables()

In [None]:
from pyspark.sql.functions import column, date_format, from_unixtime

# Create cleaner silver table, converting timestamp to a time and adding a date column
df_silver = df \
  .withColumnRenamed("device_id", "p_device_id") \
  .withColumn("timestamp", from_unixtime(column("timestamp"))) \
  .withColumn("date", date_format(column("timestamp"), format="y-MM-DD"))
df_silver.write.saveAsTable("health_tracker_silver", format="delta", partitionBy="p_device_id", path=warehouse_location + "/DLRS/healthtracker/silver")

In [None]:
# Show table in memory
spark.catalog.listTables()

In [None]:
spark.sql("DESCRIBE DETAIL health_tracker_silver").toPandas()

In [None]:
# files are partitioned by requested column
%ls {warehouse_location}/DLRS/healthtracker/silver 

In [None]:
print(f"Number of rows {df_silver.count()}")

In [None]:
# Create gold table/data mart for basic summary data. Generally created when the performance of SQL
# queries on the silver tables is not good enough

# SQL
# CREATE TABLE health_tracker_user_analytics
# USING DELTA
# LOCATION 'warehouse_location/DLRS/healthtracker/gold/health_tracker_user_analytics'
# AS (
#   SELECT p_device_id,
#          AVG(heartrate) AS avg_heartrate,
#          STD(heartrate) AS std_heartrate,
#          MAX(heartrate) AS max_heartrate
#   FROM health_tracker_silver GROUP BY p_device_id
# )

# Python DataFrame API
from pyspark.sql.functions import avg, max, stddev

df_gold = df_silver.groupBy("p_device_id").agg(avg("heartrate"), max("heartrate"), stddev("heartrate"))
df_gold.show()

In [None]:
# Update silver tables with data from February
df_silver = spark.read.format("json").load("/sampledata/health_tracker_data_2020_02.json") \
  .withColumnRenamed("device_id", "p_device_id") \
  .withColumn("timestamp", from_unixtime(column("timestamp"))) \
  .withColumn("date", date_format(column("timestamp"), format="y-MM-DD")).union(df_silver)

In [None]:
df_silver.show()
print(f"Number of rows {df_silver.count()}")

In [None]:
df_silver.write.saveAsTable("health_tracker_silver", mode="overwrite", format="delta", partitionBy="p_device_id", path=warehouse_location + "/DLRS/healthtracker/silver")

In [None]:
spark.sql("DESCRIBE HISTORY health_tracker_silver").show()

In [None]:
# Delta tables retain their history
spark.sql("SELECT COUNT(*) FROM health_tracker_silver").show()

In [None]:
spark.sql("SELECT COUNT(*) FROM health_tracker_silver VERSION AS OF 0").show()

In [None]:
# The next set of cells fixes and inserts new data to cope with common problems:
#  - corrupted source data (in our case negative heartrate)
#  - missing data (in our case we will load in the march data)
# Accomplished in one shot with an "upsert"

# View broken records
df_broken = df_silver[df_silver.heartrate < 0.]
df_broken.orderBy("p_device_id").show()

In [None]:
# Create a DF just containing values with updates
from pyspark.sql import Window
from pyspark.sql.functions import lag, lead, monotonically_increasing_id

df_tmp = df_silver\
    .withColumn("prev_hr", lag(df_silver["heartrate"]).over(Window.partitionBy("p_device_id").orderBy("timestamp")))\
    .withColumn("next_hr", lead(df_silver["heartrate"]).over(Window.partitionBy("p_device_id").orderBy("timestamp")))[df_silver.heartrate < 0.]
df_updates = df_tmp.withColumn("heartrate", 0.5*(df_tmp["prev_hr"] + df_tmp["next_hr"])).select("p_device_id", "heartrate","name", "timestamp", "date")
df_updates.show()


In [None]:
# Create a DF with values to insert.
# In this case for the demo we will ignore the broken data as we just want a dataset to insert
df_inserts = spark.read.format("json").load("/sampledata/health_tracker_data_2020_03.json") \
  .withColumnRenamed("device_id", "p_device_id") \
  .withColumn("timestamp", from_unixtime(column("timestamp"))) \
  .withColumn("date", date_format(column("timestamp"), format="y-MM-DD"))
df_inserts = df_inserts[df_inserts.heartrate > 0.]

In [None]:
# Create an upserts DF containing all data for corrections
df_upserts = df_updates.union(df_inserts)

In [None]:
# Merge with the existing silver table. This only works for delta tables.
from delta.tables import DeltaTable

print(f"Number of silver records before upsert: {df_silver.count()}")
# Create DeltaTable object from existing silver delta table 
delta_table_silver = DeltaTable.forName(spark, "health_tracker_silver")

delta_table_silver.alias("silver") \
  .merge(df_upserts.alias("upserts"), 
        "silver.p_device_id = upserts.p_device_id and silver.timestamp = upserts.timestamp") \
  .whenMatchedUpdate(set = {
    "heartrate": "upserts.heartrate"
  }) \
  .whenNotMatchedInsert(values = {
    "p_device_id": "upserts.p_device_id",
    "heartrate": "upserts.heartrate",
    "name": "upserts.name",
    "timestamp": "upserts.timestamp",
    "date": "upserts.date"
  })\
  .execute()

df_silver = delta_table_silver.toDF()
print(f"Number of silver records after upsert: {df_silver.count()}")
print(f"Number of broken records after upsert: {df_silver[df_silver.heartrate < 0.].count()}")


In [None]:
df_silver[df_silver.p_device_id == 0].toPandas().plot(x="timestamp", y="heartrate")

In [None]:
spark.sql("DESCRIBE HISTORY health_tracker_silver").show()