In [0]:
# 1) Define schema and sample DataFrame (reuse this for multiple examples)
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
from datetime import datetime

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("firstName", StringType(), True),
    StructField("middleName", StringType(), True),
    StructField("lastName", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("birthDate", TimestampType(), True),
    StructField("ssn", StringType(), True),
    StructField("salary", IntegerType(), True)
])

data = [
    (1, "John", "", "Doe", "M", datetime(1990,5,14), "111-22-3333", 60000),
    (2, "Mary", "A", "Smith", "F", datetime(1985,8,20), "222-33-4444", 75000)
]

df = spark.createDataFrame(data, schema)

df.show()


+---+---------+----------+--------+------+-------------------+-----------+------+
| id|firstName|middleName|lastName|gender|          birthDate|        ssn|salary|
+---+---------+----------+--------+------+-------------------+-----------+------+
|  1|     John|          |     Doe|     M|1990-05-14 00:00:00|111-22-3333| 60000|
|  2|     Mary|         A|   Smith|     F|1985-08-20 00:00:00|222-33-4444| 75000|
+---+---------+----------+--------+------+-------------------+-----------+------+



### Creating Delta Table — Method 1: `saveAsTable`
This writes a managed Delta table into the `default` database named `people_method1`.

In [0]:
# Method 1: saveAsTable (managed table)
df.write.format('delta').mode('overwrite').saveAsTable('default.people_method1')
print('Created default.people_method1')
spark.sql("SELECT * FROM default.people_method1").show()


Created default.people_method1
+---+---------+----------+--------+------+-------------------+-----------+------+
| id|firstName|middleName|lastName|gender|          birthDate|        ssn|salary|
+---+---------+----------+--------+------+-------------------+-----------+------+
|  2|     Mary|         A|   Smith|     F|1985-08-20 00:00:00|222-33-4444| 75000|
|  1|     John|          |     Doe|     M|1990-05-14 00:00:00|111-22-3333| 60000|
+---+---------+----------+--------+------+-------------------+-----------+------+



### Creating Delta Table — Method 2: `writeTo(...).createOrReplace()`
This API is available in Spark 3.3+ with table support. It creates or replaces the table in the metastore.

In [0]:
# Method 2: writeTo
try:
    df.writeTo('default.people_method2').createOrReplace()
    print('Created or replaced default.people_method2')
    spark.sql('SELECT * FROM default.people_method2').show()
except Exception as e:
    print('writeTo API may not be supported in this runtime:', e)
    # fallback: saveAsTable
    df.write.format('delta').mode('overwrite').saveAsTable('default.people_method2')
    spark.sql('SELECT * FROM default.people_method2').show()


Created or replaced default.people_method2
+---+---------+----------+--------+------+-------------------+-----------+------+
| id|firstName|middleName|lastName|gender|          birthDate|        ssn|salary|
+---+---------+----------+--------+------+-------------------+-----------+------+
|  2|     Mary|         A|   Smith|     F|1985-08-20 00:00:00|222-33-4444| 75000|
|  1|     John|          |     Doe|     M|1990-05-14 00:00:00|111-22-3333| 60000|
+---+---------+----------+--------+------+-------------------+-----------+------+



### Creating Delta Table — Method 3: Path-based Delta (external table)
Write Delta files to a path and optionally register a table that points to that path.

In [0]:
# Method 3: path-based
path = '/tmp/delta/people_method3'
# write to path
(df.write.format('delta')
   .mode('overwrite')
   .save(path))
# Register as table (external)
spark.sql(f"CREATE TABLE IF NOT EXISTS default.people_method3 USING DELTA LOCATION '{path}'")
spark.sql('SELECT * FROM default.people_method3').show()


+---+---------+----------+--------+------+-------------------+-----------+------+
| id|firstName|middleName|lastName|gender|          birthDate|        ssn|salary|
+---+---------+----------+--------+------+-------------------+-----------+------+
|  2|     Mary|         A|   Smith|     F|1985-08-20 00:00:00|222-33-4444| 75000|
|  1|     John|          |     Doe|     M|1990-05-14 00:00:00|111-22-3333| 60000|
+---+---------+----------+--------+------+-------------------+-----------+------+



## Merge & Upsert (SCD Type 1 example)
We demonstrate MERGE to update existing rows and insert new rows (UPSERT).

In [0]:
from delta.tables import DeltaTable

