## Inclusion Dependency Discovery and Validation

In [1]:
from typing import Tuple, List, Dict, Any
from dataclasses import dataclass, field
import re
import textwrap
from thefuzz import process, fuzz
from collections import defaultdict

import pandas as pd
import desbordante
import desbordante.ind.algorithms as ind_algorithms

In [2]:
# file_path = "../../data/Food_Inspections_20250216_preprocessed.parquet"
file_path = "../../data/Food_Inspections_20250216_test.csv"

df = pd.read_csv(file_path)
df.head()

Unnamed: 0,Inspection ID,DBA Name,AKA Name,License #,Facility Type,Risk,Address,City,State,Zip,Inspection Date,Inspection Type,Results,Violations,Latitude,Longitude,Location
0,1106427,BLOOMING BUD DAYCARE,BLOOMING BUD DAYCARE,2215789.0,Daycare Combo 1586,Risk 1 (High),5715 N LINCOLN AVE,CHICAGO,IL,60659.0,3/7/13,License,Pass,Risk 1 (High),41.98539,-87.698734,"(41.98538950526786, -87.69873407149943)"
1,2608378,Babas Halal,Babas Halal,2684170.0,Restaurant,Risk 1 (High),7901 S DAMEN AVE,CHICAGO,IL,60620.0,12/3/24,Complaint,Fail,Risk 1 (High),41.750189,-87.672986,"(41.750189342293375, -87.67298583977204)"
2,1106406,FIRST ZABIHA MEAT BAZAAR,FIRST ZABIHA MEAT BAZAAR,2232559.0,Grocery Store,Risk 2 (Medium),2907 W DEVON AVE,CHICAGO,IL,60659.0,02/20/2013,License,Fail,Risk 2 (Medium),41.997401,-87.702385,"(41.99740137039031, -87.70238538227812)"
3,2609909,HAPPY MARKET,HAPPY MARKET,2912802.0,Grocery Store,Risk 2 (Medium),2334 S WENTWORTH AVE,CHICAGO,IL,60616.0,1/2/25,Canvass,Pass w/ Conditions,Risk 2 (Medium),41.849954,-87.632094,"(41.84995400192252, -87.63209419559098)"
4,2609927,SAT KAIVAL FOOD INC/SUBWAY,SAT KAIVAL FOOD INC/SUBWAY,2728400.0,Restaurant,Risk 1 (High),1916 S STATE ST,CHICAGO,IL,60616.0,1/2/25,Canvass,Pass,Risk 1 (High),41.856053,-87.627311,"(41.85605269621059, -87.62731125804903)"


In [3]:
def preprocess(df):

    def fuzzy_normalize_column(df, column_name, threshold=80):
        """
        Normalize text values in a DataFrame column using fuzzy matching.
        
        Args:
        - df (pd.DataFrame): Input DataFrame.
        - column_name (str): Column name to normalize.
        - threshold (int): Similarity threshold for fuzzy matching (default is 80).
        
        Returns:
        - pd.DataFrame: DataFrame with a new normalized column.
        """
        df[column_name] = df[column_name].astype(str).fillna('')  # Convert to string

        unique_values = list(set(df[column_name].str.lower()))  # Unique values in lowercase

        # Reference mapping for normalization
        reference_mapping = {}
        groups = defaultdict(list)  # To store word clusters

        for value in unique_values:
            # Check if it's already in a group
            if value in reference_mapping:
                continue
            
            # Find similar words
            matches = process.extract(value, unique_values, limit=10, scorer=fuzz.ratio)
            matches = [(match, score) for match, score in matches if score >= threshold]
            
            if matches:
                best_match = max(matches, key=lambda x: x[1])[0]  # Pick the best-scoring match
            else:
                best_match = value  # Keep original if no good match found

            # Assign all similar words to the best match
            for match, score in matches:
                reference_mapping[match] = best_match
                groups[best_match].append(match)

        # Apply normalization mapping
        df[f'{column_name}'] = df[column_name].str.lower().map(reference_mapping)
        
        return df

    # renaming column names to snake_case
    COLUMN_NAMES = [
        'inspection_id',
        'dba_name',
        'aka_name',
        'license_',
        'facility_type',
        'risk',
        'address',
        'city',
        'state',
        'zip',
        'inspection_date',
        'inspection_type',
        'results',
        'violations',
        'latitude',
        'longitude',
        'location'
    ]

    df.columns = COLUMN_NAMES

    # drop irrelevant columns
    df.drop(['inspection_id', 'aka_name', 'location'], axis=1, inplace=True)

    # drop missing values
    df.dropna(subset=['city', 'state', 'zip', 'latitude', 'longitude'], inplace=True)

    # fix data type
    df = df.astype({'zip':'Int64', 'license_':'Int64'})

    # consolidate redundant values using fuzzy matching
    df = fuzzy_normalize_column(df, 'inspection_type', threshold=80).drop(['inspection_type'],axis=1)

    # drop columns after post processing 
    df.drop(['state'], axis=1, inplace=True)

    print("[bold green]SUCCESS[/bold green] File preprocessing completed.")
    print(df.head())

    return df

