### Create delta tables


In [11]:
%%pyspark
df = spark.read.load('abfss://files@datalakeadk97g2.dfs.core.windows.net/products/products.csv', format='csv', header=True)
display(df.limit(10))

StatementMeta(sparkadk97g2, 1, 3, Finished, Available)

SynapseWidget(Synapse.DataFrame, 4bb9c074-a9c8-48ff-ab6b-05b1d2e856fd)

In [12]:
delta_table_path = "/delta/products-delta"
df.write.format("delta").save(delta_table_path)

StatementMeta(sparkadk97g2, 1, 4, Finished, Available)

AnalysisException: abfss://files@datalakeadk97g2.dfs.core.windows.net/delta/products-delta already exists.

In [13]:
from delta.tables import *
from pyspark.sql.functions import *

# Create a deltaTable object
deltaTable = DeltaTable.forPath(spark, delta_table_path)

# Update the table (reduce price of product 771 by 10%)
deltaTable.update(
    condition = "ProductID == 771",
    set = { "ListPrice": "ListPrice * 0.9" })

# View the updated data as a dataframe
deltaTable.toDF().show(10)

StatementMeta(sparkadk97g2, 1, 5, Finished, Available)

+---------+--------------------+--------------+---------+
|ProductID|         ProductName|      Category|ListPrice|
+---------+--------------------+--------------+---------+
|      771|Mountain-100 Silv...|Mountain Bikes|2753.9919|
|      772|Mountain-100 Silv...|Mountain Bikes|3399.9900|
|      773|Mountain-100 Silv...|Mountain Bikes|3399.9900|
|      774|Mountain-100 Silv...|Mountain Bikes|3399.9900|
|      775|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      776|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      777|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      778|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      779|Mountain-200 Silv...|Mountain Bikes|2319.9900|
|      780|Mountain-200 Silv...|Mountain Bikes|2319.9900|
+---------+--------------------+--------------+---------+
only showing top 10 rows



In [6]:
new_df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
new_df.show(10)

StatementMeta(sparkadk97g2, 0, 6, Finished, Available)

+---------+--------------------+--------------+---------+
|ProductID|         ProductName|      Category|ListPrice|
+---------+--------------------+--------------+---------+
|      771|Mountain-100 Silv...|Mountain Bikes|3399.9900|
|      772|Mountain-100 Silv...|Mountain Bikes|3399.9900|
|      773|Mountain-100 Silv...|Mountain Bikes|3399.9900|
|      774|Mountain-100 Silv...|Mountain Bikes|3399.9900|
|      775|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      776|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      777|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      778|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      779|Mountain-200 Silv...|Mountain Bikes|2319.9900|
|      780|Mountain-200 Silv...|Mountain Bikes|2319.9900|
+---------+--------------------+--------------+---------+
only showing top 10 rows



In [14]:
deltaTable.history(10).show(20, False, True)

StatementMeta(sparkadk97g2, 1, 6, Finished, Available)

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 version             | 2                                                                                                                                                         
 timestamp           | 2023-05-08 16:14:02                                                                                                                                       
 userId              | null                                                                                                                                                      
 userName            | null                                                                                                                                                      
 operation           | UPDATE                                                                                 

### Create catalog tables

