In [None]:
%pip install jellyfish
%pip install faker



In [None]:
import jellyfish
import pandas as pd
import numpy as np
from typing import List
import os
import math
from packages.utils.generate_data_set import SyntheticMatcherDataset
from packages.pandas.pandas_pipeline import DatasetEvaluator

In [2]:
class MyClass:
    def __init__(self, df1: pd.DataFrame, 
                 df2: pd.DataFrame, 
                 matchColumn: str, 
                 on: List = [],
                 method: str = 'column', 
                 threshold: float  = 0.6):
        self.df1 = df1
        self.df2 = df2
        self.on = on
        self.threshold = threshold

        if method not in ["concat", "column"]:
            raise ValueError(f"Method '{method}' is not correct.")
        self.method = method

    
        if matchColumn not in self.df1.columns or matchColumn not in self.df2.columns:
            raise ValueError(f"Column '{matchColumn}' is not found in both DataFrames.")
        self.matchColumn = matchColumn
        
        self.groundTruth = None
        self.totalMatches = None        
    
    def setGroundTruth(self):
        """Sets the ground truth based on matching 'id' columns."""
        self.groundTruth = np.intersect1d(self.df1[self.matchColumn], self.df2[self.matchColumn])

    def soundexDfs(self):
        """Apply soundex transformation to non-id columns."""
        for df in [self.df1, self.df2]:
            for col_name in df.columns:
                if col_name != self.matchColumn:
                    df[col_name] = df[col_name].apply(lambda x: jellyfish.soundex(str(x)))

            if self.method == 'concat':
                non_match_columns = [col for col in df.columns if col != self.matchColumn]
                df['concatenated'] = df[non_match_columns].apply(lambda row: ''.join(row.astype(str)), axis=1)
                df.drop(columns=non_match_columns, inplace=True)

    def setTotalMatches(self):
        """Sets the total matches based on merged DataFrames."""
        
        # if self.method == 'concat':
        #     self.totalMatches = self.df1.merge(self.df2, how="inner", on=['concatenated']).to_numpy()
        # else:   
        #     self.totalMatches = self.df1.merge(self.df2, how="outer", on=self.on + [self.matchColumn]).to_numpy()

        self.totalMatches =  self.df1.merge(pd.concat([self.df1, self.df2]), how='outer', on=self.on)[["0_y"] + self.on]
        self.totalMatches.rename(columns={'0_y': self.matchColumn}, inplace=True)
        
    def printStatistics(self):
        """Print statistics (True Positives, False Positives, Precision)."""
        myStatistics = self.Statistics(groundTruth=self.groundTruth, 
                                       totalMatches=self.totalMatches, 
                                       threshold=self.threshold, 
                                       on=self.on, 
                                       matchColumn=self.matchColumn)
        myStatistics.calculate()

    # Inner class Statistics
    class Statistics:
        def __init__(self,
                     groundTruth: pd.DataFrame, 
                     totalMatches: pd.DataFrame, 
                     threshold : float = 0.8,
                     matchColumn: str | int = 1,
                     on: List =[]):
            self.groundTruth = pd.DataFrame(groundTruth)
            self.totalMatches = pd.DataFrame(totalMatches)
            self.threshold = threshold
            self.matchColumn = matchColumn
            self.on = on

            self._setThresholdValues()
            
        def calculate(self):
            # self.result = self.totalMatches.groupby(self.matchColumn)\
            #         .filter(lambda x : len(x) >=2)\
            #         .groupby(self.matchColumn)\
            #         .apply(lambda x: x.iloc[:, 1:].apply(lambda x: x.nunique() == 1)).sum(axis=1)

            duplicates = self.totalMatches[self.totalMatches[[1,2,3,4,5]].duplicated(keep=False)].sort_values(by=[1,2,3,4,5])
        
            # Function to check if two rows match at least 3/5 columns
            def is_duplicate(row1, row2):
                return sum(row1 == row2) >=  self.matchingRows  # At least 3 matches out of 5

            # print(duplicates)
            
            duplicate_pairs = []
            # Find duplicates
            for i in range(len(duplicates)):
                for j in range(i + 1, len(duplicates)):  # Compare only unique pairs
                    if is_duplicate(duplicates.iloc[i, 1:], duplicates.iloc[j, 1:]):
                        duplicate_pairs.append((i, j, duplicates.iloc[i, 0] == duplicates.iloc[j, 0]))  # Store (index1, index2, same_id)

            # Count same ID and different ID duplicates
            tp = sum(1 for _, _, same_id in duplicate_pairs if same_id)
            fp = len(duplicate_pairs) - same_id_count
            fn = self.groundTruth.size - tp
            
            precision = tp / (tp + fp) if tp + fp != 0 else 0 
            recall = tp / (tp + fn)  if tp + fn != 0 else 0
            f1_score = (2 * precision * recall) / (precision + recall)
            
            print("Total Possible Mathces:", self.groundTruth.size)
            print("True Positives (TP):", tp)
            print("False Positives (FP):", fp)
            print("False Negatives (FN):", fn)
            print("Precision:", f"{precision:.4f}")
            print("Recall:", f"{recall:.4f}")
            print("F1-score:", f"{f1_score:.4f}")

        def _matchingAlgorithm(self, group):
            return group.nunique() == 1
            
        def _setThresholdValues(self) -> List:
            size = len(self.totalMatches.columns) - 1
            limit = math.floor(self.threshold * size)
            
            print(f"We accept at least {limit}/{size} as matches!") 
            self.matchingRows = limit
            # return [i for i in range(size, limit , -1)]
            