In [4]:
preprocessed_df = preprocess(df)

[bold green]SUCCESS[/bold green] File preprocessing completed.
                     dba_name  license_       facility_type             risk  \
0        BLOOMING BUD DAYCARE   2215789  Daycare Combo 1586    Risk 1 (High)   
1                 Babas Halal   2684170          Restaurant    Risk 1 (High)   
2    FIRST ZABIHA MEAT BAZAAR   2232559       Grocery Store  Risk 2 (Medium)   
3                HAPPY MARKET   2912802       Grocery Store  Risk 2 (Medium)   
4  SAT KAIVAL FOOD INC/SUBWAY   2728400          Restaurant    Risk 1 (High)   

                address     city    zip inspection_date             results  \
0    5715 N LINCOLN AVE  CHICAGO  60659          3/7/13                Pass   
1      7901 S DAMEN AVE  CHICAGO  60620         12/3/24                Fail   
2      2907 W DEVON AVE  CHICAGO  60659      02/20/2013                Fail   
3  2334 S WENTWORTH AVE  CHICAGO  60616          1/2/25  Pass w/ Conditions   
4       1916 S STATE ST  CHICAGO  60616          1/2/25      

In [5]:
preprocessed_df = preprocessed_df.dropna()

In [26]:

def find_inds(df:list [pd.DataFrame] | pd.DataFrame, algorithm_name:str='Default'):
    """
    Finds inclusion dependencies in a given DataFrame using a specified algorithm.
    
    Parameters:
        df (pd.DataFrame): The input DataFrame.
        algorithm_name (str): The name of the FD algorithm to use. Defaults to 'Default'.
    
    Returns:
        list: A list of discovered approximate functional dependencies.
    """
    try:

        # Get the algorithm class dynamically from desbordante.fd.algorithms
        algo_class = getattr(ind_algorithms, algorithm_name, ind_algorithms.Default)

        print(f"Algorthm: {algo_class}")
        
        algo = algo_class()
        algo.load_data(tables=df)
        algo.execute(
            allow_duplicates=False,  # Ignore duplicate INDs
        )
        
        # Filter out self-dependencies
        return [
            ind for ind in algo.get_inds()
            if ind.get_lhs().column_indices != ind.get_rhs().column_indices
        ]
    except AttributeError:
        raise ValueError(f"Algorithm '{algorithm_name}' not found. Available algorithms: {dir(ind_algorithms)}")
    

@dataclass
class InclusionDependency:
    lhs: List[str]  # Left-hand side attributes
    rhs: List[str]  # Right-hand side attributes

    def __str__(self):
       lhs_count = len(self.lhs)
       base = f"LHS={self.lhs} ({lhs_count}), RHS={self.rhs}"
       return base
    
