In [1]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
import pyspark.sql.types as T
from pyspark.storagelevel import StorageLevel
from pyspark.sql import SparkSession

import jellyfish
from elasticsearch import Elasticsearch
import json
from datetime import datetime
import time



In [2]:
import pandas as pd
pd.set_option("display.max_rows", 999)
pd.set_option("display.max_columns", 999)

In [3]:
start = time.time()

In [4]:
spark = SparkSession.builder \
    .appName("CIDACSRL") \
    .master("spark://barravento:7077") \
    .config("spark.jars.packages", "org.elasticsearch:elasticsearch-spark-30_2.12:8.1.3") \
    .config("spark.es.nodes", "barravento") \
    .config("spark.es.port", "9200") \
    .config("spark.es.nodes.wan.only", "false") \
    .config("spark.es.resource", "dbb2") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.shuffle.partitions", 16) \
    .config("spark.sql.files.maxPartitionBytes", "256m") \
    .getOrCreate()

sc = spark.sparkContext
# just to ensure that 
sc.setCheckpointDir("hdfs://barravento:9000/spark-checkpoints")

# ES functions

#### auxiliary functions

In [5]:
def get_match_cols_and_values(vars_col, query_type, add_id_col):
    """
    query_type must be 'exact' for building exact queries or 'general' for any else query and comparison.
    """
    config_ = config_bc.value
    # getting names of indexed columns
    indexed_id_column = config_['datasets_info']['indexed_dataset']['id_column_name']
    
    indexed_cols = config_['datasets_info']['indexed_dataset']['columns']
    
#     if query_type == 'general':
#         indexed_cols = [x for x in indexed_cols if x != indexed_id_column]
        
    # notice that we are linking indexed keys with tolink values
    # the keys will be used to set which field will be fetched on es
    # the values will be used as search content
    tolink_cols_dict = dict(zip(indexed_cols, vars_col))
    
    if add_id_col == False:
        tolink_cols_dict.pop(indexed_id_column, None)
    
    if query_type == 'general':
        return tolink_cols_dict
    elif query_type == 'exact':
        # finding which are the columns used on exact match step
        indexed_exact_match_vars = [indexed_id_column] + [config_['comparisons'][x]['indexed_col'] for x in config_['comparisons'] if config_['comparisons'][x]['must_match'] == 'true']
        non_exact_match_cols = list(set(indexed_cols) - set(indexed_exact_match_vars))
        # deleting those columns of non-exact match
        [tolink_cols_dict.pop(x, None) for x in non_exact_match_cols]
        
        return tolink_cols_dict
    else: 
        print("Please use 'general' or 'exact' as query_type input")
        return None, None

def _resolve_storage_level(storage_level, prefix_sl=None):
    """
    Safely resolve a StorageLevel from either a string or a StorageLevel object.

    Parameters
    ----------
    storage_level : Union[str, StorageLevel]
        Examples of valid strings: "MEMORY_AND_DISK", "DISK_ONLY", "MEMORY_ONLY_SER".
        You can also pass an actual StorageLevel object (e.g., StorageLevel.MEMORY_AND_DISK).
    prefix_sl : Optional[str]
        Backward-compat helper if your legacy code builds strings like "StorageLevel.MEMORY_AND_DISK".
        If provided, we will fallback to eval(prefix_sl + storage_level) in case getattr() fails.

    Returns
    -------
    StorageLevel
    """
    if isinstance(storage_level, StorageLevel):
        return storage_level
    if isinstance(storage_level, str):
        # First tries getattr on the StorageLevel class
        try:
            return getattr(StorageLevel, storage_level)
        except AttributeError:
            # Fallback for legacy patterns like prefix_sl="StorageLevel." and storage_level="MEMORY_AND_DISK"
            if prefix_sl:
                return eval(prefix_sl + storage_level)  # last resort for legacy code
            raise ValueError('Unknown StorageLevel string: {}'.format(storage_level))
    # Default fallback
    return StorageLevel.MEMORY_AND_DISK


