In [0]:
# Delta Lake is pre-installed in Databricks
from delta.tables import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

#Create sample employee data

In [0]:

data = [
    (1, "John Doe", "Engineering", 75000, "2023-01-15"),
    (2, "Jane Smith", "Marketing", 65000, "2023-02-20"),
    (3, "Bob Johnson", "Engineering", 80000, "2023-01-10"),
    (4, "Alice Brown", "HR", 60000, "2023-03-01"),
    (5, "Charlie Wilson", "Finance", 70000, "2023-02-15")
]

#Define Schema

In [0]:
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), False),
    StructField("department", StringType(), False),
    StructField("salary", IntegerType(), False),
    StructField("hire_date", StringType(), False)
])

#Create a dataframe

In [0]:
df = spark.createDataFrame(data, schema)
df.show()

+---+--------------+-----------+------+----------+
| id|          name| department|salary| hire_date|
+---+--------------+-----------+------+----------+
|  1|      John Doe|Engineering| 75000|2023-01-15|
|  2|    Jane Smith|  Marketing| 65000|2023-02-20|
|  3|   Bob Johnson|Engineering| 80000|2023-01-10|
|  4|   Alice Brown|         HR| 60000|2023-03-01|
|  5|Charlie Wilson|    Finance| 70000|2023-02-15|
+---+--------------+-----------+------+----------+



#Write Data as Delta Table

In [0]:
# delta_table_path = "/FileStore/delta-table/employees"

#Write as managed table (no path needed)
df.write.format("delta").mode("overwrite").saveAsTable("employees_delta")

print("Delta table created successfully!")

#Read data from Delta Table (Managed table example)

In [0]:
# For managed table:
df_read = spark.read.table("employees_delta")
df_read.show()

#Read Delta table (SQL example)

In [0]:
%sql
SELECT * FROM employees_delta;

id,name,department,salary,hire_date
1,John Doe,Engineering,75000,2023-01-15
2,Jane Smith,Marketing,65000,2023-02-20
3,Bob Johnson,Engineering,80000,2023-01-10
4,Alice Brown,HR,60000,2023-03-01
5,Charlie Wilson,Finance,70000,2023-02-15
6,David Lee,Engineering,85000,2023-04-01
7,Sarah Connor,Marketing,67000,2023-04-15


#Insert records into Delta Table

In [0]:
new_data = [
    (6, "David Lee", "Engineering", 85000, "2023-04-01"),
    (7, "Sarah Connor", "Marketing", 67000, "2023-04-15")
]

new_df = spark.createDataFrame(new_data, schema)

In [0]:
new_df.write.format("delta").mode("append").saveAsTable("employees_delta")

print("New records inserted!")

#Time Travel & Versioning

In [0]:
%sql
DESCRIBE HISTORY employees_delta

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
1,2025-08-19T04:52:37.000Z,74411854265083,findashishagarwal@yahoo.com,WRITE,"Map(mode -> Append, statsOnLoad -> true, partitionBy -> [])",,List(4155338705883615),0819-042924-96o6c2sp-v2n,0.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 2, numOutputBytes -> 1497)",,Databricks-Runtime/17.1.x-photon-scala2.13
0,2025-08-19T04:47:52.000Z,74411854265083,findashishagarwal@yahoo.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,List(4155338705883615),0819-042924-96o6c2sp-v2n,,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 0, numRemovedBytes -> 0, numOutputRows -> 5, numOutputBytes -> 1629)",,Databricks-Runtime/17.1.x-photon-scala2.13


In [0]:
%sql
Select * from employees_delta version as of 0;

id,name,department,salary,hire_date
1,John Doe,Engineering,75000,2023-01-15
2,Jane Smith,Marketing,65000,2023-02-20
3,Bob Johnson,Engineering,80000,2023-01-10
4,Alice Brown,HR,60000,2023-03-01
5,Charlie Wilson,Finance,70000,2023-02-15


In [0]:
%sql
Select * from employees_delta timestamp as of '2025-08-19T04:48:52';

id,name,department,salary,hire_date
1,John Doe,Engineering,75000,2023-01-15
2,Jane Smith,Marketing,65000,2023-02-20
3,Bob Johnson,Engineering,80000,2023-01-10
4,Alice Brown,HR,60000,2023-03-01
5,Charlie Wilson,Finance,70000,2023-02-15


#Schema Evolution Example

In [0]:
# Add new column
evolved_data = [
    (8, "Emma Stone", "Design", 72000, "2023-06-01", "emma.stone@company.com")
]

evolved_schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), False),
    StructField("department", StringType(), False),
    StructField("salary", IntegerType(), False),
    StructField("hire_date", StringType(), False),
    StructField("email", StringType(), True)  # New column
])

evolved_df = spark.createDataFrame(evolved_data, evolved_schema)

In [0]:
# Write with schema merge option
evolved_df.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable("employees_delta")

print("Schema evolved - new column added!")
# Read the evolved table
evolved_df_read = spark.read.table("employees_delta")
evolved_df_read.show()

In [0]:
%sql
SELECT * from employees_delta;

id,name,department,salary,hire_date,email
8,Emma Stone,Design,72000,2023-06-01,emma.stone@company.com
1,John Doe,Engineering,75000,2023-01-15,
2,Jane Smith,Marketing,65000,2023-02-20,
3,Bob Johnson,Engineering,80000,2023-01-10,
4,Alice Brown,HR,60000,2023-03-01,
5,Charlie Wilson,Finance,70000,2023-02-15,
6,David Lee,Engineering,85000,2023-04-01,
7,Sarah Connor,Marketing,67000,2023-04-15,
