In [1]:
import pandas as pd
import duckdb
import datacompy
import numpy as np
import time
import psutil
import os
con = duckdb.connect()


  import datacompy
  import datacompy


In [22]:
from IPython.display import display_html
from itertools import chain,cycle
def display_side_by_side(*args,titles=cycle([''])):
    html_str=''
    for df,title in zip(args, chain(titles,cycle(['</br>'])) ):
        html_str+='<th style="text-align:center"><td style="vertical-align:top">'
        html_str+=f'<h2 style="text-align: center;">{title}</h2>'
        html_str+=df.to_html().replace('table','table style="display:inline"')
        html_str+='</td></th>'
    display_html(html_str,raw=True)

# 1 Check if dataframe is equal

## Pandas

In [2]:
# Pandas
from pandas.testing import assert_frame_equal

# Create two identical DataFrames
df1 = pd.DataFrame({'A': [1, 2], 'B': [3, 4]})
df2 = pd.DataFrame({'A': [1, 2], 'B': [3, 4]})
df3 = pd.DataFrame({'A': [1, 2], 'B': [3, 5]})
# Assert they are equal
assert_frame_equal(df1, df2) # <----
print("DataFrames df1, df2 are equal.")
try:
    assert_frame_equal(df1, df3)
except AssertionError:
    print("DataFrames df1, df3 not equal.")

DataFrames df1, df2 are equal.
DataFrames df1, df3 not equal.


## DuckDB

In [3]:
# Use DuckDB
con.register('df1', df1)
con.register('df2', df2)

# Check for discrepancies
discrepancies = con.execute("""
    SELECT * FROM df1
    EXCEPT ALL
    SELECT * FROM df2
""").df()

if discrepancies.empty:
    print("DataFrames df1, df2 are equal.")

DataFrames df1, df2 are equal.


# 2. Finding Discrepancies Between DataFrames

## Pandas JOIN with indicator=True 

In [4]:
df1 = pd.DataFrame({'Key': [1, 2, 3], 'Value': ['A', 'B', 'C']})
df2 = pd.DataFrame({'Key': [1, 2, 4], 'Value': ['A', 'B', 'D']})
con.register('df1', df1)
con.register('df2', df2)
display(df1)
display(df2)

Unnamed: 0,Key,Value
0,1,A
1,2,B
2,3,C


Unnamed: 0,Key,Value
0,1,A
1,2,B
2,4,D


In [5]:
# Merge with indicator
diff_df = df1.merge(df2, on=['Key', 'Value'], how='outer', indicator=True)
display(diff_df)
# Filter discrepancies
display(diff_df[diff_df['_merge'] != 'both'])

Unnamed: 0,Key,Value,_merge
0,1,A,both
1,2,B,both
2,3,C,left_only
3,4,D,right_only


Unnamed: 0,Key,Value,_merge
2,3,C,left_only
3,4,D,right_only


## [DataComp library](https://capitalone.github.io/datacompy/)

A package to compare two DataFrames, support pandas, spark and polar

In [6]:

compare = datacompy.Compare(
    df1,
    df2,
    join_columns=['Key', 'Value'], 
    df1_name='df1',
    df2_name='df2')

In [7]:
print(compare.df1_unq_rows)

   key value
2    3     C


In [8]:
print(compare.report())

DataComPy Comparison
--------------------

DataFrame Summary
-----------------

  DataFrame  Columns  Rows
0       df1        2     3
1       df2        2     3

Column Summary
--------------

Number of columns in common: 2
Number of columns in df1 but not in df2: 0 []
Number of columns in df2 but not in df1: 0 []

Row Summary
-----------

Matched on: key, value
Any duplicates on match values: No
Absolute Tolerance: 0
Relative Tolerance: 0
Number of rows in common: 2
Number of rows in df1 but not in df2: 1
Number of rows in df2 but not in df1: 1

Number of rows with some compared columns unequal: 0
Number of rows with all compared columns equal: 2

Column Comparison
-----------------

Number of columns compared with some values unequal: 0
Number of columns compared with all values equal: 2
Total number of values which compare unequal: 0

