d-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px; height: 163px">
</div>

# Databricks Delta Batch Operations - Upsert

Databricks&reg; Delta allows you to read, write and query data in data lakes in an efficient manner.

## In this lesson you:
* Use Databricks Delta to UPSERT data into existing Databricks Delta tables

## Audience
* Primary Audience: Data Engineers 
* Secondary Audience: Data Analysts and Data Scientists

## Prerequisites
* Web browser: **Chrome**
* A cluster configured with **8 cores** and **DBR 6.2**
* Suggested Courses from <a href="https://academy.databricks.com/" target="_blank">Databricks Academy</a>:
  - ETL Part 1
  - Spark-SQL

## Datasets Used
We will use online retail datasets from
* `/mnt/training/online_retail` in the demo part and
* `/mnt/training/structured-streaming/events/` in the exercises

## ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Classroom-Setup

For each lesson to execute correctly, please make sure to run the **`Classroom-Setup`** cell at the<br/>
start of each lesson (see the next cell) and the **`Classroom-Cleanup`** cell at the end of each lesson.

In [4]:
%run "./Includes/Classroom-Setup"

<iframe  
src="//fast.wistia.net/embed/iframe/lofgyqo0bu?videoFoam=true"
style="border:1px solid #1cb1c2;"
allowtransparency="true" scrolling="no" class="wistia_embed"
name="wistia_embed" allowfullscreen mozallowfullscreen webkitallowfullscreen
oallowfullscreen msallowfullscreen width="640" height="360" ></iframe>
<div>
<a target="_blank" href="https://fast.wistia.net/embed/iframe/lofgyqo0bu?seo=false">
  <img alt="Opens in new tab" src="https://files.training.databricks.com/static/images/external-link-icon-16x16.png"/>&nbsp;Watch full-screen.</a>
</div>

Set up relevant paths.

In [7]:
deltaMiniDataPath = workingDir + "/customer-data-mini"

## UPSERT 

Literally means "UPdate" and "inSERT". It means to atomically either insert a row, or, if the row already exists, UPDATE the row.

It is also called **MERGE INTO**, which is what the Databricks Delta operation is called.  

Alter the data by changing the values in one of the columns for a specific `CustomerID`.

Let's load the CSV file `/mnt/training/online_retail/outdoor-products/outdoor-products-mini.csv`.

In [9]:
miniDataInputPath = "/mnt/training/online_retail/outdoor-products/outdoor-products-mini.csv"
inputSchema = "InvoiceNo STRING, StockCode STRING, Description STRING, Quantity INT, InvoiceDate STRING, UnitPrice DOUBLE, CustomerID INT, Country STRING"

miniDataDF = (spark.read          
  .option("header", "true")
  .schema(inputSchema)
  .csv(miniDataInputPath)                            
)

## UPSERT Using Non-Databricks Delta Pipeline

This feature is not supported in non-Delta pipelines.

To UPSERT means to "UPdate" and "inSERT". In other words, UPSERT is not an atomic operation. It is literally TWO operations. 

Running an UPDATE could invalidate data that is accessed by the subsequent INSERT operation.

-sandbox
## UPSERT Using Databricks Delta Pipeline

Using Databricks Delta, however, we can do UPSERTS.

<img alt="Side Note" title="Side Note" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://files.training.databricks.com/static/images/icon-note.webp"/> In this Lesson, we will explicitly create tables as SQL notation works better with UPSERT.

In [12]:
(miniDataDF
  .write
  .mode("overwrite")
  .format("delta")
  .save(deltaMiniDataPath) 
)

spark.sql("""
    CREATE TABLE IF NOT EXISTS {}.customer_data_delta_mini
    USING DELTA 
    LOCATION '{}' 
  """.format(databaseName, deltaMiniDataPath))

List all rows with `CustomerID=20993`.

In [14]:
sqlCmd = "SELECT * FROM {}.customer_data_delta_mini WHERE CustomerID=20993".format(databaseName)
display(spark.sql(sqlCmd))

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536371,32129,EverGlow Single,228,1/1/18 9:01,33.85,20993,Sierra Leone


Form a new DataFrame where `StockCode` is `99999` for `CustomerID=20993`.

Create a table `customer_data_delta_to_upsert` that contains this data.

In [16]:
from pyspark.sql.functions import lit, col
customerSpecificDF = (miniDataDF
  .filter("CustomerID=20993")
  .withColumn("StockCode", lit(99999))
 )

spark.sql("DROP TABLE IF EXISTS {}.customer_data_delta_to_upsert".format(databaseName))
customerSpecificDF.write.saveAsTable("{}.customer_data_delta_to_upsert".format(databaseName))

