## Explore the data in the data lake

In [14]:
%%pyspark
df = spark.read.load('abfss://files@datalakefjby2rx.dfs.core.windows.net/products/products.csv', format='csv'
## If header exists uncomment line below
##, header=True
)
display(df.limit(10))

StatementMeta(sparkfjby2rx, 0, 15, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, b9045623-1592-4354-a3bd-e5ed1bb47908)

In [15]:
%%pyspark

df = spark.read.load('abfss://files@datalakefjby2rx.dfs.core.windows.net/products/products.csv', format='csv'
## If header exists uncomment line below
, header=True
)
display(df.limit(10))

StatementMeta(sparkfjby2rx, 0, 16, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 9d0f5935-30e6-46ce-af66-5abb6c849b15)

## Load the file data into a delta table

In [17]:
 delta_table_path = "/delta/products-delta"
 df.write.format("delta").save(delta_table_path)


StatementMeta(sparkfjby2rx, 0, 18, Finished, Available, Finished)

In [None]:
 from delta.tables import *
 from pyspark.sql.functions import *

 # Create a deltaTable object
 deltaTable = DeltaTable.forPath(spark, delta_table_path)

 # Update the table (reduce price of product 771 by 10%)
 deltaTable.update(
     condition = "ProductID == 771",
     set = { "ListPrice": "ListPrice * 0.9" })

 # View the updated data as a dataframe
 result =deltaTable.toDF()

 display(result)

StatementMeta(sparkfjby2rx, 0, 19, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, d61e4ece-3dd6-418a-a392-b71744d9f88c)

In [19]:
 new_df = spark.read.format("delta").load(delta_table_path)
 new_df.show(10)

StatementMeta(sparkfjby2rx, 0, 20, Finished, Available, Finished)

+---------+--------------------+--------------+---------+
|ProductID|         ProductName|      Category|ListPrice|
+---------+--------------------+--------------+---------+
|      771|Mountain-100 Silv...|Mountain Bikes| 3059.991|
|      772|Mountain-100 Silv...|Mountain Bikes|3399.9900|
|      773|Mountain-100 Silv...|Mountain Bikes|3399.9900|
|      774|Mountain-100 Silv...|Mountain Bikes|3399.9900|
|      775|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      776|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      777|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      778|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      779|Mountain-200 Silv...|Mountain Bikes|2319.9900|
|      780|Mountain-200 Silv...|Mountain Bikes|2319.9900|
+---------+--------------------+--------------+---------+
only showing top 10 rows



# Check the versionAsOf Dataframe

In [None]:
 new_df = spark.read.format("delta") \
                    .option("versionAsOf", 0) \
                    .load(delta_table_path)
 new_df.show(10)

StatementMeta(sparkfjby2rx, 0, 25, Finished, Available, Finished)

+---------+--------------------+--------------+---------+
|ProductID|         ProductName|      Category|ListPrice|
+---------+--------------------+--------------+---------+
|      771|Mountain-100 Silv...|Mountain Bikes|3399.9900|
|      772|Mountain-100 Silv...|Mountain Bikes|3399.9900|
|      773|Mountain-100 Silv...|Mountain Bikes|3399.9900|
|      774|Mountain-100 Silv...|Mountain Bikes|3399.9900|
|      775|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      776|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      777|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      778|Mountain-100 Blac...|Mountain Bikes|3374.9900|
|      779|Mountain-200 Silv...|Mountain Bikes|2319.9900|
|      780|Mountain-200 Silv...|Mountain Bikes|2319.9900|
+---------+--------------------+--------------+---------+
only showing top 10 rows



In [25]:
 deltaTable.history(10).show(20, False, True)

StatementMeta(sparkfjby2rx, 0, 26, Finished, Available, Finished)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 version             | 1                                                                                                                                                                                                                                  
 timestamp           | 2024-12-02 14:17:01.608                                                                                                                                                                                                            
 userId              | null                                                                                                                                                                                                                            

In [29]:
display(deltaTable.history())


StatementMeta(sparkfjby2rx, 0, 30, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 409a29d8-4d38-4b59-8471-77920b1fbde4)

### Create catalog tables

#### Create an external table

In [30]:
 spark.sql("CREATE DATABASE AdventureWorks")
 spark.sql("CREATE TABLE AdventureWorks.ProductsExternal USING DELTA LOCATION '{0}'".format(delta_table_path))
 spark.sql("DESCRIBE EXTENDED AdventureWorks.ProductsExternal").show(truncate=False)

StatementMeta(sparkfjby2rx, 0, 31, Finished, Available, Finished)

+----------------------------+-----------------------------------------------------------------------+-------+
|col_name                    |data_type                                                              |comment|
+----------------------------+-----------------------------------------------------------------------+-------+
|ProductID                   |string                                                                 |null   |
|ProductName                 |string                                                                 |null   |
|Category                    |string                                                                 |null   |
|ListPrice                   |string                                                                 |null   |
|                            |                                                                       |       |
|# Detailed Table Information|                                                                       |       |
|

