## Working with Delta Tables in Fabric


#### Configuring OptimizeWrite Function for the spark session

In [None]:
# Disable Optimize Write at the Spark session level
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", False)

# Enable Optimize Write at the Spark session level
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", True)

print(spark.conf.get("spark.microsoft.delta.optimizeWrite.enabled"))


#### Setting V-Order functionality for the Spark Session

In [None]:
spark.conf.set('spark.sql.parquet.vorder.enabled', 'true')

print(spark.conf.get('spark.sql.parquet.vorder.enabled'))

#### Reading 2019.csv and loading it into a dataframe

In [None]:
df = spark.read.csv("Files/2019.csv")

#### Displaying 2019.csv


In [None]:
display(df)

#### Defining the schema for the dataframe

In [None]:
from pyspark.sql.types import *

orderSchema = StructType([
    StructField("SalesOrderNumber", StringType()),
    StructField("SalesOrderLineNumber", IntegerType()),
    StructField("OrderDate", DateType()),
    StructField("CustomerName", StringType()),
    StructField("Email", StringType()),
    StructField("Item", StringType()),
    StructField("Quantity", IntegerType()),
    StructField("UnitPrice", FloatType()),
    StructField("Tax", FloatType())
])

df = spark.read.format("csv").schema(orderSchema).load("Files/2019.csv")

display(df)

#### Creating a Managed Table from the Dataframe

In [None]:
 df.write.format("delta").saveAsTable("Sales_Managed")

#### Creating an External Table from the Dataframe

In [None]:
df.write.format("delta").saveAsTable("Sales_External", path="abfs_path/Sales_External")

#### Compare the External and Managed Tables

In [None]:
 %%sql
 DESCRIBE FORMATTED sales_managed;

In [None]:
 %%sql
 DESCRIBE FORMATTED sales_external;

#### DROP both the Tables to see the difference

In [None]:
 %%sql
 DROP TABLE sales_managed;
 DROP TABLE sales_external;

#### Use SQL to create a Table

In [None]:
 %%sql
 CREATE TABLE sales_external
 USING DELTA
 LOCATION 'Files/Sales_External';

In [None]:
%%sql
SELECT * FROM sales_external;

#### Exploring Table Versioning

In [None]:
%%sql
UPDATE sales_external
SET Quantity = 5
WHERE Item = 'Mountain-100 Silver, 44';


In [None]:
 %%sql
 DESCRIBE HISTORY sales_external;

#### Exploring the "VACUUM" Command

In [None]:
%%sql
VACUUM Sales_External

#### Partitioning Data by "Item" field and then creating an external "sales_partitioned" table 

In [None]:
# Partitioning Delta Table by “Category”
df.write.format("delta").partitionBy("Item").saveAsTable("sales_partitioned", path="abfs_path/partitioned_products")


#### Streaming Data with Delta Tables

In [None]:
from notebookutils import mssparkutils
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a folder
inputPath = 'Files/data/'
mssparkutils.fs.mkdirs(inputPath)

# Create a stream that reads data from the folder, using a JSON schema
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)

# Write some event data to the folder
device_data = '''{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}'''

mssparkutils.fs.put(inputPath + "data.txt", device_data, True)

print("Source stream created...")

In [None]:
 # Write the stream to a delta table
 delta_stream_table_path = 'Tables/iotdevicedata'
 checkpointpath = 'Files/delta/checkpoint'
 deltastream = iotstream.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(delta_stream_table_path)
 print("Streaming to delta sink...")

In [None]:
 %%sql
 SELECT * FROM IotDeviceData;


In [None]:
 %%sql
 SELECT COUNT(*) FROM IotDeviceData;


In [None]:
 # Add more data to the source stream
 more_data = '''{"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"error"}
 {"device":"Dev2","status":"error"}
 {"device":"Dev1","status":"ok"}'''

 mssparkutils.fs.put(inputPath + "more-data.txt", more_data, True)

In [None]:
%%sql
SELECT COUNT(*) FROM IotDeviceData;

In [None]:
 deltastream.stop()