In [4]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.functions import col, expr
from pyspark.sql.types import IntegerType, DoubleType, FloatType, LongType
import shutil
import os

conf = SparkConf()
conf.set("spark.hadoop.security.authentication", "simple")
spark = SparkSession.builder.appName("Diff Calculation").getOrCreate()

In [5]:


pre_df = spark.read.option("header", "true").csv("pre.csv")
post_df = spark.read.option("header", "true").csv("post.csv")
key_columns = ['InvoiceNo', 'StockCode', 'CustomerID', 'InvoiceDate']
merged_df = pre_df.alias("pre").join(post_df.alias("post"), key_columns, "outer")
columns_to_compare = [col for col in pre_df.columns if col not in key_columns]


In [6]:
diff_columns = []
for col_name in columns_to_compare:
    if isinstance(pre_df.schema[col_name].dataType, (IntegerType, DoubleType, FloatType, LongType)):
        diff_col = expr(f"cast(post.{col_name} as double) - cast(pre.{col_name} as double)").alias(f"{col_name}_diff")
    else:
        diff_col = expr(f"post.{col_name} != pre.{col_name}").alias(f"{col_name}_diff")
    diff_columns.append(diff_col)

select_columns = key_columns + \
                [col(f"pre.{col_name}").alias(f"{col_name}_pre") for col_name in columns_to_compare] + \
                [col(f"post.{col_name}").alias(f"{col_name}_post") for col_name in columns_to_compare] + \
                diff_columns
result_df = merged_df.select(*select_columns)
output_dir = "output_single_file_temp"
result_df.coalesce(1).write.option("header", "true").csv(output_dir)

                                                                                

In [7]:
output_file = [f for f in os.listdir(output_dir) if f.endswith(".csv")][0]
shutil.move(f"{output_dir}/{output_file}", "output_single_file.csv")
shutil.rmtree(output_dir)
spark.stop()