In [None]:
from pathlib import Path
import pandas as pd
pd.set_option("display.max_columns", None) 

ROOT = Path.cwd()

DATA_DIR = ROOT / "parquet"
MLDS_DIR = ROOT / "ml-datasets"
OUTPUT_DIR = ROOT / "output"
LOGS = ROOT / "logs"
BLOCK_EVAL_DIR = OUTPUT_DIR / "blocking_evaluation"
CORR_DIR = OUTPUT_DIR / "correspondences"

BLOCK_EVAL_DIR.mkdir(parents=True, exist_ok=True)
CORR_DIR.mkdir(parents=True, exist_ok=True)

LOGS.mkdir(parents=True, exist_ok=True)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

In [2]:
from PyDI.io import load_parquet

metacritic = load_parquet(
    DATA_DIR / "df_metacritic.parquet",
    name="metacritic"
)

playtime = load_parquet(
    DATA_DIR / "df_playtime.parquet",
    name="playtime"
)

vgsales = load_parquet(
  DATA_DIR / "df_videogamesales.parquet",
  name="videogamesales"

)

In [None]:
import re, unicodedata

def normalize_title(text):
    if not isinstance(text, str):
        return ""
    # remove accents/diacritics (Ã± -> n)
    text = unicodedata.normalize("NFKD", text).encode("ASCII", "ignore").decode("ASCII")
    cleaned = re.sub(r"[^a-zA-Z0-9]+", " ", text)
    return cleaned.lower().strip()


metacritic["title_norm"] = metacritic["title"].apply(normalize_title)
playtime["title_norm"] = playtime["title"].apply(normalize_title)
vgsales["title_norm"] = vgsales["title"].apply(normalize_title)

In [4]:
from PyDI.entitymatching import (StandardBlocker,
                                 SortedNeighbourhoodBlocker,
                                 TokenBlocker,
                                 EmbeddingBlocker,
                                 RuleBasedMatcher,
                                 StringComparator,
                                 NumericComparator,
                                 EntityMatchingEvaluator)

In [5]:
train_p2m = load_parquet(
    MLDS_DIR / "train_PM.parquet",
    name="train_playtime_metacritic",
    add_index=False
)

test_p2m = load_parquet(
    MLDS_DIR / "test_PM.parquet",
    name="test_playtime_metacritic",
    add_index=False
)

train_p2s = load_parquet(
    MLDS_DIR / "train_PS.parquet",
    name="train_playtime_sales",
    add_index=False
)

test_p2s = load_parquet(
    MLDS_DIR / "test_PS.parquet",
    name="test_playtime_sales",
    add_index=False
)
train_p2m = train_p2m.rename(columns={"id_left": "id1", "id_right": "id2"})
test_p2m = test_p2m.rename(columns={"id_left": "id1", "id_right": "id2"})
train_p2s = train_p2s.rename(columns={"id_left": "id1", "id_right": "id2"})
test_p2s = test_p2s.rename(columns={"id_left": "id1", "id_right": "id2"})

In [14]:
st_blocker_p2m = StandardBlocker(
    playtime, metacritic,
    on=['title_norm'],
    batch_size=1000,
    output_dir=BLOCK_EVAL_DIR,
    id_column='id'
)

standard_candidates_p2m = st_blocker_p2m.materialize()

sn_blocker_p2m = SortedNeighbourhoodBlocker(
    playtime, metacritic,
    key='title_norm',
    window=20,
    batch_size=750,
    output_dir=BLOCK_EVAL_DIR,
    id_column='id'
)

sn_candidates_p2m = sn_blocker_p2m.materialize()


embedding_blocker_p2m = EmbeddingBlocker(
    playtime, metacritic,
    text_cols=['title_norm', 'platform','release_year'],
    model="sentence-transformers/all-MiniLM-L6-v2",
    index_backend="sklearn",
    top_k=10,
    batch_size=500,
    output_dir=BLOCK_EVAL_DIR,
    threshold=0.7,
    # Otherwise retrieves irrelevant candidates
    id_column='id'
)