In [None]:
PATH  =  "data/"

df1 = pd.read_csv(os.path.join(PATH, 'df1.csv'), header=None)[[0,1,2,3,4,5]]
df2 = pd.read_csv(os.path.join(PATH, 'df2.csv'), header=None)[[0,1,2,3,4,5]]
df3 = pd.read_csv(os.path.join(PATH, 'df3.csv'), header=None)[[0,1,2,3,4,5]]
df4 = pd.read_csv(os.path.join(PATH, 'df4.csv'), header=None)[[0,1,2,3,4,5]]
df5 = pd.read_csv(os.path.join(PATH, 'df5.csv'), header=None)[[0,1,2,3,4,5]]

# Run pipeline and see statistics
pipeline = MyClass(df1, df2, matchColumn=0, on=[1,2,3,4,5], method="column", threshold = 0.6) #  --> this means at least 3/5 of the fields must match 
pipeline.setGroundTruth()
pipeline.soundexDfs()

df1, df2 = pipeline.df1.copy(), pipeline.df2.copy()

# Run pipeline and see statistics
pipeline = MyClass(df3, df4, matchColumn=0, on=[1,2,3,4,5], method="column", threshold = 0.6) #  --> this means at least 3/5 of the fields must match 
pipeline.setGroundTruth()
pipeline.soundexDfs()

df3, df4 = pipeline.df1.copy(), pipeline.df2.copy()

pipeline = MyClass(df1.copy(), df5, matchColumn=0, on=[1,2,3,4,5], method="column", threshold = 0.6) #  --> this means at least 3/5 of the fields must match 
pipeline.setGroundTruth()
pipeline.soundexDfs()

df5 = pipeline.df2.copy()

In [4]:
evaluator = DatasetEvaluator(pipeline.df1, pipeline.df2, threshold=3)
evaluator.evaluate()
evaluator.printResults()

Expected: {}
Ground Truth Size: 25000
True Positives: 19036
False Positives: 17830
False Negatives: 5964
Precision: 0.5164
Recall: 0.7614
Elapsed Time: 33682.16 seconds


In [4]:
33682.16/60/60

9.356155555555556

In [None]:

Expected: {}
Ground Truth Size: 25000
True Positives: 19036
False Positives: 17830
False Negatives: 5964
Precision: 0.5164
Recall: 0.7614
Elapsed Time: 33682.16 seconds

# df1 -> df2
thresh = 3
Expected: {}
Ground Truth Size: 25000
True Positives: 19036
False Positives: 17830
False Negatives: 5964
Precision: 0.5164
Recall: 0.7614
Elapsed Time: 81137.75 seconds


# df1 -> df5
thresh = 2
Expected: {}
Ground Truth Size: 25000
True Positives: 24977
False Positives: 74055
False Negatives: 23
Precision: 0.2522
Recall: 0.9991
Elapsed Time: 4861.54 seconds

# df1 -> df5
thresh = 3
Expected: {}
Ground Truth Size: 25000
True Positives: 19125
False Positives: 16323
False Negatives: 5875
Precision: 0.5395
Recall: 0.7650
Elapsed Time: 80256.47 seconds

In [None]:
import pyspark
import jellyfish
import pandas as pd
import numpy as np
from typing import List
import os
import math
from itertools import combinations, product
import time
from concurrent.futures import ProcessPoolExecutor
from collections import defaultdict
from packages.generateDataSets import SyntheticMatcherDataset
from packages.pandas.pandas_pipeline import DatasetEvaluator

In [114]:
# Convert to DataFrame
df1 = pd.DataFrame(data1)
df2 = pd.DataFrame(data2)

# Run pipeline and see statistics
pipeline = MyClass(df1, df2, matchColumn=0, on=[1, 2, 3, 4, 5], threshold=0.4)
pipeline.setGroundTruth()
pipeline.soundexDfs()
pipeline.setTotalMatches()
# pipeline.printStatistics()