def optimize_with_checkpoint(df,
                             parallelism,
                             storage_level='MEMORY_AND_DISK',
                             checkpoint_mode='durable',
                             eager=True,
                             materialize=True,
                             materialize_action='count',
                             prefix_sl=None,
                             unpersist_prev=True):
    """
    Repartition -> Persist -> (Optional) Materialize -> (Local)Checkpoint -> Unpersist previous DF.

    WHY:
      - Long Spark pipelines accumulate lineage; actions later can trigger a huge DAG,
        many tasks, and repeated work. Writing/reading Parquet to "reset" the DAG works,
        but it's expensive (I/O, compression, metastore).
      - Using checkpoint() or localCheckpoint() *truncates* the lineage without heavy external I/O
        (localCheckpoint is the fastest, but less fault-tolerant).

    PARAMETERS
    ----------
    df : pyspark.sql.DataFrame
        The DataFrame you want to stabilize.
    parallelism : int
        Number of partitions after the repartition() step.
        Example: number of executors * cores per executor, adjusted by data size/skew.
    storage_level : Union[str, StorageLevel], default "MEMORY_AND_DISK"
        How you want to persist the DataFrame before checkpointing.
        Examples: "MEMORY_AND_DISK", "DISK_ONLY", "MEMORY_ONLY_SER".
    checkpoint_mode : {"durable", "local"}, default "durable"
        "durable" -> df.checkpoint(eager=...)
                    uses the configured checkpoint dir (e.g., HDFS) and is fault-tolerant.
        "local"   -> df.localCheckpoint(eager=...)
                    faster, avoids writing to the checkpoint dir, BUT less fault-tolerant.
    eager : bool, default True
        If True, checkpoint is executed eagerly (immediately). Recommended for predictability.
    materialize : bool, default True
        If True, forces an action after persist() (e.g., count()) to materialize cached data
        *before* checkpointing. This helps avoid materialization during a later, more expensive stage.
    materialize_action : {"count", "take"}, default "count"
        Which light-weight action to use for materialization. "count" is predictable; "take" can be faster for huge datasets.
    prefix_sl : Optional[str]
        Legacy helper for resolving StorageLevel strings (see _resolve_storage_level docstring).
    unpersist_prev : bool, default True
        If True, attempts to unpersist the previous (pre-checkpoint) DataFrame to free executor memory.

    RETURNS
    -------
    pyspark.sql.DataFrame
        A new DataFrame with truncated lineage (post-checkpoint).

    EXAMPLE
    -------
        # 1) Ensure a durable checkpoint directory (once at session init):
        # spark.sparkContext.setCheckpointDir("hdfs://barravento:9000/spark-checkpoints")

        # 2) Use in your pipeline:
        df = optimize_with_checkpoint(
                df,
                parallelism=paralelism,
                storage_level="MEMORY_AND_DISK",
                checkpoint_mode="durable",   # or "local" for speed (less fault-tolerant)
                eager=True,
                materialize=True,
                materialize_action="count"
             )
    """
    sl = _resolve_storage_level(storage_level, prefix_sl)

    # Keep reference to the original DF so we can unpersist it later.
    prev_df = df

    # (1) Repartition ONCE to balance load, then persist with the given StorageLevel.
    df = df.repartition(parallelism).persist(sl)

    # (2) Materialize the cache now with a cheap action.
    if materialize:
        try:
            if materialize_action == 'take':
                # Taking a small sample forces computation without scanning all rows.
                _ = df.take(1)
            else:
                # Default and predictable: forces a single pass to populate cache.
                _ = df.count()
        except Exception:
            # If the action fails, don't block the optimization flow; re-raise if you prefer strict behavior.
            pass

    # (3) Truncate lineage via (local)checkpoint.
    if checkpoint_mode == 'local':
        df = df.localCheckpoint(eager=eager)
    else:
        df = df.checkpoint(eager=eager)

    # (4) Free memory from the previous cached DF (if any).
    if unpersist_prev and prev_df is not None:
        try:
            prev_df.unpersist(blocking=False)
        except Exception:
            pass

    return df

#### indexing

In [6]:
def index_dataframe(dataframe, es_index_name):
    # creating new index
    dataframe.write.format("org.elasticsearch.spark.sql") \
                 .option("es.resource", es_index_name).mode('overwrite').save()

#### query building

In [7]:
def build_exact_queries(vars_col): 
    """
    Let us suppose the following values:
    vars_col = ['ROBESPIERRE PITA', '1987-05-05', '1', 'Mari Santos']
    indexed_cols = ['name', 'birthdate', 'sex', 'mothers_name']
    query_size = 10
    
    and only the first two attributes are assigned to exact match.
    So, the resulting query column would be: 
    '{ "size": "50", "query": 
                    { "bool": { "must": [ 
                                {"match": {"name":"ROBESPIERRE PITA"}},
                                {"match": {"birthdate":"19870505"}}] } } }'
    Requirements: 
    - All values on vars_col must be converted into string
    - All the hyphens symbols must be taken from date type used to search (e.g. 1987-05-05 must be converted to 19870505)
    - The config json must be available as a broadcast through sc.broadcast() function.
    - The names of indexed columns must be correctly filled. 
    """
    config_ = config_bc.value
    query_size = config_['query_size']
    
    tolink_cols_dict = get_match_cols_and_values(vars_col, 'exact', False)
    
    # -------------------------------------------- #
    #   starting the building of query string      #
    # -------------------------------------------- #
    # setting the preffix and suffix of query core
    prefix_ = """{"match": {"""
    suffix_ = """}}"""
    
    # filling the query core with all indexed columns and values from vars_col
    strings = []
    for col in list(tolink_cols_dict.keys()):
        string = str(prefix_) + "\"" + str(col) + "\"" + ":" + "\"" +  str(tolink_cols_dict[col]) + "\"" + str(suffix_)
        strings.append(string)
    
    # building the query core. 
    # Should be like: {"match": {"name":"ROBESPIERRE PITA"}}, {"birthdate": {"name":"1987-05-05"}}
    line = ','.join(strings)
    
    # Finally the final query string
    complete_query = """{ "bool": { "must": [ %s ] } }""" % (line)
    # CHANGELOG: to accomplish with new syntax from ES 8.x, we need to change the complete query from
    # complete_query = """{ "size": "%s", "query": { "bool": { "must": [ %s ] } } }""" % (query_size,line)
    # to
    # complete_query = """{ "bool": { "must": [ %s ] } } }""" % (line)
    # read more in: https://github.com/elastic/elasticsearch-py/issues/1698
    #               https://www.elastic.co/guide/en/elasticsearch/client/python-api/8.1/examples.html
    
    return complete_query
udf_build_exact_queries = F.udf(build_exact_queries, StringType()) 