@dataclass
class InclusionDependencySet:
    dependencies: List[InclusionDependency] = field(default_factory=list)
    validation_results: Dict[Tuple[Tuple[str, ...], str], Dict[str, Any]] = field(default_factory=dict)

    def add_dependency(self, lhs: List[str], rhs: List[str]):
        """Adds a new functional dependency to the set."""

        ind = InclusionDependency(lhs, rhs)

        if ind not in self.dependencies:
            self.dependencies.append(ind)

    def __len__(self):
        """Returns the number of functional dependencies."""
        return len(self.dependencies)

    def __iter__(self):
        """Allows iteration over functional dependencies."""
        return iter(self.dependencies)
    
    def validate_ind(self, df):
        """
        
        """
        GREEN_CODE = "\033[1;42m"
        RED_CODE = "\033[1;41m"
        BLUE_CODE = "\033[1;46m"
        DEFAULT_COLOR_CODE = "\033[1;49m"
        RESET_CODE = "\033[0m"

        def ind_str(lhs, rhs):
            def cc_str(cc):
                (df, indices) = cc
                columns = [df.columns[idx] for idx in indices]
                return ", ".join(f"{col}" for col in columns)

            return f"[{cc_str(lhs)}] -> [{cc_str(rhs)}]"

        def ind_verify(lhs, rhs):
            (lhs_table, lhs_indices) = lhs
            (rhs_table, rhs_indices) = rhs

            print(f"Checking the IND {ind_str((lhs_table, lhs_indices), (rhs_table, rhs_indices))}")

            algo = desbordante.ind_verification.algorithms.Default()
            algo.load_data(tables=[lhs_table, rhs_table])
            algo.execute(lhs_indices=lhs_indices, rhs_indices=rhs_indices)

            return algo
        
        def print_results_for_ind(verifier):
            if verifier.get_error() == 0:
                print(GREEN_CODE, "IND holds", RESET_CODE)
            else:
                print(RED_CODE, f"AIND holds with error = {verifier.get_error():.2}",
                    RESET_CODE)

        verifier = desbordante.ind_verification.algorithms.Default()
        
        verifier.load_data(tables=[df,df])

        for ind in self.dependencies:
            lhs_idx = df.columns.get_indexer(ind.lhs)
            rhs_idx = df.columns.get_indexer(ind.rhs)

            if lhs_idx[0] == -1:
                continue

            algo = ind_verify((df, [lhs_idx]),(df, [rhs_idx]))
            print_results_for_ind(algo)

In [7]:
def convert_ind(ind:desbordante.ind.IND) -> Tuple[list, list]:
    ind_str = str(ind)
    ind_str_split = ind_str.split("->") # split fd to lhs and rhs
    lhs = ind_str_split[0].strip() 
    rhs = ind_str_split[-1].strip()

    # Regex to match content within square brackets
    pattern = r"\[([^\[\]]+)\]"

    # Find matches
    lhs_matches = re.findall(pattern, lhs)

    rhs_matches = re.findall(pattern, rhs)

    return lhs_matches, rhs_matches

In [27]:
results = find_inds([preprocessed_df, preprocessed_df])

print(f"There are {len(results)} inclusion dependencies using Default algorithm.")

for ind in results:
    print(ind)

Algorthm: <class 'desbordante.ind.algorithms.Spider'>
There are 8 inclusion dependencies using Default algorithm.
(Pandas dataframe, [risk]) -> (Pandas dataframe, [violations])
(Pandas dataframe, [risk]) -> (Pandas dataframe, [violations])
(Pandas dataframe, [violations]) -> (Pandas dataframe, [risk])
(Pandas dataframe, [violations]) -> (Pandas dataframe, [risk])
(Pandas dataframe, [risk]) -> (Pandas dataframe, [violations])
(Pandas dataframe, [risk]) -> (Pandas dataframe, [violations])
(Pandas dataframe, [violations]) -> (Pandas dataframe, [risk])
(Pandas dataframe, [violations]) -> (Pandas dataframe, [risk])


In [28]:
ind_set = InclusionDependencySet()
for result in results:
    lhs, rhs =  convert_ind(result)
    ind_set.add_dependency(lhs, rhs)

In [29]:
# Validate all dependencies and store results
ind_set.validate_ind(df)

Checking the IND [Index(['risk'], dtype='object')] -> [Index(['violations'], dtype='object')]


  algo.execute(lhs_indices=lhs_indices, rhs_indices=rhs_indices)


[1;42m IND holds [0m
Checking the IND [Index(['violations'], dtype='object')] -> [Index(['risk'], dtype='object')]


  algo.execute(lhs_indices=lhs_indices, rhs_indices=rhs_indices)


[1;42m IND holds [0m
