# Use Delta Lake with Spark in Azure Synapse Analytics

Delta Lake is an open source project to build a transactional data storage layer on top of a data lake. Delta Lake adds support for relational semantics for both batch and streaming data operations, and enables the creation of a Lakehouse architecture in which Apache Spark can be used to process and query data in tables that are based on underlying files in the data lake.

## Create delta tables

In [None]:
%%pyspark
df = spark.read.load('abfss://data@asadatalake9slnoqw.dfs.core.windows.net/spark/products.csv', format='csv'
## If header exists uncomment line below
##, header=True
)
display(df.limit(10))

In [None]:
%%pyspark
df = spark.read.load('abfss://data@asadatalake9slnoqw.dfs.core.windows.net/spark/products.csv', format='csv'
## If header exists uncomment line below
, header=True
)
display(df.limit(10))

### Load the file data into a delta table

In [None]:
delta_table_path = "/delta/products-delta"
df.write.format("delta").save(delta_table_path)

The data is loaded into a **deltaTable** object and updated. You can see the update reflected in the query results.

In [None]:
from delta.tables import *
from pyspark.sql.functions import *

# Create a deltaTable object
deltaTable = DeltaTable.forPath(spark, delta_table_path)

# Update the table (reduce price of product 771 by 10%)
deltaTable.update(
    condition = "ProductID == 771",
    set = { "ListPrice": "ListPrice * 0.9" })

# View the updated data as a dataframe
deltaTable.toDF().show(10)

The code loads the delta table data into a data frame from its location in the data lake, verifying that the change you made via a **deltaTable** object ihas been persisted.

In [None]:
 new_df = spark.read.format("delta").load(delta_table_path)
 new_df.show(10)

Modify the code you just ran as follows, specifying the option to use the time travel feature of delta lake to view a previous version of the data

When you run the modified code, the results show the original version of the data.

In [None]:
 new_df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
 new_df.show(10)

In [None]:
 deltaTable.history(10).show(20, False, True)

# Create catalog tables
So far you’ve worked with delta tables by loading data from the folder containing the parquet files on which the table is based. You can define catalog tables that encapsulate the data and provide a named table entity that you can reference in SQL code. Spark supports two kinds of catalog tables for delta lake:

- External tables that are defined by the path to the parquet files containing the table data.
- Managed tables, that are defined in the Hive metastore for the Spark pool.

## Create an external table
This code creates a new database named **AdventureWorks** and then creates an external tabled named **ProductsExternal** in that database based on the path to the parquet files you defined previously. It then displays a description of the table’s properties. Note that the **Location** property is the path you specified.

In [None]:
spark.sql("CREATE DATABASE AdventureWorks")
spark.sql("CREATE TABLE AdventureWorks.ProductsExternal USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("DESCRIBE EXTENDED AdventureWorks.ProductsExternal").show(truncate=False)

The code uses SQL to switch context to the **AdventureWorks** database (which returns no data) and then query the **ProductsExternal** table (which returns a resultset containing the products data in the Delta Lake table).

In [None]:
 %%sql

 USE AdventureWorks;

 SELECT * FROM ProductsExternal;

## Create a managed table
This code creates a managed tabled named **ProductsManaged** based on the DataFrame you originally loaded from the **products.csv** file (before you updated the price of product 771). You do not specify a path for the parquet files used by the table - this is managed for you in the Hive metastore, and shown in the **Location** property in the table description (in the tempdata/synapse/workspaces/asaworkspacexxxxxxx/warehouse path).

In [None]:
df.write.format("delta").saveAsTable("AdventureWorks.ProductsManaged")
spark.sql("DESCRIBE EXTENDED AdventureWorks.ProductsManaged").show(truncate=False)

The code uses SQL to query the **ProductsManaged** table.

In [None]:
 %%sql

 USE AdventureWorks;

 SELECT * FROM ProductsManaged;

# Compare external and managed tables
This code lists the tables in the AdventureWorks database.

In [None]:
 %%sql

 USE AdventureWorks;

 SHOW TABLES;

This code drops the tables from the metastore.

In [None]:
 %%sql

 USE AdventureWorks;

 DROP TABLE IF EXISTS ProductsExternal;
 DROP TABLE IF EXISTS ProductsManaged;

- Return to the **files** tab and view the **tempdata/delta/products-delta** folder. Note that the data files still exist in this location. Dropping the external table has removed the table from the metastore, but left the data files intact.
- View the **tempdata/synapse/workspaces/asaworkspacexxxxxxx/warehouse** folder, and note that there is no folder for the **ProductsManaged** table data. Dropping a managed table removes the table from the metastore and also deletes the table’s data files.

## Create a table using SQL

In [None]:
 %%sql

 USE AdventureWorks;

 CREATE TABLE Products
 USING DELTA
 LOCATION '/delta/products-delta';

#### Observe that the new catalog table was created for the existing Delta Lake table folder, which reflects the changes that were made previously.

In [None]:
 %%sql

 USE AdventureWorks;

 SELECT * FROM Products;

# Use delta tables for streaming data
Delta lake supports streaming data. Delta tables can be a sink or a source for data streams created using the Spark Structured Streaming API. In this example, you’ll use a delta table as a sunk for some streaming data in a simulated internet of things (IoT) scenario.

In [None]:
 from notebookutils import mssparkutils
 from pyspark.sql.types import *
 from pyspark.sql.functions import *

 # Create a folder
 inputPath = '/data/'
 mssparkutils.fs.mkdirs(inputPath)

 # Create a stream that reads data from the folder, using a JSON schema
 jsonSchema = StructType([
 StructField("device", StringType(), False),
 StructField("status", StringType(), False)
 ])
 iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)

 # Write some event data to the folder
 device_data = '''{"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev2","status":"error"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"error"}
 {"device":"Dev2","status":"ok"}
 {"device":"Dev2","status":"error"}
 {"device":"Dev1","status":"ok"}'''
 mssparkutils.fs.put(inputPath + "data.txt", device_data, True)
 print("Source stream created...")

