In [82]:
import json
import math
import networkx as nx
import numpy as np
import os
import pandas as pd
import pickle as pk
import random
import time

In [83]:
def blocking(task, ds, gold):
    
    # Load the candidate pairs
    candidates = pk.load(open(task.candidates, 'rb'))
    candidates = set(candidates)
    
    print("Number of candidate pairs generated by blocking: " + str(len(candidates)) + '\n', file=open(task.query_output, "a"))
    
    # Measure precision and recall of the blocking method (true positives: intersection between candidates and gold)
    tp = gold.intersection(candidates)
    print("Quality of the blocking:", file=open(task.query_output, "a"))
    print("TP: " + str(len(tp)) + ", FP: " + str(len(gold) - len(tp)) + " => R: " + str(
        len(tp) / len(gold)) + ", P: " + str(len(tp) / len(candidates)) + '\n', file=open(task.query_output, "a"))

    return candidates

In [84]:
def resolution(ds, cluster, aggregations):
    
    # Find the matching elements (contained in the cluster cc) in the dataset
    matches = ds.loc[ds['id'].isin(cluster)]

    # Create the entity (as a dictionary) according to the specified aggregation functions
    entity = dict()
    for item in aggregations.items():
        if item[1] == 'min':
            entity[item[0]] = matches[item[0]].min()
        elif item[1] == 'max':
            entity[item[0]] = matches[item[0]].max()
        elif item[1] == 'avg':
            entity[item[0]] = round(matches[item[0]].mean(), 2)
        elif item[1] == 'sum':
            entity[item[0]] = round(matches[item[0]].sum(), 2)
        elif item[1] == 'vote':
            try:
                entity[item[0]] = matches[item[0]].mode(dropna=False).iloc[0]
            except ValueError:
                entity[item[0]] = matches[item[0]].min()
        elif item[1] == 'random':
            entity[item[0]] = np.random.choice(matches[item[0]])
        elif item[1] == 'concat':
            entity[item[0]] = ' ; '.join(matches[item[0]])

    return entity

In [85]:
def batch_er(task, ds, candidates, gold):
    
    print("BATCH ENTITY RESOLUTION ALGORITHM\n", file=open(task.query_output, "a"))

    # Create an empty graph
    graph = nx.Graph()

    # Apply matching function on candidate pairs (verify their presence in gold) and add matches to the graph as edges
    for match in gold.intersection(candidates):
        graph.add_edge(match[0], match[1])

    # Detect clusters (connected components) as sets of nodes and call resolution function on them
    entities = list()
    for cluster in nx.connected_components(graph):
        entities.append(resolution(ds, cluster, task.aggregations))

    # Create a new dataset without duplicates
    duplicates = ds.loc[ds['id'].isin(list(graph.nodes))]
    ds = pd.concat([ds, duplicates], ignore_index=True).drop_duplicates(subset=['id'], keep=False)

    # Return the clean dataset obtained by replacing the removed duplicates with the solved entities
    return pd.concat([ds, pd.DataFrame(entities)], ignore_index=True).drop_duplicates(subset=['id'], keep=False)

In [86]:
def find_matching_neighbours(current_id, neighbourhood, neighbours, matches, done, compared, count):
    
    # Look for the matches among the neighbours
    for n in neighbourhood:
        
        # Do not compare with itself and with elements already inserted in a solved entity or already compared
        if n not in matches and n not in done and not compared[n]:
            
            # Increment the counter of comparisons and register the candidate as compared
            count = count + 1
            compared[n] = True
            
            # Apply the matching function
            if (current_id, n) in gold or (n, current_id) in gold:
                matches.add(n)
                matches, compared, count = find_matching_neighbours(n, neighbours[n][0].union(neighbours[n][1]),\
                                                                          neighbours, matches, done, compared, count)
    
    return matches, compared, count

