# Exercise 03 : Databricks Delta
With delta table, you can handle streaming data more effectively with high performance reading, concurrent data modification, and consistency, etc. (This needs **Premium** tier and **Databricks Runtime 4.1 or above**.)

In [2]:
# Same as exercise04
dbutils.fs.rm("/tmp/structured-streaming/events", recurse=True)
dbutils.fs.put(
  "/tmp/structured-streaming/events/file01.json",
  """{"event_name":"Open","event_time":1540601000}
{"event_name":"Open","event_time":1540601010}
{"event_name":"Fail","event_time":1540601020}
{"event_name":"Open","event_time":1540601030}
{"event_name":"Open","event_time":1540601040}
{"event_name":"Open","event_time":1540601050}
{"event_name":"Open","event_time":1540601060}
{"event_name":"Fail","event_time":1540601070}
{"event_name":"Open","event_time":1540601080}
{"event_name":"Open","event_time":1540601090}
""", True)

In [3]:
dbutils.fs.rm("/tmp/delta", recurse=True)

Create delta table as follows

In [5]:
%sql
create database if not exists testdb;
USE testdb;
DROP TABLE IF EXISTS testdb.table01;
CREATE TABLE testdb.table01(
  event_name STRING,
  event_time TIMESTAMP
)
USING delta 
LOCATION "/tmp/delta/events"

In [6]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Write data without streaming into delta table
# read_schema = StructType([
#  StructField("event_name", StringType(), True),
#  StructField("event_time", TimestampType(), True)])
# df1 = spark.read.schema(read_schema).json("/tmp/structured-streaming/events")
# df1.write.mode("overwrite").format("delta").save("/tmp/delta/events")

# Streaming reads and append into delta table (Start !)
read_schema = StructType([
  StructField("event_name", StringType(), True),
  StructField("event_time", TimestampType(), True)])
df2 = (spark.readStream
  .option("maxFilesPerTrigger", "1")
  .schema(read_schema)
  .json("/tmp/structured-streaming/events"))
(df2.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/checkpoint")
  .option("path", "/tmp/delta/events").start())

In [7]:
%sql 
USE testdb;
OPTIMIZE "/tmp/delta/events"

path
/tmp/delta/events


In [8]:
# Append data
dbutils.fs.put(
  "/tmp/structured-streaming/events/file02.json",
  """{"event_name":"Open","event_time":1540601100}
{"event_name":"Open","event_time":1540601110}
{"event_name":"Fail","event_time":1540601120}
{"event_name":"Open","event_time":1540601130}
{"event_name":"Open","event_time":1540601140}
{"event_name":"Open","event_time":1540601150}
{"event_name":"Open","event_time":1540601160}
{"event_name":"Fail","event_time":1540601170}
{"event_name":"Open","event_time":1540601180}
{"event_name":"Open","event_time":1540601190}
""", True)
dbutils.fs.put(
  "/tmp/structured-streaming/events/file03.json",
  """{"event_name":"Open","event_time":1540601200}
{"event_name":"Open","event_time":1540601210}
{"event_name":"Fail","event_time":1540601220}
{"event_name":"Open","event_time":1540601230}
{"event_name":"Open","event_time":1540601240}
{"event_name":"Open","event_time":1540601250}
{"event_name":"Open","event_time":1540601260}
{"event_name":"Fail","event_time":1540601270}
{"event_name":"Open","event_time":1540601280}
{"event_name":"Open","event_time":1540601290}
""", True)

In [9]:
%sql
select
  event_name,
  date_format(window.end, "MMM-dd HH:mm") as event_time,
  count(1) as event_count
from testdb.table01
group by event_name, window(event_time, "1 minute")
order by event_time, event_name

event_name,event_time,event_count
Fail,Oct-27 00:44,1
Open,Oct-27 00:44,3
Fail,Oct-27 00:45,1
Open,Oct-27 00:45,5
Fail,Oct-27 00:46,1
Open,Oct-27 00:46,5
Fail,Oct-27 00:47,1
Open,Oct-27 00:47,5
Fail,Oct-27 00:48,2
Open,Oct-27 00:48,4
