## Create a notebook and ingest data

In [0]:
 %sh
 rm -r /dbfs/delta_lab
 mkdir /dbfs/delta_lab
 wget -O /dbfs/delta_lab/products.csv https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/products.csv

--2024-04-22 13:18:33--  https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/products.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.110.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 14372 (14K) [text/plain]
Saving to: ‘/dbfs/delta_lab/products.csv’

     0K .......... ....                                       100% 1.85M=0.007s

2024-04-22 13:18:33 (1.85 MB/s) - ‘/dbfs/delta_lab/products.csv’ saved [14372/14372]



In [0]:
df = spark.read.load('/delta_lab/products.csv', format='csv', header=True)
display(df.limit(10))

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.99
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.99
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.99
774,"Mountain-100 Silver, 48",Mountain Bikes,3399.99
775,"Mountain-100 Black, 38",Mountain Bikes,3374.99
776,"Mountain-100 Black, 42",Mountain Bikes,3374.99
777,"Mountain-100 Black, 44",Mountain Bikes,3374.99
778,"Mountain-100 Black, 48",Mountain Bikes,3374.99
779,"Mountain-200 Silver, 38",Mountain Bikes,2319.99
780,"Mountain-200 Silver, 42",Mountain Bikes,2319.99


## 

## Load the file data into a delta table