def build_non_exact_queries(vars_col): 
    """
    Let us suppose the following values:
    vars_col = ['ROBESPIERRE PITA', '1987-05-05', '1', 'Mari Santos']
    indexed_cols = ['name', 'birthdate', 'sex', 'mothers_name']
    query_size = 10
    
    and only the first two attributes are assigned to exact match.
    So, the resulting query column would be: 
    '{"bool": { 
                 "should": [ 
                     {'match': {'nome_a': {'query': 'ROBESPIERRE PITA', 'fuzziness':'AUTO', 'operator':'or', 'boost':'3.0'}}},
                     {"match": {"birthdate":"19870505"}} ] } } }
                     {"term": {"sexo_a":"1"}} ] } } }'
    Requirements: 
    - All values on vars_col must be converted into string
    - All the hyphens symbols must be taken from date type used to search (e.g. 1987-05-05 must be converted to 19870505)
    - The config json must be available as a broadcast through sc.broadcast() function.
    - The names of indexed columns must be correctly filled. 
    """
    config_ = config_bc.value
    query_size = config_['query_size']
    
    tolink_cols_dict = get_match_cols_and_values(vars_col, 'exact', False)
    
    # -------------------------------------------- #
    #   starting the building of query string      #
    # -------------------------------------------- #
    
    # filling the query core with all indexed columns and values from vars_col
    comparisons = [config_['comparisons'][x] for x in config_['comparisons']]
    strings = []
    for col in list(tolink_cols_dict.keys()):
        query_col_instructions = [x for x in comparisons if x['indexed_col'] == col][0]
        query_type = str(query_col_instructions['query_type'])
        prefix_ = """{"%s": {""" % query_type
        suffix_ = """}}"""

        if query_col_instructions['should_match'] == 'true':
            if query_col_instructions['is_fuzzy'] == 'true':
                boost = str(query_col_instructions['boost'])
                string = str(prefix_) + "\"" + str(col) + "\"" + ":" + " { \"query\" : \"" +  str(tolink_cols_dict[col]) + "\"" + ", \"fuzziness\":\"AUTO\", \"operator\":\"or\", \"boost\":\"" + boost + "\" }" + str(suffix_)
                
            if query_col_instructions['is_fuzzy'] == 'false':
                string = str(prefix_) + "\"" + str(col) + "\"" + ":" + "\"" +  str(tolink_cols_dict[col]) + "\"" + str(suffix_)
        strings.append(string)
    
    # building the query core. 
    # is_fuzzy = 'true' should be like: {"match": {"name":"ROBESPIERRE PITA", "fuzziness":"AUTO", "operator":"or", "boost":"3.0"}}, {"term": {"dt_nasc_a":"20070816"}}
    line = ','.join(strings)
    
    # Finally the final query string
    complete_query = """{ "bool": { "should": [ %s ] } }""" % (line)
    # CHANGELOG: to accomplish with new syntax from ES 8.x, we need to change the complete query from
    # complete_query = """{ "size": "%s", "query": { "bool": { "should": [ %s ] } } }""" % (query_size,line)
    # to
    # complete_query = """{ "bool": { "should": [ %s ] } } }""" % (query_size,line)
    # read more in: https://github.com/elastic/elasticsearch-py/issues/1698
    #               https://www.elastic.co/guide/en/elasticsearch/client/python-api/8.1/examples.html
    
    return complete_query
udf_build_non_exact_queries = F.udf(build_non_exact_queries, StringType())

#### finding matches

In [8]:
def find_elasticsearch_exact_best_candidate(vars_col, exact_queries_col):
    """
    Let us suppose a column with the following query:
    
    '{ "bool": { "must": [ 
                    {"match": {"name":"ROBESPIERRE PITA"}},
                    {"match": {"birthdate":"19870505"}}] } }'
    
    so, this function must return a dict with N results like: 
        {'_index': 'test', '_type': '_doc', '_id': 'aaabbbccc', '_score': 43.9280841,
        '_source': {'name': 'ROBESPIERRE PITA', 'birthdate': '19870505', 'other_col': 'other_value'}},
    
    being N the query_size value set on config, you can see this number on the 'size' field of the query.
    
    This result can now be used to compute the proper similarity and pick the 
    best candidate for each record
    """
    from elasticsearch import Elasticsearch
    from ast import literal_eval

    config_ = config_bc.value
    
    es_connect_string = config_['es_connect_string']
    es_index_name = config_['es_index_name']
    query_size = config_['query_size']
    
    es = Elasticsearch(es_connect_string)
    
    exact_queries_col = literal_eval(exact_queries_col)
    # CHANGELOG: To accomplish with the syntax of ES 8.x we need to add the lines
    # from ast import literal_eval
    # exact_queries_col = literal_eval(exact_queries_col)
    # String typed query contents does not meet the new requisits, rainsing the following error (even when they are properly written): 
    # # elasticsearch.BadRequestError: BadRequestError(400, 'parsing_exception', 'Unknown key for a VALUE_STRING in [query].')
    
    candidates = es.search(index=es_index_name, query=exact_queries_col, size=query_size)['hits']['hits']
    # CHANGELOG: To accomplish with the syntax of ES 8.x we need to change the line from:
    # candidates = es.search(index=es_index_name, body=exact_queries_col)['hits']['hits']
    # to 
    # candidates = es.search(index=es_index_name, query=exact_queries_col, size=query_size)['hits']['hits']
    # This could avoid the following errors: 
    # # ValueError: Couldn't merge 'body' with other parameters as it wasn't a mapping. Instead of using 'body' use individual API parameters
    # # elasticsearch.BadRequestError: BadRequestError(400, 'parsing_exception', 'Unknown key for a VALUE_STRING in [query].')
    
    if len(candidates) == 0:
        best_score_id, best_score_value, scores = 'null', 'null', 'null'
        return T.Row('best_candidate_exact', 'sim_best_candidate_exact', 'similarity_exact_candidates')(best_score_id, best_score_value, scores)
    else:
        cols_and_values = get_match_cols_and_values(vars_col, 'general', True)
        best_score_id, best_score_value, scores = find_best_candidates(cols_and_values, candidates)

        if float(best_score_value) >= float(config_['cutoff_exact_match']):
            return T.Row('best_candidate_exact', 'sim_best_candidate_exact', 'similarity_exact_candidates')(best_score_id, best_score_value, scores)
        else: 
            best_score_id, best_score_value, scores = 'null', 'null', 'null'
            return T.Row('best_candidate_exact', 'sim_best_candidate_exact', 'similarity_exact_candidates')(best_score_id, best_score_value, scores)

