# Delta Lake features

Use `spark.read.csv()` to load the data from the source public blob storage account and display its schema and shape.

In [None]:
from pyspark.sql.types import *
import numpy as np
import pandas as pd
from delta.tables import DeltaTable

manualSchema = StructType([
  StructField("CustomerId", StringType(), True),
  StructField("ProductId", StringType(), True),
  StructField("Rating", LongType(), True),
  StructField("Cost", FloatType(), True),
  StructField("Size", FloatType(), True),
  StructField("Price", FloatType(), True),
  StructField("PrimaryBrandId", LongType(), True),
  StructField("GenderId", LongType(), True),
  StructField("MaritalStatus", LongType(), True),
  StructField("LowerIncomeBound", FloatType(), True),
  StructField("UpperIncomeBound", FloatType(), True)
])

url = "wasbs://files@synapsemlpublic.blob.core.windows.net/PersonalizedData.csv"
raw_data = spark.read.csv(url, header=True, schema=manualSchema)
print("Schema: ")
raw_data.printSchema()

df = raw_data.toPandas()
print("Shape: ", df.shape)

Save the customer rating dataframe as a Delta Lake table.

In [None]:
delta_table_path = 'abfss://delta@#DATA_LAKE_ACCOUNT_NAME#.dfs.core.windows.net/customer-rating'

raw_data.write.format('delta').save(delta_table_path)

mssparkutils.fs.ls(delta_table_path)

Explore the layout of files and inspect the Delta log file.

In [None]:
delta_log_path = mssparkutils.fs.ls(f'{delta_table_path}/_delta_log')[0].path
print(delta_log_path)
mssparkutils.fs.head(delta_log_path)

Load the Delta lake table into a Spark dataframe.

In [None]:
data = spark.read.format('delta').load(delta_table_path)
data.show()

Use the dedicated `DeltaTable` class to manage the Delta Lake table. Explore the Delta Lake table history.

In [None]:
# Get all versions
delta_table = DeltaTable.forPath(spark, delta_table_path)
display(delta_table.history())

Perform an update on the Delta Lake table using a SQL-style condition.

In [None]:
# Declare the predicate by using a SQL-formatted string.
delta_table.update(
  condition = "Price < 1500",
  set = { "Price": "Price * 1.05" }
)


Check again the history of the Delta Lake table and notice the new entry corresponding to the update that has just been performed.

In [None]:
display(delta_table.history())

It's possible to query previous snapshots of your Delta Lake table by using a feature called Time Travel. If you want to access the data that you overwrote, you can query a snapshot of the table before you overwrote the first set of data using the versionAsOf option.

In [None]:
display(spark.read.format("delta").option("versionAsOf", "0").load(delta_table_path))

In [None]:
display(spark.read.format("delta").option("versionAsOf", "1").load(delta_table_path))


Create the metadata to expose the Delta Lake table in the default Spark database.

In [None]:
spark.sql("CREATE TABLE CustomerRating USING DELTA LOCATION '{0}'".format(delta_table_path))

List all tables that exist in the default Spark database.

In [None]:
spark.sql("SHOW TABLES").show()

Explore the properties of the `CustomerRating` Spark database table.

In [None]:
spark.sql("DESCRIBE EXTENDED customerrating").show(truncate=False)

To query the delta table from the serverless SQL pool, navigate to the `Develop` hub in Synapse Studio and create a new SQL script. Make sure `Built-in` is selected for the `Connect to` option and `default` is selected for the `Use database` option.

Enter the query below and make sure you replace `<your_data_lake_account_name>` with the name of the Data Lake account with the one from your lab environment.


```sql
SELECT TOP 10 *
FROM OPENROWSET(
    BULK 'abfss://delta@<your_data_lake_account_name>.dfs.core.windows.net/customer-rating/',
    FORMAT = 'delta') as rows
```

![Query Delta Lake with serverless SQL pool](https://solliancepublicdata.blob.core.windows.net/synapse-l400/notebook-images/query-delta-table.png)

This concludes the Delta Lake section of this notebook.

To learn more about Delta Lake support in Syanspe Spark, take a look at the [Work with Delta Lake](https://docs.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-delta-lake-overview?pivots=programming-language-python) section in the Azure Synapse Analytics documentation.

# Mount storage account containers

The `mssparkutils` utility can be used to mount storage account containers. In the example below, you will use an already created linked service to manage the authentication with the storage account.

In [60]:
mssparkutils.fs.mount( 
    "abfss://delta@#DATA_LAKE_ACCOUNT_NAME#.dfs.core.windows.net", 
    "/test", 
    {"linkedService":"#DATA_LAKE_ACCOUNT_NAME#"} 
) 

StatementMeta(SparkPool02, 7, 6, Finished, Available)

True

Explore the content of the mounted volume using the local path. Note the `synfs:/{jobId}` prefix used by `mssparkutils` for the local path.

In [61]:
jobId = mssparkutils.env.getJobId() 

log_files = mssparkutils.fs.ls(f'synfs:/{jobId}/test/customer-rating/_delta_log')
log_files

StatementMeta(SparkPool02, 7, 7, Finished, Available)

[FileInfo(path=synfs:/7/test/customer-rating/_delta_log/00000000000000000000.json, name=00000000000000000000.json, size=2299),
 FileInfo(path=synfs:/7/test/customer-rating/_delta_log/00000000000000000001.json, name=00000000000000000001.json, size=1466)]

When using regular PySpark classes, the syntax of the prefix is slightly different - `/synfs/{jobId}`. Use this prefix to load and display the first 500 characters from the first Delta Lake log file.

In [62]:
with open(f'/synfs/{jobId}/test/customer-rating/_delta_log/{log_files[0].name}', 'r') as f:
    f.read()[:500]

StatementMeta(SparkPool02, 7, 8, Finished, Available)

'{"commitInfo":{"timestamp":1644308193124,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"135572","numOutputRows":"5000"}}}\n{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}\n{"metaData":{"id":"ae14ef8d-1182-44af-9a85-76f991697c24","format":{"provider":"parquet","options":{}},"schemaString":"{\\"type\\":\\"struct\\",\\"fields\\":[{\\"name\\":\\"CustomerId\\",\\"type\\":\\"string\\",\\"nullable\\"'