embedding_candidates_p2m = embedding_blocker_p2m.materialize()

In [15]:
st_blocker_p2s = StandardBlocker(
    playtime, vgsales,
    on=['title_norm'],
    batch_size=1000,
    output_dir=BLOCK_EVAL_DIR,
    id_column='id'
)

standard_candidates_p2s = st_blocker_p2s.materialize()

sn_blocker_p2s = SortedNeighbourhoodBlocker(
    playtime, vgsales,
    key='title',
    window=20,
    batch_size=750,
    output_dir=BLOCK_EVAL_DIR,
    id_column='id'
)

sn_candidates_p2s = sn_blocker_p2s.materialize()




embedding_blocker_p2s = EmbeddingBlocker(
    playtime, vgsales,
    text_cols=['title_norm', 'platform','release_year'],
    model="sentence-transformers/all-MiniLM-L6-v2",
    index_backend="sklearn",
    top_k=10,
    batch_size=500,
    output_dir=BLOCK_EVAL_DIR,
    threshold=0.7,
    # Otherwise retrieves irrelevant candidates
    id_column='id'
)

embedding_candidates_p2s = embedding_blocker_p2s.materialize()

In [16]:
common_comparators= [   
    StringComparator(
        column='title_norm',
        similarity_function='cosine'
    ),
    StringComparator(
        column='platform',
        similarity_function='identity',
        preprocess=str.lower
    ),
    NumericComparator(
    column="release_year",
    method="within_range",
    max_difference=0,   # exact match only
)]
comparators_p2m = common_comparators + [
    StringComparator(
        column='developer',
        similarity_function='cosine',
        preprocess=str.lower
    )
]
comparators_p2s= common_comparators + [
    StringComparator(
        column='publisher',
        similarity_function='jaccard',
        preprocess=str.lower
    )
]

In [135]:
matcher = RuleBasedMatcher()

correspondences_p2m,debug_p2m = matcher.match(
    df_left=playtime,
    df_right=metacritic, 
    candidates=embedding_candidates_p2m,
    comparators=comparators_p2m,
    weights=[0.5, 0.2, 0.2, 0.1],
    threshold=0.8,
    id_column='id',
    debug=True
)

correspondences_p2s,debug_p2s = matcher.match(
    df_left=playtime,
    df_right=vgsales, 
    candidates=embedding_candidates_p2s,
    comparators=comparators_p2s,
    weights=[0.5, 0.2, 0.2, 0.1],
    threshold=0.8,
    id_column='id',
    debug=True
)

In [136]:
cluster_analysis_dir = OUTPUT_DIR / "cluster_analysis"
cluster_analysis_dir.mkdir(parents=True, exist_ok=True)
cluster_distribution_p2m = EntityMatchingEvaluator.create_cluster_size_distribution(
    correspondences=correspondences_p2m,
    out_dir=cluster_analysis_dir
)
cluster_distribution_p2s = EntityMatchingEvaluator.create_cluster_size_distribution(
    correspondences=correspondences_p2s,
    out_dir=cluster_analysis_dir
)
print(f"\nðŸ“Š Cluster Size Distribution Results (Playtime -> Metacritic):")
display(cluster_distribution_p2m)
print(f"\nðŸ“Š Cluster Size Distribution Results (Playtime -> VGSales):")
display(cluster_distribution_p2s)


ðŸ“Š Cluster Size Distribution Results (Playtime -> Metacritic):


Unnamed: 0,cluster_size,frequency,percentage
0,2,8520,97.326936
1,3,169,1.930546
2,4,42,0.479781
3,5,20,0.228467
4,6,1,0.011423
5,7,2,0.022847



ðŸ“Š Cluster Size Distribution Results (Playtime -> VGSales):