In [5]:
PATH  =  "data/"

df1 = pd.read_csv(os.path.join(PATH, 'df1.csv'), header=None)[[0,1,2,3,4,5]]
df2 = pd.read_csv(os.path.join(PATH, 'df2.csv'), header=None)[[0,1,2,3,4,5]]

# Run pipeline and see statistics
pipeline = MyClass(df1, df2, matchColumn=0, on=[1,2,3,4,5], method="column", threshold = 0.6) #  --> this means at least 3/5 of the fields must match 
pipeline.setGroundTruth()
pipeline.soundexDfs()


columns = [1,2,3,4,5]

# Store result as (column_triplet, unique_count)
unique_counts = []
for cols in combinations(columns, 3):
    count = df2[list(cols)].agg(''.join, axis=1).nunique()
    unique_counts.append((cols, count))

# Find the combination with the minimum unique count
best_combination = min(unique_counts, key=lambda x: x[1])

print("Best column triplet (least unique values):", best_combination[0])
print("Number of unique combinations:", best_combination[1])

# evaluator = DatasetEvaluator(pipeline.df1, pipeline.df2, expected, threshold=3)
# evaluator.evaluate()
# evaluator.printResults()

Best column triplet (least unique values): (1, 4, 5)
Number of unique combinations: 77477


In [7]:
best_combination

((1, 4, 5), 77477)

In [6]:
80256.47/60/60

22.293463888888887

In [19]:
df3, df4 = pipeline.df1, pipeline.df2

In [None]:
matchedData = pd.concat([df2]).drop_duplicates().reset_index(drop=True)

evaluator = DatasetEvaluator(df1.sample(frac=0.01, random_state=42), matchedData, threshold=3)
evaluator.preproccess()
evaluator.evaluate()
evaluator.calculate_statistics()
evaluator.printResults()



In [None]:
                
df1 = pd.read_csv("data/df1.csv", header=None, dtype=str)[[0, 1, 2, 3, 4, 5]]
df2 = pd.read_csv("data/df2.csv", header=None, dtype=str)[[0, 1, 2, 3, 4, 5]]
# df3 = pd.read_csv("data/df3.csv", header=None, dtype=str)[[0, 1, 2, 3, 4, 5]]
# df4 = pd.read_csv("data/df4.csv", header=None, dtype=str)[[0, 1, 2, 3, 4, 5]]
# df5 = pd.read_csv("data/df5.csv", header=None, dtype=str)[[0, 1, 2, 3, 4, 5]]

# matched_data = (
#     pd.concat([df2, df3, df4, df5]).drop_duplicates().reset_index(drop=True)
# )

for df in [df1, df2]:
    for col_name in df.columns:
        if col_name != 0:
            df[col_name] = df[col_name].apply(lambda x: jellyfish.soundex(str(x)))
                
pipeline = DatasetEvaluator(df1.sample(frac=0.01, random_state=42), df2.sample(frac=0.01, random_state=55), threshold=3)
pipeline.preprocess()
pipeline.evaluate()
pipeline.calculate_statistics()
pipeline.printResults()

preprocess took 0.1120 seconds
evaluate took 2.0147 seconds
calculateStatistics took 0.0001 seconds
Expected: {}
Ground Truth Size: 5
True Positives: 4
False Positives: 1
False Negatives: 1
Precision: 0.8000
Recall: 0.8000


In [10]:
pipeline.df1_proc

