In [0]:
dbutils.fs.mount(source = "wasbs://<container-name>@<storage-account>.blob.core.windows.net",
mount_point = "/mnt/iotdata",
extra_configs = {"fs.azure.sas.<container-name>.<storage-account>.blob.core.windows.net":dbutils.secrets.get(scope = "week16-scope", key = "sas-storage-container")}) 

In [0]:
# Read sample flight dataset from DBFS root - /databricks-dataset

In [0]:
flights_df = spark.read.format('csv') \
.option('inferSchema',True)
.option('header',True) \
.load('dbfs:/databricks-datasets/flights/departuredelays.csv')

[0;36m  File [0;32m<command-4474175648162502>:4[0;36m[0m
[0;31m    .load('dbfs:/databricks-datasets/flights/departuredelays.csv')[0m
[0m    ^[0m
[0;31mSyntaxError[0m[0;31m:[0m invalid syntax


In [0]:
# Save the dataset in azure storage account, partitioning by origin. We will save in both parquet and delta formats to compare the results.

In [0]:
flights_df.write.mode('overwrite').format('parquet').partitionBy('origin').save('/mnt/iotdata/flight-parquet')

In [0]:
flights_df.write.mode('overwrite').format('delta').partitionBy('origin').save('/mnt/iotdata/flight-delta')

In [0]:
#  Create Spark tables

In [0]:
%sql
create database if not exists week16db

In [0]:
%sql
create table if not exists week16db.flights_parquet_table using parquet location '/mnt/iotdata/flight-parquet'

In [0]:
%sql
create table if not exists week16db.flights_delta_table using delta location '/mnt/iotdata/flight-delta'


In [0]:
# These will create an external managed tables in HIVE metastore. We can browse to Data tab in databricks workspace to view Week16db.db database and both the tables inside it.
# (Refer to screenshot below)

In [0]:
# Delta Tables maintain history of transactions (UPSERT, DELETE, ALTER) through a Write ahead logs in delta-log folder and maintains data stats for each column to enable data skipping

In [0]:
# Time-Travel using Delta Tables
# ===================
# Delta tables maintain transaction logs which can be used to revert back to previous version of dataset.
# Here, We are inserting a new record, which will create a new part file and a new entry is added to delta-logs folder

In [0]:
%sql
insert into week16db.flights_delta_table values (3011901,10,1000,'ATL','AUS')

In [0]:
# Delta Caching enables saving query result files directly onto worker nodes for optimised query performance.

In [0]:
spark.conf.set("spark.databricks.io.cache.enabled",True)

In [0]:
# To enable caching manually, use ‘Cache’ keyword before the sql query

In [0]:
%sql
cache
SELECT DISTINCT(origin) FROM week16db.flights_delta_table WHERE destination='BHM'

In [0]:
# Demonstrate the commonly occurring Small File Problem

In [0]:
# Let us reuse the flight dataset above, with 10 partitions and 250 partitionBy folders. Overall, it should create ~2500 files (10 in each folder)

In [0]:
flights_df.repartition(10).write.mode("overwrite").format("delta").partitionBy("origin").save("/mnt/iotdata/flight-delta")

In [0]:
%sql
SELECT AVG(delay) FROM  week16db.flights_delta_table GROUP BY destination

In [0]:
# We can optimize the delta table to perform data compaction and merge the large number of small files into few large files

In [0]:
%sql
OPTIMIZE week16db.flights_delta_table

In [0]:
# After, Optimize - We can see 2543 files compacted into 255 new files.

In [0]:
%sql
SELECT AVG(delay) FROM  week16db.flights_delta_table GROUP BY destination

In [0]:
# Explain with an example how data-skipping is achieved in Partitioning and Bucketing.

# PARTITIONING
# ==============
# While writing a dataset on disk, We can logically segregate data into partitions (folders on disk) based on a column value. For columns with low cardinality, separate folders get created on disk (one for each value) and keep relevant data inside each folder.
# This helps in query optimization if we are filtering based on column value, so files from only the relevant folder are read.
# BUCKETING
# ============
# If a column has high cardinality like a date column, It is not feasible to partition data into column values as it will create a large number of folders. Bucketing splits each partition into fixed number of bins (or buckets)

In [0]:
# Before Z-Ordering

In [0]:
%sql
SELECT COUNT(origin) FROM week16db.flights_delta_table version as of 0 GROUP BY destination

In [0]:
# After Z-Ordering

In [0]:
%sql
optimize week16db.flights_delta_table zorder by (destination)

In [0]:
%sql
SELECT COUNT(origin) FROM week16db.flights_delta_table version as of 1 GROUP BY destination

In [0]:
# VACUUM command removes the unreferenced files, which gets created after large number of Insert, Update and Delete operations on delta tables. This is like a cleanup of dataset and all old files are deleted. However, the disadvantage of VACCUM command is, we will not be able to restore to old version after applying VACCUM

In [0]:
%sql
delete from week16db.flights_delta_table where distance < 500

In [0]:
%sql
set spark.databricks.delta.retentionDurationCheck.enabled = false

In [0]:
%sql
VACUUM week16db.flights_delta_table RETAIN 0.01 HOURS DRY RUN