# Overview

This notebook runs Valentine matchers on test data generated from the SANTOS data lake tables.

## Setup

1. Download the SANTOS labeled benchmark.
2. The data lake tables -- in the `datalake` directory -- are the tables to be sampled. Update the `table_dir` variable under the *Cluster Tables* section to point to this directory, relative to this notebook.
3. The test results are written to separate `.csv` files for each matcher. Update the `matcher_out_dir` and `matcher_out` variables under the *Valentine Tests* section to the desired output files.

# Imports

In [None]:
import os
import pandas as pd
from itertools import combinations
from valentine import valentine_match, valentine_metrics
from valentine.algorithms import Coma, Cupid, DistributionBased, JaccardLevenMatcher, SimilarityFlooding

# Cluster Tables

In [None]:
table_dir = "datalake"
all_tables = os.listdir(table_dir)
table_sets = [ s for s in dict.fromkeys( [ "_".join(f.split("_")[:-1]) for f in all_tables ] ) if s ]
table_sets

['311_calls_historic_data',
 'abandoned_wells',
 'albums',
 'animal_tag_data',
 'biodiversity',
 'business_rates',
 'cdc_nutrition_physical_activity_and_obesity_legislation',
 'cihr_co-applicant',
 'civic_building_locations',
 'complaint_by_practice',
 'contributors_parties',
 'data_mill',
 'deaths_2012_2018',
 'film_locations_in_san_francisco',
 'HMRC_exceptions_to_spending_controls_April_to_June_2018',
 'HMRC_exceptions_to_spending_controls_April_to_June_2018_facilities',
 'HMRC_exceptions_to_spending_controls_April_to_June_2019',
 'HMRC_WMI_headcount_and_payroll_data_Mar',
 'HMRC_exceptions_to_spending_controls_October_to_December_2017',
 'HMRC_Officials_meetings_with_tobacco_stakeholders_Apr_2015_to_Jun',
 'HMRC_Officials_meetings_with_tobacco_stakeholders_Apr_2017_to_June',
 'HMRC_Officials_meetings_with_tobacco_stakeholders_Apr_2018_to_June',
 'HMRC_Officials_meetings_with_tobacco_stakeholders_Jan_2020_to_Mar',
 'HMRC_Officials_meetings_with_tobacco_stakeholders_Jul_2014_to_Sept'

# Valentine Tests

## Setup Matchers

In [None]:
# Store the constructor of each matcher type by key, which also corresponds to the output dicts below
matcher_builders = {
    "coma_opt_matcher" : lambda : Coma(strategy="COMA_OPT"),            # COMA (schema based matching)
    "coma_opt_inst_matcher" : lambda : Coma(strategy="COMA_OPT_INST"),  # COMA (schema and instance based matching)
    "cupid_matcher" : lambda : Cupid(),                                 # Cupid
    "distr_based_matcher" : lambda : DistributionBased(),               # DistributionBased
    "jaccard_matcher" : lambda : JaccardLevenMatcher(),                 # JaccardLevenMatcher
    "sim_flooding_matcher" : lambda : SimilarityFlooding()              # SimilarityFlooding
}

## Setup Results

In [None]:
# Store the results of each matcher
matcher_res = {
    "coma_opt_matcher" : [],
    "coma_opt_inst_matcher" : [],
    "cupid_matcher" : [],
    "distr_based_matcher" : [],
    "jaccard_matcher" : [],
    "sim_flooding_matcher" : []
}

# Store the output paths of each matcher
matcher_out_dir = "valentine_experiments"
matcher_out = {
    "coma_opt_matcher" : "results__coma_opt.csv",
    "coma_opt_inst_matcher" : "results__coma_opt_inst.csv",
    "cupid_matcher" : "results__cupid.csv",
    "distr_based_matcher" : "results__distribution_based.csv",
    "jaccard_matcher" : "results__jaccard_levenshtein.csv",
    "sim_flooding_matcher" : "results__similarity_flooding.csv"
}

## Run All

In [None]:
def record_cluster_metrics(cluster_name, dfs, matcher):
  '''
  Records the results of a matcher on a set of tables.

  Params:
  cluster_name: prefix of table set, recorded for analysis
  dfs: list of dataframes of the table set
  matcher: matcher to use
  '''
  for i1, i2 in list(combinations(range(len(dfs)), 2)):
    df1: pd.DataFrame = dfs[i1]
    df2: pd.DataFrame = dfs[i2]
    ground_truth = get_ground_truth(df1, df2)
    matches = valentine_match(df1, df2, matcher_builders[matcher]())
    metrics = valentine_metrics.all_metrics(matches, ground_truth)
    metrics["dataset"] = cluster_name
    matcher_res[matcher].append(metrics)

In [None]:
def record_all_cluster_metrics(cluster_name, dfs):
  '''
  Record the results of all matchers on a set of tables.
  Note that only the instance-based matchers are currently tested.

  Params:
  cluster_name: prefix of table set, recorded for analysis
  dfs: list of dataframes of the table set
  '''
  if (len(dfs) == 0):
    return
  # record_cluster_metrics(cluster_name, dfs, "coma_opt_matcher")
  # record_cluster_metrics(cluster_name, dfs, "coma_opt_inst_matcher")
  # record_cluster_metrics(cluster_name, dfs, "cupid_matcher")
  record_cluster_metrics(cluster_name, dfs, "distr_based_matcher")
  record_cluster_metrics(cluster_name, dfs, "jaccard_matcher")
  # record_cluster_metrics(cluster_name, dfs, sim_flooding_matcher)

In [None]:
def save_metrics():
  '''
  Save the contents of the matcher results out to their respective files.
  '''
  for matcher, records in matcher_res.items():
    df = pd.DataFrame.from_dict(records)
    df.to_csv(os.path.join(matcher_out_dir, matcher_out[matcher]), index=False)

# Iterate over Test Sets

In [None]:
def get_test_set(pre: str) -> list:
  '''
  Get the test set split, corresponds to the SANTOS benchmark splits.
  '''
  s = [ t for t in all_tables if t.startswith(pre) ]
  if len(s) < 6:
    test = s[:1]
    train = s[1:-1]
    valid = s[-1:]
  if len(s) < 8:
    test = s[:2]
    train = s[2:-2]
    valid = s[-2:]
  else:
    test = s[:2]
    train = s[2:6]
    valid = s[-2:]
  return test

In [None]:
TOKEN_CT = 15  # MAX TOKENS SERIALIZED PER COLUMN

def get_dfs(table_lst: list) -> list:
  '''
  Pre-process the tables, corresponsed to the SANTOS pre-preprocessing.

  Params:
  table_lst: list of table names

  Returns:
  list of pre-processed dataframes
  '''
  # raw data
  dfs = [ pd.read_csv(os.path.join(table_dir, csv)) for csv in table_lst ]
  # drop numeric columns
  dfs = [ df.select_dtypes(exclude=['number']) for df in dfs ]
  # drop na rows
  dfs = [ df.dropna() for df in dfs ]
  # sample rows
  dfs = [ df.sample(n=TOKEN_CT) for df in dfs if len(df) >= TOKEN_CT ]
  # return
  return dfs

In [None]:
def get_ground_truth(df1: pd.DataFrame, df2: pd.DataFrame) -> list:
  '''
  Generate ground truth based on column names.

  Params:
  df1, df2: dataframes to pair

  Returns:
  list of tuples representing matching columns, for use with the Valenine matchers
  '''
  common_cols = set(df1.columns) & set(df2.columns)
  return [ (col, col) for col in common_cols ]

In [None]:
# MAIN LOOP
excluded = {"deaths_2012_2018"}  # tables that error when run through Valentine models
for pre in table_sets:
  if (pre in excluded): continue
  print(f"In cluster: {pre}")
  table_names = get_test_set(pre)
  test_dfs = get_dfs(table_names)
  record_all_cluster_metrics(pre, test_dfs)
  save_metrics()

In cluster: 311_calls_historic_data
In cluster: abandoned_wells
In cluster: albums
In cluster: animal_tag_data
In cluster: biodiversity
In cluster: business_rates
In cluster: cdc_nutrition_physical_activity_and_obesity_legislation
In cluster: cihr_co-applicant
In cluster: civic_building_locations
In cluster: complaint_by_practice
In cluster: contributors_parties
In cluster: data_mill
In cluster: film_locations_in_san_francisco
In cluster: HMRC_exceptions_to_spending_controls_April_to_June_2018
In cluster: HMRC_exceptions_to_spending_controls_April_to_June_2018_facilities
In cluster: HMRC_exceptions_to_spending_controls_April_to_June_2019
In cluster: HMRC_WMI_headcount_and_payroll_data_Mar
In cluster: HMRC_exceptions_to_spending_controls_October_to_December_2017
In cluster: HMRC_Officials_meetings_with_tobacco_stakeholders_Apr_2015_to_Jun
In cluster: HMRC_Officials_meetings_with_tobacco_stakeholders_Apr_2017_to_June
In cluster: HMRC_Officials_meetings_with_tobacco_stakeholders_Apr_2018_