# comp df drift

In [None]:

import sys
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F
from pyspark.sql.window import Window

def compare_dataframes(df1: DataFrame, df2: DataFrame, pk: list = ["rownum"], 
                       noOfOutputRecords: int = 10):
    def colorCodeDF(df):
        ''' 
        Changes the cell background to pink where the data between the 
        2 dataframes don't match for easier identification        
        '''
        return pd.DataFrame(np.where(df.ne(df.xs('DF1', axis="columns", level=1), 
                                           level=0), 'background-color: pink',''), 
                                           index=df.index, columns=df.columns)
    
    # Check if the schemas are identical else no point in checking data rows
    if df1.schema == df2.schema:
        print("The schemas are identical. Proceeding to data comparison")
    else:
        print("The schemas are NOT identical. Cannot proceed to data comparison.")
        pdf1 = pd.DataFrame(df1.dtypes, columns=['df1_cols','df1_dtype'])
        pdf2 = pd.DataFrame(df2.dtypes, columns=['df2_cols','df2_dtype'])
        print("Please check the table for schema differences.")
        display(pd.merge(pd.DataFrame(df1.dtypes, columns=['df1_cols','df1_dtype']),
                     pd.DataFrame(df2.dtypes, columns=['df2_cols','df2_dtype']),
                     how="outer", left_index=True, right_index=True).style.highlight_null("cyan"))
        sys.exit()
    
    df_1_subtract_2 = df1.subtract(df2)
    df_2_subtract_1 = df2.subtract(df1)
 
    row_difference_df1_count = df_1_subtract_2.count()
    row_difference_df2_count = df_2_subtract_1.count()
    
    if row_difference_df1_count == 0 and row_difference_df2_count == 0:
        return logger.info("The data rows are identical")
    else:
        print(f"""There are {row_difference_df1_count} rows in df1 which are not present 
        or different in df2 and There are {row_difference_df2_count} rows in df2 which are not 
        present or different in df1.""")
        if pk == ["rownum"]:
            logger.info("A Primary Key column name has NOT been provided. Generating a ROWNUM \
                        column and using it to compare the data rows")
            df_1_subtract_2 = df_1_subtract_2.withColumn("rownum", 
                                                         F.row_number()\
                                                         .over(Window()\
                                                         .orderBy(F.lit('1'))))
            df_2_subtract_1 = df_2_subtract_1.withColumn("rownum", 
                                                         F.row_number()\
                                                         .over(Window()\
                                                         .orderBy(F.lit('1'))))
        else:
            print(f"The provided Primary Key columns is: {pk}")

    pdf1 = df_1_subtract_2.toPandas()
    pdf2 = df_2_subtract_1.toPandas()
    
    pdf_combined = pd.concat([pdf1.set_index(pk), pdf2.set_index(pk)], axis="columns", 
                             keys=["DF1","DF2"], sort=False)    
    pdf_result = pdf_combined.swaplevel(axis="columns")[set([y for x,y in pdf_combined.columns[:]])]
    return pdf_result.head(noOfOutputRecords).style.apply(colorCodeDF, 
                                                          axis=None).highlight_null("cyan")
