In [50]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

In [51]:
# Creating spark session
spark = SparkSession.builder.getOrCreate()

In [52]:
# Creating spark datframes by reading csv files.
spark_pre_df = spark.read.csv('pre.csv', header=True, inferSchema=True)
spark_post_df = spark.read.csv('post.csv', header=True, inferSchema=True)

keys = ['InvoiceNo', 'StockCode', 'CustomerID', 'InvoiceDate']

In [53]:
# Dropping duplicate keys.
spark_pre_df = spark_pre_df.dropDuplicates(keys)
spark_post_df = spark_post_df.dropDuplicates(keys)

In [54]:
# Joining two dataframes.
for column in spark_pre_df.columns:
    if column not in keys:
        spark_pre_df = spark_pre_df.withColumnRenamed(column, column + "_pre")
        spark_post_df = spark_post_df.withColumnRenamed(column, column + "_post")
        
spark_merge_df = spark_pre_df.join(spark_post_df, on=keys, how='inner')



In [60]:
spark_diff_df = spark_merge_df.withColumn('Quantity_diff', col('Quantity_post') - col('Quantity_pre')).withColumn('UnitPrice_diff', col('UnitPrice_post') - col('UnitPrice_pre')).withColumn('Description_diff', when(col('Description_post') != col('Description_pre'), col('Description_post') + ' | ' + col('Description_pre')).otherwise('No Change'))

In [61]:
spark_diff_df = spark_diff_df.fillna('')

columns = spark_diff_df.columns

selected_col = ['Description_pre', 'Description_post', 'Description_diff', 'UnitPrice_pre', 'UnitPrice_post', 'UnitPrice_diff', 'Quantity_pre', 'Quantity_post', 'Quantity_diff', 'Country_pre', 'Country_post']

order = keys + selected_col


In [57]:
filtered_data = spark_diff_df.filter(col('Quantity_diff') != 0)
ordered_df = filtered_data.select(*order)
ordered_df.show()

+---------+---------+----------+---------------+--------------------+--------------------+----------------+-------------+--------------+--------------+------------+-------------+-------------+--------------+--------------+
|InvoiceNo|StockCode|CustomerID|    InvoiceDate|     Description_pre|    Description_post|Description_diff|UnitPrice_pre|UnitPrice_post|UnitPrice_diff|Quantity_pre|Quantity_post|Quantity_diff|   Country_pre|  Country_post|
+---------+---------+----------+---------------+--------------------+--------------------+----------------+-------------+--------------+--------------+------------+-------------+-------------+--------------+--------------+
|   544392|   84997D|     14680|2/18/2011 11:54|PINK 3 PIECE POLK...|PINK 3 PIECE POLK...|            NULL|         3.75|          3.75|           0.0|          72|           77|            5|United Kingdom|United Kingdom|
|   544391|    22292|     15755|2/18/2011 11:53|HANGING CHICK  YE...|HANGING CHICK  YE...|            NULL| 

In [58]:
filter_data = spark_diff_df.filter(col('UnitPrice_diff') != 0)
ordered_df = filter_data.select(*order)
ordered_df.show()

+---------+---------+----------+--------------+--------------------+--------------------+----------------+-------------+--------------+-------------------+------------+-------------+-------------+-----------+------------+
|InvoiceNo|StockCode|CustomerID|   InvoiceDate|     Description_pre|    Description_post|Description_diff|UnitPrice_pre|UnitPrice_post|     UnitPrice_diff|Quantity_pre|Quantity_post|Quantity_diff|Country_pre|Country_post|
+---------+---------+----------+--------------+--------------------+--------------------+----------------+-------------+--------------+-------------------+------------+-------------+-------------+-----------+------------+
|   536370|    22728|     12583|12/1/2010 8:45|ALARM CLOCK BAKEL...|ALARM CLOCK BAKEL...|            NULL|         3.75|          3.94|0.18999999999999995|          24|           24|            0|     France|      France|
|   536370|    22659|     12583|12/1/2010 8:45|LUNCH BOX I LOVE ...|LUNCH BOX I LOVE ...|            NULL|      

In [62]:
description_filtered_data = spark_diff_df.filter(col('Description_post') != col('Description_pre'))
ordered_df = description_filtered_data.select(*order)
ordered_df.show()

+---------+---------+----------+--------------+--------------------+--------------------+----------------+-------------+--------------+--------------+------------+-------------+-------------+--------------+--------------+
|InvoiceNo|StockCode|CustomerID|   InvoiceDate|     Description_pre|    Description_post|Description_diff|UnitPrice_pre|UnitPrice_post|UnitPrice_diff|Quantity_pre|Quantity_post|Quantity_diff|   Country_pre|  Country_post|
+---------+---------+----------+--------------+--------------------+--------------------+----------------+-------------+--------------+--------------+------------+-------------+-------------+--------------+--------------+
|   536367|    22745|     13047|12/1/2010 8:34|POPPY'S PLAYHOUSE...|POPPY'S PLAYHOUSE...|                |          2.1|           2.1|           0.0|           6|            6|            0|United Kingdom|United Kingdom|
|   536367|    48187|     13047|12/1/2010 8:34| DOORMAT NEW ENGLAND|DOORMAT NEW ENGLA...|                |      