In [87]:
def brewer_preliminary_filtering(task, ds, candidates, ol, optimize):
    
    # Keep a set for the seed records and one for all the records whose block passes the filtering
    seeds = list()
    filtered = list()
    
    # From the list of candidate pairs, create the transitively closed blocks
    g = nx.Graph()
    g.add_nodes_from(list(ds['id']))
    g.add_edges_from(candidates)
    blocks = list()
    for cc in nx.connected_components(g):
        blocks.append(list(cc))
    
    # Perform the preliminary filtering of the transitively closed blocks
    for block in blocks:
        block_records = ds.loc[ds['id'].isin(block)]
        
        if len(block) == 1:
            solved = True
        else:
            solved = False
        
        # Perform preliminary filtering on the records of the block
        seed_records = task.brewer_pre_filtering(block_records, solved)
        no_seed_records = pd.concat([block_records, seed_records], ignore_index=True).drop_duplicates(subset=['id'], keep=False)
        
        # Check if the block overcomes the filtering (i.e., if the list of seed records is not empty)
        if len(seed_records.index) > 0:
            
            # If 'ignore_null' option is set, check that at least one record in the block has a not null ordering key value
            if task.ignore_null and len(block_records[block_records[task.ordering_key].notnull()]) == 0:
                pass
            else:
                seeds = seeds + list(seed_records['id'])
                filtered = filtered + list(seed_records['id']) + list(no_seed_records['id'])
                
                # If the scenario fits the optimization, insert in OL each record that survives the filtering (seed record)...
                # ...otherwise, insert in OL the whole block
                if optimize:
                    block_records = seed_records
                else:
                    
                    # If 'ignore_null' option is set, insert the records with null ordering key only as neighbours
                    # (valid only in general case, not if we apply the optimization)
                    if task.ignore_null:
                        block_records = block_records[block_records[task.ordering_key].notnull()]
                
                for index, row in block_records.iterrows():
                    
                    # Element to be inserted in OL
                    element = dict()
                    element['id'] = row['id']
                    
                    # Its attribute 'matches' is a set of identifiers, containing at the moment only the one of the record
                    element['matches'] = {row['id']}
                    
                    # Its attribute 'ordering_key' must be a numeric value (forced cast to float)
                    element['ordering_key'] = float(row[task.ordering_key])
                    
                    element['solved'] = solved
                    
                    ol.append(element)
                    
    return ol, seeds, filtered