Upsert the new data into `customer_data_delta_mini`.

Upsert is done using the `MERGE INTO` syntax.

In [18]:
spark.sql("USE {}".format(databaseName))

sqlCmd = """
  MERGE INTO customer_data_delta_mini
  USING customer_data_delta_to_upsert
  ON customer_data_delta_mini.CustomerID = customer_data_delta_to_upsert.CustomerID
  WHEN MATCHED THEN
    UPDATE SET
      customer_data_delta_mini.StockCode = customer_data_delta_to_upsert.StockCode
  WHEN NOT MATCHED
    THEN INSERT (InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country)
    VALUES (
      customer_data_delta_to_upsert.InvoiceNo,
      customer_data_delta_to_upsert.StockCode, 
      customer_data_delta_to_upsert.Description, 
      customer_data_delta_to_upsert.Quantity, 
      customer_data_delta_to_upsert.InvoiceDate, 
      customer_data_delta_to_upsert.UnitPrice, 
      customer_data_delta_to_upsert.CustomerID, 
      customer_data_delta_to_upsert.Country)"""
spark.sql(sqlCmd)

Notice how this data is seamlessly incorporated into `customer_data_delta_mini`.

In [20]:
sqlCmd = "SELECT * FROM {}.customer_data_delta_mini WHERE CustomerID=20993".format(databaseName)
display(spark.sql(sqlCmd))

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536371,99999,EverGlow Single,228,1/1/18 9:01,33.85,20993,Sierra Leone


# LAB

## Step 1

Write base data to `deltaIotPath`.

We do this for you, so just run the cell below.

In [23]:
from pyspark.sql.functions import expr, col, from_unixtime, to_date
jsonSchema = "action string, time long"
streamingEventPath = "/mnt/training/structured-streaming/events/"
deltaIotPath = workingDir + "/iot-pipeline"

(spark.read 
  .schema(jsonSchema)
  .json(streamingEventPath) 
  .withColumn("date", to_date(from_unixtime(col("time").cast("Long"),"yyyy-MM-dd")))
  .withColumn("deviceId", expr("cast(rand(5) * 100 as int)"))
  .repartition(200)
  .write
  .mode("overwrite")
  .format("delta")
  .partitionBy("date")
  .save(deltaIotPath)
)

## Step 2

Create a DataFrame out of the the data sitting in `deltaIotPath`.

In [25]:
# ANSWER
deltaIotPath = workingDir + "/iot-pipeline"

newDataDF = spark.sql("SELECT * FROM delta.`{}` ".format(deltaIotPath))

In [26]:
# TEST  - Run this cell to test your solution.
schema = str(newDataDF.schema)

dbTest("assert-1", True, "action,StringType" in schema)
dbTest("assert-2", True, "time,LongType" in schema)
dbTest("assert-3", True, "date,DateType" in schema)
dbTest("assert-4", True, "deviceId,IntegerType" in schema)

print("Tests passed!")

-sandbox
## Step 3

Create another DataFrame `newDeviceIdDF`
* Pick up the 1st row you see that has `action` set to `Open`.
  - <img alt="Hint" title="Hint" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.3em" src="https://files.training.databricks.com/static/images/icon-light-bulb.svg"/>&nbsp;**Hint:** Use the `limit(1)` method.
* Change `action` to `Close`.
  - <img alt="Hint" title="Hint" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.3em" src="https://files.training.databricks.com/static/images/icon-light-bulb.svg"/>&nbsp;**Hint:** Use the `lit()` function.
* We will use the associated `deviceId` in the cells that follow.
* The DataFrame you construct should only have 1 row.

In [28]:
# ANSWER
from pyspark.sql.functions import col, lit

devId = (newDataDF
  .select("deviceId")
  .filter(col("action") == "Open")
  .limit(1)
  .first()[0])
  
newDeviceIdDF = (newDataDF
  .filter(col("deviceId") == devId)
  .withColumn("action", lit("Close")) 
  .limit(1)) 

In [29]:
# TEST - Run this cell to test your solution.
actionCount = newDeviceIdDF.filter(col("Action") == "Close").count()

dbTest("Delta-L4-actionCount", 1, actionCount)

print("Tests passed!")

## Step 4

Write to a new Databricks Delta table named `iot_data_delta_to_upsert` that contains just our data to be upserted.

In [31]:
# ANSWER
spark.sql("DROP TABLE IF EXISTS {}.iot_data_delta_to_upsert".format(databaseName))
newDeviceIdDF.write.saveAsTable("{}.iot_data_delta_to_upsert".format(databaseName))

In [32]:
# TEST - Run this cell to test your solution.
count = spark.table("{}.iot_data_delta_to_upsert".format(databaseName)).count()

