In [0]:
file_path = "dbfs:/FileStore/HW4-1.json"

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import DeltaTable
import pandas as pd



In [0]:
# Initialize Spark Session with Delta Lake
spark = (SparkSession.builder
    .appName("DeltaLakeHomework")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.0.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate())

In [0]:
# Load the JSON file into a DataFrame
df = spark.read \
    .option("mode", "PERMISSIVE") \
    .option("multiline", "true") \
    .json(file_path)

In [0]:
df.show()

+---+---------------+--------+
|Age|     Department|    Name|
+---+---------------+--------+
| 34|        Manager|  Alice1|
| 30|        Analyst|  David1|
| 28|Senior Engineer|    Bob1|
| 29|         Intern|    Eve1|
| 45|             HR|  Cathy1|
| 31|        Manager|   John1|
| 32|Senior Engineer|   Emma1|
| 40|        Analyst|Michael1|
| 27|         Intern| Sophia1|
| 35|             HR| Daniel1|
+---+---------------+--------+



In [0]:
print("\nDataFrame Summary Statistics:")
df.describe().show()


DataFrame Summary Statistics:
+-------+-----------------+---------------+-------+
|summary|              Age|     Department|   Name|
+-------+-----------------+---------------+-------+
|  count|               10|             10|     10|
|   mean|             33.1|           null|   null|
| stddev|5.665686189686118|           null|   null|
|    min|               27|        Analyst| Alice1|
|    max|               45|Senior Engineer|Sophia1|
+-------+-----------------+---------------+-------+



In [0]:
# Additional Profiling using Pandas
pandas_df = df.toPandas()
print("\nPandas Profiling:")
print(pandas_df.info())
print("\nValue Counts for Department:")
print(pandas_df['Department'].value_counts())


Pandas Profiling:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 3 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   Age         10 non-null     int64 
 1   Department  10 non-null     object
 2   Name        10 non-null     object
dtypes: int64(1), object(2)
memory usage: 368.0+ bytes
None

Value Counts for Department:
Manager            2
Analyst            2
Senior Engineer    2
Intern             2
HR                 2
Name: Department, dtype: int64


In [0]:
# Try direct saving without saveAsTable
delta_path = "/FileStore/delta_tables/employee_table"

# Ensure the directory exists
dbutils.fs.mkdirs("/FileStore/delta_tables")

# Save Delta table
df.write \
    .format("delta") \
    .mode("overwrite") \
    .save(delta_path)

print(f"Delta table saved at: {delta_path}")

Delta table saved at: /FileStore/delta_tables/employee_table


In [0]:
# Import necessary libraries
from delta.tables import DeltaTable
from pyspark.sql.functions import *

# Function to update DataFrame
def update_dataframe(spark, delta_path):
    # Add a new record
    new_data = [{"Name": "Sarah1", "Age": 33, "Department": "Marketing"}]
    new_df = spark.createDataFrame(new_data)
    
    # Union with existing data
    full_df = df.union(new_df)
    
    # Save as Delta table
    full_df.write.format("delta").mode("overwrite").save(delta_path)

# Create DeltaTable object
delta_table = DeltaTable.forPath(spark, delta_path)

# Show initial version
print("Initial Version:")
delta_table.toDF().show()

# Update DataFrame to create a new version
update_dataframe(spark, delta_path)

# Refresh DeltaTable object
delta_table = DeltaTable.forPath(spark, delta_path)

# Show updated version
print("\nUpdated Version:")
delta_table.toDF().show()

# Display version history
print("\nTable Version History:")
delta_table.history().show()

# Get the current version number from the history
history = spark.sql(f"DESCRIBE HISTORY delta.`{delta_path}`")
current_version = history.select("version").collect()[0][0]

# Read the current version of the table
current_version_df = spark.read.format("delta").option("versionAsOf", current_version).load(delta_path)
print("Current Version Data:")
current_version_df.show()

# Read the prior version of the table
prior_version_df = spark.read.format("delta").option("versionAsOf", current_version - 1).load(delta_path)
print("Prior Version Data:")
prior_version_df.show()



Initial Version:
+---+---------------+--------+
|Age|     Department|    Name|
+---+---------------+--------+
| 34|        Manager|  Alice1|
| 30|        Analyst|  David1|
| 28|Senior Engineer|    Bob1|
| 29|         Intern|    Eve1|
| 45|             HR|  Cathy1|
| 31|        Manager|   John1|
| 32|Senior Engineer|   Emma1|
| 40|        Analyst|Michael1|
| 27|         Intern| Sophia1|
| 35|             HR| Daniel1|
+---+---------------+--------+


Updated Version:
+---+---------------+--------+
|Age|     Department|    Name|
+---+---------------+--------+
| 34|        Manager|  Alice1|
| 30|        Analyst|  David1|
| 28|Senior Engineer|    Bob1|
| 29|         Intern|    Eve1|
| 45|             HR|  Cathy1|
| 31|        Manager|   John1|
| 32|Senior Engineer|   Emma1|
| 40|        Analyst|Michael1|
| 27|         Intern| Sophia1|
| 35|             HR| Daniel1|
| 33|      Marketing|  Sarah1|
+---+---------------+--------+


Table Version History:
+-------+-------------------+-----------

In [0]:
# Show updated version
print("\nUpdated Version:")
delta_table.toDF().show()