# prepare updates DataFrame
updates = [
    (2, 'Mary', 'A', 'Johnson', 'F', datetime(1985,8,20), '222-33-4444', 80000),  # update
    (3, 'James', '', 'Brown', 'M', datetime(1992,1,10), '333-44-5555', 65000)        # insert
]

updates_df = spark.createDataFrame(updates, schema)

# Merge into people_method1
deltaTable = DeltaTable.forName(spark, 'default.people_method1')
(deltaTable.alias('t')
 .merge(updates_df.alias('u'), 't.id = u.id')
 .whenMatchedUpdateAll()
 .whenNotMatchedInsertAll()
 .execute())

spark.sql('SELECT * FROM default.people_method1 ORDER BY id').show()


+---+---------+----------+--------+------+-------------------+-----------+------+
| id|firstName|middleName|lastName|gender|          birthDate|        ssn|salary|
+---+---------+----------+--------+------+-------------------+-----------+------+
|  1|     John|          |     Doe|     M|1990-05-14 00:00:00|111-22-3333| 60000|
|  2|     Mary|         A| Johnson|     F|1985-08-20 00:00:00|222-33-4444| 80000|
|  3|    James|          |   Brown|     M|1992-01-10 00:00:00|333-44-5555| 65000|
+---+---------+----------+--------+------+-------------------+-----------+------+



## Internals of Delta Table
- Data files are stored as Parquet under the table location.
- Transaction log is stored in `_delta_log/` as JSON and checkpoint Parquet files.
- Delta keeps a commit history; you can inspect it with `deltaTable.history()` or `DESCRIBE HISTORY table`.

Example commands below show how to inspect details and history.


In [0]:
# Inspect location and detail
print('Describe detail:')
spark.sql("DESCRIBE DETAIL default.people_method1").show(truncate=False)

# Use DeltaTable API to show history
dt = DeltaTable.forName(spark, 'default.people_method1')
try:
    dt.history().show(truncate=False)
except Exception as e:
    print('history() may not be available in this runtime or requires permissions:', e)


Describe detail:
+------+------------------------------------+------------------------------------+-----------+----------------------------------------+-----------------------+-------------------+----------------+-----------------+--------+-----------+-------------------------------------+----------------+----------------+-----------------------------------------+---------------------------------------------------------------+-------------+
|format|id                                  |name                                |description|location                                |createdAt              |lastModified       |partitionColumns|clusteringColumns|numFiles|sizeInBytes|properties                           |minReaderVersion|minWriterVersion|tableFeatures                            |statistics                                                     |clusterByAuto|
+------+------------------------------------+------------------------------------+-----------+---------------------------------

## Optimize Delta Table
`OPTIMIZE` and `ZORDER` are Databricks-specific commands (available on Databricks runtime) to compact files and colocate data for faster reads.

Use `VACUUM` to remove old files after retention period.


In [0]:
# OPTIMIZE and VACUUM (Databricks runtime)
try:
    spark.sql('OPTIMIZE default.people_method1 ZORDER BY (id)')
    print('OPTIMIZE executed (Databricks only)')
except Exception as e:
    print('OPTIMIZE may not be available in this runtime:', e)

# VACUUM (requires that retention period be lower than default or set spark.conf)
try:
    spark.sql('VACUUM default.people_method1 RETAIN 168 HOURS')
    print('VACUUM executed (Databricks only)')
except Exception as e:
    print('VACUUM may not be available or permission denied:', e)


OPTIMIZE executed (Databricks only)
VACUUM executed (Databricks only)


## Show tables 

In [0]:
spark.sql('SHOW TABLES IN default').show()
spark.sql("DESCRIBE DETAIL default.people_method1").show(truncate=False)
spark.sql("SELECT * FROM default.people_method1 LIMIT 20").show()


+--------+--------------+-----------+
|database|     tableName|isTemporary|
+--------+--------------+-----------+
| default|        export|      false|
| default|          loan|      false|
| default|people_method1|      false|
| default|people_method2|      false|
| default|people_method3|      false|
+--------+--------------+-----------+

+------+------------------------------------+------------------------------------+-----------+----------------------------------------+-----------------------+-------------------+----------------+-----------------+--------+-----------+-------------------------------------+----------------+----------------+-----------------------------------------+---------------------------------------------------------------+-------------+
|format|id                                  |name                                |description|location                                |createdAt              |lastModified       |partitionColumns|clusteringColumns|numFiles|sizeIn