Ensure the message Source stream created… is printed. The code you just ran has created a streaming data source based on a folder to which some data has been saved, representing readings from hypothetical IoT devices

#### Run the following code, this code writes the streaming device data in delta format.

In [None]:
 # Write the stream to a delta table
 delta_stream_table_path = '/delta/iotdevicedata'
 checkpointpath = '/delta/checkpoint'
 deltastream = iotstream.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(delta_stream_table_path)
 print("Streaming to delta sink...")

#### Run the following code, This code reads the streamed data in delta format into a dataframe. Note that the code to load streaming data is no different to that used to load static data from a delta folder.

In [None]:
 # Read the data in delta format into a dataframe
 df = spark.read.format("delta").load(delta_stream_table_path)
 display(df)

#### Run the following code, This code creates a catalog table named **IotDeviceData** (in the **default** database) based on the delta folder. Again, this code is the same as would be used for non-streaming data.

In [None]:
 # create a catalog table based on the streaming sink
 spark.sql("CREATE TABLE IotDeviceData USING DELTA LOCATION '{0}'".format(delta_stream_table_path))

This code queries the IotDeviceData table, which contains the device data from the streaming source.

In [None]:
 %%sql

 SELECT * FROM IotDeviceData;

This code writes more hypothetical device data to the streaming source.

In [None]:
 # Add more data to the source stream
 more_data = '''{"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"error"}
 {"device":"Dev2","status":"error"}
 {"device":"Dev1","status":"ok"}'''

 mssparkutils.fs.put(inputPath + "more-data.txt", more_data, True)

This code queries the **IotDeviceData** table again, which should now include the additional data that was added to the streaming source

In [None]:
 %%sql

 SELECT * FROM IotDeviceData;

# Query a delta table from a serverless SQL pool
In addition to Spark pools, Azure Synapse Analytics includes a built-in serverless SQL pool. You can use the relational database engine in this pool to query delta tables using SQL.

1. In the **tempdata** tab, browse to the **tempdata/delta** folder.
2. Select the **products-delta** folder, and on the toolbar, in the **New SQL script** drop-down list, select **Select TOP 100 rows**.
3. In the **Select TOP 100 rows** pane, in the **File type** list, select **Delta format** and then select **Apply**.
4. Review the SQL code that is generated, which should look like this:

```
 -- This is auto-generated code
 SELECT
     TOP 100 *
 FROM
     OPENROWSET(
         BULK 'https://datalakexxxxxxx.dfs.core.windows.net/files/delta/products-delta/',
         FORMAT = 'DELTA'
     ) AS [result]
```
5. Use the ▷ Run icon to run the script, and review the results.

    This demonstrates how you can use a serverless SQL pool to query delta format files that were created using Spark, and use the results for reporting or analysis.
6. Replace the query with the following SQL code:
```
 USE AdventureWorks;

 SELECT * FROM Products;
```
7. Run the code and observe that you can also use the serverless SQL pool to query Delta Lake data in catalog tables that are defined the Spark metastore.