schema = StructType([StructField("best_candidate_exact", StringType(), False), 
                     StructField("sim_best_candidate_exact", StringType(), False), 
                     StructField("similarity_exact_candidates", StringType(), False)])
udf_find_elasticsearch_exact_best_candidate = F.udf(find_elasticsearch_exact_best_candidate, schema)


def find_elasticsearch_non_exact_best_candidate(vars_col, non_exact_queries_col):
    """
    Let us suppose a column with the following query:
    
    '{ "bool": { 
                 "should": [ 
                     {"match": {"nome_a":"ROBESPIERRE PITA", "fuzziness":"AUTO", "operator":"or", "boost":"3.0"}},
                     {"match": {"birthdate":"19870505"}} ] } }'
    
    so, this function must return a dict with N results like: 
        {'_index': 'test', '_type': '_doc', '_id': 'aaabbbccc', '_score': 43.9280841,
        '_source': {'name': 'ROBESPIERRE PITA', 'birthdate': '19870505', 'other_col': 'other_value'}},
    
    being N the query_size value set on config, you can see this number on the 'size' field of the query.
    
    This result can now be used to compute the proper similarity and pick the 
    best candidate for each record
    """
    from elasticsearch import Elasticsearch
    from ast import literal_eval
    
    config_ = config_bc.value
    
    es_connect_string = config_['es_connect_string']
    es_index_name = config_['es_index_name']
    query_size = config_['query_size']
    
    es = Elasticsearch(es_connect_string)
    
    non_exact_queries_col = literal_eval(non_exact_queries_col)
    # CHANGELOG: To accomplish with the syntax of ES 8.x we need to add the lines
    # from ast import literal_eval
    # exact_queries_col = literal_eval(exact_queries_col)
    # String typed query contents does not meet the new requisits, rainsing the following error (even when they are properly written): 
    # # elasticsearch.BadRequestError: BadRequestError(400, 'parsing_exception', 'Unknown key for a VALUE_STRING in [query].')
    
    
    candidates = es.search(index=es_index_name, query=non_exact_queries_col, size=query_size)['hits']['hits']
    # CHANGELOG: To accomplish with the syntax of ES 8.x we need to change the line from:
    # candidates = es.search(index=es_index_name, body=non_exact_queries_col)['hits']['hits']
    # to 
    # candidates = es.search(index=es_index_name, query=non_exact_queries_col, size=query_size)['hits']['hits']
    # This could avoid the following errors: 
    # # ValueError: Couldn't merge 'body' with other parameters as it wasn't a mapping. Instead of using 'body' use individual API parameters
    # # elasticsearch.BadRequestError: BadRequestError(400, 'parsing_exception', 'Unknown key for a VALUE_STRING in [query].')
    
    
    if len(candidates) == 0:
        best_score_id, best_score_value, scores = 'null', 'null', 'null'
        return T.Row('best_candidate_non_exact', 'sim_best_candidate_non_exact', 'similarity_non_exact_candidates')(best_score_id, best_score_value, scores)
    else:
        cols_and_values = get_match_cols_and_values(vars_col, 'general', True)
        best_score_id, best_score_value, scores = find_best_candidates(cols_and_values, candidates)
        return T.Row('best_candidate_non_exact', 'sim_best_candidate_non_exact', 'similarity_non_exact_candidates')(best_score_id, best_score_value, scores)
        
schema = StructType([StructField("best_candidate_non_exact", StringType(), False), 
                     StructField("sim_best_candidate_non_exact", StringType(), False), 
                     StructField("similarity_non_exact_candidates", StringType(), False)])
udf_find_elasticsearch_non_exact_best_candidate = F.udf(find_elasticsearch_non_exact_best_candidate, schema)



def find_best_candidates(cols_and_values, candidates):
    
    config_ = config_bc.value
    indexed_id_col = config_['datasets_info']['indexed_dataset']['id_column_name']
    id_value = cols_and_values[indexed_id_col]
    scores = {}
    
    for candidate in candidates:
        candidate_id = candidate['_source'][indexed_id_col]
        sim_candidate = []

        for col_and_value in list(cols_and_values.keys()):
            if col_and_value != indexed_id_col:
                comparison_info = [config_['comparisons'][x] for x in config_['comparisons'] if config_['comparisons'][x]['indexed_col'] == col_and_value][0]
                n_comparisons = len(config_['comparisons'].keys())

                sim_for_pair_of_cols = similarity_hub(n_comparisons, comparison_info, cols_and_values[col_and_value], candidate['_source'][col_and_value])

                sim_candidate.append(sim_for_pair_of_cols)

        score_max = sum([float(config_['comparisons'][x]['weight']) for x in config_['comparisons']])
        score = (sum(sim_candidate))/score_max
    
        scores[candidate_id] = score
    
    if len(scores) > 0:
        best_score_id = max(scores, key=scores.get)
        best_score_value = scores[best_score_id]
    else: 
        best_score_id = 'null'
        best_score_value = '0.0'
        scores = '{}'
    return best_score_id, best_score_value, scores
    
    