Unnamed: 0,cluster_size,frequency,percentage
0,2,6010,96.252402
1,3,170,2.722614
2,4,46,0.736707
3,5,13,0.2082
4,6,2,0.032031
5,7,1,0.016015
6,8,2,0.032031


In [137]:
from PyDI.entitymatching import MaximumBipartiteMatching, StableMatching

# We are using Maxmimum Bipartite Matching to refine results to 1:1 matches
clusterer = MaximumBipartiteMatching()
mbm_correspondences_p2m = clusterer.cluster(correspondences_p2m)
mbm_correspondences_p2s = clusterer.cluster(correspondences_p2s)
cluster_distribution_p2m = EntityMatchingEvaluator.create_cluster_size_distribution(
    correspondences=mbm_correspondences_p2m
)
cluster_distribution_p2s = EntityMatchingEvaluator.create_cluster_size_distribution(
    correspondences=mbm_correspondences_p2s
)
print(f"\nðŸ“Š Cluster Size Distribution Results (Playtime -> Metacritic):")
display(cluster_distribution_p2m)
print(f"\nðŸ“Š Cluster Size Distribution Results (Playtime -> VGSales):")
display(cluster_distribution_p2s)


ðŸ“Š Cluster Size Distribution Results (Playtime -> Metacritic):


Unnamed: 0,cluster_size,frequency,percentage
0,2,8797,100.0



ðŸ“Š Cluster Size Distribution Results (Playtime -> VGSales):


Unnamed: 0,cluster_size,frequency,percentage
0,2,6306,100.0


In [138]:
all_correspondences = pd.concat([mbm_correspondences_p2m, mbm_correspondences_p2s], ignore_index=True)
len(all_correspondences)

15103

In [139]:
import numpy as np

datasets = [playtime, metacritic, vgsales]

for i, df in enumerate(datasets):
    df["genres"] = df["genres"].apply(
        lambda x: x.tolist() if isinstance(x, np.ndarray) else x
    )
    datasets[i] = df

In [140]:
golden_fusion_dataset=pd.read_csv(MLDS_DIR / "Golden_Fusion_Dataset.csv")
golden_fusion_dataset.head(2)

Unnamed: 0,id,title_provenance,title,platform_provenance,platform,release_year_provenance,release_year,developer_provenance,developer,publisher_provenance,publisher,genres_provenance,genres,global_sales_mil_provenance,global_sales_mil,main_story_hours_provenance,main_story_hour,main_plus_sides_hour_provenance,main_plus_sides_hour,completionist_hour_provenance,completionist_hour,critic_score_provenance,critic_score,esrb_rating_provenance,esrb_rating
0,playtime_21+sales_5232+metacritic_14639,metacritic_14639,007 Legends,metacritic_14639,PlayStation 3,metacritic_14639,2012,playtime_21,Eurocom,playtime_21,Activision,playtime_21+sales_5232+metacritic_14639,"['First-Person', 'Shooter', 'Action', 'Modern'...",sales_5232,0.36,playtime_21,6.7,playtime_21,7.95,playtime_21,9.58,metacritic_14639,41,metacritic_14639,T
1,playtime_4075+sales_841+metacritic_261,metacritic_261,Dead Space,metacritic_261,PlayStation 3,metacritic_261,2008,playtime_4075,Visceral Games,playtime_4075,Electronic Arts,playtime_4075+sales_841+metacritic_261,"['Horror', 'Shooter', 'Survival', 'Third-Perso...",sales_841,2.02,playtime_4075,11.07,playtime_4075,13.18,playtime_4075,20.42,metacritic_261,88,metacritic_261,M


In [141]:
from PyDI.fusion import (DataFusionStrategy,
                         DataFusionEngine,
                         longest_string,
                         union,
                         prefer_higher_trust,
                         voting,
                         favour_sources)



