# Databricks Delta Batch Operations - Upsert

Databricks&reg; Delta allows you to read, write and query data in data lakes in an efficient manner.

## Datasets Used
We will use online retail datasets from
* `/mnt/training/online_retail` in the demo part and
* `/mnt/training/structured-streaming/events/` in the exercises

### Getting Started

Run the following cell to configure our "classroom."

In [3]:
%run ./Includes/Classroom-Setup

Set up relevant paths.

In [5]:
deltaMiniDataPath = userhome + "/delta/customer-data-mini/"
genericMiniDataPath = userhome + "/generic/customer-data-mini/"
miniDataInputPath = "/mnt/training/online_retail/outdoor-products/outdoor-products-mini.csv"

## UPSERT 

Literally means "UPdate" and "inSERT". It means to atomically either insert a row, or, if the row already exists, UPDATE the row.

Alter data by changing the values in one of the columns for a specific `CustomerID`.

Let's load the CSV file `../outdoor-products-mini.csv`.

In [7]:
# First, let's load some new data that we want to save to our delta table
miniDataDF = (spark       
  .read                                              # Call the read method returning a DataFrame
  .option("inferSchema","true")                      # Infer schema
  .option("header","true")                           # File has a header
  .csv(miniDataInputPath)                            # Path to file
)

## UPSERT Using Non-Databricks Delta Pipeline

This feature is not supported in non-Delta pipelines.

To UPSERT means to "UPdate" and "inSERT". In other words, UPSERT is not an atomic operation. It is literally TWO operations. 

Running an UPDATE could invalidate data that is accessed by the subsequent INSERT operation.

## UPSERT Using Databricks Delta Pipeline

Using Databricks Delta, however, we can do UPSERTS.

In [10]:
(miniDataDF
  .write
  .mode("overwrite")
  .format("delta")
  .save(deltaMiniDataPath) 
)

spark.sql("""
    DROP TABLE IF EXISTS customer_data_delta_mini
  """)
spark.sql("""
    CREATE TABLE customer_data_delta_mini
    USING DELTA 
    LOCATION '{}' 
  """.format(deltaMiniDataPath))

List all rows with `CustomerID=20993`.

In [12]:
%sql
SELECT * FROM customer_data_delta_mini WHERE CustomerID=20993

-sandbox
Form a new DataFrame where `StockCode` is `99999` for `CustomerID=20993`.

Create a table `customer_data_delta_to_upsert` that contains this data.

<img alt="Hint" title="Hint" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.3em" src="https://files.training.databricks.com/static/images/icon-light-bulb.svg"/>&nbsp;**Hint:** You need to convert `InvoiceNo` to a `String` because Delta infers types and `InvoiceNo` looks like it should be an integer.

In [14]:
from pyspark.sql.functions import lit, col
customerSpecificDF = (miniDataDF
  .filter("CustomerID=20993")
  .withColumn("StockCode", lit(99999))
  .withColumn("InvoiceNo", col('InvoiceNo').cast("String")) 
 )

spark.sql("DROP TABLE IF EXISTS customer_data_delta_to_upsert")
customerSpecificDF.write.saveAsTable("customer_data_delta_to_upsert")

Upsert the new data into `customer_data_delta_mini`.

Upsert is done using the `MERGE INTO` syntax.

In [16]:
%sql
MERGE INTO customer_data_delta_mini
USING customer_data_delta_to_upsert
ON customer_data_delta_mini.CustomerID = customer_data_delta_to_upsert.CustomerID
WHEN MATCHED THEN
  UPDATE SET
    customer_data_delta_mini.StockCode = customer_data_delta_to_upsert.StockCode
WHEN NOT MATCHED
  THEN INSERT (InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country)
  VALUES (
    customer_data_delta_to_upsert.InvoiceNo,
    customer_data_delta_to_upsert.StockCode, 
    customer_data_delta_to_upsert.Description, 
    customer_data_delta_to_upsert.Quantity, 
    customer_data_delta_to_upsert.InvoiceDate, 
    customer_data_delta_to_upsert.UnitPrice, 
    customer_data_delta_to_upsert.CustomerID, 
    customer_data_delta_to_upsert.Country)

Notice how this data is seamlessly incorporated into `customer_data_delta_mini`.