In [88]:
def brewer(task, ds, gold, candidates):
    
    # Understand if the current scenario can be optimized or not
    optimize = False
    if (task.aggregations[task.ordering_key] == 'max' and task.ordering_mode == 'asc') or \
        (task.aggregations[task.ordering_key] == 'min' and task.ordering_mode == 'desc'):
        optimize = True
    
    # Create the ordering list OL
    ol = list()
    
    # Create a dictionary to keep track of all the neighbourhoods
    neighbours = dict()
    
    # Keep a set for the already solved records (entities)
    done = set()
    
    # Initialize OL
    ol, seeds, filtered = brewer_preliminary_filtering(task, ds, candidates, ol, optimize)
    
    seeds = set(seeds)
    filtered = set(filtered)
    
    # Define the neighbours from the list of candidate pairs
    for pair in candidates:
        if pair[0] in filtered and pair[1] in filtered:
        
            # If the records are not in neighbours, they must be inserted (a set for seeds, a set for non-seeds)
            if pair[0] not in neighbours:
                    neighbours[pair[0]] = [set(), set()]
            if pair[1] not in neighbours:
                    neighbours[pair[1]] = [set(), set()]

            if pair[0] in seeds:
                neighbours[pair[0]][0].add(pair[0])
                neighbours[pair[1]][0].add(pair[0])
            else:
                neighbours[pair[0]][1].add(pair[0])
                neighbours[pair[1]][1].add(pair[0])

            if pair[1] in seeds:
                neighbours[pair[0]][0].add(pair[1])
                neighbours[pair[1]][0].add(pair[1])
            else:
                neighbours[pair[0]][1].add(pair[1])
                neighbours[pair[1]][1].add(pair[1])

    with open(task.query_details, 'a') as query_details:
        query_details.write(',' + str(len(ol)))
        
    # Perform progressive entity resolution and count the number of comparisons before each emission
    
    # Number of comparisons
    count = 0
    
    # List of emitted entities
    results = list()
    
    # Number of emitted entities (for Top-K query case)
    top_k_now = 0
    
    # Initialize a dictionary to keep track of the performed comparisons (for a transitively closed matcher)
    compared = {n: False for n in neighbours}
    
    # At each iteration, order OL and check its first element (priority)
    while len(ol) > 0:
        
        # OL is always kept sorted on the ordering key (according to the specified ordering mode)
        if task.ordering_mode == 'asc':
            ol = sorted(ol, key=lambda x: x['ordering_key'] if not math.isnan(x['ordering_key']) else float('inf'))
        else:
            ol = sorted(ol, key=lambda x: x['ordering_key'] if not math.isnan(x['ordering_key']) else float('-inf'),
                        reverse=True)
        
        # If the first element of OL is already solved: perform ER, check HAVING clauses on it...
        # ...and eventually emit the entity
        if ol[0]['solved']:
            
            # Perform ER on the records represented by the element (identifiers)
            entity = resolution(ds, ol[0]['matches'], task.aggregations)
            
            # Check HAVING clauses on the entity
            if task.brewer_post_filtering(entity):
                
                # Emit the entity keeping in memory the number of comparisons performed before its emission
                entity['comparisons'] = count
                results.append(entity)
                
                # Increment the emitted entities counter and check if it fits the (eventual) K value (Top-K query)...
                # ...if it is equal to K, return the result in advance
                top_k_now = top_k_now + 1
                if top_k_now == task.top_k:
                    return pd.DataFrame(results)
        
            # Remove the considered element from OL
            ol.pop(0)
        
        # If the first element of OL is not solved yet, find the matching neighbours...
        # ...and insert in OL a new element representing them
        else:
            
            # Set all the elements in compared to False
            compared = dict.fromkeys(compared, False)
            
            # Look for the matches among the seeds
            ol[0]['matches'], compared, count = find_matching_neighbours(ol[0]['id'], neighbours[ol[0]['id']][0], neighbours,\
                                                               ol[0]['matches'], done, compared, count)
        
            # Check the presence of at least a seed record among the matches...
            if len(ol[0]['matches'].intersection(neighbours[ol[0]['id']][0])) > 0:
        
                # In this case, look for the matches also among the non-seeds...
                ol[0]['matches'], compared, count = find_matching_neighbours(ol[0]['id'], neighbours[ol[0]['id']][1],\
                                                                             neighbours, ol[0]['matches'], done, compared,\
                                                                             count)
                
                # ...and create the representative record:
                # The ordering key of the new element is the aggregation of the ones of the matches
                key_aggregation = {task.ordering_key: task.aggregations[task.ordering_key]}
                entity = resolution(ds, ol[0]['matches'], key_aggregation)
                
                # Define the new element of OL representing the group of matching elements
                solved = dict()
                solved['id'] = ol[0]['id']
                solved['matches'] = ol[0]['matches']
                solved['ordering_key'] = float(entity[task.ordering_key])
                del neighbours[ol[0]['id']]
                solved['solved'] = True
                solved['seed'] = True
                
                # Insert the matching elements in the list of solved records
                done = done.union(ol[0]['matches'])
                
                # Delete the matching elements from OL
                ol = [item for item in ol if item['id'] not in ol[0]['matches']]
                
                # Insert in OL the new element representing them
                ol.append(solved)
            
            # ...if no seed record is present, delete the current element from OL and insert it in the list of solved records
            else:
                done = done.union(ol[0]['matches'])
                ol = [item for item in ol if item['id'] not in ol[0]['matches']]
        
    print("Total number of performed comparisons: " + str(count) + '\n', file=open(task.query_output, "a"))

    with open(task.query_details, 'a') as query_details:
        query_details.write(',' + str(count) + ',' + str(len(results)))
    
    return pd.DataFrame(results)