metacritic.attrs["trust_score"] = 3
playtime.attrs["trust_score"] = 2
vgsales.attrs["trust_score"] = 1

# merge rule based correspondences
all_correspondences = pd.concat([correspondences_p2m, correspondences_p2s], ignore_index=True)

# define data fusion strategy
strategy = DataFusionStrategy('video_games_fusion_strategy')

strategy.add_attribute_fuser('title', prefer_higher_trust, trust_key="trust_score")
strategy.add_attribute_fuser('platform', voting)
strategy.add_attribute_fuser('release_year', voting)
strategy.add_attribute_fuser('genres', union)
strategy.add_attribute_fuser('developer', prefer_higher_trust, trust_key="trust_score")
strategy.add_attribute_fuser('publisher', prefer_higher_trust, trust_key="trust_score")
strategy.add_attribute_fuser('critic_score', prefer_higher_trust, trust_key="trust_score")
strategy.add_attribute_fuser('main_story_hour', prefer_higher_trust, trust_key="trust_score")
strategy.add_attribute_fuser('global_sales_mil', prefer_higher_trust, trust_key="trust_score")


# run fusion
engine = DataFusionEngine(strategy, debug=True, debug_format='json',
                          debug_file="output/data_fusion/debug_fusion_rb_embedding_blocker.jsonl")

# fuse rule based matches
fused_dataset = engine.run(
    datasets=datasets,
    correspondences=all_correspondences,
    id_column="id",
    include_singletons=False,
)
def collect_ids_ordered(meta, prefixes=("playtime_", "sales_", "metacritic_")):
    buckets = {p: [] for p in prefixes}

    def add(s):
        if isinstance(s, str):
            for p in prefixes:
                if s.startswith(p) and s not in buckets[p]:
                    buckets[p].append(s)
                    break

    def walk(obj):
        if isinstance(obj, dict):
            for v in obj.values():
                walk(v)
        elif isinstance(obj, list):
            for v in obj:
                walk(v)
        else:
            add(obj)

    walk(meta)
    ordered = []
    for p in prefixes:
        ordered.extend(buckets[p])
    return "+".join(ordered)

# Apply to a DataFrame column of metadata dicts
fused_dataset["fusion_id"] = fused_dataset["_fusion_metadata"].apply(collect_ids_ordered)
fused_dataset["_id"]= fused_dataset["fusion_id"]
fused_dataset.drop(columns=["fusion_id"], inplace=True)

print(f'Fused rows: {len(fused_dataset):,}')
fused_dataset.sample(2)

Fused rows: 11,834


Unnamed: 0,_id,_fusion_group_id,_fusion_sources,completionist_hour,jp_sales_mil,na_sales_mil,title,main_plus_sides_hour,global_sales_mil,esrb_rating,title_norm,genres,release_year,other_sales_mil,critic_score,main_story_hour,publisher,platform,developer,user_score,id,eu_sales_mil,_fusion_confidence,_fusion_metadata
4788,playtime_58637+metacritic_10661,group_4788,"[playtime, metacritic]",18.4,,,Onee Chanbara ORIGIN,10.37,,,onee chanbara origin,"[Action, Beat Em Up, Compilation, Horror, Misc...",2020.0,,62.0,4.9,D3Publisher,PC,D3Publisher,,playtime_58637,,0.730769,"{'platform_rule': 'voting', 'platform_sources'..."
3702,playtime_26333+sales_4136+metacritic_6921,group_3702,"[videogamesales, playtime, metacritic]",32.77,0.01,0.25,Fable Anniversary,21.03,0.48,M,fable anniversary,"[Action, Action RPG, Role-Playing]",2014.0,0.04,68.0,13.4,Microsoft Game Studios,Xbox 360,Microsoft Game Studios,7.1,playtime_26333,0.19,0.75,"{'_id_rule': 'first_non_null', '_id_inputs': [..."