In [18]:
%sql
SELECT * FROM customer_data_delta_mini WHERE CustomerID=20993

## Exercise 1

Create a DataFrame out of the table `demo_iot_data_delta`.

In [20]:
# TODO
newDataDF =  spark.sql("FILL_IN")

In [21]:
# TEST  - Run this cell to test your solution.
from pyspark.sql.types import StructField, StructType, StringType, LongType, DateType, IntegerType

expectedSchema = StructType([
   StructField("action",StringType(), True),
   StructField("time",LongType(), True),
   StructField("date",DateType(), True),
   StructField("deviceId",IntegerType(), True),
])

dbTest("Delta-04-schemas", set(expectedSchema), set(newDataDF.schema))

print("Tests passed!")

-sandbox
## Exercise 2

Create another dataframe where you change`action` to `Close` for `date = '2018-06-01' ` and `deviceId = 485`.

<img alt="Hint" title="Hint" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.3em" src="https://files.training.databricks.com/static/images/icon-light-bulb.svg"/>&nbsp;**Hint:** Use `distinct`.

<img alt="Hint" title="Hint" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.3em" src="https://files.training.databricks.com/static/images/icon-light-bulb.svg"/>&nbsp;**Hint:** Consider using `selectExpr()`, as we did in [Lesson 3]($./03-Append).

In [23]:
# TODO
from pyspark.sql.types import LongType

newDeviceId485DF =  (newDataDF
 .selectExpr(FILL_IN)
 .FILL_IN
)

In [24]:
# TEST - Run this cell to test your solution.
actionCount = newDeviceId485DF.select("Action").count()

dbTest("Delta-L4-actionCount", 1, actionCount)

print("Tests passed!")

-sandbox
## Exercise 3

Write to a new Databricks Delta table that contains just our data to be upserted.

<img alt="Hint" title="Hint" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.3em" src="https://files.training.databricks.com/static/images/icon-light-bulb.svg"/>&nbsp;**Hint:** You can adapt the SQL syntax for the upsert from our demo example, above.

In [26]:
# TODO
spark.sql("FILL_IN")
newDeviceId485DF.write.saveAsTable("FILL_IN")

In [27]:
# TEST - Run this cell to test your solution.
try:
  tableExists = (spark.table("iot_data_delta_to_upsert") is not None)
  count = spark.table("iot_data_delta_to_upsert").count()
except:
  tableExists = False
  
dbTest("Delta-04-demoIotTableExists", True, tableExists)  
dbTest("Delta-04-demoIotTableHasRow", 1, count)  


print("Tests passed!")

In [28]:
%sql
--TODO
MERGE INTO demo_iot_data_delta
USING iot_data_delta_to_upsert
FILL_IN

In [29]:
spark.sql("SELECT * FROM demo_iot_data_delta").count()

## Exercise 4

Count the number of items in `demo_iot_data_delta` where the `deviceId` is `485` and `action` is `Close`.

In [31]:
# TODO
count = spark.sql("FILL IN").collect()[0][0]

In [32]:
# TEST - Run this cell to test your solution.
dbTest("Delta-L4-demoiot-count", 17, count)t

print("Tests passed!")

## Summary
In this Lesson, we used Databricks Delta to UPSERT data into existing Databricks Delta tables.

## Review Questions

**Q:** What does it mean to UPSERT?<br>
**A:** To UPSERT is to either INSERT a row, or if the row already exists, UPDATE the row.

**Q:** What happens if you try to UPSERT in a parquet-based data set?<br>
**A:** That's not possible due to the schema-on-read paradigm, you will get an error until you refresh the table.

**Q:** What is schema-on-read?<br>
**A:** It stems from Hive and roughly means: the schema for a data set is unknown until you perform a read operation.

**Q:** How to you perform UPSERT in a Databricks Delta dataset?<br>
**A:** Using the `MERGE INTO my-table USING data-to-upsert`.

**Q:** What is the caveat to `USING data-to-upsert`?<br>
**A:** Your source data has ALL the data you want to replace: in other words, you create a new dataframe that has the source data you want to replace in the Databricks Delta table.

## Additional Topics & Resources

* <a href="https://docs.azuredatabricks.net/delta/delta-batch.html" target="_blank">Table Batch Read and Writes</a>

## Next Steps

Start the next lesson, [Streaming]($./05-Streaming).