In [1]:
## Copied from pipeline.prediction.components

from typing import Dict, List, NamedTuple, Optional

import logging

# Configure logging
logging.basicConfig(level=logging.INFO)  # Set the desired logging level

def generate_candidates(
    global_entity_id: str,
    left_processed_table: str,
    right_processed_table: str,
    lead_source: str,
    run_type: str,
    stage: str,
    shard: Optional[int],
    environment: str,
    region: str,
    version: str,
    dry_run: bool = False,
    hyperparams: Dict = {
        "h3_resolution": 8,
        "h3_kring": 2,
        "city_h3_resolution": 5,
        "city_h3_kring": 2,
        "tokenset_name_candidate_threshold": 0.5,
    },
    additional_labels: str = None,
) -> str:
    # import logging

    from match.achilles import GenCanStage, GenerateCandidates
    from match.achilles.io import extract_additional_labels, patch_labels
    from match.achilles.vertex.config import PipelineRuntimeConfigFactory

    labels = (
        extract_additional_labels(additional_labels)
        if additional_labels
        else {}
    )
    patch_labels(
        global_entity_id=global_entity_id,
        pipeline_name="vendor_match",
        pipeline_step="generate_candidates",
        **labels,
    )
    # log = logging.getLogger()
    log = logging.getLogger(__name__)  # Get logger with current module name
    candidate_gen_cfg = (
        PipelineRuntimeConfigFactory(
            environment, region, global_entity_id, run_type, version
        )
        .get()
        .prediction.candidate_generation
    )
    log.info(f"Candidates generation config: {candidate_gen_cfg}")
    output_table = candidate_gen_cfg.output_table_id
    gc = GenerateCandidates(
        bq_input_left=left_processed_table,
        bq_input_right=right_processed_table,
        bq_output=output_table,
        lead_source=lead_source,
        h3_resolution=candidate_gen_cfg.h3_resolution,
        h3_kring=candidate_gen_cfg.h3_kring,
        city_h3_resolution=candidate_gen_cfg.city_h3_resolution,
        city_h3_kring=candidate_gen_cfg.city_h3_kring,
        tokenset_name_threshold=candidate_gen_cfg.tokenset_name_candidate_threshold,
        shard_total=candidate_gen_cfg.shard_total.get(stage),
        shard=shard,
    )
    log.info(f"Running candidate generation query: {output_table}")
    gc.generate(stages_l=[GenCanStage.from_string(stage)], dry_run=dry_run)
    return output_table

In [7]:
# generate right input query with delta
CANDIDATE_GENERATION_STAGES = ["geo_geo", "geo_non", "non_all", "union"]

global_entity_id = "PO_FI"
left_processed_table = "dh-global-sales-data.leadgen_sf_match_vertex_raw.left_processed_PO_FI"
right_processed_table = "dh-global-sales-data.leadgen_sf_match_vertex_raw.right_processed_PO_FI"
lead_source = "facebook"
run_type = "full"
shard = 0
environment = "dev"
region = "eur"
version = "latest"
dry_run = True


generate_candidates(
    global_entity_id = global_entity_id,
    left_processed_table = left_processed_table,
    right_processed_table = right_processed_table,
    lead_source = lead_source,
    run_type = run_type,
    stage = "union",
    shard = shard,
    environment = environment,
    region = region,
    version = version,
    dry_run = dry_run
)

INFO:achilles_io:Labels after update: {'dh_app': 'vendor-matching', 'dh_cc_id': '1001025045', 'global_entity_id': 'po_fi', 'pipeline_name': 'vendor_match', 'pipeline_step': 'generate_candidates'}
INFO:__main__:Candidates generation config: context=Context(global_entity_id='PO_FI', run_type='full', version='latest') common=CommonConfig(project='dh-global-sales-data-dev', location='us-central1', bq_project='dh-global-sales-data-dev', bq_location='US', bq_dataset={'full': 'leadgen_sf_match_vertex_raw', 'delta': 'leadgen_sf_match_vertex_delta_raw', 'instant_match': 'instant_match_vertex'}, leads_input_data_table='dh-global-sales-data.leadgen_cl.vendor_complete', flexible_match_dataset='cl_saleslayer', flexible_match_table='leadgen_flexible_match', gar_docker_repo='gsd-vendor-match', docker_image='prediction', docker_image_tag_default='latest', gar_kfp_repo='gsd-vendor-match-pipeline', gcp_project='dh-global-sales-data-dev', network='projects/433987166697/global/networks/vpc-network') outpu

-- Unique combine all sub-tables.
CREATE TABLE IF NOT EXISTS `dh-global-sales-data-dev.leadgen_sf_match_vertex_raw.candidates_PO_FI`
(
country_iso STRING,
left_row_id STRING,
left_name STRING,
left_name_local STRING,
left_name_stop STRING,
left_name_stop_phonetic STRING,
left_name_local_transliterated STRING,
left_street STRING,
left_street_stop STRING,
left_street_stop_phonetic STRING,
left_phone_number STRING,
left_lat FLOAT64,
left_lng FLOAT64,
left_registration_number STRING,
right_row_id STRING,
right_name STRING,
right_name_local STRING,
right_name_legal STRING,
right_name_stop STRING,
right_name_stop_phonetic STRING,
right_name_local_transliterated STRING,
right_street STRING,
right_street_stop STRING,
right_street_stop_phonetic STRING,
right_phone_number STRING,
right_lat FLOAT64,
right_lng FLOAT64,
right_registration_number STRING,
haversine FLOAT64,
tokenset_name_stop FLOAT64,
tokenset_street_stop FLOAT64,
tokenset_name_local FLOAT64,
tokenset_name_local_transliterated FLOAT6

'dh-global-sales-data-dev.leadgen_sf_match_vertex_raw.candidates_PO_FI'