def similarity_hub(n_comparisons, comparison_info, col_and_value, candidate):
    """
    Currently the CIDACS-RL uses overlap for categorical data, jaro_winkler for names and hamming for dates.
    """
    import jellyfish
    
    # getting relevant information for this pair of values
    config_ = config_bc.value
#     score_max = sum([float(config_['comparisons'][x]['weight']) for x in config_['comparisons']])
    similarity = 0.0
    weight = float(comparison_info['weight'])
    penalty = float(comparison_info['penalty'])
    
    # first, test if some value are missing
    if (candidate == config_['null_value']) or (col_and_value == config_['null_value'])\
        or (candidate == "") or (col_and_value == "") or (candidate == None) or (col_and_value == None):
        similarity = similarity - penalty
    else: 
        sim_type = comparison_info['similarity']
        if (sim_type == 'overlap') and (col_and_value == candidate):
            similarity += (1.0) * weight
            return similarity
        elif (sim_type == 'overlap') and (col_and_value != candidate):
            similarity += 0.0
            return similarity
        elif sim_type == 'jaro_winkler':
            similarity += jellyfish.jaro_winkler(col_and_value, candidate) * weight
        elif sim_type == 'hamming':
            max_size = max(len(col_and_value), len(candidate))
            similarity += 1.0 - float(jellyfish.hamming_distance(col_and_value, candidate)/max_size) * weight
        else: 
            print('Please inform valid similarities for cidacs-rl')
        
        similarity = similarity
    return similarity    

In [9]:
jellyfish.jaro_winkler("Robespierre", "Robespierre")

1.0

#### main functions

In [10]:
def cidacs_rl_exact_phase(tolink_dataset):
    """
    This function take a dataframe to link with an indexed dataframe on elasticsearch.
    It consists in three main steps: 
        1) The first step consists in create an array column from a set of columns used on integration
        
        withColumn('vars', F.array(tolink_cols)) input: 
        +-----------+--------------------+------+
        |id_cidacs_b|                nome|  sexo|
        +-----------+--------------------+------+
        |          0|    ROBESPIERRE PITA|     1|
        +-----------+--------------------+------+
        
        withColumn('vars', F.array(tolink_cols)) output: 
        +-----------+--------------------+------+--------------------------+
        |id_cidacs_b|                nome|  sexo|                      vars|
        +-----------+--------------------+------+--------------------------+
        |        614|    ROBESPIERRE PITA|     1|  [0, ROBESPIERRE PITA, 1]|
        +-----------+--------------------+------+--------------------------+
        
        2) The second step will take the new array col as input and build exact queries:
        
        udf_build_exact_queries(F.col('vars')) output:
        
        { "size": "50",
            "query": { "bool": 
            { "must": [ 
                {"match": {"nome_a":"ROBESPIERRE PITA"}},
                {"match": {"sexo_a":"1"}} ] } } }
        
        +-----------+-----------------+------+--------------------------+----------------+
        |id_cidacs_b|             nome|  sexo|                      vars|     exact_query|
        +-----------+-----------------+------+--------------------------+----------------+
        |        614|    ROBESPIERR...|     1|  [0, ROBESPIERRE PITA, 1]| { "size": "5...|
        +-----------+-----------------+------+--------------------------+----------------+
        
        3) Finally, a udf should generate 3 new columns with the best candidate id, the similarity with 
           this best candidate, and the set of candidates scores. 
        
        +--------------------+------------------------+---------------------------+
        |best_candidate_exact|sim_best_candidate_exact|similarity_exact_candidates|
        +--------------------+------------------------+---------------------------+
        |                 614|                       1|        {614: 1, 34: 0.8...|
        +--------------------+------------------------+---------------------------+
        
    At last, this function should return the tolink_dataset with all these columns
    """ 
    start = time.time()
    # ------------------------------------ #
    # getting relevant values from config
    # ------------------------------------ #
    
    # collecting config json from broadcasted variable
    config_ = config_bc.value
    
    tolink_id_column = config_['datasets_info']['tolink_dataset']['id_column_name']
    
    tolink_columns = config_['datasets_info']['tolink_dataset']['columns']
    
    temp_dir = config_['temp_dir']
    
    paralelism = int(config_['datasets_info']['indexed_dataset']['default_paralelism'])

    prefix_sl = "StorageLevel."
    storage_level = config_['datasets_info']['indexed_dataset']['storage_level']
    
    write_checkpoint = config_['write_checkpoint']
    
    # ------------------------------------ #
    # preparing exact search
    # ------------------------------------ #
    # selecting columns
    tolink_dataset = tolink_dataset.select(tolink_columns)
    # building array of variable values
    tolink_dataset = tolink_dataset.withColumn('vars', F.array(tolink_columns))
    # building exact queries
    tolink_dataset = tolink_dataset.withColumn('exact_queries', udf_build_exact_queries(F.col('vars')))
    # finding the best candidate for each tolink record
    tolink_dataset = tolink_dataset.withColumn('result_exact_search', F.explode(F.array(udf_find_elasticsearch_exact_best_candidate(F.col('vars'), F.col('exact_queries')))))
    
    if write_checkpoint == 'true':
        tolink_dataset = optimize_with_checkpoint(
            df=tolink_dataset,
            parallelism=paralelism,
            storage_level=storage_level,      # ex.: "MEMORY_AND_DISK" ou StorageLevel.MEMORY_AND_DISK
            checkpoint_mode='durable',        # ou 'local' para mais velocidade
            eager=True,
            materialize=True,
            materialize_action='count',
            prefix_sl=prefix_sl,              # se você usa algo como "StorageLevel."
            unpersist_prev=True
        )
    
    # exploding array columns from the last function into 4 atomic cols
    tolink_dataset = tolink_dataset.withColumn('best_candidate_exact', tolink_dataset.result_exact_search['best_candidate_exact'])
    tolink_dataset = tolink_dataset.withColumn('sim_best_candidate_exact', tolink_dataset.result_exact_search['sim_best_candidate_exact'])
    tolink_dataset = tolink_dataset.withColumn('similarity_exact_candidates', tolink_dataset.result_exact_search['similarity_exact_candidates'])
    
    tolink_dataset = tolink_dataset.withColumn('sim_best_candidate_exact', F.col('sim_best_candidate_exact').cast('float'))
    
    # dropping array columns
    cols_to_drop = ['result_exact_search']
    tolink_dataset = tolink_dataset.drop(*cols_to_drop)
    
    print("\t[CIDACS-RL] time for exact phase: {} secs".format(time.time()-start))
    return tolink_dataset



