# BrewER Experiments
Este notebook replica as funcionalidades do experiments.ipynb para o algoritmo BrewER

In [41]:
import pandas as pd
import pickle as pkl
from collections import deque
import time
import itertools as it
import numpy as np
import math
import pprint

import re

from brewer import Task, brewer, blocking, matching, fusion, parser

import variables as var

from itertools import combinations

In [42]:
# Change to raw dataset path
data = pd.read_csv("datasets/alaska_cameras_small/dataset.csv")

# Change to matches (dataframe with l_id and r_id pairs)
matches = pd.read_csv("datasets/alaska_cameras_small/matches.csv")

# Change to a pickled candidate pairs file (list of pairs)
with open(
    "datasets/alaska_cameras_small/blocking_functions/candidates_sparker.pkl",
    "rb",
) as f:
    candidates = pkl.load(f)

In [43]:
# If your matches aren't using l_id and r_id, or you have a different matching function change here
def matcher(l, r):
    return (
        ((matches["l_id"] == l) & (matches["r_id"] == r))
        | ((matches["l_id"] == r) & (matches["r_id"] == l))
    ).any()

In [44]:
# Change based on the aggregation functions you want to use for BrewER
def resolve_batch(records):
    entity = {}
    entity["_id"] = records['_id'].mode()[0]
    entity["description"] = records['description'].mode()[0]
    entity["brand"] = records['brand'].mode()[0]
    entity["price"] = records["price"].min()
    entity["mp"] = records["mp"].mean()
    return entity

In [45]:
def match_batch(candidate_pairs):
    seen = set()
    matches_set = set()
    not_matches_set = set()
    result = []
    comparisons = 0
    for idx, row in data.iterrows():
        # Change if your id column is different
        root = row["_id"]
        if root in seen:
            continue

        pairs = [pair for pair in candidate_pairs if root in pair]
        if not pairs:
            continue

        block = set().union(*pairs)

        entity_cluster = set([root])
        to_analyze = deque([root])

        while to_analyze:
            id = to_analyze.popleft()
            for candidate in block:
                if candidate in entity_cluster:
                    continue
                if id == candidate:
                    entity_cluster.add(candidate)
                    continue
                if (id, candidate) in matches_set:
                    entity_cluster.add(candidate)
                    to_analyze.append(candidate)
                    continue
                if (id, candidate) in not_matches_set:
                    continue
                comparisons += 1
                if matcher(id, candidate):
                    entity_cluster.add(candidate)
                    matches_set.add((id, candidate))
                    matches_set.add((candidate, id))
                    to_analyze.append(candidate)
                else:
                    not_matches_set.add((id, candidate))
                    not_matches_set.add((candidate, id))

        seen.update(entity_cluster)
        result.append(entity_cluster)
    return result, comparisons

In [46]:
# Run batch processing for comparison
start = time.time()

results, batch_comparisons = match_batch(candidates)

batch_elapsed_time = time.time() - start

In [47]:
# Change if your id column is different
resolved = pd.DataFrame(
    [resolve_batch(data[data["_id"].isin(entity)]) for entity in results]
)
# Change to the where query you used on BrewER, and the order by
batch_filtered = resolved[resolved["brand"] == "sony"].sort_values(
    "mp", ascending=False
)
batch_filtered_pairs = set()
for group in (results[index] for index in batch_filtered.index):
    if len(group) > 1:
        batch_filtered_pairs.update(
            [tuple(sorted(pair)) for pair in combinations(group, 2)]
        )

In [48]:
print("Batch filtered pairs:")
batch_filtered_pairs

Batch filtered pairs:


