# What's new in Delta Lake Release 3.3.0?

In [None]:
!pip install delta-spark==3.3.0 pandas

In [None]:
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages io.delta:delta-spark_2.12:3.3.0,io.delta:delta-iceberg_2.12:3.3.0 pyspark-shell'

## Initialize SparkSession

In [None]:
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.master("local").appName("What's New in Delta Lake 3.3.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# #1 - Support for Declaring Identity Columns

Identity columns are a type of generated column that assigns unique values to each record inserted into a table. Delta Lake 3.3.0, adds support for leveraging the Python or Scala `DeltaTableBuilder` class to generate unique keys for new rows.

Identity columns come in two flavors: `generatedAlwaysAs` and `generatedByDefaultAs`

- `generatedAlwaysAs` - automatically generates unique values for inserted records
- `generatedByDefaultAs` - provides flexibility for the writer to specify unique values for the inserted record. If no value is specified, then a new, unique value is generated 

Let’s take a look at an example together.

In [None]:
from delta.tables import DeltaTable, IdentityGenerator

# Identity columns only support LONG data type
DeltaTable.createOrReplace(spark) \
  .tableName("Customer") \
  .addColumn("c_custkey", "LONG", generatedByDefaultAs=IdentityGenerator(start=1, step=1)) \
  .addColumn("c_name", "STRING") \
  .addColumn("c_address", "STRING") \
  .addColumn("c_nationkey", "LONG") \
  .addColumn("c_phone", "STRING") \
  .addColumn("c_acctbal", "DOUBLE") \
  .addColumn("c_mktsegment", "STRING") \
  .addColumn("c_comment", "STRING") \
.execute()

In [None]:
# Let's add a few rows
spark.sql("""
INSERT INTO Customer (c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment)
VALUES
   ('ACME, Inc.', '123 Fake Street', 23, '12-475-733-1633', 523.10, 'MACHINERY', 'run about the final theodolites. blithely unusual dolphins are furio'),
   ('Just Parts, LLC', '123 Pines Lane', 23, '11-461-373-7563', 1220.45, 'AUTOMOBILE', 'ges doze according to the carefully pending'),
   ('Wing Nuts & More, Inc.', '123 Fake Avenue', 23, '32-136-289-9352', 6324.46, 'BUILDING', 'jole slyly unusual deposits. furiously ironic requests cajole blithely'),
   ('Build Stuff, Inc.', '123 Fake Court', 23, '17-263-616-2325', 9296.71, 'BUILDING', 'leep carefully slyly express sentiments. slyly pending instructions wake above the carefully'),
   ('Munchin Machines', '123 Meadow Lane', 23, '24-339-805-7967', 8362.10, 'MACHINERY', 'quickly. carefully regular requests cajole above the special, pending pa')
""")

In [None]:
# With `generatedByDefaultAs`, I can choose to provide a value for the id col
spark.sql("""
INSERT INTO Customer (c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment)
VALUES
   (1206, 'Just Wheels, LLC', '123 Fake ST', 23, '22-353-733-1783', 7683.10, 'AUTOMOBILE', 'about carefully slyly express sentiments. blithely unusual')
""")


In [None]:
# Take a peek!
display(spark.table("Customer").toPandas())

# #2 - Faster Table Vacuums using VACUUM LITE
This release introduces 2 new vacuum types: `FULL` and `LITE`.

- `VACUUM FULL` (default) - will recursively list all the table files and subdirectories for a table’s root path
- `VACUUM LITE` - calculate eligible files to be deleted by looking at the earliest commit and latest commit outside of the retention window (`RETAIN N HOURS` clause) in the Delta transaction log

`VACUUM LITE` doesn’t replace `VACUUM FULL` entirely. Keep in mind, Delta transaction log files are deleted automatically after each log checkpoint operation (see `delta.logRetentionDuration`), so there still the possibility that some old table files fall through the cracks.

In [None]:
# Disabling this to remove all the table history (do NOT do this in production!)
# _Note_: we just created this table so there's not much to clean up!
# Try disabling retention check and setting to the retention window to 0
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", False)
spark.conf.set("delta.deletedFileRetentionDuration", "interval 0 hours")

In [None]:
# Delta Lake 3.3.0 introduces vacuum types `LITE` and `FULL`
spark.sql("""
VACUUM Customer
   LITE
   RETAIN 240 HOURS   
   DRY RUN
""").show()

# #3 - Ehanced Support for Row Tracking

Delta Lake 3.3.0 adds support for backfilling an existing Delta table with unique row identifiers.

You can enable row tracking by setting the table property `delta.enableRowTracking`. 

For example:

```ALTER TABLE my_table SET TBLPROPERTIES ('delta.enableRowTracking' = 'true');```

Row tracking adds two new **metadata** fields to a Delta table, when enabled:

- `_metadata.row_id` - a unique identifier of the row
- `_metadata.row_commit_version` - table version at which the row was last inserted or updated

By default, these metadata fields are hidden but can be read using the `_metadata` column.

This release adds support for altering existing Delta tables, enabling row-tracking. Behind the scenes, a backfill process is triggered which will populate existing table rows with unique row IDs.

In [None]:
spark.sql("""
ALTER TABLE Customer
SET TBLPROPERTIES (
   'delta.enableRowTracking' = 'true'
);
""")

In [None]:
spark.sql("""
SELECT 
   _metadata.row_id, 
   _metadata.row_commit_version,
   * 
FROM
   Customer
ORDER BY _metadata.row_id ASC;
""").show()

Now that row tracking is enabled on our Delta table, let's merge a new record and update an existing record to generate some table history. 

In [None]:
# Create a few new customers
new_customers = [
    # An update to `c_name`
    (5, 'Machine Parts', '123 Meadow Lane', 23, '24-339-805-7967', 8362.10, 'MACHINERY', 'quickly. carefully regular requests cajole above the special, pending pa'),
    (1207, 'Sockets & Rockets', '4456 Swim Lane', 23, '14-292-888-5532', 342.45, 'MACHINERY', 'slyly unusual deposits. furiously ironic requ, pending special things'),
]
df = spark.createDataFrame(data=new_customers, schema="c_custkey LONG, c_name STRING, c_address STRING, c_nationkey LONG, c_phone STRING, c_acctbal DOUBLE, c_mktsegment STRING, c_comment STRING")
df.createOrReplaceTempView("new_customers_vw")

# MERGE new customers into the existing Delta table
spark.sql("""
MERGE INTO Customer t
USING new_customers_vw s
ON t.c_custkey = s.c_custkey
WHEN MATCHED
   THEN UPDATE SET
      c_name = s.c_name,
      c_address = s.c_address,
      c_nationkey = s.c_nationkey,
      c_phone = s.c_phone,
      c_acctbal = s.c_acctbal,
      c_mktsegment = s.c_mktsegment,
      c_comment = s.c_comment
WHEN NOT MATCHED
   THEN INSERT (
      c_custkey,
      c_name,
      c_address,
      c_nationkey,
      c_phone,
      c_acctbal,
      c_mktsegment,
      c_comment
   ) VALUES (
      s.c_custkey,
      s.c_name,
      s.c_address,
      s.c_nationkey,
      s.c_phone,
      s.c_acctbal,
      s.c_mktsegment,
      s.c_comment
   )
""")

spark.table("Customer").show()

## Tracking row-level lineage

When row-tracking is enabled, users can identify rows across multiple versions of the table using Delta's time travel feature.

In [None]:
display(spark.sql("""
(SELECT '5' AS `version`, *
  FROM Customer
VERSION AS OF 5
WHERE _metadata.row_id = 4)
UNION ALL
(SELECT '6' as `version`, *
  FROM Customer
VERSION AS OF 6
WHERE _metadata.row_id = 4)
""").toPandas())

# #4 - Support for Fully Re-clusterting a Liquid Table

This release adds support for enabling liquid clustering on an existing Delta table.

Previously, liquid clustering could only be enabled upon table creation.

Let's look at how easy it is to enable liquid clustering on our existing, unpartitioned Delta table. 

In [None]:
spark.sql("""
ALTER TABLE Customer
CLUSTER BY (c_mktsegment)
""")

This release adds support for fully re-clustering a Delta table by new columns using an `OPTIMIZE FULL` command. 

`OPTIMIZE FULL` will optimize all records in a Delta table that uses liquid clustering, including data that might have been previously clustered by other columns.

In [None]:
display(spark.sql("""
OPTIMIZE Customer FULL
""").toPandas())

# #5 - Enable UniForm Iceberg In-place
Previously, enabling UniForm Iceberg on an existing Delta table meant that the table's data files would need to be rewritten along with an Iceberg metadata layer. 

This release supports enabling UniForm Iceberg on a Delta table in place, meaning that the data layer doesn't need to be rewritten. 

Using an `ALTER TABLE` statement, you can enable UniForm Iceberg on an existing Delta table.

In [None]:
# Create the TPC-H Region table
DeltaTable.createOrReplace(spark) \
  .tableName("Region") \
  .addColumn("r_regionkey", "LONG") \
  .addColumn("r_name", "STRING") \
  .addColumn("r_comment", "STRING") \
.execute()

spark.sql("""
INSERT INTO Region
VALUES (23, 'EUROPE', 'hs use ironic, even requests. s'),
       (24, 'ASIA', 'uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl'),
       (25, 'AFRICA', 'lar deposits. blithely final packages cajole. regular waters are final requests. regular accounts are according to '),
       (26, 'MIDDLE EAST', 'ges. thinly even pinto beans cages. thinly even pinto beans ca'),
       (27, 'AMERICA', 'lithely final packages cajole. regular waters are final requests. regular ')
""")

display(spark.table("Region").toPandas())

In [None]:
# UniForm requires column mapping to be enabled on the Delta table
spark.sql("""
ALTER TABLE Region
SET TBLPROPERTIES(
  'delta.columnMapping.mode' = 'name'
);
""")

## Important Note

You will need a **Hive Metastore** to execute the following command.


In [None]:
spark.sql("""
ALTER TABLE Region
SET TBLPROPERTIES(
  'delta.enableIcebergCompatV2' = 'true',
  'delta.universalFormat.enabledFormats' = 'iceberg'
);
""")