Updated Version:
+---+---------------+--------+
|Age|     Department|    Name|
+---+---------------+--------+
| 34|        Manager|  Alice1|
| 30|        Analyst|  David1|
| 28|Senior Engineer|    Bob1|
| 29|         Intern|    Eve1|
| 45|             HR|  Cathy1|
| 31|        Manager|   John1|
| 32|Senior Engineer|   Emma1|
| 40|        Analyst|Michael1|
| 27|         Intern| Sophia1|
| 35|             HR| Daniel1|
| 33|      Marketing|  Sarah1|
+---+---------------+--------+



In [0]:
# Attempt to read change data using readChangeFeed
try:
    # Start from the most recent version
    recent_version_df = spark.read.format("delta") \
        .option("readChangeFeed", "true") \
        .option("startingVersion", 0)  # Provide the starting version number
    recent_version_df = recent_version_df.load(delta_path)
    
    # Show change data
    print("Change Data:")
    recent_version_df.show()
except Exception as e:
    print("\nError with readChangeFeed:", str(e))
    print("Note: Change feed needs to be explicitly enabled.")


Change Data:

Error with readChangeFeed: Error getting change data for range [0 , 14] as change data was not
recorded for version [0]. If you've enabled change data feed on this table,
use `DESCRIBE HISTORY` to see when it was first enabled.
Otherwise, to start recording change data, use `ALTER TABLE table_name SET TBLPROPERTIES
(delta.enableChangeDataFeed=true)`.
Note: Change feed needs to be explicitly enabled.


In [0]:
# CDF Not Enabled Initially:

# Change Data Feed (CDF) was not enabled from the very first version (version 0) of the Delta table. Therefore, attempting to read changes starting from version 0 will fail because no change data was recorded for that version.
# CDF Needs Explicit Enabling:

# CDF needs to be explicitly enabled on the Delta table using the ALTER TABLE command. If it was enabled at a later version, you can only read change data from that version onwards.

In [0]:
# Check Change Data Feed history
history = spark.sql(f"DESCRIBE HISTORY delta.`{delta_path}`")
history.show()


+-------+-------------------+---------------+--------------------+-----------------+--------------------+----+-----------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|         userId|            userName|        operation| operationParameters| job|         notebook|           clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+---------------+--------------------+-----------------+--------------------+----+-----------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|     14|2024-12-14 04:39:17|616905300709848|surbhisharma9099@...|            WRITE|{mode -> Overwrit...|null|{151653019208167}|1214-034253-tpnn7g8k|         13|WriteSerializable|        false|{numFiles -> 2, n...|        null|Databricks-Runtim...|
|   

In [0]:
# Q5 Enable CDC on the same table, do some changes, and read the change data between
# the last two versions? Change the configuration of the api to read between version 0 and any of the
# other versions? Reflect on why it worked/did not.

# Enable Change Data Feed
spark.sql(
    f"ALTER TABLE delta.`{delta_path}` SET TBLPROPERTIES (delta.enableChangeDataFeed = true)"
)

# Make additional changes (you can modify this to match your use case)
def make_more_changes(spark):
    more_data = [{"Name": "Tom1", "Age": 36, "Department": "Sales"}]
    more_df = spark.createDataFrame(more_data)
    more_df.write.format("delta").mode("append").save(delta_path)


# Create additional version
make_more_changes(spark)

# Check Change Data Feed history
history = spark.sql(f"DESCRIBE HISTORY delta.`{delta_path}`")
history.show()

# Read CDC between last two versions
print("\nChange Data between Versions:")
# Adjust startingVersion to the version when CDF was first enabled
changes = (
    spark.read.format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", 17)  # Adjust starting version based on history
    .option("endingVersion", 18)  # Adjust ending version based on history
    .load(delta_path)
)
changes.show()

+-------+-------------------+---------------+--------------------+-----------------+--------------------+----+-----------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|         userId|            userName|        operation| operationParameters| job|         notebook|           clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+---------------+--------------------+-----------------+--------------------+----+-----------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|     20|2024-12-14 04:51:37|616905300709848|surbhisharma9099@...|            WRITE|{mode -> Append, ...|null|{151653019208167}|1214-034253-tpnn7g8k|         19|WriteSerializable|         true|{numFiles -> 1, n...|        null|Databricks-Runtim...|
|   

In [0]:
# Read CDC between version 0 and another version
print("\nChange Data between Versions:")
changes = (spark.read
    .format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", 0)  # Starting from version 0
    .option("endingVersion", 10)   # Adjust ending version based on history
    .load(delta_path))
changes.show()


Change Data between Versions:


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-204473638056691>:9[0m
[1;32m      2[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;130;01m\n[39;00m[38;5;124mChange Data between Versions:[39m[38;5;124m"[39m)
[1;32m      3[0m changes [38;5;241m=[39m (spark[38;5;241m.[39mread
[1;32m      4[0m     [38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)
[1;32m      5[0m     [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mreadChangeFeed[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mtrue[39m[38;5;124m"[39m)
[1;32m      6[0m     [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mstartingVersion[39m[38;5;124m"[39m, [38;5;241m0[39m)  [38;5;66;03m# Starting from version 0[39;00m
[1;32m      7[0m     [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mendingVersion[39m[38;5;124m"[39m

In [0]:
# Explanation
# Reflection on Why It Worked/Did Not Work
# Why It Worked:

# If the Change Data Feed (CDF) was enabled from the very beginning (version 0), then reading changes from version 0 to any other version should work without issues. This is because all changes would have been recorded from the start.
# Why It Did Not Work:

# If the CDF was not enabled from the beginning, then attempting to read changes starting from version 0 will fail. This is because the changes were not recorded for the versions before CDF was enabled. In the provided history, CDF was enabled at version 9, so trying to read changes from version 0 will result in an error.

