<a href="https://colab.research.google.com/github/menicacci/CE_for_ER/blob/main/ditto-data-integration.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Set up

In [None]:
# Set-up

# clone the repo
!git clone https://github.com/megagonlabs/ditto
%cd ditto/
!git pull

# install requirements
!pip install -r requirements.txt

!pip install transformers
!pip install tensorboardX

%cd ..

# support for the model

import nltk
nltk.download('stopwords')

!git clone https://github.com/NVIDIA/apex
%cd apex
!python setup.py install
%cd ..

!pip install --upgrade "urllib3==1.25.4" awscli
!pip install jsonlines

!pip install recordlinkage
import recordlinkage

!pip install subprocess
import subprocess

import os
import random
import shutil
import json

import pandas as pd

Cloning into 'ditto'...
remote: Enumerating objects: 291, done.[K
remote: Counting objects: 100% (291/291), done.[K
remote: Compressing objects: 100% (144/144), done.[K
remote: Total 291 (delta 146), reused 262 (delta 140), pack-reused 0[K
Receiving objects: 100% (291/291), 26.87 MiB | 16.73 MiB/s, done.
Resolving deltas: 100% (146/146), done.
Updating files: 100% (128/128), done.
/content/ditto
Already up to date.
Collecting gensim==3.8.1 (from -r requirements.txt (line 1))
  Downloading gensim-3.8.1.tar.gz (23.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m23.4/23.4 MB[0m [31m34.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting numpy==1.19.2 (from -r requirements.txt (line 2))
  Downloading numpy-1.19.2.zip (7.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.3/7.3 MB[0m [31m78.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting req

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


Cloning into 'apex'...
remote: Enumerating objects: 11632, done.[K
remote: Counting objects: 100% (3700/3700), done.[K
remote: Compressing objects: 100% (563/563), done.[K
remote: Total 11632 (delta 3341), reused 3263 (delta 3134), pack-reused 7932[K
Receiving objects: 100% (11632/11632), 15.46 MiB | 18.06 MiB/s, done.
Resolving deltas: 100% (8170/8170), done.
/content/apex
No CUDA runtime is found, using CUDA_HOME='/usr/local/cuda'

 If your intention is to cross-compile, this is not an error.
By default, Apex will cross-compile for Pascal (compute capabilities 6.0, 6.1, 6.2),
Volta (compute capability 7.0), Turing (compute capability 7.5),
and, if the CUDA version is >= 11.0, Ampere (compute capability 8.0).
If you wish to cross-compile for a single specific architecture,
export TORCH_CUDA_ARCH_LIST="compute capability" before running setup.py.



torch.__version__  = 2.1.0+cu121


running install
!!

        ***********************************************************************

In [None]:
def execute_command(command):
    env = os.environ.copy()
    env["CUDA_VISIBLE_DEVICES"] = "0"

    subprocess.run(command, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

def train_ditto(task):
    command = [
        "python", "train_ditto.py",
        "--task", task,
        "--batch_size", "32",
        "--max_len", "64",
        "--lr", "3e-5",
        "--n_epochs", "10",
        "--finetuning",
        "--lm", "distilbert",
        "--fp16",
        "--save_model"
    ]

    execute_command(command)

def run_matcher(task, input_path, output_path):
    command = [
        "python", "matcher.py",
        "--task", task,
        "--input_path", input_path,
        "--output_path", output_path,
        "--lm", "distilbert",
        "--max_len", "64",
        "--use_gpu",
        "--fp16",
        "--checkpoint_path", "checkpoints/"
    ]

    execute_command(command)

# Ditto training using two different datasets

In [None]:
# Load the training data

%cd ditto/data

# Function to read and extract data from files
def extract_data(directory_path, file_names):
    merge_data = []

    for file_name in file_names:
        file_path = os.path.join(directory_path, file_name)

        if os.path.exists(file_path):
            with open(file_path, 'r') as file:
                lines = file.readlines()
                merge_data.extend(lines)

    return merge_data

# Function to merge data, shuffle, and write to separate files in Merge directory
def merge_and_write_data(path, file_names):
    merge_all_data = []

    for subdir in os.listdir(path):
        dir_path = path + '/' + subdir
        if os.path.isdir(dir_path):
            merge_data = extract_data(dir_path, file_names)
            merge_all_data.extend(merge_data)

    # Shuffle the lines
    random.shuffle(merge_all_data)
    return merge_all_data


if os.path.exists('wdc/all'):
    shutil.rmtree('wdc/all')

data = []
data.extend(merge_and_write_data('er_magellan/Structured', ['train.txt', 'valid.txt', 'test.txt']))
data.extend(merge_and_write_data('wdc', ['test.txt', 'valid.txt.small', 'train.txt.small']))

random.shuffle(data)

# Calculate the split indices
total_lines = len(data)
train_split = int(0.7 * total_lines)
valid_split = int(0.2 * total_lines)

# Split the data
train_data = data[:train_split]
valid_data = data[train_split:train_split + valid_split]
test_data = data[train_split + valid_split:]

dataset = 'er_magellan'
task = 'Structured/Beer'

merge_directory = f"{dataset}/{task}"
if not os.path.exists(merge_directory):
  os.makedirs(merge_directory)

with open(os.path.join(merge_directory, 'train.txt'), 'w') as train_file:
  train_file.writelines(train_data)

with open(os.path.join(merge_directory, 'valid.txt'), 'w') as valid_file:
  valid_file.writelines(valid_data)

with open(os.path.join(merge_directory, 'test.txt'), 'w') as test_file:
  test_file.writelines(test_data)

%cd ..

/content/ditto/data
/content/ditto


In [None]:
train_ditto(task)

run_matcher(task, f"data/{dataset}/{task}/test.txt", "output/output_small.jsonl")

In [None]:
%cd ..

/content


# Blocking (finding candidate pairs)

In [None]:
# Load the datasource to perform Entity Matching
df = pd.read_csv('output_file.csv')
df.head()

Unnamed: 0,company_name,category,founded,employees,CEO,valuation,country,twitter,marketcap,revenue,website,trade_name
0,(DIA) Distribuidora Internacional de Alimentación,supermarkets food retail,,,,,spain,,$0.87 B,,,
1,(HLBank) Hong Leong Bank,banks financial-services,,,,,malaysia,,$9.97 B,,,
2,0x,Fintech,2016.0,,,$1.05B,United States,,,,,
3,0x,Fintech,,,,$1.05,United States,,,,,
4,1&1,telecommunication internet,,,,,germany,,$2.41 B,,,


In [None]:
def print_dict_sorted(dictionary):
    sorted_items = sorted(dictionary.items(), key=lambda x: x[1])
    for key, value in sorted_items:
        print(f'{key}: {value}')

total_rows = len(df)
null_percentage_per_column = {}
for column in df.columns:
    null_count = df[column].isnull().sum()
    null_percentage_per_column[column] = (null_count / total_rows) * 100

print("Percentage of null values for each column:")
print_dict_sorted(null_percentage_per_column)

Percentage of null values for each column:
company_name: 0.0
category: 30.97777681228684
revenue: 33.58677470510286
country: 43.37540460104707
marketcap: 45.55644863467512
founded: 61.82303999304847
employees: 65.3770121434623
CEO: 73.96215758260378
website: 85.14543914148545
trade_name: 94.1932961136576
valuation: 94.81458953359547
twitter: 97.54089457563052


In [None]:
# Perform blocking on datasource
indexer_1 = recordlinkage.Index()
indexer_1.block('company_name')
pairs_1 = indexer_1.index(df)

indexer_2 = recordlinkage.Index()
indexer_2.block('revenue')
pairs_2 = indexer_2.index(df)

candidate_pairs = list(set(pairs_1).union(set(pairs_2)))

# Entity Matching with Ditto

In [None]:
# Convert records into Ditto supported format
def to_string(r1, r2, l):
    content = ''
    for ent in [r1, r2]:
        if isinstance(ent, str):
            content += ent
        else:
            for attr in ent.keys():
                content += 'COL %s VAL %s ' % (attr, ent[attr])
        content += '\t'

    content += str(l) + '\n'
    return content

def get_record(candidate_pair):
  return to_string(df.iloc[candidate_pair[0]], df.iloc[candidate_pair[1]], 0)

In [None]:
records = [get_record(pair) for pair in candidate_pairs]


%cd ditto

input_file = "test.txt"
output_file = "output/result.jsonl"

with open(input_file, "w") as file:
  file.writelines(records)

run_matcher(task, input_file, output_file)

/content/ditto


In [None]:
def read_matching_result(output_path):
  with open(output_path, 'r') as file:
    output_lines = file.readlines()

  matching_pairs_index = [];

  for i, line in enumerate(output_lines):
      try:
          json_data = json.loads(line)
          # Check if the pairs are in match
          if int(json_data['match']) == 1:
              matching_pairs_index.append(i)
      except json.JSONDecodeError as e:
          print(f"Error decoding JSON at line {i}: {e}")

  return matching_pairs_index

In [None]:
matching_pairs_index = read_matching_result(output_file)

matching_pairs = [candidate_pairs[index] for index in matching_pairs_index]

In [None]:
# If two pairs (A, B) and (B, C) are in match, then also (A, C) is in match
def merge_tuples(tuple_array):
    graph = {}

    # Build a graph where each number is a node and each tuple represents an edge
    for a, b in tuple_array:
        if a not in graph:
            graph[a] = set()
        if b not in graph:
            graph[b] = set()
        graph[a].add(b)
        graph[b].add(a)

    # DFS traversal to find connected components
    visited = set()
    result = []
    for node in graph:
        if node not in visited:
            component = set()
            stack = [node]
            while stack:
                current_node = stack.pop()
                if current_node not in visited:
                    visited.add(current_node)
                    component.add(current_node)
                    stack.extend(graph[current_node])
            result.append(tuple(sorted(component)))

    return result

In [None]:
%cd ..


matching_records_ditto = merge_tuples(matching_pairs)

with open("matching.txt", "w") as match_file:
  match_file.writelines([f"{pair}\n" for pair in matching_records_ditto])

If you already have the file with the records in match, go here

In [None]:
# To load the indexes of records in match

with open('matching.txt', 'r') as file:
    lines = file.readlines()

pairs_in_match = []

for line in lines:
    line = line.strip().replace('(', '').replace(')', '')
    numbers = line.split(',')
    pair = (int(numbers[0]), int(numbers[1]))
    pairs_in_match.append(pair)