In [0]:
delta_table_path = "/delta/products-delta"
df.write.format("delta").save(delta_table_path)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-1021970034712716>, line 2[0m
[1;32m      1[0m delta_table_path [38;5;241m=[39m [38;5;124m"[39m[38;5;124m/delta/products-delta[39m[38;5;124m"[39m
[0;32m----> 2[0m df[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)[38;5;241m.[39msave(delta_table_path)

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;49m[38;5;241;43m*[39;49m[43mkwargs[49m[43m)[49m
[1;32m   

The data for a delta lake table is stored in Parquet format. A log file is also created to track modifications made to the data.

In [0]:
 %sh
 ls /dbfs/delta/products-delta

_delta_log
part-00000-1c22e100-0045-4401-b27d-cd62e511781f.c000.snappy.parquet
part-00000-d2e25713-7290-4f89-9cac-c632f8906126.c000.snappy.parquet


In [0]:
from delta.tables import *
from pyspark.sql.functions import *
   
# Create a deltaTable object
deltaTable = DeltaTable.forPath(spark, delta_table_path)
# Update the table (reduce price of product 771 by 10%)
deltaTable.update(
    condition = "ProductID == 771",
    set = { "ListPrice": "ListPrice * 0.9" })
# View the updated data as a dataframe
deltaTable.toDF().show(10)

+---------+--------------------+--------------+---------+
|ProductID|         ProductName|      Category|ListPrice|
+---------+--------------------+--------------+---------+
|      771|Mountain-100 Silv...|Mountain Bikes|2753.9919|
|      772|Mountain-100 Silv...|Mountain Bikes|3399.9900|
|      773|Mountain-100 Silv...|Mountain Bikes|3399.9900|
|      774|Mountain-100 Silv...|Mountain Bikes|3399.9900|
|      775|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      776|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      777|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      778|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      779|Mountain-200 Silv...|Mountain Bikes|2319.9900|
|      780|Mountain-200 Silv...|Mountain Bikes|2319.9900|
+---------+--------------------+--------------+---------+
only showing top 10 rows



In [0]:
new_df = spark.read.format("delta").load(delta_table_path)
new_df.show(10)

+---------+--------------------+--------------+---------+
|ProductID|         ProductName|      Category|ListPrice|
+---------+--------------------+--------------+---------+
|      771|Mountain-100 Silv...|Mountain Bikes|2753.9919|
|      772|Mountain-100 Silv...|Mountain Bikes|3399.9900|
|      773|Mountain-100 Silv...|Mountain Bikes|3399.9900|
|      774|Mountain-100 Silv...|Mountain Bikes|3399.9900|
|      775|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      776|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      777|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      778|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      779|Mountain-200 Silv...|Mountain Bikes|2319.9900|
|      780|Mountain-200 Silv...|Mountain Bikes|2319.9900|
+---------+--------------------+--------------+---------+
only showing top 10 rows



## Explore logging and time-travel

In [0]:
new_df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
new_df.show(10)

+---------+--------------------+--------------+---------+
|ProductID|         ProductName|      Category|ListPrice|
+---------+--------------------+--------------+---------+
|      771|Mountain-100 Silv...|Mountain Bikes|3399.9900|
|      772|Mountain-100 Silv...|Mountain Bikes|3399.9900|
|      773|Mountain-100 Silv...|Mountain Bikes|3399.9900|
|      774|Mountain-100 Silv...|Mountain Bikes|3399.9900|
|      775|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      776|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      777|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      778|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      779|Mountain-200 Silv...|Mountain Bikes|2319.9900|
|      780|Mountain-200 Silv...|Mountain Bikes|2319.9900|
+---------+--------------------+--------------+---------+
only showing top 10 rows



In [0]:
deltaTable.history(10).show(10, False, True)

-RECORD 0----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 version             | 2                                                                                                                                                                                                                                                                                                 
 timestamp           | 2024-04-22 13:20:25                                                                                                                                                                                                                                                                               
 userId              | 5049762535763620                   

## Create catalog tables

#### Create an external table

In [0]:
spark.sql("CREATE DATABASE AdventureWorks")
spark.sql("CREATE TABLE AdventureWorks.ProductsExternal USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("DESCRIBE EXTENDED AdventureWorks.ProductsExternal").show(truncate=False)

+----------------------------+-------------------------------------------+-------+
|col_name                    |data_type                                  |comment|
+----------------------------+-------------------------------------------+-------+
|ProductID                   |string                                     |NULL   |
|ProductName                 |string                                     |NULL   |
|Category                    |string                                     |NULL   |
|ListPrice                   |string                                     |NULL   |
|                            |                                           |       |
|# Delta Statistics Columns  |                                           |       |
|Column Names                |ProductID, ProductName, Category, ListPrice|       |
|Column Selection Method     |first-32                                   |       |
|                            |                                           |       |
|# D

In [0]:
%sql
USE AdventureWorks;
SELECT * FROM ProductsExternal;

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,2753.9919
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.99
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.99
774,"Mountain-100 Silver, 48",Mountain Bikes,3399.99
775,"Mountain-100 Black, 38",Mountain Bikes,3374.99
776,"Mountain-100 Black, 42",Mountain Bikes,3374.99
777,"Mountain-100 Black, 44",Mountain Bikes,3374.99
778,"Mountain-100 Black, 48",Mountain Bikes,3374.99
779,"Mountain-200 Silver, 38",Mountain Bikes,2319.99
780,"Mountain-200 Silver, 42",Mountain Bikes,2319.99


#### Create a managed table

In [0]:
df.write.format("delta").saveAsTable("AdventureWorks.ProductsManaged")
spark.sql("DESCRIBE EXTENDED AdventureWorks.ProductsManaged").show(truncate=False)

+----------------------------+-----------------------------------------------------------+-------+
|col_name                    |data_type                                                  |comment|
+----------------------------+-----------------------------------------------------------+-------+
|ProductID                   |string                                                     |NULL   |
|ProductName                 |string                                                     |NULL   |
|Category                    |string                                                     |NULL   |
|ListPrice                   |string                                                     |NULL   |
|                            |                                                           |       |
|# Delta Statistics Columns  |                                                           |       |
|Column Names                |ProductID, ProductName, Category, ListPrice                |       |
|Column Se

In [0]:
%sql
USE AdventureWorks;
SELECT * FROM ProductsManaged;

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.99
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.99
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.99
774,"Mountain-100 Silver, 48",Mountain Bikes,3399.99
775,"Mountain-100 Black, 38",Mountain Bikes,3374.99
776,"Mountain-100 Black, 42",Mountain Bikes,3374.99
777,"Mountain-100 Black, 44",Mountain Bikes,3374.99
778,"Mountain-100 Black, 48",Mountain Bikes,3374.99
779,"Mountain-200 Silver, 38",Mountain Bikes,2319.99
780,"Mountain-200 Silver, 42",Mountain Bikes,2319.99


#### Compare external and managed tables

In [0]:
%sql
USE AdventureWorks;
SHOW TABLES;

database,tableName,isTemporary
adventureworks,productsexternal,False
adventureworks,productsmanaged,False


In [0]:
 %sh
 echo "External table:"
 ls /dbfs/delta/products-delta
 echo
 echo "Managed table:"
 ls /dbfs/user/hive/warehouse/adventureworks.db/productsmanaged

External table:
_delta_log
part-00000-1c22e100-0045-4401-b27d-cd62e511781f.c000.snappy.parquet
part-00000-54e2dd00-fba4-4e7e-a39b-0d196f7a3ee7.c000.snappy.parquet
part-00000-d2e25713-7290-4f89-9cac-c632f8906126.c000.snappy.parquet

Managed table:
_delta_log
part-00000-9f076280-c4dd-43e6-ac11-36e3c13f0b95.c000.snappy.parquet


In [0]:
%sql
USE AdventureWorks;
DROP TABLE IF EXISTS ProductsExternal;
DROP TABLE IF EXISTS ProductsManaged;
SHOW TABLES;

database,tableName,isTemporary


In [0]:
 %sh
 echo "External table:"
 ls /dbfs/delta/products-delta
 echo
 echo "Managed table:"
 ls /dbfs/user/hive/warehouse/adventureworks.db/productsmanaged

External table:
_delta_log
part-00000-1c22e100-0045-4401-b27d-cd62e511781f.c000.snappy.parquet
part-00000-54e2dd00-fba4-4e7e-a39b-0d196f7a3ee7.c000.snappy.parquet
part-00000-d2e25713-7290-4f89-9cac-c632f8906126.c000.snappy.parquet

Managed table:


In [0]:
%sql
USE AdventureWorks;
CREATE TABLE Products
USING DELTA
LOCATION '/delta/products-delta';

In [0]:
%sql
USE AdventureWorks;
SELECT * FROM Products;

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,2753.9919
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.99
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.99
774,"Mountain-100 Silver, 48",Mountain Bikes,3399.99
775,"Mountain-100 Black, 38",Mountain Bikes,3374.99
776,"Mountain-100 Black, 42",Mountain Bikes,3374.99
777,"Mountain-100 Black, 44",Mountain Bikes,3374.99
778,"Mountain-100 Black, 48",Mountain Bikes,3374.99
779,"Mountain-200 Silver, 38",Mountain Bikes,2319.99
780,"Mountain-200 Silver, 42",Mountain Bikes,2319.99


## Use delta tables for streaming data

In [0]:
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}

{'device': 'Dev1', 'status': 'ok'}

In [0]:
 %sh
 rm -r /dbfs/device_stream
 mkdir /dbfs/device_stream
 wget -O /dbfs/device_stream/devices1.json https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/devices1.json

rm: cannot remove '/dbfs/device_stream': No such file or directory
--2024-04-22 13:24:07--  https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/devices1.json
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 296 [text/plain]
Saving to: ‘/dbfs/device_stream/devices1.json’

     0K                                                       100% 4.28M=0s

2024-04-22 13:24:07 (4.28 MB/s) - ‘/dbfs/device_stream/devices1.json’ saved [296/296]



In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
   
# Create a stream that reads data from the folder, using a JSON schema
inputPath = '/device_stream/'
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
print("Source stream created...")

Source stream created...


In [0]:
# Write the stream to a delta table
delta_stream_table_path = '/delta/iotdevicedata'
checkpointpath = '/delta/checkpoint'
deltastream = iotstream.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(delta_stream_table_path)
print("Streaming to delta sink...")

Streaming to delta sink...


In [0]:
# Read the data in delta format into a dataframe
df = spark.read.format("delta").load(delta_stream_table_path)
display(df)

device,status
Dev1,ok
Dev1,ok
Dev1,ok
Dev2,error
Dev1,ok
Dev1,error
Dev2,ok
Dev2,error
Dev1,ok


In [0]:
# create a catalog table based on the streaming sink
spark.sql("CREATE TABLE IotDeviceData USING DELTA LOCATION '{0}'".format(delta_stream_table_path))

DataFrame[]

In [0]:
%sql
SELECT * FROM IotDeviceData;

device,status
Dev1,ok
Dev1,ok
Dev1,ok
Dev2,error
Dev1,ok
Dev1,error
Dev2,ok
Dev2,error
Dev1,ok


In [0]:
 %sh
 wget -O /dbfs/device_stream/devices2.json https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/devices2.json

--2024-04-22 13:39:34--  https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/devices2.json
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.108.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 229 [text/plain]
Saving to: ‘/dbfs/device_stream/devices2.json’

     0K                                                       100% 4.88M=0s

2024-04-22 13:39:34 (4.88 MB/s) - ‘/dbfs/device_stream/devices2.json’ saved [229/229]



In [0]:
%sql
SELECT * FROM IotDeviceData;

device,status
Dev1,ok
Dev1,ok
Dev1,ok
Dev2,error
Dev1,ok
Dev1,error
Dev2,ok
Dev2,error
Dev1,ok
Dev1,ok


In [0]:
deltastream.stop()