dbTest("Delta-04-demoIotTableHasRow", True, count > 0)  
  
print("Tests passed!")

## Step 5

Create a Databricks Delta table named `demo_iot_data_delta` that contains just the data from `deltaIotPath`.

In [34]:
# ANSWER
sqlCmd = """
  CREATE TABLE IF NOT EXISTS {}.demo_iot_data_delta
  USING DELTA
  LOCATION '{}'""".format(databaseName, deltaIotPath)

spark.sql(sqlCmd)

In [35]:
# TEST - Run this cell to test your solution.
try:
  tableExists = (spark.table("{}.demo_iot_data_delta".format(databaseName)).count() > 0)
except:
  tableExists = False
  
dbTest("Delta-04-demoTableExists", True, tableExists)  

print("Tests passed!")

## Step 6

Insert the data `iot_data_delta_to_upsert` into `demo_iot_data_delta`.

You can adapt the SQL syntax for the upsert from our demo example, above.

In [37]:
# ANSWER
spark.sql("USE {}".format(databaseName))

sqlCmd = """
  MERGE INTO demo_iot_data_delta
  USING iot_data_delta_to_upsert
  ON demo_iot_data_delta.deviceId = iot_data_delta_to_upsert.deviceId
  WHEN MATCHED THEN
    UPDATE SET
      demo_iot_data_delta.action = iot_data_delta_to_upsert.action
  WHEN NOT MATCHED
    THEN INSERT (action, time, date, deviceId)
    VALUES (
      iot_data_delta_to_upsert.action, 
      iot_data_delta_to_upsert.time, 
      iot_data_delta_to_upsert.date, 
      iot_data_delta_to_upsert.deviceId 
  )"""

spark.sql(sqlCmd)

In [38]:
# TEST - Run this cell to test your solution.
devId = newDeviceIdDF.select("deviceId").first()[0]

sqlCmd1 = "SELECT count(*) as total FROM {}.demo_iot_data_delta WHERE deviceId = {} AND action = 'Open' ".format(databaseName, devId)
countOpen = spark.sql(sqlCmd1).first()[0]

sqlCmd2 = "SELECT count(*) as total FROM {}.demo_iot_data_delta WHERE deviceId = {} AND action = 'Close' ".format(databaseName, devId)
countClose = spark.sql(sqlCmd2).first()[0]

dbTest("Delta-L4-count", True, countOpen == 0 and countClose > 0)

## Step 7

Count the number of items in `demo_iot_data_delta` where 
* `deviceId` is obtained from this query `newDeviceIdDF.select("deviceId").first()[0]` .
* `action` is `Close`.

In [40]:
# ANSWER
sqlCmd = "SELECT count(*) as total FROM {}.demo_iot_data_delta WHERE deviceId = {} AND action = 'Close' ".format(databaseName, devId)
count = spark.sql(sqlCmd).first()[0]

In [41]:
# TEST - Run this cell to test your solution.
dbTest("Delta-L4-demoiot-count", True, count > 0)

print("Tests passed!")

## ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Classroom-Cleanup<br>

Run the **`Classroom-Cleanup`** cell below to remove any artifacts created by this lesson.

In [43]:
%run "./Includes/Classroom-Cleanup"

## Summary
In this Lesson we:
* Learned that is not possible to do UPSERTS in the traditional pre-Databricks Delta lake.
  - UPSERT is essentially two operations in one step 
  - UPdate and inSERT
* `MERGE INTO` is the SQL expression we use to do UPSERTs.
* Used Databricks Delta to UPSERT data into existing Databricks Delta tables.
* Ended up creating tables explicitly because it is easier to work with SQL syntax.

## Review Questions

**Q:** What does it mean to UPSERT?<br>
**A:** To UPSERT is to either INSERT a row, or if the row already exists, UPDATE the row.

**Q:** What happens if you try to UPSERT in a parquet-based data set?<br>
**A:** That's not possible due to the schema-on-read paradigm, you will get an error until you repair the table.

**Q:** How to you perform UPSERT in a Databricks Delta dataset?<br>
**A:** Using the `MERGE INTO my-table USING data-to-upsert`.

**Q:** What is the caveat to `USING data-to-upsert`?<br>
**A:** Your source data has ALL the data you want to replace: in other words, you create a new dataframe that has the source data you want to replace in the Databricks Delta table.

## Additional Topics & Resources

* <a href="https://docs.databricks.com/delta/delta-batch.html#" target="_blank">Table Batch Read and Writes</a>

## Next Steps

Start the next lesson, [Streaming]($./Delta 05 - Streaming).

-sandbox
&copy; 2020 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>