def cidacs_rl_non_exact_phase(tolink_dataset):
    """
    This function take a dataframe from exact match phase and submit it to a non exact search.
    cidacs_rl_non_exact_phase(tolink_dataset) input: 
    
    +--------------------------+--------------------+------------------------+---------------------------+
    |                      vars|best_candidate_exact|sim_best_candidate_exact|similarity_exact_candidates|
    +--------------------------+--------------------+------------------------+---------------------------+
    |       [2, SAMILA SENA, 2]|                null|                    null|                       null|
    +--------------------------+--------------------+------------------------+---------------------------+
        
    cidacs_rl_non_exact_phase(tolink_dataset) output: 
        
        +------------------------+----------------------------+-------------------------------+
        |best_candidate_non_exact|sim_best_candidate_non_exact|similarity_exact_non_candidates|
        +------------------------+----------------------------+-------------------------------+
        |                       7|                        0.94|            {7: 0.94, 3: 0.9...|
        +------------------------+----------------------------+-------------------------------+
    
    At last, this function should return the tolink_dataset with all these columns
    """
    # ------------------------------------ #
    # getting relevant values from config
    # ------------------------------------ #
    
    # collecting config json from broadcasted variable
    start = time.time()
    config_ = config_bc.value
    
    tolink_id_column = config_['datasets_info']['tolink_dataset']['id_column_name']
    
    tolink_columns = config_['datasets_info']['tolink_dataset']['columns']
    
    temp_dir = config_['temp_dir']
    
    is_debug = config_['debug']
    
    paralelism = int(config_['datasets_info']['indexed_dataset']['default_paralelism'])

    prefix_sl = "StorageLevel."
    storage_level = config_['datasets_info']['indexed_dataset']['storage_level']
    
    write_checkpoint = config_['write_checkpoint']
    # ------------------------------------ #
    # preparing non exact search
    # ------------------------------------ #
    
    # building linked_from column. Non-null values on sim_best_candidate_exact must be filled 
    # as 'exact_match', otherwise as 'non_exact_match'.    
    filter_isnull = F.col('sim_best_candidate_exact').isNull()
    tolink_dataset = tolink_dataset.withColumn('linked_from', F.when(filter_isnull, 'non_exact_match').otherwise('exact_match'))
    
    # preparing filters for debug and non-debug executions
    filter_exact = F.col('linked_from') == 'exact_match'
    filter_non_exact = F.col('linked_from') == 'non_exact_match'
    
    if is_debug == 'false': 
        # declaring a filtered version of input dataset
        tolink_dataset_ = tolink_dataset.filter(filter_non_exact)
        # declaring the remainder dataframe
        tolink_dataset = tolink_dataset.filter(filter_exact)
        
        # creating, for remainder dataframe, the cols created in this function to ensure union
        tolink_dataset = tolink_dataset.withColumn('best_candidate_non_exact', F.lit(None))
        tolink_dataset = tolink_dataset.withColumn('sim_best_candidate_non_exact', F.lit(None))
        tolink_dataset = tolink_dataset.withColumn('similarity_non_exact_candidates', F.lit(None))
        tolink_dataset = tolink_dataset.withColumn('non_exact_queries', F.lit(None))
    else: 
        # inside dataframe receives the input integrally
        tolink_dataset_ = tolink_dataset
    
    tolink_dataset_ = tolink_dataset_.withColumn('non_exact_queries', udf_build_non_exact_queries(F.col('vars')))

    tolink_dataset_ = tolink_dataset_.withColumn('result_non_exact_search', F.explode(F.array(udf_find_elasticsearch_non_exact_best_candidate(F.col('vars'), F.col('non_exact_queries')))))
    
    if write_checkpoint == 'true':
        tolink_dataset_ = optimize_with_checkpoint(
            df=tolink_dataset_,
            parallelism=paralelism,
            storage_level=storage_level,
            checkpoint_mode='durable',   # ou 'local'
            eager=True,
            materialize=True,
            materialize_action='count',
            prefix_sl=prefix_sl,
            unpersist_prev=True
        )

    tolink_dataset_ = tolink_dataset_.withColumn('best_candidate_non_exact', tolink_dataset_.result_non_exact_search['best_candidate_non_exact'])
    tolink_dataset_ = tolink_dataset_.withColumn('sim_best_candidate_non_exact', tolink_dataset_.result_non_exact_search['sim_best_candidate_non_exact'])
    tolink_dataset_ = tolink_dataset_.withColumn('similarity_non_exact_candidates', tolink_dataset_.result_non_exact_search['similarity_non_exact_candidates'])
    
    tolink_dataset_ = tolink_dataset_.withColumn('sim_best_candidate_non_exact', F.col('sim_best_candidate_non_exact').cast('float'))
    
    cols_to_drop = ['result_non_exact_search']
    tolink_dataset_ = tolink_dataset_.drop(*cols_to_drop)
    
    if is_debug == 'false':
        tolink_dataset_ = tolink_dataset_.union(tolink_dataset)
    print("\t[CIDACS-RL] time for non-exact phase: {} secs".format(time.time()-start))
    return tolink_dataset_