In [31]:
 %%sql

 USE AdventureWorks;

 SELECT * FROM ProductsExternal;

StatementMeta(sparkfjby2rx, 0, 32, Finished, Available, Finished)

<Spark SQL result set with 295 rows and 4 fields>

### Create a managed table

In [32]:

 df.write.format("delta").saveAsTable("AdventureWorks.ProductsManaged")
 spark.sql("DESCRIBE EXTENDED AdventureWorks.ProductsManaged").show(truncate=False)

StatementMeta(sparkfjby2rx, 0, 33, Finished, Available, Finished)

+----------------------------+--------------------------------------------------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                                                                       |comment|
+----------------------------+--------------------------------------------------------------------------------------------------------------------------------+-------+
|ProductID                   |string                                                                                                                          |null   |
|ProductName                 |string                                                                                                                          |null   |
|Category                    |string                                                                                                                          |n

In [33]:
 %%sql

 USE AdventureWorks;

 SELECT * FROM ProductsManaged;

StatementMeta(sparkfjby2rx, 0, 35, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 295 rows and 4 fields>

### Compare external and managed tables

In [34]:
 %%sql

 USE AdventureWorks;

 SHOW TABLES;

StatementMeta(sparkfjby2rx, 0, 37, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 2 rows and 3 fields>

In [35]:
 %%sql

 USE AdventureWorks;

 DROP TABLE IF EXISTS ProductsExternal;
 DROP TABLE IF EXISTS ProductsManaged;

StatementMeta(sparkfjby2rx, 0, 40, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

### Create a table using SQL

In [37]:
 %%sql

 USE AdventureWorks;

 CREATE TABLE Products
 USING DELTA
 LOCATION '/delta/products-delta';

StatementMeta(sparkfjby2rx, 0, 44, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [38]:
 %%sql

 USE AdventureWorks;

 SELECT * FROM Products;

StatementMeta(sparkfjby2rx, 0, 46, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 295 rows and 4 fields>

### Use delta tables for streaming data

In [59]:
 from notebookutils import mssparkutils
 from pyspark.sql.types import *
 from pyspark.sql.functions import *

 # Create a folder
 inputPath = '/data/'
 mssparkutils.fs.mkdirs(inputPath)

 # Create a stream that reads data from the folder, using a JSON schema
 jsonSchema = StructType([
 StructField("device", StringType(), False),
 StructField("status", StringType(), False)
 ])
 iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)

 # Write some event data to the folder
 device_data = '''{"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev2","status":"error"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"error"}
 {"device":"Dev2","status":"ok"}
 {"device":"Dev2","status":"error"}
 {"device":"Dev2","status":"Coronel"}
 {"device":"Dev1","status":"Romaro Coronel"}'''
 mssparkutils.fs.put(inputPath + "data.txt", device_data, True)
 print("Source stream created...")

StatementMeta(sparkfjby2rx, 0, 69, Finished, Available, Finished)

Source stream created...


 #### Write the stream to a delta table

In [54]:
 # Write the stream to a delta table
 
 delta_stream_table_path = '/delta/iotdevicedata'
 checkpointpath = '/delta/checkpoint'
 deltastream = iotstream.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(delta_stream_table_path)
 print("Streaming to delta sink...")

StatementMeta(sparkfjby2rx, 0, 64, Finished, Available, Finished)

Streaming to delta sink...


In [61]:
 # Read the data in delta format into a dataframe

 df = spark.read.format("delta").load(delta_stream_table_path)
 display(df.head(100))


StatementMeta(sparkfjby2rx, 0, 71, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 807dd7b7-b8c8-4866-875a-468269cac1df)

In [65]:
 # create a catalog table based on the streaming sink
 spark.sql("CREATE TABLE IotDeviceData USING DELTA LOCATION '{0}'".format(delta_stream_table_path))

StatementMeta(sparkfjby2rx, 0, 75, Finished, Available, Finished)

DataFrame[]

In [66]:
 %%sql

 SELECT * FROM IotDeviceData;

StatementMeta(sparkfjby2rx, 0, 76, Finished, Available, Finished)

<Spark SQL result set with 9 rows and 2 fields>

In [67]:
 # Add more data to the source stream
 more_data = '''{"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"error"}
 {"device":"Dev2","status":"error"}
 {"device":"Dev1","status":"ok"}'''

 mssparkutils.fs.put(inputPath + "more-data.txt", more_data, True)

StatementMeta(sparkfjby2rx, 0, 77, Finished, Available, Finished)

True

In [68]:
 %%sql

 SELECT * FROM IotDeviceData;

StatementMeta(sparkfjby2rx, 0, 78, Finished, Available, Finished)

<Spark SQL result set with 16 rows and 2 fields>

In [69]:
 deltastream.stop()

StatementMeta(sparkfjby2rx, 0, 79, Finished, Available, Finished)

### Query a delta table from a serverless SQL pool

In [72]:
 %%sql
 
 USE AdventureWorks;

 SELECT * FROM Products;

StatementMeta(sparkfjby2rx, 0, 83, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 295 rows and 4 fields>