## Delta Lake Batch Operations

> ### Append
> ### UPSERT

In [0]:
inputPath = "/mnt/training/online_retail/data-001/data.csv"
dataPath = "dbfs:/tmp/customer-data"

In [0]:
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType

inputSchema = StructType([
  StructField("InvoiceNo", IntegerType(), True),
  StructField("StockCode", StringType(), True),
  StructField("Description", StringType(), True),
  StructField("Quantity", IntegerType(), True),
  StructField("InvoiceDate", StringType(), True),
  StructField("UnitPrice", DoubleType(), True),
  StructField("CustomerID", IntegerType(), True),
  StructField("Country", StringType(), True)
])

rawDataDF = (spark.read
  .option("header", "true")
  .schema(inputSchema)
  .csv(inputPath)
)

# write to Delta Lake
rawDataDF.write.mode("overwrite").format("delta").partitionBy("Country").save(dataPath)

In [0]:
#registering the delta lake table
spark.sql("""
  CREATE TABLE customer_data_delta
  USING DELTA
  LOCATION '{}'
""".format(dataPath))

In [0]:
%sql
select count(*) from customer_data_delta --No of Records: 65499

### APPEND Using Delta Lake

Adding to our existing Delta Lake is as easy as modifying our write statement and specifying the `append` mode. 

Here we save to our previously created Delta Lake at `tmp/customer-data/`.

In [0]:
#create a new dataframe with the records needed to be appended

miniDataInputPath = "/mnt/training/online_retail/outdoor-products/outdoor-products-mini.csv"

newDataDF = (spark
  .read
  .option("header", "true")
  .schema(inputSchema)
  .csv(miniDataInputPath)
)

In [0]:
newDataDF.count() #No of records: 36

In [0]:
newDataDF.write.format("delta").mode("append").partitionBy("country").save(dataPath)

Check the number of records appended

In [0]:
%sql
select count(*) from customer_data_delta --No of Records: 65535

Changes in the delta lake file immediately reflects in the delta table as well

### Key Takeaways
With Delta Lake, you can easily append new data without schema-on-read issues.

Changes to Delta Lake files will immediately be reflected in registered Delta tables.

## Delta Lake Batch Operations - Upsert

To UPSERT means to "UPdate" and "inSERT". In other words, UPSERT is literally TWO operations. It is not supported in traditional data lakes, as running an UPDATE could invalidate data that is accessed by the subsequent INSERT operation.

Using Delta Lake, however, we can do UPSERTS. Delta Lake combines these operations to guarantee atomicity to
- INSERT a row 
- if the row already exists, UPDATE the row.

### Scenario
We have a small amount of batch data to write to your Delta table. This is currently staged in a JSON in a mounted blob store.

In [0]:
upsertDF = spark.read.format("json").load("/mnt/training/enb/commonfiles/upsert-data.json")
display(upsertDF)

We'll register this as a temporary view so that this table doesn't persist in DBFS (but we can still use SQL to query it).

In [0]:
upsertDF.createOrReplaceTempView("upsert_data")

Included in this data are:
- Some new orders for customer 20993
- An update to a previous order correcting the country for customer 20993 to Iceland
- Corrections to some records for StockCode 22837 where the Description was incorrect

We can use UPSERT to simultaneously INSERT our new data and UPDATE our previous records.

In [0]:
%sql

SELECT COUNT(*) FROM customer_data_delta WHERE customerID=20993 --result: 1

In [0]:
%sql
SELECT Country FROM customer_data_delta WHERE customerID = 20993 --result: "Sierra Leone"

In [0]:
%sql
SELECT DISTINCT Description FROM customer_data_delta WHERE StockCode = 22837 --NULL

In [0]:
%sql
MERGE INTO customer_data_delta --Delta table
USING upsert_data  --temp view
ON customer_data_delta.InvoiceNo = upsert_data.InvoiceNo     --keys
  AND customer_data_delta.StockCode = upsert_data.StockCode
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED
  THEN INSERT *

In [0]:
%sql

SELECT COUNT(*) FROM customer_data_delta WHERE customerID=20993 --result: 11

In [0]:
%sql
SELECT DISTINCT Country FROM customer_data_delta WHERE customerID = 20993 --result: "Iceland"

In [0]:
%sql
SELECT DISTINCT Description FROM customer_data_delta WHERE StockCode = 22837 --"HOT WATER BOTTLE BABUSHKA"