def cidacsrl():
    now = datetime.now()
    dt_string = now.strftime("%d/%m/%Y %H:%M:%S")
    print("[CIDACS-RL] starting at {}".format(dt_string))
    start = time.time()

    config_ = config_bc.value
    
    # getting the auxiliary variables
    data_ext = config_['datasets_info']['indexed_dataset']['extension']
    data_path = config_['datasets_info']['indexed_dataset']['path']
    index_df_response = config_['index_data']
    index_name = config_['es_index_name']

    if index_df_response == 'yes':
        start_ = time.time()
        # getting the auxiliary variables
        data_ext = config_['datasets_info']['indexed_dataset']['extension']
        data_path = config_['datasets_info']['indexed_dataset']['path']
        
        paralelism = int(config_['datasets_info']['indexed_dataset']['default_paralelism'])
        
        prefix_sl = "StorageLevel."
        storage_level = config_['datasets_info']['indexed_dataset']['storage_level']
        
        # test the extension of the dataset to properly read it
        if data_ext == 'csv':
            indexed_dataset = spark.read.csv(data_path, header=True).repartition(paralelism).persist(eval(prefix_sl+storage_level))
        elif data_ext == 'parquet':
            indexed_dataset = spark.read.parquet(data_path).repartition(paralelism).persist(eval(prefix_sl+storage_level))
        else:
            print("Please make sure the extension for this dataset is set as 'csv' or 'parquet'")
    
        # # indexing, at last
        index_dataframe(indexed_dataset, index_name)
        print("[CIDACS-RL] indexing on, it took {} secs".format(time.time()-start_))
    
    # getting the auxiliary variables
    data_ext = config_['datasets_info']['tolink_dataset']['extension']
    data_path = config_['datasets_info']['tolink_dataset']['path']
    
    paralelism = int(config_['datasets_info']['tolink_dataset']['default_paralelism'])
    
    prefix_sl = "StorageLevel."
    storage_level = config_['datasets_info']['tolink_dataset']['storage_level']

    # test the extension of the dataset to properly read it
    if data_ext == 'csv':
        tolink_dataset = spark.read.csv(data_path, header=True).repartition(paralelism).persist(eval(prefix_sl+storage_level))
    elif data_ext == 'parquet':
        tolink_dataset = spark.read.parquet(data_path).repartition(paralelism).persist(eval(prefix_sl+storage_level))
    else:
        print("Please make sure the extension for this dataset is set as 'csv' or 'parquet'")
    
    tolink_dataset = cidacs_rl_exact_phase(tolink_dataset)
    
    tolink_dataset = cidacs_rl_non_exact_phase(tolink_dataset)
    
    tolink_dataset = tolink_dataset.withColumn('final_cidacs_rl_score', 
                                               F.when(F.col('linked_from') == 'exact_match', F.col('sim_best_candidate_exact'))\
                                                .otherwise(F.col('sim_best_candidate_non_exact')))
    
    tolink_dataset = tolink_dataset.withColumn('final_cidacs_rl_id', 
                                               F.when(F.col('linked_from') == 'exact_match', F.col('best_candidate_exact'))\
                                                .otherwise(F.col('best_candidate_non_exact')))
    
    now = datetime.now()
    dt_string = now.strftime("%d/%m/%Y %H:%M:%S")
    print("[CIDACS-RL] finished at {}".format(dt_string))
    print("[CIDACS-RL] total time elapsed: {} secs".format(time.time()-start))
    return tolink_dataset

# Running CIDACS-RL

In [11]:
config_file = 'config.json'
f = open(config_file)
config = json.load(f)

# broadcasting config
config_bc = sc.broadcast(config)

In [12]:
config