In [89]:
class Task(object):
    
    # Identify each instance (query) using an index
    def __init__(self, index):
        self.counter = index
        batch = pd.read_csv("results/Experiment 1 (Blocking)/Funding/OR/batch.csv")

        # Choose the dataset to be used (among: alaska_camera, altosight, altosight_sigmod, funding)
        self.dataset = 'funding'

        # Indicate if the entities with null value of the ordering key must be returned or ignored
        self.remove_null = True
        if self.remove_null:
            self.ds_name = self.dataset + '_no_nan'
        else:
            self.ds_name = self.dataset

        # Define the path of the files to be used to save the results of the query
        self.folder_name = "results/prova"
        self.query_output = self.folder_name + "/" + str(self.counter) + "_query.txt"
        self.brewer_output = self.folder_name + "/" + str(self.counter) + ".csv"
        self.query_details = self.folder_name + "/" + "queries.csv"

        # Define if batch version is required
        self.batch = True

        # SELECT

        # Top-K query: define the number of entities (K) to be returned (if all the entities must be returned, set <= 0)
        self.top_k = 0

        # Indicate if the entities with null value of the ordering key must be returned or ignored
        self.ignore_null = True

        # Define the aggregation function to be used for each attribute (min, max, avg, sum, vote, random, concat)
        # For the ordering key only some aggregation functions (min, max, avg, vote) are supported
        # Define also the attributes to be shown for the resulting entities
        batch_ok_aggregation = batch['ok_aggregation'].to_list()
        random_aggregation = batch_ok_aggregation[index - 1]
        # random_aggregation = random.choice(['max', 'min'])
        if self.dataset == 'alaska_camera':
            self.aggregations = {'id': 'min', 'brand': 'vote', 'model': 'vote', 'megapixels': random_aggregation}
            self.attributes = ['brand', 'model', 'megapixels']
        elif self.dataset == 'altosight':
            self.aggregations = {'id': 'min', 'name': 'vote', 'brand': 'vote', 'size': 'vote', 'size_num': 'max',
                             'price': random_aggregation}
            self.attributes = ['name', 'brand', 'size', 'size_num', 'price']
        elif self.dataset == 'altosight_sigmod':
            self.aggregations = {'id': 'min', 'name': 'vote', 'brand': 'vote', 'size': 'vote', 'size_num': 'max',
                             'price': random_aggregation}
            self.attributes = ['name', 'brand', 'size', 'size_num', 'price']
        elif self.dataset == 'funding':
            self.aggregations = {'id': 'min', 'legal_name': 'vote', 'address': 'vote', 'source': 'vote',
                             'council_member': 'vote', 'amount': random_aggregation}
            self.attributes = ['legal_name', 'address', 'source', 'council_member', 'amount']

        # FROM

        # Define the path of the dataset: it must be a CSV file
        self.ds_path = "data/" + self.ds_name + "_dataset.csv"

        # Define the path of the ground truth (a CSV file containing the matching pairs - couples ordered by id)
        self.gold_path = "data/" + self.ds_name + "_gold.csv"

        # With blocking or without blocking?
        self.blocking = True
        self.candidates = "data/" + self.ds_name + "_candidates.pkl"
        # self.blocks_path = "data/" + self.ds_name + "_jedai_blocks.txt"
        # self.block_costs = "data/" + self.ds_name + "_block_costs.txt"
        # self.record_blocks = "data/" + self.ds_name + "_record_blocks.txt"

        # WHERE

        # HAVING

        # Define HAVING conditions as attribute-value pairs (for LIKE situation)
        if self.dataset == 'alaska_camera':
            brands = ['canon', 'dahua', 'fuji', 'hikvision', 'kodak', 'nikon', 'olympus', 'panasonic', 'samsung', 'sony']
            minor_brands = ['argus', 'benq', 'casio', 'coleman', 'ge', 'gopro', 'hasselblad', 'howell', 'hp', 'intova',
                            'leica', 'lg', 'minolta', 'pentax', 'polaroid', 'ricoh', 'sanyo', 'sigma', 'toshiba', 'vivitar']
            all_brands = brands + minor_brands
            models = {'canon': ['a', 'd', 'elph', 'g', 'ixus', 'mark', 's', 'sd', 'sx', 't', 'xs', 'xt'],
                      'dahua': ['dh', 'ipc', 'hd', 'hf', 'sd'], 'fuji': ['ax', 'f', 'hs', 'jx', 's'],
                      'hikvision': ['cd', 'de', 'ds', 'f', 'is'], 'kodak': ['dc', 'dx', 'm', 'v', 'z'],
                      'nikon': ['100', 'aw', 'd', 'j', 'l', 'p', 's', 'v'],
                      'olympus': ['c', 'd', 'e', 'fe', 'sp', 'sz', 'tg', 'vg', 'vr', 'xz'],
                      'panasonic': ['dmc', 'fz', 'gf', 'gh', 'gx', 'lx', 'lz', 's', 'tz', 'x', 'z', 'zs'],
                      'samsung': ['gc', 'nx', 'pl', 'st', 'wb'],
                      'sony': ['tvl', 'a', 'dsc', 'fd', 'pj', 'hx', 'nex', 'slt']}
            random_brand = random.choice(brands)
            random_model = random.choice(models[random_brand])
            batch_cond1 = batch['cond1'].to_list()
            batch_cond2 = batch['cond2'].to_list()
            random_brand = str(batch_cond1[index - 1])
            random_model = str(batch_cond2[index - 1])
            self.having = [('brand', random_brand), ('brand', random_model)]
        elif self.dataset == 'altosight':
            brands = ['intenso', 'kingston', 'lexar', 'pny', 'samsung', 'sandisk', 'sony', 'toshiba', 'transcend']
            sizes = ['4', '8', '16', '32', '64', '128', '256', '512']
            random_brand = random.choice(brands)
            random_size = random.choice(sizes)
            batch_cond1 = batch['cond1'].to_list()
            batch_cond2 = batch['cond2'].to_list()
            random_brand = str(batch_cond1[index - 1])
            random_size = str(batch_cond2[index - 1])
            self.having = [('brand', random_brand), ('brand', random_size)]
        elif self.dataset == 'altosight_sigmod':
            brands = ['intenso', 'kingston', 'lexar', 'pny', 'samsung', 'sandisk', 'sony', 'toshiba', 'transcend']
            sizes = ['4', '8', '16', '32', '64', '128', '256', '512']
            random_brand = random.choice(brands)
            random_size = random.choice(sizes)
            batch_cond1 = batch['cond1'].to_list()
            batch_cond2 = batch['cond2'].to_list()
            random_brand = str(batch_cond1[index - 1])
            random_size = str(batch_cond2[index - 1])
            self.having = [('brand', random_brand), ('brand', random_size)]
        elif self.dataset == 'funding':
            sources = ['aging', 'aids', 'boro', 'casa', 'food', 'health', 'local', 'youth']
            legal_name = ['asian', 'association', 'christian', 'community', 'council', 'foundation', 'jewish', 'service']
            random_source = random.choice(sources)
            random_name = random.choice(legal_name)
            batch_cond1 = batch['cond1'].to_list()
            batch_cond2 = batch['cond2'].to_list()
            random_source = str(batch_cond1[index - 1])
            random_name = str(batch_cond2[index - 1])
            self.having = [('source', random_source), ('source', random_name)]

        # Define the logical operator to be used for HAVING conditions (and/or)
        self.operator = 'or'

        # ORDER BY

        # Define the numeric attribute to be used as ordering key (OK) and the ordering mode (asc or desc)
        if self.dataset == 'alaska_camera':
            self.ordering_key = 'megapixels'
        elif self.dataset == 'altosight':
            self.ordering_key = 'price'
        elif self.dataset == 'altosight_sigmod':
            self.ordering_key = 'price'
        elif self.dataset == 'funding':
            self.ordering_key = 'amount'

        if random_aggregation == 'max':
            self.ordering_mode = 'desc'
        else:
            self.ordering_mode = 'asc'

        # Get the query in SQL
        select_clause = "SELECT "
        if self.top_k > 0:
            select_clause = select_clause + "TOP(" + str(self.top_k) + ") "
        for i in range(0, len(self.attributes)):
            select_clause = select_clause + self.aggregations[self.attributes[i]] + "(" + self.attributes[i] + ")"
            if i < len(self.attributes) - 1:
                select_clause = select_clause + ", "
            else:
                select_clause = select_clause + "\n"
        from_clause = "FROM " + self.ds_name + "\n"
        group_by_clause = "GROUP BY _\n"
        having_clause = "HAVING "
        for i in range(0, len(self.having)):
            having_clause = having_clause + self.aggregations[self.having[i][0]] + \
                            "(" + str(self.having[i][0]) + ") LIKE '%" + str(self.having[i][1]) + "%'"
            if i < len(self.having) - 1:
                having_clause = having_clause + " " + self.operator + " "
            else:
                having_clause = having_clause + "\n"
        order_by_clause = "ORDER BY " + self.aggregations[self.ordering_key] + "(" + self.ordering_key + \
                          ") " + self.ordering_mode + "\n"
        self.query = (select_clause + from_clause + group_by_clause + having_clause + order_by_clause).upper()

    # Define the query for batch ER algorithm
    # It is a post filtering (application of HAVING clauses on solved entity), so AND/OR are maintained
    def batch_query(self, entities):
        if self.ordering_mode == 'asc':
            ascending = True
        else:
            ascending = False
        if self.operator == 'and':
            return entities.loc[
                (entities[self.having[0][0]].str.contains(self.having[0][1], na=False)) &
                (entities[self.having[1][0]].str.contains(self.having[1][1], na=False)),
                self.attributes].sort_values(by=[self.ordering_key], ascending=ascending)
        else:
            return entities.loc[
                (entities[self.having[0][0]].str.contains(self.having[0][1], na=False)) |
                (entities[self.having[1][0]].str.contains(self.having[1][1], na=False)),
                self.attributes].sort_values(by=[self.ordering_key], ascending=ascending)

    # Define the preliminary filtering for BrewER
    def brewer_pre_filtering(self, records, solved):
        # To reduce the number of comparisons, if HAVING conditions are in AND...
        # ...we check that all conditions are separately satisfied by at least one record appearing the block
        if self.operator == 'and':
            # If we consider already solved records (no neighbours), simply filter them in AND
            if solved:
                return records.loc[(records[self.having[0][0]].str.contains(self.having[0][1], na=False)) &
                                   (records[self.having[1][0]].str.contains(self.having[1][1], na=False))]
            # Otherwise, check that all conditions are separately satisfied (if not, return an empty DataFrame)
            else:
                condition = records.loc[(records[self.having[0][0]].str.contains(self.having[0][1], na=False))]
                if len(condition) == 0:
                    return condition
                condition = records.loc[(records[self.having[1][0]].str.contains(self.having[1][1], na=False))]
                if len(condition) == 0:
                    return condition
            # If the conditions are all satisfied, proceed as in OR case

        # Otherwise, in OR case, check that at least one of the conditions is satisfied by the records of the block
        return records.loc[(records[self.having[0][0]].str.contains(self.having[0][1], na=False)) |
                           (records[self.having[1][0]].str.contains(self.having[1][1], na=False))]

    # Define the post filtering for BrewER (AND/OR are maintained)
    def brewer_post_filtering(self, entity):
        if self.ignore_null:
            if pd.isna(entity[self.ordering_key]):
                return False
        if self.operator == 'and':
            return self.having[0][1] in str(entity[self.having[0][0]]) and \
                   self.having[1][1] in str(entity[self.having[1][0]])
        else:
            return self.having[0][1] in str(entity[self.having[0][0]]) or \
                   self.having[1][1] in str(entity[self.having[1][0]])

