In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession, Row
from datetime import datetime
from delta.tables import DeltaTable
import uuid
import os

In [0]:
audit_path = "abfss://gold@rmpyru.dfs.core.windows.net/Audit"
target_path = "abfss://gold@rmpyru.dfs.core.windows.net/zomato"
metadata_schema = StructType([
    StructField("batch_id", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("source_path", StringType(), True),
    StructField("record_count", LongType(), True),
    StructField("delta_table_version", LongType(), True),
    StructField("status", StringType(), True),
    StructField("rollback_flag", StringType(), True)  # 'Y' or 'N'
])

In [0]:
# Function to safely get Delta version
def get_delta_version(path):
    try:
        delta_table = DeltaTable.forPath(spark, path)
        history_df = delta_table.history()
        if history_df.count() > 0:
            return history_df.head(1)[0]['version']
        else:
            print("Delta table exists but has no version history.")
            return None
    except Exception as e:
        print(f"Delta table not found at {path}. Initializing...")
        return None

# Check and initialize if needed
current_version = get_delta_version(target_path)

In [0]:
def rollback_batch(batch_id: str, rollback_version: int):
    """
    Rolls back the target Delta table to the specified version for the given batch_id.
    Logs the rollback event in the audit table.
    """
    # Load audit table and get metadata for the batch
    audit_df = spark.read.format("delta").load(audit_path)
    rollback_info = (
        audit_df.filter(col("batch_id") == batch_id)
                .orderBy(col("timestamp").desc())
                .limit(1)
                .collect()[0]
    )

    source_path = rollback_info["source_path"]
    record_count = rollback_info["record_count"]

    # Read the target table as of rollback_version
    restored_df = (
        spark.read.format("delta")
        .option("versionAsOf", rollback_version)
        .load(target_path)
    )

    # Overwrite the current table with restored version
    restored_df.write.format("delta").mode("overwrite").save(target_path)

    # Get current version after overwrite
    #current_version = get_delta_version(target_path)

    # Prepare rollback metadata
    timestamp = datetime.now()
    updated_metadata_df = spark.createDataFrame([
        (
            batch_id,
            timestamp,
            source_path,
            record_count,
            rollback_version,
            "ROLLEDBACK",
            "Y"
        )
    ], schema=metadata_schema)

    # Log rollback event
    updated_metadata_df.write.format("delta").mode("append").save(audit_path)

    print(f"✅ Rollback completed for batch {batch_id} to version {rollback_version}")

In [0]:
def check_and_trigger_rollback():
    """
    Checks if the latest FAILED batch has a newer delta_table_version than the last SUCCESS batch.
    If true, triggers rollback using rollback_batch(batch_id, rollback_version).
    """
    # Load audit table
    audit_df = spark.read.format("delta").load(audit_path)

    # Get latest FAILED batch
    failed_df = audit_df.filter(col("status") == "FAILED").orderBy(col("timestamp").desc())
    failed_batch = failed_df.limit(1).collect()

    # Get latest SUCCESS batch
    success_df = audit_df.filter(col("status") == "SUCCESS").orderBy(col("timestamp").desc())
    success_batch = success_df.limit(1).collect()

    if failed_batch and success_batch:
        failed_version = failed_batch[0]["delta_table_version"]
        success_version = success_batch[0]["delta_table_version"]
        failed_batch_id = failed_batch[0]["batch_id"]

        if failed_version > success_version:
            print(f"⚠️ Triggering rollback: FAILED version {failed_version} > SUCCESS version {success_version}")
            rollback_batch(failed_batch_id, success_version)
        else:
            print(f"✅ No rollback needed: FAILED version {failed_version} ≤ SUCCESS version {success_version}")
    else:
        print("ℹ️ No rollback triggered — missing FAILED or SUCCESS batch in audit table.")

In [0]:
def log_failure_to_audit(
    batch_id: str
):
    current_version = get_delta_version(target_path)

    # Load the audit Delta table
    delta_table = DeltaTable.forPath(spark, audit_path)

    # Update status to SUCCESS for the given batch_id
    delta_table.update(
        condition = "batch_id = '{}'".format(batch_id),
        set = { "status": "'FAILED'", "rollback_flag": "'N'", "delta_table_version": f"{current_version}" }
    )
    check_and_trigger_rollback()

In [0]:
def log_success_to_audit(
    batch_id: str
):
    current_version = get_delta_version(target_path)

    # Load the audit Delta table
    delta_table = DeltaTable.forPath(spark, audit_path)

    # Update status to SUCCESS for the given batch_id
    delta_table.update(
        condition = "batch_id = '{}'".format(batch_id),
        set = { "status": "'SUCCESS'", "rollback_flag": "'N'", "delta_table_version": f"{current_version}" }
    )

In [0]:
json = 'abfss://gold@rmpyru.dfs.core.windows.net'
df_zone = spark.read.format('json')\
                .option('inferSchema',True)\
                .option('multiLine',True)\
                .option('header',True)\
                .load(f'{json}/resturant_json_data.json')               

In [0]:
# Metadata
batch_id = str(uuid.uuid4())
timestamp = datetime.now()
source_path = "abfss://gold@rmpyru.dfs.core.windows.net/resturant_json_data.json"
record_count = df_zone.count()


In [0]:
metadata_df = spark.createDataFrame([(
    batch_id,
    timestamp,
    source_path,
    record_count,
    current_version,
    "STARTED",
    "N"
)], schema=metadata_schema)
metadata_df.write.format("delta").mode("append").save(audit_path)


In [0]:
df_zone.withColumn("restaurants",explode("restaurants"))\
    .withColumn("restaurant id",col("restaurants.restaurant.id"))\
        .withColumn("restaurant name",col("restaurants.restaurant.name"))\
            .withColumn("cuisines",col("restaurants.restaurant.cuisines"))\
                .withColumn("ratings",col("restaurants.restaurant.user_rating.rating_text"))\
                    .withColumn("city",col("restaurants.restaurant.location.city"))\
                        .withColumn("establishment_types",explode_outer(col("restaurants.restaurant.establishment_types")))\
                            .drop("code","message","results_found","results_shown","results_start","status")\
                                .filter(col("city")=="Columbus")\
                                    .groupBy("ratings").count().alias("restaurant_ratings")
                                
                                

In [0]:
df_restaurant=df_zone.withColumn("restaurants",explode("restaurants"))\
    .withColumn("restaurant id",col("restaurants.restaurant.id"))\
        .withColumn("restaurant name",col("restaurants.restaurant.name"))\
            .withColumn("city",col("restaurants.restaurant.location.city"))\
                .drop("code","message","results_found","results_shown","results_start","status","restaurants")

In [0]:
df_restaurant_rating=df_zone.withColumn("restaurants",explode("restaurants"))\
    .withColumn("restaurant id",col("restaurants.restaurant.id"))\
        .withColumn("ratings",col("restaurants.restaurant.user_rating.rating_text"))\
            .drop("code","message","results_found","results_shown","results_start","status","restaurants")

In [0]:
df_restaurant_cuisines=df_zone.withColumn("restaurants",explode("restaurants"))\
    .withColumn("restaurant id",col("restaurants.restaurant.id"))\
        .withColumn("cuisines",col("restaurants.restaurant.cuisines"))\
            .drop("code","message","results_found","results_shown","results_start","status","restaurants")

In [0]:
#df_restaurant.display()
#df_restaurant_rating.cache()
#df_restaurant_cuisines.display()

df_final = df_restaurant.join(broadcast(df_restaurant_rating),df_restaurant["restaurant id"]==df_restaurant_rating["restaurant id"],how="left").join(df_restaurant_cuisines,df_restaurant["restaurant id"]==df_restaurant_cuisines["restaurant id"],how="inner").filter((col("restaurant name")!="") & (col("cuisines")=="")).select(df_restaurant["restaurant id"],"restaurant name","ratings","cuisines").groupBy("ratings").count()


In [0]:
#df_final.partitionBy("ratings")
#df_final.write.mode("overwrite").format("delta").save(f'{json}/zomato')
try:
    df_final.write.partitionBy("ratings").mode("overwrite").format("delta").save(target_path)
    log_success_to_audit(batch_id=batch_id)
except Exception as e:
    log_failure_to_audit(batch_id=batch_id)

In [0]:
#df_final = df_zone

In [0]:
%sql
select * from delta.`abfss://gold@rmpyru.dfs.core.windows.net/zomato`

In [0]:
%sql
select * from delta.`abfss://gold@rmpyru.dfs.core.windows.net/Audit`