In [142]:
from PyDI.fusion import tokenized_match, boolean_match,numeric_tolerance_match,set_equality_match
from PyDI.fusion import DataFusionEvaluator
import numpy as np
import re, ast, numpy as np, pandas as pd


def categories_set_equal(a, b) -> bool:
    """Return True if a and b contain the same unique categories (order/type agnostic)."""
    def to_set(x):
        def items(v):
            # missing
            if v is None or (isinstance(v, float) and np.isnan(v)): return []
            # numpy array â†’ recurse over elements
            if isinstance(v, np.ndarray): 
                out=[]; [out.extend(items(e)) for e in v.flatten()]; return out
            # python containers â†’ recurse over elements
            if isinstance(v, (list, tuple, set)):
                out=[]; [out.extend(items(e)) for e in v]; return out
            # scalar/string: try parse stringified list; else split by delimiters
            s = str(v).strip()
            if s == "" or s.lower() in {"nan","none"}: return []
            try:
                parsed = ast.literal_eval(s)
                if isinstance(parsed, (list, tuple, set)): return items(parsed)
            except Exception:
                pass
            return [p.strip() for p in re.split(r"[|,;/]", s) if p.strip()]
        return {it.lower() for it in items(x)}
    return to_set(a) == to_set(b)

strategy.add_evaluation_function("title", tokenized_match)
strategy.add_evaluation_function("platform", tokenized_match)
strategy.add_evaluation_function("release_year", tokenized_match)
strategy.add_evaluation_function("genres", categories_set_equal)
strategy.add_evaluation_function("developer", tokenized_match)
strategy.add_evaluation_function("publisher", tokenized_match)
strategy.add_evaluation_function("critic_score", numeric_tolerance_match)
strategy.add_evaluation_function("main_story_hour", numeric_tolerance_match)
strategy.add_evaluation_function("global_sales_mil", numeric_tolerance_match)


# Create evaluator with our fusion strategy
evaluator = DataFusionEvaluator(strategy, debug=True, debug_file=OUTPUT_DIR / "data_fusion" / "debug_fusion_eval.jsonl", debug_format="json")

# Evaluate the fused results against the gold standard
print("Evaluating fusion results against gold standard...")
evaluation_results = evaluator.evaluate(
    fused_df=fused_dataset,
    fused_id_column='_id',
    gold_df=golden_fusion_dataset,
    gold_id_column='id',
)

# Display evaluation metrics
print("\nFusion Evaluation Results:")
print("=" * 40)
for metric, value in evaluation_results.items():
    if isinstance(value, float):
        print(f"  {metric}: {value:.3f}")
    else:
        print(f"  {metric}: {value}")
        
print(f"\nOverall Accuracy: {evaluation_results.get('overall_accuracy', 0):.1%}")

Evaluating fusion results against gold standard...

Fusion Evaluation Results:
  overall_accuracy: 0.868
  macro_accuracy: 0.871
  num_evaluated_records: 22
  num_evaluated_attributes: 12
  total_evaluations: 258
  total_correct: 224
  platform_accuracy: 1.000
  platform_count: 22
  main_plus_sides_hour_accuracy: 1.000
  main_plus_sides_hour_count: 21
  global_sales_mil_accuracy: 1.000
  global_sales_mil_count: 22
  genres_accuracy: 1.000
  genres_count: 22
  completionist_hour_accuracy: 1.000
  completionist_hour_count: 18
  release_year_accuracy: 0.000
  release_year_count: 22
  developer_accuracy: 0.545
  developer_count: 22
  esrb_rating_accuracy: 1.000
  esrb_rating_count: 22
  critic_score_accuracy: 1.000
  critic_score_count: 22
  main_story_hour_accuracy: 1.000
  main_story_hour_count: 21
  publisher_accuracy: 0.955
  publisher_count: 22
  title_accuracy: 0.955
  title_count: 22

Overall Accuracy: 86.8%