{'index_data': 'no',
 'es_index_name': 'fd-cidacs-rl',
 'es_connect_string': 'http://localhost:9200',
 'query_size': 50,
 'cutoff_exact_match': '0.95',
 'null_value': '99',
 'temp_dir': 'hdfs://barravento:9000/data/temp_dataframe/',
 'debug': 'false',
 'write_checkpoint': 'false',
 'datasets_info': {'indexed_dataset': {'path': 'hdfs://barravento:9000/data/synthetic-dataset-A.parquet',
   'extension': 'parquet',
   'columns': ['id_cidacs_a', 'nome_a', 'nome_mae_a', 'dt_nasc_a', 'sexo_a'],
   'id_column_name': 'id_cidacs_a',
   'storage_level': 'MEMORY_ONLY',
   'default_paralelism': '16'},
  'tolink_dataset': {'path': 'hdfs://barravento:9000/data/synthetic-datasets-b-1000.parquet',
   'extension': 'parquet',
   'columns': ['id_cidacs_b', 'nome_b', 'nome_mae_b', 'dt_nasc_b', 'sexo_b'],
   'id_column_name': 'id_cidacs_b',
   'storage_level': 'MEMORY_ONLY',
   'default_paralelism': '16'},
  'result_dataset': {'path': 'hdfs://barravento:9000/data/result/'}},
 'comparisons': {'name': {'index

In [13]:
linked_data = cidacsrl()

[CIDACS-RL] starting at 14/08/2025 15:00:56
	[CIDACS-RL] time for exact phase: 0.4350156784057617 secs
	[CIDACS-RL] time for non-exact phase: 0.3547844886779785 secs
[CIDACS-RL] finished at 14/08/2025 15:01:00
[CIDACS-RL] total time elapsed: 3.851865530014038 secs


In [14]:
now = datetime.now()
dt_string = now.strftime("%Y-%m-%d_%H-%M-%S")
config_ = config_bc.value

output_str_prefix = config_['datasets_info']['result_dataset']['path']
output_str_all = "{}cidacsrl-result_{}.parquet".format(output_str_prefix, dt_string)

In [15]:
indexed_path = config_['datasets_info']['indexed_dataset']['path']
indexed_df = spark.read.parquet(indexed_path)
linked_data = linked_data.join(indexed_df, 
                               linked_data.final_cidacs_rl_id == indexed_df.id_cidacs_a, 
                               "left")

In [16]:
columns_a = config_['datasets_info']['indexed_dataset']['columns']
columns_b = config_['datasets_info']['tolink_dataset']['columns']
seen = set()
merged_entity_cols = []
for cols in (columns_a, columns_b):
    for c in cols:
        if c not in seen:
            seen.add(c)
            if c in linked_data.columns:
                merged_entity_cols.append(c)
                
fixed_cols = [
    'vars',
    'exact_queries', 'best_candidate_exact', 'sim_best_candidate_exact', 'similarity_exact_candidates',
    'linked_from',
    'non_exact_queries', 'best_candidate_non_exact', 'sim_best_candidate_non_exact', 'similarity_non_exact_candidates',
    'final_cidacs_rl_score'
]

fixed_cols_present = [c for c in fixed_cols if c in linked_data.columns]

select_cols = merged_entity_cols + fixed_cols_present

linked_data = linked_data.select(*select_cols).sort(F.desc('final_cidacs_rl_score'))
linked_data.limit(3).toPandas()

Unnamed: 0,id_cidacs_a,nome_a,nome_mae_a,dt_nasc_a,sexo_a,id_cidacs_b,nome_b,nome_mae_b,dt_nasc_b,sexo_b,vars,exact_queries,best_candidate_exact,sim_best_candidate_exact,similarity_exact_candidates,linked_from,non_exact_queries,best_candidate_non_exact,sim_best_candidate_non_exact,similarity_non_exact_candidates,final_cidacs_rl_score
0,14956,LUIZ GABRIEL SANTOS BASTOS,MARIA MARGARIDA SANTOS SILVA,20090627,1,14956,LUIZ GABRIEL SANTOS BASTOS,MARIA MARGARIDA SANTOS SILVA,20090627,1,"[14956, LUIZ GABRIEL SANTOS BASTOS, MARIA MARG...","{ ""bool"": { ""must"": [ {""match"": {""nome_a"":""LUI...",14956,1.0,"{750484=0.8130559916274203, 731128=0.914195526...",exact_match,,,,,1.0
1,454770,MARCOS ALEXANDRE DE CASTRO DIAS,GIUNY NOQUEIRA DE FIQUEIREDO,20090525,1,454770,MARCOS ALEXANDRE DE CASTRO DIAS,GIUNY NOQUEIRA DE FIQUEIREDO,20090525,1,"[454770, MARCOS ALEXANDRE DE CASTRO DIAS, GIUN...","{ ""bool"": { ""must"": [ {""match"": {""nome_a"":""MAR...",454770,1.0,"{880184=0.7065247653313523, 19260=0.7851056810...",exact_match,,,,,1.0
2,643154,VALETIM DE OLIVEIRA SANTOS,VARLUCE SANTOS RIBEIRO,20100104,1,643154,VALETIM DE OLIVEIRA SANTOS,VARLUCE SANTOS RIBEIRO,20100104,1,"[643154, VALETIM DE OLIVEIRA SANTOS, VARLUCE S...","{ ""bool"": { ""must"": [ {""match"": {""nome_a"":""VAL...",643154,1.0,"{283172=0.850605413105413, 556052=0.8301767676...",exact_match,,,,,1.0


In [17]:
# linked_data.filter(F.col('linked_from') == 'non_exact_match') \
#            .sort(F.desc('final_cidacs_rl_score')).limit(3).toPandas()

In [18]:
linked_data.write.parquet(output_str_all, mode='overwrite')

In [19]:
print("Tempo total de execução: {} secs".format(time.time() - start))

Tempo total de execução: 159.88978672027588 secs


In [20]:
linked_data.select('linked_from').groupBy('linked_from').count().show()

+---------------+-----+
|    linked_from|count|
+---------------+-----+
|non_exact_match|  458|
|    exact_match|  542|
+---------------+-----+