Sample Rows Only in df1 (First 10 Columns)
------------------------------------------

   key value
0    3     C

Sample Rows Only in df2 (First 10 Co

[Benchmark provided by datacomp](https://capitalone.github.io/datacompy/benchmark.html)



## DuckDB

In [9]:
anti_join_df = con.execute("""
    SELECT df1.*
    FROM df1
    ANTI JOIN df2
    ON df1.Key = df2.Key AND df1.value = df2.value
""").df()

print(anti_join_df)

   Key Value
0    3     C


# 3. Fuzzy Temporal Data Diff, As of JOIN

## Data preparation

In [24]:
# Data from Vendor A
data_a = {
    'company': ['Company X', 'Company Y', 'Company Z'],
    'filing_date': pd.to_datetime(['2021-02-25', '2021-03-01', '2021-03-05']),
    'revenue': [500, 800, 600],
    'net_income': [50, 80, 60]
}
df_vendor_a = pd.DataFrame(data_a)
# Data from Vendor B (filing dates slightly different)
data_b = {
    'company': ['Company X', 'Company Y', 'Company Z'],
    'filing_date': pd.to_datetime(['2021-02-26', '2021-02-28', '2021-03-06']),
    'revenue': [500, 810, 600],  # Note: Revenue for Company Y is different
    'net_income': [50, 82, 60]   # Note: Net income for Company Y is different
}
df_vendor_b = pd.DataFrame(data_b)
display_side_by_side(df_vendor_a, df_vendor_b)

Unnamed: 0,company,filing_date,revenue,net_income
0,Company X,2021-02-25,500,50
1,Company Y,2021-03-01,800,80
2,Company Z,2021-03-05,600,60

Unnamed: 0,company,filing_date,revenue,net_income
0,Company X,2021-02-26,500,50
1,Company Y,2021-02-28,810,82
2,Company Z,2021-03-06,600,60


In [11]:
df_vendor_a.sort_values(['company', 'filing_date'], inplace=True)
df_vendor_b.sort_values(['company', 'filing_date'], inplace=True)
df_vendor_b['filing_date_b'] = df_vendor_b['filing_date']

In [12]:
# Perform as-of join
merged_df = pd.merge_asof(
    df_vendor_a,
    df_vendor_b,
    on='filing_date', # We align the DataFrames based on the filing date.
    by='company', # The as-of join is performed within each company.
    direction='nearest', # Chooses the nearest filing date from Vendor B for each filing date in Vendor A.
    tolerance=pd.Timedelta('2D'),  # Allow a 2-day difference in filing dates
    suffixes=('_a', '_b')
)

print("Merged Data:")
merged_df

Merged Data:


Unnamed: 0,company,filing_date,revenue_a,net_income_a,revenue_b,net_income_b,filing_date_b
0,Company X,2021-02-25,500,50,500,50,2021-02-26
1,Company Y,2021-03-01,800,80,810,82,2021-02-28
2,Company Z,2021-03-05,600,60,600,60,2021-03-06


In [13]:
merged_df['revenue_discrepancy'] = merged_df['revenue_a'] != merged_df['revenue_b']
merged_df['net_income_discrepancy'] = merged_df['net_income_a'] != merged_df['net_income_b']

discrepancies = merged_df[
    merged_df['revenue_discrepancy'] | merged_df['net_income_discrepancy']
]

print("\nDiscrepancies Found with Increased Tolerance:")
discrepancies


Discrepancies Found with Increased Tolerance:


Unnamed: 0,company,filing_date,revenue_a,net_income_a,revenue_b,net_income_b,filing_date_b,revenue_discrepancy,net_income_discrepancy
1,Company Y,2021-03-01,800,80,810,82,2021-02-28,True,True


## DuckDB

In [14]:
con.register('vendor_a', df_vendor_a)
con.register('vendor_b', df_vendor_b)

<duckdb.duckdb.DuckDBPyConnection at 0x7143c569e7f0>

In [15]:
query = """
SELECT
    vendor_a.company,
    vendor_a.filing_date AS filing_date_a,
    vendor_b.filing_date AS filing_date_b,
    vendor_a.revenue AS revenue_a,
    vendor_b.revenue AS revenue_b,
    vendor_a.net_income AS net_income_a,
    vendor_b.net_income AS net_income_b
FROM vendor_a
ASOF JOIN vendor_b
ON vendor_a.company = vendor_b.company
AND vendor_a.filing_date > vendor_b.filing_date
"""
merged_duckdb = con.execute(query).df()
merged_duckdb

Unnamed: 0,company,filing_date_a,filing_date_b,revenue_a,revenue_b,net_income_a,net_income_b
0,Company Y,2021-03-01,2021-02-28,800,810,80,82


# Let's find the needle in haystack

In [21]:
# Parameters and setup
num_rows = 5_000_000  # Adjusted to 1 million
np.random.seed(0)

# Generate data
df1 = pd.DataFrame({
    'company_id': np.random.randint(1, 100_000, size=num_rows),
    'filing_date': pd.date_range('2021-01-01', periods=num_rows, freq='min'),
    'revenue': np.random.uniform(1000, 1_000_000, size=num_rows)
})
df1['net_income'] = df1['revenue'] * np.random.uniform(0.05, 0.15, size=num_rows)

# Save File 1
df1.to_parquet('file1.parquet')
print("File 1 created with shape:", df1.shape)

# Copy and extend for File 2
df2 = pd.concat([
    df1,
    pd.DataFrame({'company_id': [100_001], 'filing_date': [pd.Timestamp('2021-12-31')], 'revenue': [500_000], 'net_income': [50_000]})
], ignore_index=True)

# Save File 2
df2.to_parquet('file2.parquet')
print("File 2 created with shape:", df2.shape)

File 1 created with shape: (5000000, 4)
File 2 created with shape: (5000001, 4)


In [18]:
# Register Parquet files as tables in DuckDB
con.execute("""
CREATE TABLE df1 AS SELECT * FROM 'file1.parquet';
CREATE TABLE df2 AS SELECT * FROM 'file2.parquet';
""")

<duckdb.duckdb.DuckDBPyConnection at 0x7143c569e7f0>

In [19]:
start_time = time.time()

# Measure initial memory usage
process = psutil.Process(os.getpid())
mem_before = process.memory_info().rss / 1024 ** 2  # Memory in MB

# Perform anti-join to find rows in df2 not in df1
result_duckdb = con.execute("""
SELECT * FROM df2
EXCEPT
SELECT * FROM df1
""").df()

end_time = time.time()
duckdb_time = end_time - start_time

# Measure final memory usage
mem_after = process.memory_info().rss / 1024 ** 2  # Memory in MB
mem_usage_duckdb = mem_after - mem_before

print("DuckDB anti-join completed in {:.2f} seconds.".format(duckdb_time))
print("DuckDB memory usage: {:.2f} MB".format(mem_usage_duckdb))
print("Additional rows found:", result_duckdb.shape[0])

DuckDB anti-join completed in 0.01 seconds.
DuckDB memory usage: 0.12 MB
Additional rows found: 1


In [20]:
start_time = time.time()
process = psutil.Process(os.getpid())
mem_before = process.memory_info().rss / 1024 ** 2  # Memory in MB
df1_pandas = pd.read_parquet('file1.parquet')
df2_pandas = pd.read_parquet('file2.parquet')
df_diff = df2_pandas.merge(
    df1_pandas,
    on=['company_id', 'filing_date', 'revenue', 'net_income'],
    how='outer',
    indicator=True
)
additional_rows = df_diff[df_diff['_merge'] == 'left_only']
end_time = time.time()
pandas_time = end_time - start_time
mem_after = process.memory_info().rss / 1024 ** 2  # Memory in MB
mem_usage_pandas = mem_after - mem_before
print("Pandas merge completed in {:.2f} seconds.".format(pandas_time))
print("Pandas memory usage: {:.2f} MB".format(mem_usage_pandas))
print("Additional rows found:", additional_rows.shape[0])

Pandas merge completed in 12.79 seconds.
Pandas memory usage: 700.36 MB
Additional rows found: 1