array([('AA223369', 'BRINSKELLEISABELEVE455  CEDARWOOD DRBURLINGTON'),
       ('AA41454', 'SMITHWILLIAMKENT2125  HAWTHORNE LNBURLINGTON'),
       ('AA144378', 'COOPERBARTHENAHEATHER414  BANKS STGRAHAM'),
       ('AA25364', 'KERNODLEERICWATSON3540 N NC HWY 87ELON'),
       ('AA8583', 'CARDWELLLARRYRAY2805  DURHAM ST EXTBURLINGTON'),
       ('AA24201', 'JOHNSONLIONELLWEBSTER1140 E HARDEN STGRAHAM'),
       ('AA57788', 'BERTINODONALDWILLIAM3490  THAMESFORD RDFAYETTEVILLE'),
       ('AA45000', 'TORAINJOHNNYLEE2527  MCKINNEY STBURLINGTON'),
       ('AA205928', 'PASSJACKSONCOLBY2032  BRUCEWOOD RDHAW RIVER'),
       ('AA193095', 'PATELYESHAPANKAJBHAI519  APPLECROSS DRMEBANE'),
       ('AA157869', 'BRYANTGWENDOLYNFITZGERALD10  ARBOR HILL PLMCLEANSVILLE'),
       ('AA182660', 'KENDALLHALEYREBECCA3021  BERWICK DRBURLINGTON'),
       ('AA186589', 'REVISJEFFERYCLAYTON3431  UNION RIDGE RDBURLINGTON'),
       ('AA138813', 'HUFFINESALICEFAYE1203 E MAIN STGRAHAM'),
       ('AA145313', 'SHAWADAMLEE107 

In [None]:
preproccess took 8.6420 seconds
evaluate took 134.6332 seconds
calculateStatistics took 0.0088 seconds
Expected: {}
Ground Truth Size: 271
True Positives: 202
False Positives: 248
False Negatives: 69
Precision: 0.4489
Recall: 0.7454

preproccess took 8.0907 seconds
evaluate took 140.7979 seconds
calculateStatistics took 0.3955 seconds
Expected: {}
Ground Truth Size: 271
True Positives: 202
False Positives: 248
False Negatives: 69
Precision: 0.4489
Recall: 0.7454

In [None]:
matchedData = pd.concat([df2, df3, df4, df5]).drop_duplicates().reset_index(drop=True)

evaluator = DatasetEvaluator(df1, matchedData, threshold=3, trim=0)
evaluator.preproccess()
evaluator.evaluate()
evaluator.calculate_statistics()
evaluator.printResults()

preproccess took 29.9362 seconds


In [None]:
preproccess took 28.2823 seconds
evaluate took 17617.6667 seconds
calculateStatistics took 0.5669 seconds
Expected: {}
Ground Truth Size: 25000
True Positives: 21695
False Positives: 18613
False Negatives: 3305
Precision: 0.5382
Recall: 0.8678

In [None]:
evaluator.printResults()

In [None]:
df1.merge(df2, how="outer").merge(df3, how="outer").drop_duplicates().reset_index(drop=True)

In [16]:
matchedData = pd.concat([df2, df3, df4, df5]).drop_duplicates().reset_index(drop=True)

In [18]:
result = df1.merge(matchedData, on=[1,2,3,4,5], how='inner')
gt = np.intersect1d(df1[0], matchedData[0]).size

tp = (result['0_x'] == result['0_y']).sum()
fn = gt - tp
fp = (result['0_x'] != result['0_y']).sum()

tp, fp, fn, tp/(tp+fp), tp / (tp + fn)

(6833, 18, 18167, 0.997372646329003, 0.27332)

In [15]:
df1

Unnamed: 0,0,1,2,3,4,5
0,AA100000,B323,J520,A620,3523,G650
1,AA100004,B620,M240,E421,2251,B645
2,AA100006,B620,D520,W460,3345,H616
3,AA100007,K260,K600,L500,1225,G650
4,AA100008,K536,J600,F652,4235,B645
...,...,...,...,...,...,...
99995,AB16955,W231,D242,R410,5325,T462
99996,AB16957,R163,C642,A636,1152,T462
99997,AB16959,B550,C645,D150,1155,T462
99998,AB16960,B530,L253,Y130,3246,T462


In [None]:
 PATH  =  "data/"

df1 = pd.read_csv(os.path.join(PATH, 'df1.csv'), header=None)[[0,1,2,3,4,5]]
df2 = pd.read_csv(os.path.join(PATH, 'df2.csv'), header=None)[[0,1,2,3,4,5]]

# Run pipeline and see statistics
pipeline = MyClass(df1.copy(), df2.copy(), matchColumn=0, on=[1,2,3,4,5], method="column", threshold = 0.6) #  --> this means at least 3/5 of the fields must match 
pipeline.soundexDfs()

evaluator = DatasetEvaluator(pipeline.df1.copy(), pipeline.df2.copy(), threshold=3, match_column=0)
evaluator.preproccess()
evaluator.evaluate()
evaluator.calculate_statistics()
evaluator.printResults()

preproccess took 13.7715 seconds
evaluate took 11634.1816 seconds
calculateStatistics took 0.1742 seconds
Expected: {}
Ground Truth Size: 25000
True Positives: 15636
False Positives: 3318
False Negatives: 9364
Precision: 0.8249
Recall: 0.6254


In [10]:
9879.75 //60//60

2.0

In [7]:
30485.83 //60//60

8.0

In [None]:
Expected: {}
Ground Truth Size: 25000
True Positives: 21180
False Positives: 12559
False Negatives: 3820
Precision: 0.6278
Recall: 0.8472
Elapsed Time: 30485.83 seconds


trim = 1
preproccess took 28.2823 seconds
evaluate took 17617.6667 seconds
calculateStatistics took 0.5669 seconds
Expected: {}
Ground Truth Size: 25000
True Positives: 21695
False Positives: 18613
False Negatives: 3305
Precision: 0.5382
Recall: 0.8678