{('buy.net//6510', 'www.gosale.com//394'),
 ('www.ebay.com//24233', 'www.ebay.com//46633'),
 ('www.ebay.com//42799', 'www.ebay.com//60261'),
 ('www.ebay.com//24694', 'www.ebay.com//46395'),
 ('www.ebay.com//44209', 'www.ebay.com//55392'),
 ('www.ebay.com//45040', 'www.ebay.com//46256'),
 ('www.ebay.com//54873', 'www.ebay.com//59883'),
 ('www.ebay.com//45418', 'www.ebay.com//48532'),
 ('www.ebay.com//52701', 'www.ebay.com//54036'),
 ('www.ebay.com//59331', 'www.ebay.com//60845'),
 ('www.ebay.com//59331', 'www.ebay.com//59645'),
 ('www.ebay.com//46633', 'www.ebay.com//60259'),
 ('www.ebay.com//42093', 'www.ebay.com//52701'),
 ('www.ebay.com//47857', 'www.ebay.com//56042'),
 ('www.ebay.com//56849', 'www.ebay.com//58307'),
 ('www.ebay.com//46970', 'www.ebay.com//55524'),
 ('www.ebay.com//44753', 'www.ebay.com//57755'),
 ('buy.net//6417', 'www.ebay.com//47392'),
 ('www.gosale.com//1395', 'www.henrys.com//26'),
 ('www.ebay.com//57664', 'www.gosale.com//1325'),
 ('www.ebay.com//56737', 'www.e

In [49]:
print("Batch comparisons:", batch_comparisons)
print("Batch elapsed time:", batch_elapsed_time)
print("Batch emitted:", len(batch_filtered))

Batch comparisons: 22073
Batch elapsed time: 63.070372104644775
Batch emitted: 206


In [50]:
# Prepare gold standard for BrewER matching function
gold = set()
for _, match in matches.iterrows():
    gold.add((match["l_id"], match["r_id"]))
    gold.add((match["r_id"], match["l_id"]))

In [51]:
# Initialize BrewER results tracking
brewer_start = None
brewer_results = []
brewer_entities = []

def track_brewer_results(entity):
    global brewer_start, brewer_results, brewer_entities

    brewer_entities.append(entity)
    i = len(brewer_entities) - 1

    cluster = entity.get("matches", [entity["_id"]]) 
    if isinstance(cluster, (set, list)) and len(cluster) > 1:
        cluster_pairs = set(tuple(sorted(p)) for p in combinations(cluster, 2))
    else:
        cluster_pairs = set()

    tp = len(cluster_pairs & batch_filtered_pairs)
    fp = len(cluster_pairs - batch_filtered_pairs)


    brewer_results.append({
        "elapsed_time": entity.get("time", time.time() - brewer_start),
        "comparisons": entity.get("comparisons", 0),
        "order": entity["mp"],
        "tp": tp,
        "fp": fp,
    })


In [52]:
# Configure BrewER task
# Create a SQL query for BrewER - must follow the exact syntax expected by the parser
# This replicates the same query from the BlendER experiments.ipynb:
# - SELECT: VOTE(_id), VOTE(description), VOTE(brand), MIN(price), AVG(mp)
# - WHERE: brand = "sony" 
# - ORDER BY: mp DESC
sql_query = '''
SELECT VOTE(_id), VOTE(description), VOTE(brand), MIN(price), AVG(mp)
FROM alaska_cameras_small
GROUP BY ENTITY WITH MATCHER bf_sparker_mf_gt
HAVING VOTE(brand) = "sony"
ORDER BY AVG(mp) DESC
'''

print("Configuring BrewER task...")

# Create task object
class SimpleSQL:
    def __init__(self, value):
        self.value = value

sql_obj = SimpleSQL(sql_query)
parsed_query = parser(sql_obj)

if parsed_query["complete"]:
    task = Task(parsed_query)
    print("Task created successfully!")
    print(f"Dataset: {task.ds}")
    print(f"Blocking function: {task.blocking_function}")
    print(f"Matching function: {task.matching_function}")
    print(f"Ordering key: {task.ordering_key}")
    print(f"Ordering mode: {task.ordering_mode}")
    print(f"Aggregation functions: {task.aggregation_functions}")
else:
    print("Error: SQL query parsing failed!")
    print("Parsed query:", parsed_query)

Configuring BrewER task...
Task created successfully!
Dataset: alaska_cameras_small
Blocking function: SparkER (Meta-Blocking)
Matching function: Ground Truth
Ordering key: mp
Ordering mode: desc
Aggregation functions: {'_id': 'vote', 'description': 'vote', 'brand': 'vote', 'price': 'min', 'mp': 'avg'}


In [53]:
# Run BrewER algorithm (reutilizando a função matcher do batch)
if 'task' in locals() and parsed_query["complete"]:
    brewer_start = time.time()
    brewer_results = []
    brewer_entities = []

    # gold fica sem uso aqui; a função matcher do batch será chamada diretamente
    gold = []  # apenas placeholder

    # Candidates
    candidates_set = set(candidates)

    try:
        import importlib, brewer as brewer_mod
        importlib.reload(brewer_mod)

        # matching do Brewer delega para o seu matcher do batch
        def _my_matching(left_id, right_id, _gold_ignored):
            return matcher(left_id, right_id)  # usa a MESMA lógica do batch
        brewer_mod.matching = _my_matching

        results_df = brewer_mod.brewer(
            task, data, gold, candidates_set,
            demo=False, mode="scratch", results=[]
        )

        print(f"BrewER completed successfully! Emitted {len(results_df)} entities.")

        for _, entity in results_df.iterrows():
            track_brewer_results(entity.to_dict())

    except Exception as e:
        print(f"Error running BrewER: {e}")
        results_df = pd.DataFrame()

else:
    print("Cannot run BrewER: task creation failed or query parsing failed.")
    results_df = pd.DataFrame()



BrewER is running: setup started.

Setup completed... let's go!

{'_id': 0,
 'brand': 'sony',
 'comparisons': 2,
 'description': 'sony a7r black interchangeable lens digital slr camera body '
                'only (36.4 mp, memory stick/sd card slot) price comparison at '
                'buy.net',
 'matches': {'www.gosale.com//1215', 'www.ebay.com//53668', 'buy.net//5881'},
 'mp': np.float64(36.4),
 'price': np.float64(1586.99),
 'time': 1.4259588718414307}
{'_id': 1,
 'brand': 'sony',
 'comparisons': 416,
 'description': 'sony alpha ilce 6000l 24 3mp slr digital camera with e pz 16 '
                '50mm lens black | ebay',
 'matches': {'www.ebay.com//46949'},
 'mp': np.float64(24.3),
 'price': np.float64(nan),
 'time': 2.5894253253936768}
{'_id': 2,
 'brand': 'sony',
 'comparisons': 416,
 'description': 'sony a alpha nex 7 24 3 mp digital camera black body only | '
                'ebay',
 'matches': {'www.ebay.com//42034',
             'www.ebay.com//54868',
             'www.eba

In [54]:
print(results_df.columns.tolist())
print(results_df[["matches", "_id", "mp"]].head())


['_id', 'description', 'brand', 'price', 'mp', 'matches', 'comparisons', 'time']
                                             matches  _id    mp
0  {www.gosale.com//1215, www.ebay.com//53668, bu...    0  36.4
1                              {www.ebay.com//46949}    1  24.3
2  {www.gosale.com//531, www.ebay.com//60426, www...    2  24.3
3                              {www.ebay.com//42367}    3  24.3
4                              {www.ebay.com//42416}    4  24.3


In [None]:
# Create evaluation dataframe
brewer_dataframe = pd.DataFrame(brewer_results)

if len(brewer_dataframe) > 0:
    prev = brewer_dataframe["order"].shift(1)
    # CHANGE < TO > IF ASC
    brewer_dataframe["correct_order"] = (brewer_dataframe["order"] < prev) | np.isclose(brewer_dataframe["order"], prev, 1e-8)
    brewer_dataframe.loc[0, "correct_order"] = True

    brewer_dataframe["running_tp"] = brewer_dataframe["tp"].cumsum()
    brewer_dataframe["running_fp"] = brewer_dataframe["fp"].cumsum()
    brewer_dataframe["running_fn"] = len(batch_filtered_pairs) - brewer_dataframe["running_tp"]
    brewer_dataframe["running_correctness"] = brewer_dataframe["correct_order"].cumsum()

    brewer_dataframe["recall"] = (brewer_dataframe["running_tp"] / (brewer_dataframe["running_tp"] + brewer_dataframe["running_fn"])).replace(np.nan, 0)
    brewer_dataframe["precision"] = (brewer_dataframe["running_tp"] / (brewer_dataframe["running_tp"] + brewer_dataframe["running_fp"])).replace(np.nan, 0)
    brewer_dataframe["correctness"] = brewer_dataframe["running_correctness"] / (brewer_dataframe.index + 1)
    brewer_dataframe["f1"] = (
        2
        * (brewer_dataframe["precision"] * brewer_dataframe["recall"])
        / (brewer_dataframe["precision"] + brewer_dataframe["recall"])
    )

    brewer_dataframe.to_csv("output/brewer_results.csv", index=False)
else:
    print("No results from BrewER to evaluate.")

In [82]:
brewer_dataframe

Unnamed: 0,elapsed_time,comparisons,order,tp,fp,correct_order,running_tp,running_fp,running_fn,running_correctness,recall,precision,correctness,f1
0,1.425959,2,36.4,3,0,True,3,0,3359,1,0.000892,1.0,1.0,0.001783
1,2.589425,416,24.3,0,0,True,3,0,3359,2,0.000892,1.0,1.0,0.001783
2,2.592755,416,24.3,10,0,True,13,0,3349,3,0.003867,1.0,1.0,0.007704
3,2.595556,416,24.3,0,0,True,13,0,3349,4,0.003867,1.0,1.0,0.007704
4,2.598356,416,24.3,0,0,True,13,0,3349,5,0.003867,1.0,1.0,0.007704
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
558,15.627621,4624,2.0,0,0,True,237,0,3125,559,0.070494,1.0,1.0,0.131703
559,15.630010,4624,2.0,0,0,True,237,0,3125,560,0.070494,1.0,1.0,0.131703
560,15.632450,4624,2.0,0,0,True,237,0,3125,561,0.070494,1.0,1.0,0.131703
561,15.634945,4624,2.0,0,0,True,237,0,3125,562,0.070494,1.0,1.0,0.131703


In [None]:
# Display BrewER results
print("\n=== BrewER RESULTS ===")

if len(brewer_dataframe) > 0:
    print("BrewER Performance Metrics:")
    print(brewer_dataframe)
    print(f"\nFinal Metrics:")
    print(f"- Final Precision: {brewer_dataframe['precision'].iloc[-1]:.4f}")
    print(f"- Final Recall: {brewer_dataframe['recall'].iloc[-1]:.4f}")
    print(f"- Final Correctness: {brewer_dataframe['correctness'].iloc[-1]:.4f}")
else:
    print("No performance metrics available from BrewER.")

if 'results_df' in locals() and len(results_df) > 0:
    print(f"\nBrewER entities emitted ({len(results_df)}):")
    print(results_df.head(10))
    results_df.to_csv("output/brewer_entities.csv", index=False)
else:
    print("No entities were emitted by BrewER.")


=== BrewER RESULTS ===
BrewER Performance Metrics:
     elapsed_time  comparisons  order  tp  fp  correct_order  running_tp  \
0        1.425959            2   36.4   3   0           True           3   
1        2.589425          416   24.3   0   0          False           3   
2        2.592755          416   24.3  10   0           True          13   
3        2.595556          416   24.3   0   0           True          13   
4        2.598356          416   24.3   0   0           True          13   
..            ...          ...    ...  ..  ..            ...         ...   
558     15.627621         4624    2.0   0   0           True         237   
559     15.630010         4624    2.0   0   0           True         237   
560     15.632450         4624    2.0   0   0           True         237   
561     15.634945         4624    2.0   0   0           True         237   
562     15.637781         4624    1.3   0   0          False         237   

     running_fp  running_fn  runnin

KeyError: 'accuracy'

In [None]:
# Comparison summary
print("\n" + "="*50)
print("COMPARISON SUMMARY: Batch vs BrewER")
print("="*50)

print(f"\nBatch Processing (Baseline):")
print(f"  - Comparisons: {batch_comparisons:,}")
print(f"  - Elapsed time: {batch_elapsed_time:.4f}s")
print(f"  - Entities emitted: {len(batch_filtered)}")

if len(brewer_dataframe) > 0:
    total_brewer_comparisons = brewer_dataframe["comparisons"].iloc[-1] if "comparisons" in brewer_dataframe.columns else 0
    total_brewer_time = brewer_dataframe["elapsed_time"].max() if "elapsed_time" in brewer_dataframe.columns else 0
    print(f"\nBrewER Processing (On-Demand):")
    print(f"  - Comparisons: {total_brewer_comparisons:,}")
    print(f"  - Elapsed time: {total_brewer_time:.4f}s")
    print(f"  - Entities emitted: {len(brewer_dataframe)}")
    
    if total_brewer_comparisons > 0 and batch_comparisons > 0:
        comp_reduction = ((batch_comparisons - total_brewer_comparisons) / batch_comparisons * 100)
        time_reduction = ((batch_elapsed_time - total_brewer_time) / batch_elapsed_time * 100)
        print(f"\nEfficiency Gains:")
        print(f"  - Comparison reduction: {comp_reduction:.2f}%")
        print(f"  - Time reduction: {time_reduction:.2f}%")
        print(f"  - Speedup factor: {batch_elapsed_time/total_brewer_time:.2f}x" if total_brewer_time > 0 else "")
else:
    print(f"\nBrewER Processing: No results to compare")

print("\n" + "="*50)
print("Query Configuration (same as BlendER experiments):")
print("  - SELECT: VOTE(_id), VOTE(description), VOTE(brand), MIN(price), AVG(mp)")
print("  - WHERE: brand = 'sony'")
print("  - ORDER BY: mp DESC")
print("  - LIMIT: TOP 10")
print("="*50)


COMPARISON SUMMARY: Batch vs BrewER

Batch Processing (Baseline):
  - Comparisons: 22,073
  - Elapsed time: 57.3694s
  - Entities emitted: 206

BrewER Processing (On-Demand):
  - Comparisons: 4,624
  - Elapsed time: 15.1841s
  - Entities emitted: 563

Efficiency Gains:
  - Comparison reduction: 79.05%
  - Time reduction: 73.53%
  - Speedup factor: 3.78x

Query Configuration (same as BlendER experiments):
  - SELECT: VOTE(_id), VOTE(description), VOTE(brand), MIN(price), AVG(mp)
  - WHERE: brand = 'sony'
  - ORDER BY: mp DESC
  - LIMIT: TOP 10


In [None]:
# Side-by-side comparison of results
print("\n" + "="*80)
print("SIDE-BY-SIDE RESULTS COMPARISON")
print("="*80)

print("\nBatch Processing Results (sorted by mp DESC):")
if len(batch_filtered) > 0:
    display_cols = ['_id', 'brand', 'mp', 'price'] if 'price' in batch_filtered.columns else ['_id', 'brand', 'mp']
    print(batch_filtered[display_cols].head(10).to_string(index=False))
else:
    print("No batch results to display")

print(f"\nBrewER On-Demand Results:")
if 'results_df' in locals() and len(results_df) > 0:

    available_cols = [col for col in ['_id', 'brand', 'mp', 'price', 'comparisons', 'time'] if col in results_df.columns]
    print(results_df[available_cols].head(10).to_string(index=False))
else:
    print("No BrewER results to display")

print("\n" + "="*80)


SIDE-BY-SIDE RESULTS COMPARISON

Batch Processing Results (sorted by mp DESC):
                _id brand   mp   price
      buy.net//5881  sony 36.4 1586.99
      buy.net//4330  sony 24.3     NaN
      buy.net//6531  sony 24.3 1172.99
www.ebay.com//43490  sony 24.3     NaN
www.ebay.com//42034  sony 24.3  827.99
  cammarkt.com//229  sony 24.3     NaN
www.ebay.com//42742  sony 24.3 1655.97
www.gosale.com//540  sony 24.3 1379.99
www.ebay.com//54853  sony 24.3     NaN
www.ebay.com//46677  sony 24.3     NaN

BrewER On-Demand Results:
 _id brand   mp   price  comparisons     time
   0  sony 36.4 1586.99            2 1.303157
   1  sony 24.3     NaN          416 2.392973
   2  sony 24.3  827.99          416 2.396129
   3  sony 24.3     NaN          416 2.399670
   4  sony 24.3     NaN          416 2.402486
   5  sony 24.3     NaN          416 2.405061
   6  sony 24.3     NaN          416 2.408036
   7  sony 24.3     NaN          416 2.410732
   8  sony 24.3     NaN          416 2.413642
   9

In [None]:
from itertools import combinations

def pairs_of(ids):
    return {tuple(sorted(p)) for p in combinations(ids, 2)}

pred_pairs = set()
if "matches" in results_df.columns:
    for m in results_df["matches"]:
        pred_pairs |= pairs_of(m)

print("pares preditos:", len(pred_pairs))
print("pares baseline:", len(batch_filtered_pairs))
print("TP:", len(pred_pairs & batch_filtered_pairs))
print("FP:", len(pred_pairs - batch_filtered_pairs))  # idealmente 0
print("FN:", len(batch_filtered_pairs - pred_pairs))  # se >0, é blocking/HAVING


pares preditos: 237
pares baseline: 3362
TP: 237
FP: 0
FN: 3125
