## Create a Delta file 

Provide the format as delta or parquet while creating the file.

Below, we are providing the file path for loading the CSV file to delta

In [1]:
inputPath = 'abfss://delta2021@storagebricks.dfs.core.windows.net/inputfile'

StatementMeta(SparkPool1, 13, 1, Finished, Available)

Reading the data into the dataframe. Providing the schema details and header as true for this CSV file.
Read the data into a DataFrame. We supply the schema.

In [2]:
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType

inputSchema = StructType([
  StructField("InvoiceNo", IntegerType(), True),
  StructField("StockCode", StringType(), True),
  StructField("Description", StringType(), True),
  StructField("Quantity", IntegerType(), True),
  StructField("InvoiceDate", StringType(), True),
  StructField("UnitPrice", DoubleType(), True),
  StructField("CustomerID", IntegerType(), True),
  StructField("Country", StringType(), True)
])

rawDataDF = (spark.read
  .option("header", "false")
  .schema(inputSchema)
  .csv(inputPath)
            )

StatementMeta(SparkPool1, 13, 2, Finished, Available)

In [3]:
DataPath = 'abfss://delta2021@storagebricks.dfs.core.windows.net/deltafile'

StatementMeta(SparkPool1, 13, 3, Finished, Available)

Provide the appropriate partition - below example partition on `Country` 

In [4]:
# write to Delta Lake
rawDataDF.write.mode("overwrite").format("delta").partitionBy("Country").save(DataPath)

StatementMeta(SparkPool1, 13, 4, Finished, Available)

Spark SQL queries can run directly on a directory of data, for delta use the following syntax: 
```
SELECT * FROM delta.`/path/to/delta_directory`

In [5]:
display(spark.sql("SELECT * FROM delta.`{}` LIMIT 5".format(DataPath)))

StatementMeta(SparkPool1, 13, 5, Finished, Available)

SynapseWidget(Synapse.DataFrame, 81da5af4-0a36-41da-ac3a-58c3b1bddbd1)

### CREATE A Table Using Delta Lake

Create a table called `customer_data_delta` using `DELTA` out of the above data.

The notation is:
> `CREATE TABLE <table-name>` <br>
  `USING DELTA` <br>
  `LOCATION <path-do-data> ` <br>
  
Note: Tables created with a specified `LOCATION` are considered unmanaged by the metastore. Unlike a managed table, where no path is specified, an unmanaged table’s files are not deleted when you `DROP` the table. However, changes to either the registered table or the files will be reflected in both locations.
Since Delta Lake stores schema (and partition) info in the `_delta_log` directory, we do not have to specify partition columns!

In [6]:
%%sql
drop table customer_data_delta

StatementMeta(SparkPool1, 13, 6, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

In [None]:
spark.sql("""
  DROP TABLE IF EXISTS customer_data_delta
""")
spark.sql("""
  CREATE TABLE customer_data_delta
  USING DELTA
  LOCATION '{}'
""".format(DataPath))

In [None]:
%%sql
SELECT count(*) FROM customer_data_delta

Get the metadata details of the table

In [None]:
%%sql
DESCRIBE DETAIL customer_data_delta


## Delta Lake Batch Operations - AppendDelta Lake Batch Operations - Append

In [None]:
miniDataInputPath = 'abfss://delta2021@storagebricks.dfs.core.windows.net/smallfile/new_data.csv'
newDataDF = (spark
  .read
  .option("header", "true")
  .schema(inputSchema)
  .csv(miniDataInputPath)
)

In [None]:
display(newDataDF)

In [None]:
newDataDF.count()

In [None]:
(newDataDF
  .write
  .format("delta")
  .partitionBy("Country")
  .mode("append")
  .save(DataPath)
)

In [None]:
%%sql
SELECT count(*) FROM customer_data_delta

##  Upsert into delta lake

Can't upsert in parquet files.
Using Delta Lake, we can do UPSERTS. Delta Lake combines these operations to guarantee atomicity to
- INSERT a row 
- if the row already exists, UPDATE the row.

In [None]:
upsertPath = 'abfss://delta2021@storagebricks.dfs.core.windows.net/upsertfile'

upsertDF = spark.read.format("json").load(upsertPath)
display(upsertDF)

In [None]:
upsertDF.createOrReplaceTempView("upsert_data")

In this upsert
- Adding new records for customer 20993
- Updating 'Country' column for customer 20993 to Iceland
- Updates to 'Description' column for StockCode 22837 

In [None]:
%%sql
MERGE INTO customer_data_delta
USING upsert_data
ON customer_data_delta.InvoiceNo = upsert_data.InvoiceNo
  AND customer_data_delta.StockCode = upsert_data.StockCode
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED
  THEN INSERT *

In [None]:
%%sql
SELECT * FROM customer_data_delta WHERE CustomerID=20993

In [None]:
%%sql
SELECT DISTINCT(Description) 
FROM customer_data_delta 
WHERE StockCode = 22837