# Data Preparation Pipeline

This notebook outlines the steps to prepare the data for various baseline methods. Before running the code, please download the relevant datasets from the provided data space and place them in the 'raw' folder.

[Magellan Repository](https://sites.google.com/site/anhaidgroup/useful-stuff/the-magellan-data-repository)

[WDC](https://webdatacommons.org/largescaleproductcorpus/v2/)

The first part of this notebook outlines the steps to process the raw datasets, while the second part focuses on creating datasets tailored for the corresponding methods.

In [None]:
import os
import pandas as pd
import numpy as np
import copy
import json
import random
from autogluon.tabular import TabularPredictor

## General Data Preparation

We need differnt handling strategy for the Magellan datasets and WDC dataset.

### Magellan Datasets

In [None]:
magellan_dirs = {
    'abt': 'raw/abt_buy', 'amgo': 'raw/amazon_google',
    'beer': 'raw/beer', 'dbac': 'raw/dblp_acm',
    'dbgo': 'raw/dblp_scholar', 'foza': 'raw/fodors_zagat',
    'itam': 'raw/itunes_amazon', 'waam': 'raw/walmart_amazon',
    'roim': 'raw/rotten_imdb', 'zoye': 'raw/zomato_yelp'
}

magellan_rename_columns = {
    'abt': ['id', 'name', 'description', 'price'], 'amgo': ['id', 'name', 'manufacturer', 'price'],
    'beer': ['id', 'name', 'factory', 'style', 'ABV'], 'dbac': ['id', 'title', 'authors', 'venue', 'year'],
    'dbgo': ['id', 'title', 'authors', 'venue', 'year'], 'foza': ['id', 'name', 'address', 'city', 'phone', 'type', 'class'],
    'itam': ['id', 'name', 'artist', 'album', 'genre', 'price', 'copyright', 'time', 'released'],
    'waam': ['id', 'name', 'category', 'brand', 'modelno', 'price'],
    'roim': ['id', 'name', 'release date', 'director', 'duration', 'genre'],
    'zoye': ['id', 'name', 'votes', 'rating', 'phone', 'address', 'zip', 'cuisine']
}

magellan_drop_columns = {
    'abt': ['description'], 'amgo': [], 'beer': [], 'dbac': [], 'dbgo': [], 'foza': [], 'itam': [],
    'waam': [], 'roim': [], 'zoye': []}

In [None]:
def merge_with_id(tableA, tableB, id_pairs):
    left_merged = pd.merge(tableA, id_pairs, left_on='id', right_on='ltable_id')
    left_right_merged = pd.merge(left_merged, tableB, left_on='rtable_id', right_on='id', suffixes=('_l', '_r'))
    left_right_merged.drop(columns=['ltable_id', 'rtable_id', 'id_l', 'id_r'], inplace=True)
    return left_right_merged

In [None]:
def merge_with_id(tableA, tableB, id_pairs):
    left_merged = pd.merge(tableA, id_pairs, left_on='id', right_on='ltable_id')
    left_right_merged = pd.merge(left_merged, tableB, left_on='rtable_id', right_on='id', suffixes=('_l', '_r'))
    left_right_merged.drop(columns=['ltable_id', 'rtable_id', 'id_l', 'id_r'], inplace=True)
    return left_right_merged

In [2]:
def prepare_magellan_row_pairs(dirs: dict, rename_columns: dict, drop_columns: dict):
    for d_name in dirs:
        tableA = pd.read_csv(os.path.join(dirs[d_name], 'tableA.csv'))
        tableB = pd.read_csv(os.path.join(dirs[d_name], 'tableB.csv'))
        tableA.columns = rename_columns[d_name]
        tableB.columns = rename_columns[d_name]
        tableA.drop(columns=drop_columns[d_name], inplace=True)
        tableB.drop(columns=drop_columns[d_name], inplace=True)

        train_id_pairs = pd.read_csv(os.path.join(dirs[d_name], 'train.csv'))
        valid_id_pairs = pd.read_csv(os.path.join(dirs[d_name], 'valid.csv'))
        test_id_pairs = pd.read_csv(os.path.join(dirs[d_name], 'test.csv'))
        train_df = merge_with_id(tableA, tableB, train_id_pairs)
        valid_df = merge_with_id(tableA, tableB, valid_id_pairs)
        test_df = merge_with_id(tableA, tableB, test_id_pairs)
        if len(test_df) > 1250:
            test_df = test_df.sample(n=1250)

        if not os.path.exists(f'prepared/{d_name}'):
            os.makedirs(f'prepared/{d_name}')
        train_df.to_csv(f'prepared/{d_name}/train.csv', index=False)
        valid_df.to_csv(f'prepared/{d_name}/valid.csv', index=False)
        test_df.to_csv(f'prepared/{d_name}/test.csv', index=False)

# prepare_magellan_row_pairs(magellan_dirs, magellan_rename_columns, magellan_drop_columns)

### WDC

In [5]:
def prepare_wdc_row_pairs(dir: str):
    used_columns = ['title_left', 'price_left', 'priceCurrency_left', 'label', 'title_right', 'price_right', 'priceCurrency_right']
    train_df = pd.read_pickle(os.path.join(dir, 'train.pkl.gz'))[used_columns]
    valid_df = pd.read_pickle(os.path.join(dir, 'valid.pkl.gz'))[used_columns]
    test_df = pd.read_pickle(os.path.join(dir, 'test.pkl.gz'))[used_columns]

    merge_price_currency = lambda x, y: str(y) + str(x) if pd.notna(x) and pd.notna(y) else None
    train_df['price_left'] = train_df.apply(lambda x: merge_price_currency(x['price_left'], x['priceCurrency_left']), axis=1)
    train_df['price_right'] = train_df.apply(lambda x: merge_price_currency(x['price_right'], x['priceCurrency_right']), axis=1)
    train_df.drop(columns=['priceCurrency_left', 'priceCurrency_right'], inplace=True)
    train_df.columns = ['title_l', 'price_l', 'label', 'title_r', 'price_r']

    valid_df['price_left'] = valid_df.apply(lambda x: str(x['price_left'])+ str(x['priceCurrency_left']), axis=1)
    valid_df['price_right'] = valid_df.apply(lambda x: str(x['price_right'])+ str(x['priceCurrency_right']), axis=1)
    valid_df.drop(columns=['priceCurrency_left', 'priceCurrency_right'], inplace=True)
    valid_df.columns = ['title_l', 'price_l', 'label', 'title_r', 'price_r']

    test_df['price_left'] = test_df.apply(lambda x: str(x['price_left'])+ str(x['priceCurrency_left']), axis=1)
    test_df['price_right'] = test_df.apply(lambda x: str(x['price_right'])+ str(x['priceCurrency_right']), axis=1)
    test_df.drop(columns=['priceCurrency_left', 'priceCurrency_right'], inplace=True)
    test_df.columns = ['title_l', 'price_l', 'label', 'title_r', 'price_r']
    if len(test_df) > 1250:
            test_df = test_df.sample(n=1250) 

    if not os.path.exists(f'prepared/wdc'):
        os.makedirs(f'prepared/wdc')
    train_df.to_csv(f'prepared/wdc/train.csv', index=False)
    valid_df.to_csv(f'prepared/wdc/valid.csv', index=False)
    test_df.to_csv(f'prepared/wdc/test.csv', index=False)

# prepare_wdc_row_pairs('raw/wdc')

## Method Specific Data Preparation

In [None]:
src_dataset_dir = 'processed/'
datasets = ['abt', 'amgo', 'beer', 'dbac', 'dbgo', 'foza', 'itam', 'roim', 'waam', 'wdc', 'zoye']

### OpenAI GPT

We need to prepare the prompts that can be used by the OpenAI api.

In [None]:
models = ['gpt-3.5-turbo', 'gpt-4o-mini', 'gpt-4']
rand_seeds = [42, 44, 46, 48, 50]

In [4]:
sys_msg = '''Determine whether the two entity descriptions refer to the same real-world entity. Answer with 'Yes' if they do and 'No' if they do not.'''
zero_shot_request_template = {"custom_id": None, "method": "POST", "url": "/v1/chat/completions", "body": {"model": None, "messages": [{"role": "system", "content": None},{"role": "user", "content": None}],"max_tokens": 15}}
few_shot_request_template = {"custom_id": None, "method": "POST", "url": "/v1/chat/completions", "body": {"model": None, "messages": [{"role": "system", "content": None},{"role": "user", "content": None}, {"role": "assistant", "content": None},{"role": "user", "content": None}, {"role": "assistant", "content": None},{"role": "user", "content": None}, {"role": "assistant", "content": None},{"role": "user", "content": None}],"max_tokens": 15}}

In [5]:
def serializer(data, rand_seed):
    random.seed(rand_seed)
    columns = [col[:-2] for col in data.columns if col.endswith('_l')]
    random.shuffle(columns)
    columns_l = [col+'_l' for col in columns]
    columns_r = [col+'_r' for col in columns]
    
    text_l = '{}\t' * (len(columns) - 1) + '{}'
    text_r = '{}\t' * (len(columns) - 1) + '{}'
    text = f'Entity 1: {text_l}\nEntity 2: {text_r}\nYour answer is '
    data['text'] = data.apply(lambda x: text.format(*x[columns_l], *x[columns_r]), axis=1)
    texts = data['text'].to_list()
    return texts

In [6]:
def demonstrations(loo_dataset, mode='manual'):
    leaved_datasets = [d for d in datasets if d!=loo_dataset]
    candidates = []
    labels = []
    if mode == 'manual':
        with open('gpt/demonstrations.json') as f:
            examples = json.load(f)
        for dataset in leaved_datasets:
            candidates += examples[dataset]['mismatches'] + examples[dataset]['matches']
            labels += ['No', 'No'] + ['Yes']
        for i, candidate in enumerate(candidates):
            text_l, text_r = candidate.split('\n')
            candidates[i] = f'Entity 1: {text_l}\nEntity 2: {text_r}\nYour answer is '
    else:
        for dataset in leaved_datasets:
            candidates += serializer(pd.read_csv(f'{src_dataset_dir}{dataset}/test.csv'), 42)
            labels += pd.read_csv(f'{src_dataset_dir}{dataset}/test.csv')['label'].apply(lambda x: 'Yes' if x==1 else 'No').to_list()
    return candidates, labels

In [7]:
def prepare_requests(texts, model, dataset, rand_seed, mode='zero-shot'):
    requests = []
    if mode == 'zero-shot':
        for i, text in enumerate(texts):
            request = copy.deepcopy(zero_shot_request_template)
            custom_id = f'{dataset}-{rand_seed}-{i}'
            request['custom_id'] = custom_id
            request['body']['model'] = model
            request['body']['messages'][0]['content'] = sys_msg
            request['body']['messages'][1]['content'] = text
            requests.append(request)
    else:
        if mode == 'few-shot-manual':
            candidates, labels = demonstrations(dataset, mode='manual')
        elif mode == 'few-shot-random':
            candidates, labels = demonstrations(dataset, mode='random')
        else:
            print('Not supported mode')
            return None
        for i, text in enumerate(texts):
            request = copy.deepcopy(few_shot_request_template)
            custom_id = f'{dataset}-{rand_seed}-{i}'
            request['custom_id'] = custom_id
            request['body']['model'] = model
            request['body']['messages'][0]['content'] = sys_msg
            id1, id2, id3 = np.random.choice(len(candidates), 3, replace=False)
            request['body']['messages'][1]['content'] = candidates[id1]
            request['body']['messages'][2]['content'] = labels[id1]
            request['body']['messages'][3]['content'] = candidates[id2]
            request['body']['messages'][4]['content'] = labels[id2]
            request['body']['messages'][5]['content'] = candidates[id3]
            request['body']['messages'][6]['content'] = labels[id3]
            request['body']['messages'][7]['content'] = text
            requests.append(request)

    return requests

In [8]:
def prepare_gpt_input(model, mode='zero-shot'):
    requests = []
    for seed in rand_seeds:
        for dataset in datasets:
            data = pd.read_csv(f'{src_dataset_dir}{dataset}/test.csv')
            per_requests = prepare_requests(serializer(data, seed), model, dataset, seed, mode)
            requests += per_requests
    if mode == 'zero-shot':
        file_path = f'gpt/prompts/em-{model}.jsonl'
    elif mode == 'few-shot-manual':
        file_path = f'gpt/prompts/em-few-shot-manual-{model}.jsonl'
    elif mode == 'few-shot-random':
        file_path = f'gpt/prompts/em-few-shot-random-{model}.jsonl'
    with open(file_path, "w") as file:
        for item in requests:
            file.write(json.dumps(item) + "\n")

In [9]:
# for model in models:
#     prepare_gpt_input(model, 'few-shot-random')

### Ditto

The Ditto accept txt format input, so we need to convert the original csv file into the give txt format,  """COL col VAL val..."""

In [30]:
def textualize(data, rand_seed, mode='train'):
    random.seed(rand_seed)
    columns = [col[:-2] for col in data.columns if col.endswith('_l')]
    random.shuffle(columns)
    columns_l = [col+'_l' for col in columns]
    columns_r = [col+'_r' for col in columns]

    if mode == 'test':
        text_l = 'COL unknown VAL {} ' * (len(columns) - 1) + 'COL unknown VAL {}'
        text_r = 'COL unknown VAL {} ' * (len(columns) - 1) + 'COL unknown VAL {}'
        label = '{}'
        text = f'{text_l}\t{text_r}\t{label}\n'
        data['text'] = data.apply(lambda x: text.format(*x[columns_l], *x[columns_r], x['label']), axis=1)
    else:
        text_l = 'COL {} VAL {} ' * (len(columns) - 1) + 'COL {} VAL {}'
        text_r = 'COL {} VAL {} ' * (len(columns) - 1) + 'COL {} VAL {}'
        label = '{}'
        text = f'{text_l}\t{text_r}\t{label}\n'
        cross_mix = lambda a, b: [item for tup in zip(a, b) for item in tup]
        data['text'] = data.apply(lambda x: text.format(*cross_mix(columns, x[columns_l]), *cross_mix(columns, x[columns_r]), x['label']), axis=1)
    
    texts = data['text'].to_list()
    return texts

In [31]:
def prepare_ditto_dataset(loo_dataset, rand_seed):
    leaved_datasets = [d for d in datasets if d!=loo_dataset]
    src_train_paths = [f'{src_dataset_dir}{d}/train.csv' for d in leaved_datasets]
    src_valid_paths = [f'{src_dataset_dir}{d}/valid.csv' for d in leaved_datasets]
    src_test_path = f'{src_dataset_dir}{loo_dataset}/test.csv'

    train_texts = []
    valid_texts = []
    for i in range(len(leaved_datasets)):
        train_df = pd.read_csv(src_train_paths[i])
        valid_df = pd.read_csv(src_valid_paths[i])
        train_texts += textualize(train_df.sample(n=min(1500, len(train_df)), random_state=rand_seed).reset_index(drop=True), rand_seed)
        valid_texts += textualize(valid_df.sample(n=min(500, len(valid_df)), random_state=rand_seed).reset_index(drop=True), rand_seed)
    test_texts = textualize(pd.read_csv(src_test_path), rand_seed, mode='test')

    save_dir = f'ditto/loo-{loo_dataset}-{rand_seed}'
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    with open(f'{save_dir}/train.txt', 'w') as f:
        f.writelines(train_texts)
    with open(f'{save_dir}/valid.txt', 'w') as f:
        f.writelines(valid_texts)
    with open(f'{save_dir}/test.txt', 'w') as f:
        f.writelines(test_texts)
    

In [34]:
# configs = []
# config_template = {
#     "name": None,
#     "task_type": "classification",
#     "vocab": ["0", "1"],
#     "trainset": None,
#     "validset": None,
#     "testset": None
#   }

# for seed in rand_seeds:
#     for dataset in datasets:
#         config = copy.deepcopy(config_template)
#         config['name'] = f'loo-{dataset}-{seed}'
#         config['trainset'] =  f'../data/ditto/loo-{dataset}-{seed}/train.txt'
#         config['validset'] =  f'../data/ditto/loo-{dataset}-{seed}/valid.txt'
#         config['testset'] =  f'../data/ditto/loo-{dataset}-{seed}/test.txt'
#         configs.append(config)
# with open('../ditto/configs.json', "w") as file:
#     json.dump(configs, file, indent=4)

# rand_seeds = [42, 44, 46, 48, 50]

# for seed in rand_seeds:
#     for dataset in datasets:
#         prepare_ditto_dataset(dataset, seed)
    

### Unicorn

In [42]:
def prepare_unicorn_dataset(dataset, rand_seed):
    random.seed(rand_seed)
    for partition in ['train.csv', 'valid.csv', 'test.csv']:
        df = pd.read_csv(f'{src_dataset_dir}{dataset}/{partition}')
        df = df.fillna('')
        if partition == 'train.csv':
            df = df.sample(n=min(1500, len(df)), random_state=rand_seed).reset_index(drop=True)
        elif partition == 'valid.csv':
            df = df.sample(n=min(500, len(df)), random_state=rand_seed).reset_index(drop=True)
        
        left_columns = [col for col in df.columns if col.endswith('_l')]
        right_columns = [col for col in df.columns if col.endswith('_r')]
        random.shuffle(left_columns)
        random.shuffle(right_columns)
        item_list = []
        for i in range(len(df)):
            left_str = '{},'*len(left_columns)
            right_str = '{},'*len(right_columns)
            left = df.iloc[i][left_columns].values
            right = df.iloc[i][right_columns].values
            left_str = left_str.format(*left)[:-1]
            right_str = right_str.format(*right)[:-1]
            item = [left_str, right_str, str(df.iloc[i]['label'])]
            item_list.append(item)
        if not os.path.exists(f'unicorn/{dataset}-{rand_seed}'):
            os.makedirs(f'unicorn/{dataset}-{rand_seed}')
        with open(f'unicorn/{dataset}-{rand_seed}/{partition[:-4]}.json', 'w') as f:
            json.dump(item_list, f)

In [43]:
# for dataset in datasets:
#     for seed in rand_seeds:
#         prepare_unicorn_dataset(dataset, seed)

### AnyMatch
AnyMatch requires attribute augmentation and results from autoML, it will be directly placed in the 'processed' folder for simplicity.

In [None]:
def nan_check(value):
    null_strings = [None, 'nan', 'NaN', 'NAN', 'null', 'NULL', 'Null', 'None', 'none', 'NONE', '', '-', '--', '---']
    if pd.isna(value) or pd.isnull(value) or value in null_strings:
        return 1
    else:
        return 0

def numerical_check(value):
    if isinstance(value, int) or isinstance(value, float):
        return 1

def string_identical_check(left_value, right_value, row_label):
    if left_value == right_value or left_value in right_value or right_value in left_value:
        return 1
    else:
        if row_label == 1:
            return 1
        else:
            return 0

def numerical_identical_check(left_value, right_value, row_label):
    if left_value == right_value:
        return 1
    else:
        return 0

def identical_check(left_value, right_value, row_label):
    if nan_check(left_value) and not nan_check(right_value):
        return 0
    elif not nan_check(left_value) and nan_check(right_value):
        return 0
    elif nan_check(left_value) and nan_check(right_value):
        return 1
    elif numerical_check(left_value) and numerical_check(right_value):
        return numerical_identical_check(left_value, right_value, row_label)
    else:
        left_value = str(left_value).lower()
        right_value = str(right_value).lower()
        return string_identical_check(left_value, right_value, row_label)

In [None]:
def row2attribute_pairs(row):
    attr_pairs = []
    all_columns = row.index
    left_columns = [col for col in all_columns if col.endswith('_l')]
    right_columns = [col for col in all_columns if col.endswith('_r')]
    row_label = row['label']
    for i in range(len(left_columns)):
        left_value = row[left_columns[i]]
        right_value = row[right_columns[i]]
        attr_pair = [left_value, right_value, identical_check(left_value, right_value, row_label), left_columns[i][:-2]]
        attr_pairs.append(attr_pair)
    return attr_pairs

In [None]:
def prepare_all_attribute_pairs(names: list):
    for name in names:
        train_row_pairs = pd.read_csv(f'prepared/{name}/train.csv')
        valid_row_pairs = pd.read_csv(f'prepared/{name}/valid.csv')
        test_row_pairs = pd.read_csv(f'prepared/{name}/test.csv')
        train_attr_pairs = []
        valid_attr_pairs = []
        test_attr_pairs = []

        train_row_pairs.apply(lambda row: train_attr_pairs.extend(row2attribute_pairs(row)), axis=1)
        valid_row_pairs.apply(lambda row: valid_attr_pairs.extend(row2attribute_pairs(row)), axis=1)
        test_row_pairs.apply(lambda row: test_attr_pairs.extend(row2attribute_pairs(row)), axis=1)

        train_attr_pairs_df = pd.DataFrame(train_attr_pairs, columns=['left_value', 'right_value', 'label', 'attribute'])
        val_attr_pairs_df = pd.DataFrame(valid_attr_pairs, columns=['left_value', 'right_value', 'label', 'attribute'])
        test_attr_pairs_df = pd.DataFrame(test_attr_pairs, columns=['left_value', 'right_value', 'label', 'attribute'])
        train_attr_pairs_df.drop_duplicates(inplace=True)
        val_attr_pairs_df.drop_duplicates(inplace=True)
        test_attr_pairs_df.drop_duplicates(inplace=True)

        train_attr_pairs_df.to_csv(f'prepared/{name}/attr_train.csv', index=False)
        val_attr_pairs_df.to_csv(f'prepared/{name}/attr_valid.csv', index=False)
        test_attr_pairs_df.to_csv(f'prepared/{name}/attr_test.csv', index=False)

In [None]:
# prepare_all_attribute_pairs(dataset_names)

In [None]:
def prepare_automl_predictions():
    dataset_names = ['abt', 'amgo', 'beer', 'dbac', 'dbgo', 'foza', 'itam', 'roim', 'waam', 'wdc', 'zoye']
    for name in dataset_names:
        train_df = pd.read_csv(f'prepared/{name}/train.csv')
        valid_df = pd.read_csv(f'prepared/{name}/valid.csv')

        predictor = TabularPredictor(label='label').fit(train_data=train_df, tuning_data=valid_df, verbosity=-1)
        train_preds = predictor.predict(train_df)
        train_preds_proba = predictor.predict_proba(train_df)
        valid_preds = predictor.predict(valid_df)
        valid_preds_proba = predictor.predict_proba(valid_df)
        train_preds_df = pd.DataFrame({'prediction': train_preds, 'proba_0': train_preds_proba[0], 'proba_1': train_preds_proba[1]})
        valid_preds_df = pd.DataFrame({'prediction': valid_preds, 'proba_0': valid_preds_proba[0], 'proba_1': valid_preds_proba[1]})

        if not os.path.exists(f'automl/{name}'):
            os.makedirs(f'automl/{name}')
        train_preds_df.to_csv(f'automl/{name}/train_preds.csv', index=False)


In [None]:
# prepare_automl_predictions()

### ZeroER
Please just create a seperate 'metadata.txt' file for each dataset, while it include the path of tableA, tableB, and the id pairs of matching entities.

### Jellyfish & MatchGPT

These two methods will directly use data in precessed folder to form prompts.