In [15]:
spark.sql("CREATE DATABASE AdventureWorks")
spark.sql("CREATE TABLE AdventureWorks.ProductsExternal USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("DESCRIBE EXTENDED AdventureWorks.ProductsExternal").show(truncate=False)

StatementMeta(sparkadk97g2, 1, 7, Finished, Available)

+----------------------------+-----------------------------------------------------------------------+-------+
|col_name                    |data_type                                                              |comment|
+----------------------------+-----------------------------------------------------------------------+-------+
|ProductID                   |string                                                                 |       |
|ProductName                 |string                                                                 |       |
|Category                    |string                                                                 |       |
|ListPrice                   |string                                                                 |       |
|                            |                                                                       |       |
|# Partitioning              |                                                                       |       |
|

In [16]:
%%sql

USE AdventureWorks;

SELECT * FROM ProductsExternal;

StatementMeta(, 1, -1, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 295 rows and 4 fields>

In [17]:
df.write.format("delta").saveAsTable("AdventureWorks.ProductsManaged")
spark.sql("DESCRIBE EXTENDED AdventureWorks.ProductsManaged").show(truncate=False)

StatementMeta(sparkadk97g2, 1, 10, Finished, Available)

+----------------------------+--------------------------------------------------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                                                                       |comment|
+----------------------------+--------------------------------------------------------------------------------------------------------------------------------+-------+
|ProductID                   |string                                                                                                                          |       |
|ProductName                 |string                                                                                                                          |       |
|Category                    |string                                                                                                                          | 

In [18]:
%%sql

USE AdventureWorks;

SELECT * FROM ProductsManaged;

StatementMeta(, 1, -1, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 295 rows and 4 fields>

In [19]:
%%sql

USE AdventureWorks;

SHOW TABLES;

StatementMeta(, 1, -1, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 2 rows and 3 fields>

In [20]:
%%sql

USE AdventureWorks;

DROP TABLE IF EXISTS ProductsExternal;
DROP TABLE IF EXISTS ProductsManaged;

StatementMeta(, 1, -1, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

### Create a table using SQL

In [21]:
%%sql

USE AdventureWorks;

CREATE TABLE Products
USING DELTA
LOCATION '/delta/products-delta';

StatementMeta(, 1, -1, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [22]:
%%sql

USE AdventureWorks;

SELECT * FROM Products;

StatementMeta(, 1, -1, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 295 rows and 4 fields>

### Use delta tables for streaming data

In [23]:
from notebookutils import mssparkutils
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a folder
inputPath = '/data/'
mssparkutils.fs.mkdirs(inputPath)

# Create a stream that reads data from the folder, using a JSON schema
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)

# Write some event data to the folder
device_data = '''{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}'''
mssparkutils.fs.put(inputPath + "data.txt", device_data, True)
print("Source stream created...")

StatementMeta(sparkadk97g2, 1, 22, Finished, Available)

Source stream created...


In [24]:
# Write the stream to a delta table
delta_stream_table_path = '/delta/iotdevicedata'
checkpointpath = '/delta/checkpoint'
deltastream = iotstream.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(delta_stream_table_path)
print("Streaming to delta sink...")

StatementMeta(sparkadk97g2, 1, 23, Finished, Available)

Streaming to delta sink...


In [25]:
# Read the data in delta format into a dataframe
df = spark.read.format("delta").load(delta_stream_table_path)
display(df)

StatementMeta(sparkadk97g2, 1, 24, Finished, Available)

SynapseWidget(Synapse.DataFrame, 2662466e-a37c-4d7e-bb72-2347192645db)

In [26]:
# create a catalog table based on the streaming sink
spark.sql("CREATE TABLE IotDeviceData USING DELTA LOCATION '{0}'".format(delta_stream_table_path))

StatementMeta(sparkadk97g2, 1, 25, Finished, Available)

DataFrame[]

In [27]:
%%sql

SELECT * FROM IotDeviceData;

StatementMeta(sparkadk97g2, 1, 26, Finished, Available)

<Spark SQL result set with 9 rows and 2 fields>

In [28]:
# Add more data to the source stream
more_data = '''{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}'''

mssparkutils.fs.put(inputPath + "more-data.txt", more_data, True)

StatementMeta(sparkadk97g2, 1, 27, Finished, Available)

True

In [29]:
%%sql

SELECT * FROM IotDeviceData;

StatementMeta(sparkadk97g2, 1, 28, Finished, Available)

<Spark SQL result set with 16 rows and 2 fields>

In [30]:
deltastream.stop()

StatementMeta(sparkadk97g2, 1, 29, Finished, Available)

### Query a delta table from a serverless SQL pool

In [33]:
-- This is auto-generated code

SELECT
    TOP 100 *
FROM
    OPENROWSET(
        BULK 'https://datalakeadk97g2.dfs.core.windows.net/files/delta/products-delta/',
        FORMAT = 'DELTA'
    ) AS [result]


StatementMeta(sparkadk97g2, 1, 32, Finished, Available)

Error: 
mismatched input '100' expecting {<EOF>, ';'}(line 2, pos 8)

== SQL ==
SELECT
    TOP 100 *
--------^^^
FROM
    OPENROWSET(
        BULK 'https://datalakeadk97g2.dfs.core.windows.net/files/delta/products-delta/',
        FORMAT = 'DELTA'
    ) AS [result]


In [None]:
-- This is auto-generated code
USE AdventureWorks;

SELECT * FROM Products;