In [None]:
import math
import multiprocessing as mp
from functools import partial

import pandas as pd
from tqdm.notebook import tqdm

from utils.dataset_utils import Dataset, get_datasets, systematic_sample

tqdm.pandas()

In [None]:
# How much of CPU threads not to use in the parallelization (and so leave free).
CPU_THREADS_RESERVED = 1
BODY_SLICE_SIZE = 50
BODY_ARTICLES_SAMPLE_MAX_SIZE = 10_000


def prepare_dataset_for_comparison(dataset: Dataset):
    print(f"preparing {dataset.name}")

    dataset.dataframe["body"] = (dataset.dataframe["body"]
                                 .str.replace(r"[^a-zA-Z]", "", regex=True)
                                 .str.lower())

    if dataset.dataframe.get("title") is None:
        dataset.dataframe["body_slice"] = dataset.dataframe["body"].map(
            lambda body: body[
                         math.floor(len(body) / 2 - BODY_SLICE_SIZE / 2)
                         :math.ceil(len(body) / 2 + BODY_SLICE_SIZE / 2)
                         ] if len(body) > BODY_SLICE_SIZE else body
        )
    else:
        dataset.dataframe["title"] = dataset.dataframe["title"].str.replace(r"[^a-zA-Z]", "", regex=True).str.lower()


def find_matches_chunk(chunk, df2, find_first_match):
    return chunk["body_slice"].apply(lambda body_slice: find_first_match(df2, body_slice)).count()


def parallel_intersection(df1, df2, find_first_match, n_processes=None):
    if n_processes is None:
        n_processes = max(1, mp.cpu_count() - CPU_THREADS_RESERVED)

    chunk_size = max(len(df1) // n_processes, 1)
    chunks = [df1.iloc[i:i + chunk_size] for i in range(0, len(df1), chunk_size)]

    worker_func = partial(find_matches_chunk, df2=df2, find_first_match=find_first_match)

    with mp.Pool(processes=n_processes) as pool:
        results = list(tqdm(
            pool.imap(worker_func, chunks),
            total=len(chunks),
            desc="processing chunks"
        ))

    return sum(results)


def find_first_match(haystack, needle):
    matches = haystack["body"].str.contains(needle, regex=False)
    return None if not matches.any() else haystack[matches].iloc[0]["body"]


datasets_count = len(list(get_datasets()))
results = [["" for _ in range(datasets_count)] for _ in range(datasets_count)]

for i1, dataset1 in enumerate(get_datasets()):
    prepare_dataset_for_comparison(dataset1)
    df1 = dataset1.dataframe

    datasets = enumerate(get_datasets())
    # Skip the first `i1` datasets to avoid measuring the intersections twice.
    for _ in range(i1):
        next(datasets)
    for i2, dataset2 in datasets:
        print(f"measuring {dataset1.name} & {dataset2.name}")

        if i1 == i2:
            intersection_size = len(df1)
        else:
            prepare_dataset_for_comparison(dataset2)
            df2 = dataset2.dataframe

            if df1.get("title") is None:
                df1_sample = systematic_sample(df1, BODY_ARTICLES_SAMPLE_MAX_SIZE)
                intersection_size = parallel_intersection(df1_sample, df2, find_first_match) * len(df1) / len(df1_sample)
            elif df2.get("title") is None:
                df2_sample = systematic_sample(df2, BODY_ARTICLES_SAMPLE_MAX_SIZE)
                intersection_size = parallel_intersection(df2_sample, df1, find_first_match) * len(df2) / len(df2_sample)
            else:
                intersection_size = len(set(df1["title"].str.strip())
                                        .intersection(set(df2["title"].str.strip())))

        # To avoid division by zero.
        df1_length = 1 if len(df1) == 0 else len(df1)
        df2_length = 1 if len(dataset2.dataframe) == 0 else len(dataset2.dataframe)

        intersection_size = round(intersection_size)
        results[i1][i2] = f"{intersection_size} ({intersection_size / df1_length * 100:.1f} %)"
        results[i2][i1] = f"{intersection_size} ({intersection_size / df2_length * 100:.1f} %)"

In [None]:
dataset_names = list(map(lambda dataset: dataset.name, get_datasets()))
results_df = pd.DataFrame(results, index=dataset_names, columns=dataset_names)
results_df