Synapse has Azure Open Datasets package pre-installed. You will use [NYC Yellow Taxi trip records](https://learn.microsoft.com/en-us/azure/open-datasets/dataset-taxi-yellow?tabs=azureml-opendatasets) data in this notebook.

In [6]:
// Load nyc green taxi trip records from azure open dataset
val blob_account_name = "azureopendatastorage"

val nyc_blob_container_name = "nyctlc"
val nyc_blob_relative_path = "yellow"
val nyc_blob_sas_token = ""
val nycYellowPath = "/poc/delatalake/nyc_yellow_taxi_trips"

val nyc_wasbs_path = f"wasbs://$nyc_blob_container_name@$blob_account_name.blob.core.windows.net/$nyc_blob_relative_path"
spark.conf.set(f"fs.azure.sas.$nyc_blob_container_name.$blob_account_name.blob.core.windows.net",nyc_blob_sas_token)

## Setup

In [36]:
%%sql
DELETE FROM delta.`/poc/ddl/person`;
DELETE FROM delta.`/poc/ddl/person_df_write`;

## Create

A delta table can be created in 2 ways

1. Write a dataframe to storage in Delta format or
2. Explicitly create a table definition in catalog and define the schema

### 1. Writte a dataframe to storage in Delta format

In [20]:
// Write dataframe to storage
{
    val nyc_tlc_df = spark.read.parquet(nyc_wasbs_path)
    nyc_tlc_df.write.partitionBy("puYear").format("delta").save(nycYellowPath)
}

### 2. Explicitly create table definition in catalog

#### 2.1 Use `saveAsTable`

Define the table within the metastore using the `saveAsTable` method or a `CREATE TABLE` statement.

In [13]:
// Create manage delta table
val nyc_tlc_df = spark.read.format("delta").load(nycYellowPath)
nyc_tlc_df.write.partitionBy("puYear").format("delta").saveAsTable("test.delta_nyc_yellow_taxi_trips")

#### 2.2 Use `CREATE TABLE`

%%sql

CREATE EXTERNAL TABLE IF NOT EXISTS person (
    id INT NOT NULL, 
    name STRING NOT NULL, 
    crt_ts TIMESTAMP NOT NULL, 
    crt_usr STRING NOT NULL, 
    UPD_ts TIMESTAMP, 
    UPD_usr STRING
) 
LOCATION '/poc/ddl/'
COMMENT 'Student external table'
TBLPROPERTIES ('createBy'='Notebook', 'createdOn'='2023-06-20', 'app'='Customer Satisfaction', 'dataZone'='refined', 'pipelineRunId'='UUID', 'notebook'='nb_ddl')

In [37]:
%%sql

CREATE TABLE IF NOT EXISTS delta.`/poc/ddl/person` (
    id INT NOT NULL, 
    name STRING NOT NULL, 
    crt_ts TIMESTAMP NOT NULL, 
    crt_usr STRING NOT NULL, 
    upd_ts TIMESTAMP, 
    upd_usr STRING
) 
USING DELTA
COMMENT 'Person unmanaged table'
TBLPROPERTIES ('type' = 'unmanaged', 'createBy'='Notebook', 'createdOn'='2023-06-20', 'app'='Customer Satisfaction', 'dataZone'='refined', 'pipelineRunId'='UUID', 'notebook'='nb_ddl')

In [22]:
import java.sql._
import scala.collection.mutable
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.types._
import io.delta.tables.DeltaTable

val TargetSchema = StructType(Seq(
    StructField("id", IntegerType, nullable = false),
    StructField("name", StringType, nullable = false),
    StructField("crt_ts", TimestampType, nullable = true),
    StructField("crt_usr", StringType, nullable = true),
    StructField("upd_ts", TimestampType, nullable = true),
    StructField("upd_usr", StringType, nullable = true)
  ))

def createDataframeFromCollection(schema: StructType, data: Seq[Row]) = {
    spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
}

def upsert(targetPath: String, sourceDf: DataFrame, sourceExcludeCols: List[String] = List(),
             mergeOnCols: List[String], partitionPruneCols: List[String] = List(),
             customInsertExpr: Map[String, String] = Map(), customUpdateExpr: Map[String, String] = Map()): Unit = {
    println(s"targetPath: $targetPath, sourceExcludeCols: $sourceExcludeCols, mergeOnCols: $mergeOnCols")
    val targetTable = DeltaTable.forPath(spark, targetPath)
    val targetDf = targetTable.toDF
    println(s"Target table cols: (${targetDf.columns.length})" + targetDf.columns.mkString(", "))
    println(s"Source Dataframe cols: (${sourceDf.columns.length})" + sourceDf.columns.mkString(", "))

    val mergeCols = sourceDf.columns.filter(col => !sourceExcludeCols.contains(col))

    val updateExpr = mergeCols.map(colName => (s"target.$colName", s"source.$colName")).toMap
    val finalUpdateExpr = updateExpr ++ customUpdateExpr

    val insertExpr: mutable.Map[String, String] = mutable.Map[String, String]() ++= finalUpdateExpr
    val finalInsertExpr = insertExpr ++ customInsertExpr

    println(s"updateExpr: $finalUpdateExpr")
    println(s"insertExpr: $finalInsertExpr")

    val mergeCondition = mergeOnCols.map(col => s"target.$col = source.$col").mkString(" AND ")
    println(s"mergeCondition: $mergeCondition")

    targetTable.as("target")
      .merge(sourceDf.as("source"), mergeCondition)
      .whenMatched
      .updateExpr(finalUpdateExpr)
      .whenNotMatched
      .insertExpr(finalInsertExpr)
      .execute()
}

## Load

### Initial load

In [38]:
val baseDf = createDataframeFromCollection(TargetSchema, Seq(
                    //id, name, crt_ts, crt_usr, upd_ts, upd_usr
                    Row(1, "Alice", Timestamp.valueOf("2023-01-01 00:00:00"), "user1", Timestamp.valueOf("2023-01-01 00:00:00"), "user1"),
                    Row(2, "Bob", Timestamp.valueOf("2023-01-01 00:00:00"), "user2", Timestamp.valueOf("2023-01-01 00:00:00"), "user2")
                ))

//Nothing to update
val updateMap: Map[String, String] = Map()

//Do not carry upd_ts, upd_usr from source
val excludeCols = List("upd_ts", "upd_usr")

//Modify crt_ts
val insertMap: Map[String, String] = Map(
    "target.crt_ts" -> s"${current_timestamp()}",
    "target.crt_usr" -> s"'${mssparkutils.env.getUserName()}'"
)

upsert("/poc/ddl/person", baseDf, sourceExcludeCols = excludeCols, mergeOnCols = List("id"), partitionPruneCols = List(), 
    customInsertExpr = insertMap, customUpdateExpr = updateMap)

println("Complete upsert")

In [39]:
%%sql
SELECT * FROM delta.`/poc/ddl/person`

In [40]:
%%sql
DESCRIBE TABLE EXTENDED delta.`/poc/ddl/person`

### Upsert

In [41]:
val SourceSchema = StructType(Seq(
      StructField("id", IntegerType, nullable = false),
      StructField("name", StringType, nullable = false),
      StructField("crt_ts", TimestampType, nullable = true),
      StructField("crt_usr", StringType, nullable = true)
))

val sourceDf = createDataframeFromCollection(SourceSchema, Seq(
      Row(1, "Alice-update", Timestamp.valueOf("2023-01-02 00:00:00"), "user1"), //update
      Row(3, "Charlie-insert", Timestamp.valueOf("2023-01-02 00:00:00"), "user2") //insert
    ))

//Modify the upd_ts and upd_usr for merged rows
val updateCols = Map(
    "target.upd_ts" -> s"${current_timestamp()}",
    "target.upd_usr" -> "'testUser'"
)

upsert("/poc/ddl/person", sourceDf,
    sourceExcludeCols = List(),
    mergeOnCols = List("id"), partitionPruneCols = List(), customUpdateExpr = updateCols)
  println("Complete upsert")

In [27]:
%%sql
SELECT * FROM delta.`/poc/ddl/person`

In [42]:
val df = spark.read.format("delta").load("/poc/ddl/person")
df.printSchema

## Read

1. Read directly from storage
2. Read from metastore defined table

### 1. Read directly from storage

In [10]:
// Read directly from storage
{
    val df = spark.read.format("delta").load(nycYellowPath).take(2)
    display(df)
}

In [5]:
%%sql
SELECT * from delta.`/poc/delatalake/nyc_yellow_taxi_trips` LIMIT 2

### 2. Read from metastore defined table

In [1]:
%%sql
SELECT * from test.delta_nyc_yellow_taxi_trips LIMIT 2

## Caveats

### Nullability handling 

Handling nulls depends on the method used to create the table. Using `CREATE TABLE` or `saveAsTable` table describes the schema in catalog hence hanoring the non-null columns. If the Dataframe is written to ADLS using `df.write` then all coumns are treated as `nullable = true`.

**Explanation**

Writing in parquet, the underlying format of delta lake, can't guarantee the nullability of the column.

Maybe you wrote a parquet that for sure it's not null, but the schema is never validated on write in parquet, and any could append some data with the same schema, but with nulls. **So spark will always put as nullable the columns, just to prevention.**

This behavior can be prevented using a catalog, that will validate that the dataframe follows the expected schema.

In [43]:
val testDf = createDataframeFromCollection(TargetSchema, Seq(
                    //id, name, crt_ts, crt_usr, upd_ts, upd_usr
                    Row(1, "Alice", Timestamp.valueOf("2023-01-01 00:00:00"), "user1", Timestamp.valueOf("2023-01-01 00:00:00"), "user1"),
                    Row(2, "Bob", Timestamp.valueOf("2023-01-01 00:00:00"), "user2", Timestamp.valueOf("2023-01-01 00:00:00"), "user2")
                ))
testDf.write.format("delta").save("/poc/ddl/person_df_write")

In [44]:
println("Schema when described in catalog using saveAsTable or CREATE TABLE:")
spark.read.format("delta").load("/poc/ddl/person").printSchema

println("Schema when df.write is used:")
spark.read.format("delta").load("/poc/ddl/person_df_write").printSchema

#### Write test

Create dataframe with `name` column as `null`

In [58]:
val NullableSchema = StructType(Seq(
    StructField("id", IntegerType, nullable = false),
    StructField("name", StringType, nullable = true), //Change name to nullable
    StructField("crt_ts", TimestampType, nullable = true),
    StructField("crt_usr", StringType, nullable = true),
    StructField("upd_ts", TimestampType, nullable = true),
    StructField("upd_usr", StringType, nullable = true)
  ))

val nullDf = createDataframeFromCollection(NullableSchema, Seq(
                    //id, name, crt_ts, crt_usr, upd_ts, upd_usr
                    Row(100, null, Timestamp.valueOf("2023-01-01 00:00:00"), "user1", null, null)
                ))

Save to `/poc/ddl/person_df_write` should succeed as the table wasn't decribed in catalog.

In [50]:
upsert("/poc/ddl/person_df_write", nullDf,
    sourceExcludeCols = List(),
    mergeOnCols = List("id"), partitionPruneCols = List(), customUpdateExpr = updateCols)

In [52]:
%%sql
SELECT * from delta.`/poc/ddl/person_df_write`

Save to `/poc/ddl/person` should FAIL as the table IS decribed in catalog.

In [59]:
upsert("/poc/ddl/person", nullDf,
    sourceExcludeCols = List(),
    mergeOnCols = List("id"), partitionPruneCols = List(), customUpdateExpr = updateCols)

In [60]:
%%sql
SELECT * from delta.`/poc/ddl/person`