In [90]:
# Acquire the requirements of the task to be performed
for query_index in range(1, 2):
    task = Task(query_index)
    
    # Save the query details in the apposite file
    if not os.path.isfile(task.query_details):
        with open(task.query_details, 'a') as query_details:
            query_details.write("index,ds_name,top_k,ok,ok_aggregation,ordering_mode,cond1,cond2,operator,"
                                "ol,tot,emitted")
    with open(task.query_details, 'a') as query_details:
        query_details.write('\n' + str(task.counter) + ',' + task.ds_name + ',' + str(task.top_k) + ',' +
                            task.ordering_key + ',' + task.aggregations[task.ordering_key] + ',' +
                            task.ordering_mode + ',' + str(task.having[0][1]) + ',' + str(task.having[1][1]) + ',' +
                            task.operator)

    # Print the query
    print(task.query, file=open(task.query_output, "a"))

    # Load the dataset in DataFrame format, creating also an alternative version as a list of dictionaries
    ds = pd.read_csv(task.ds_path)
    ds[task.ordering_key] = pd.to_numeric(ds[task.ordering_key], errors='coerce')
    for column in ds.columns:
        if ds[column].dtype == 'object':
            ds[column] = ds[column].fillna('NaN')
    ds_dict = ds.to_dict('records')
    print("Number of records in the dataset: " + str(len(ds_dict)) + '\n', file=open(task.query_output, "a"))

    # Load the ground truth in DataFrame format and transform it into a set of tuples (matching pairs)
    gold = pd.read_csv(task.gold_path)
    gold = set(list(gold.itertuples(index=False, name=None)))
    print("Number of matching pairs in ground truth: " + str(len(gold)) + '\n', file=open(task.query_output, "a"))

    # Perform blocking on the dataset
    candidates = blocking(task, ds, gold)
    
    # If required, perform batch ER on the candidate set to get the cleaned dataset (DataFrame composed by entities)
    # Then, perform the query on the clean dataset
    batch_results = pd.NA
    if task.batch:
        batch_entities = batch_er(task, ds, candidates, gold)
        # If 'ignore null' option is set, ignore the entities with null ordering key
        if task.ignore_null:
            batch_entities = batch_entities[batch_entities[task.ordering_key].notnull()]
        batch_results = task.batch_query(batch_entities)
        if len(batch_results.index) > 0:
            with pd.option_context('display.max_rows', None, 'display.max_columns', None):
                print(batch_results, file=open(task.query_output, "a", encoding="utf-8"))
        else:
            print("No entities satisfied the query\n", file=open(task.query_output, "a"))
    
    # Perform progressive ER through BrewER on the dataset
    brewer_attributes = task.attributes + ['comparisons']
    brewer_results = brewer(task, ds, gold, candidates)

    if len(brewer_results.index) > 0:
        with pd.option_context('display.max_rows', None, 'display.max_columns', None):
            print(brewer_results.loc[:, brewer_attributes], file=open(task.query_output, "a", encoding="utf-8"))
            brewer_results.loc[:, brewer_attributes].to_csv(task.brewer_output, index=False)
    else:
        print("No entities satisfied the query\n", file=open(task.query_output, "a"))
    
    if task.batch:
        # Verify that batch algorithm and BrewER produce the same entities
        if len(batch_results.index) > 0 and len(brewer_results.index) > 0:
            batch_results = list(batch_results.fillna(0).to_records(index=False))
            brewer_results = list(brewer_results[task.attributes].fillna(0).to_records(index=False))
            print("\nDifferences in the produced entities between batch algorithm and BrewER: ",
                  file=open(task.query_output, "a"))
            # For Top-K query case, consider only the entities produced by BrewER (batch always exhaustive)
            if task.top_k > 0:
                print([item for item in brewer_results if item not in batch_results],
                      file=open(task.query_output, "a", encoding="utf-8"))
            else:
                print(
                    [item for item in batch_results if item not in brewer_results] + [item for item in brewer_results
                                                                                     if item not in batch_results],
                    file=open(task.query_output, "a", encoding="utf-8"))
        else:
            print("One of the two algorithms did not return any entity\n", file=open(task.query_output, "a"))