Upserting hourly batch bikesharing data
1. Create an intial gold table that is missing the most recent four hours
1. Upsert with the entire dataset

In [2]:
# Read in our data
df = spark.read.format('delta').load("/mnt/delta/silver/bikeSharing/hourly")

In [3]:
# subtract a few hours from the max date so we can filter
from datetime import timedelta
max_datetime = df.agg({"dteday": "max"}).collect()[0][0]-timedelta(hours=4)
max_datetime

In [4]:
# Summarize data, filter to keep away the most recent 2 hours, write the data
from pyspark.sql.functions import col, unix_timestamp
from pyspark.sql.types import DateType
summary_df = (df 
              .withColumn("datetime", (unix_timestamp(col("dteday"))+col("hr")*3600).cast("timestamp")) # we create a datetime column to filter on using the hr column and dteday column
              .filter(col("datetime") < max_datetime)
              .select("dteday", "cnt")
              .groupBy("dteday")
              .sum("cnt")
              .select(col("dteday").alias("date"), col("sum(cnt)").alias("cnt")))

In [5]:
# display, and use the html table to sort and see '2012-12-30'
display(summary_df)

In [6]:
# Write the data
summary_df.write.format("delta").save("/mnt/delta/gold/bikeSharing/daily_summar_for_upsert")

Quickly see how we are able to enforce a schema with Delta!

Notice the following:
- Line 11: Cast the date column to a date type (was a timestamp)
- Line 13: We are overwriting the delta table (as opposed to appending)

In [8]:
#### This should throw an Error
## See the schema enforcement
# Summarize data, filter to keep away the most recent 2 hours, write the data
from pyspark.sql.functions import col, unix_timestamp
from pyspark.sql.types import DateType
summary_df = (df 
              .withColumn("datetime", (unix_timestamp(col("dteday"))+col("hr")*3600).cast("timestamp"))
              .filter(col("datetime") < max_datetime)
              .select("dteday", "cnt")
              .groupBy("dteday")
              .sum("cnt")
              .select(col("dteday").alias("date").cast("date"), col("sum(cnt)").alias("cnt")))

summary_df.write.format("delta").mode("overwrite").save("/mnt/delta/gold/bikeSharing/daily_summar_for_upsert")

Lets do an upsert

In [10]:
# Reread the data, summarize and upsert
whole_df = spark.read.format('delta').load("/mnt/delta/silver/bikeSharing/hourly")

upsert_df = (whole_df
              .withColumn("date", col("dteday").cast(DateType()))
              .select("date", "cnt")
              .groupBy("date")
              .sum("cnt")
              .select(col("date"), col("sum(cnt)").alias("cnt")))

# register as a temp table
upsert_df.registerTempTable("bike_upsert")

In [11]:
# display to show diff, filter to see the '2012-12-31' row is added to the whole dataframe and the '2012-12-30' is a different number
display(upsert_df)

In [12]:
# register delta table in our database. Open the "Data" tab and you will see it in the default database. 
spark.sql("""
  DROP TABLE IF EXISTS bike_counts
""")

spark.sql("""
  CREATE TABLE bike_counts
  USING DELTA
  LOCATION '{}'
""".format("/mnt/delta/gold/bikeSharing/daily_summar_for_upsert"))

In [13]:
%sql

MERGE INTO bike_counts
USING bike_upsert
ON bike_counts.date = bike_upsert.date
WHEN MATCHED THEN
  UPDATE SET cnt = bike_upsert.cnt
WHEN NOT MATCHED THEN
  INSERT *

In [14]:
# Load and display delta table to see that it was updated appropriately
display(spark.sql("select * from bike_counts"))

Now lets check out the time travel feature! 

The delta table that we just created has two versions available. Lets check out both versions.

In [16]:
%sql 
DESCRIBE HISTORY bike_counts

In [17]:
%sql
SELECT *
FROM bike_counts
VERSION AS OF 0

You can even query and compare between the different versions of the table

In [19]:
%sql
SELECT count(*) - (
  SELECT count(*)
  FROM bike_counts
  VERSION AS OF 0 ) AS new_entries
FROM bike_counts