In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Start Spark session
spark = SparkSession.builder \
    .appName("Create DataFrame Example") \
    .getOrCreate()

# File location and type
file_location = "/FileStore/tables/sales_data_sample-1.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)


df= df.select(col("country"),col("year_id"),col("sales"))
#df = df.withColumn("date", date_format(to_date(col("year_id"), "yyyy-MM-dd HH:mm"), "MM-yyyy"))

# Group by 'country' and 'date' and aggregate sales
result_df = df.groupBy("country", "year_id").agg(
    sum("sales").alias("tot_sales")
)
#.filter(col("country")=="USA")
result_df.write.format("DELTA").saveAsTable("tblcountrysales")



In [0]:

df = spark.table("tblcountrysales")
print(df)


DataFrame[country: string, year_id: string, tot_sales: double]


In [0]:
history_df = spark.sql("DESCRIBE HISTORY tblcountrysales")
latest_version = history_df.select("version").orderBy("version", ascending=False).first()["version"]
previous_version = latest_version - 1

current_df = spark.read.format("delta").option("versionAsOf", latest_version).table("tblcountrysales")
previous_df = spark.read.format("delta").option("versionAsOf", previous_version).table("tblcountrysales")


# Step 5: Compare changes (records in current not in previous)
added_rows = current_df.exceptAll(previous_df)
deleted_rows = previous_df.exceptAll(current_df)

join_keys = ["country", "year_id"]

joined_df = previous_df.alias("old").join(
    current_df.alias("new"),
    on=join_keys,
    how="inner"
)

# Identify updated rows (rows where any column other than keys changed)
updated_rows = joined_df.filter(
    col("old.tot_sales") != col("new.tot_sales")  # Adjust for additional columns
).select(
    *[col(f"old.{k}").alias(k) for k in join_keys],
    col("old.tot_sales").alias("old_tot_sales"),
    col("new.tot_sales").alias("new_tot_sales")
)


print("✅ Added Rows:")
added_rows.show()

print("❌ Deleted Rows:")
deleted_rows.show()

print("🔄 Updated Rows:")
updated_rows.show(truncate=False)


✅ Added Rows:
+-------+-------+---------+
|country|year_id|tot_sales|
+-------+-------+---------+
|  nepal|   2025|  1000.12|
+-------+-------+---------+

❌ Deleted Rows:
+-------+-------+---------+
|country|year_id|tot_sales|
+-------+-------+---------+
+-------+-------+---------+

🔄 Updated Rows:
+-------+-------+-------------+-------------+
|country|year_id|old_tot_sales|new_tot_sales|
+-------+-------+-------------+-------------+
+-------+-------+-------------+-------------+

