# ❄️ Hybrid Entity Matching with AI_CLASSIFY

**Purpose**: Advanced entity matching using a two-stage approach that combines vector similarity with AI_CLASSIFY for optimal performance and cost efficiency.

**Key Innovation**: This hybrid approach uses vector cosine similarity for high-confidence matches (≥0.8) and AI_CLASSIFY for medium-confidence cases, providing an amazing balance between speed, accuracy, and cost.

**Why This Approach Works**:
- **Fast Processing**: High-confidence matches are resolved quickly with vector similarity
- **Intelligent Fallback**: AI_CLASSIFY handles ambiguous cases where vector similarity alone isn't sufficient
- **Cost Optimization**: AI_CLASSIFY is only used when there are multiple candidates, reducing unnecessary API calls
- **Proven Pattern**: Based on the successful AI_SIMILARITY + AI_CLASSIFY workflow from Snowflake's AI SQL guide

---


## ℹ️  Configuration Setup and Environment Preparation

**What This Cell Does**: This interactive configuration cell allows you to select your harmonization audit tables and entity columns for the hybrid matching process. It dynamically discovers available databases, schemas, and audit tables, then lets you choose which entity columns to use for matching. Once validated, it automatically sets up the Snowflake environment, prepares data structures, and extracts table/column information from the audit table.

**Why This Is Important**: The hybrid approach needs to know which tables and columns contain your entity data to perform the two-stage matching process. This combined configuration and setup step ensures we're working with the right data sources, establishes the database context, and prepares all necessary variables for the feature engineering step. This streamlined approach eliminates an extra manual step and ensures the environment is ready immediately after configuration.


In [None]:
# Harmonization Table Configuration
# Interactive selection of audit tables, harmonization tables, and entity columns

import streamlit as st
import pandas as pd
from snowflake.snowpark.context import get_active_session

# Get active session
session = get_active_session()

st.title("🔧 Hybrid Entity Matching Configuration")
st.markdown("Configure your harmonization tables and entity columns for hybrid matching analysis.")

# Step 1: Database and Schema Selection
st.header("1. Select Target Database and Schema")

col1, col2 = st.columns(2)

def find_name_column(df, context="general"):
    """Find the name column handling quoted column names"""
    name_column = None
    # Remove quotes from column names for comparison
    clean_columns = {col.strip('"\'\\').upper(): col for col in df.columns}
    
    # Look for common name column patterns
    patterns = ['NAME', 'DATABASE_NAME', 'DB_NAME', 'SCHEMA_NAME', 'TABLE_NAME']
    
    for pattern in patterns:
        if pattern in clean_columns:
            name_column = clean_columns[pattern]
            break
    
    # If no standard patterns found, try to find any column with 'name' in it
    if not name_column:
        for clean_name, original_col in clean_columns.items():
            if 'NAME' in clean_name:
                name_column = original_col
                break
    
    return name_column

with col1:
    try:
        # Get available databases
        databases_df = session.sql("SHOW DATABASES").to_pandas()
        
        if not databases_df.empty:
            name_column = find_name_column(databases_df, "database")
            
            if name_column:
                available_dbs = databases_df[name_column].tolist()
                selected_db = st.selectbox("Database:", available_dbs, index=0)
            else:
                st.error("❌ Could not find database name column. Available columns: " + str(list(databases_df.columns)))
                st.info("🔍 Falling back to manual entry")
                selected_db = st.text_input("Database Name:", value="ABT_BEST_BUY")
        else:
            st.warning("⚠️ No databases found. Please check your permissions.")
            selected_db = None
    except Exception as e:
        st.error(f"❌ An error occurred while fetching databases: {str(e)}")
        st.info("🔍 Falling back to manual entry")
        selected_db = st.text_input("Database Name:", value="ABT_BEST_BUY")

with col2:
    selected_schema = None
    if selected_db:
        try:
            # Get schemas for selected database
            schemas_df = session.sql(f"SHOW SCHEMAS IN DATABASE {selected_db}").to_pandas()
            
            if not schemas_df.empty:
                name_column = find_name_column(schemas_df, "schema")
                
                if name_column:
                    available_schemas = schemas_df[name_column].tolist()
                    if available_schemas:
                        selected_schema = st.selectbox("Schema:", available_schemas, index=0)
                    else:
                        st.warning("⚠️ No schemas found in the selected database.")
                else:
                    st.error("❌ Could not find schema name column. Available columns: " + str(list(schemas_df.columns)))
                    selected_schema = st.text_input("Schema Name:", value="PUBLIC")
            else:
                st.warning("⚠️ No schemas found in the selected database.")
                selected_schema = st.text_input("Schema Name:", value="PUBLIC")
        except Exception as e:
            st.error(f"❌ An error occurred while fetching schemas: {str(e)}")
            st.info("🔍 Falling back to manual entry")
            selected_schema = st.text_input("Schema Name:", value="PUBLIC")

if selected_db and selected_schema:
    # Set context
    session.sql(f"USE DATABASE {selected_db}").collect()
    session.sql(f"USE SCHEMA {selected_schema}").collect()
    
    st.success(f"✅ Context set to: {selected_db}.{selected_schema}")
    
    # Step 2: Find Audit Tables
    st.header("2. Select Harmonization Audit Table")
    
    try:
        # Look for audit harmonization tables
        audit_tables_df = session.sql(
            "SHOW TABLES LIKE 'AUDIT_HARMONIZATION_%'"
        ).to_pandas()
        
        if not audit_tables_df.empty:
            name_column = find_name_column(audit_tables_df, "table")
            
            if name_column:
                audit_table_names = audit_tables_df[name_column].tolist()
                
                # Always show as dropdown, even if only one option
                selected_audit_table = st.selectbox(
                    "Audit Table:", 
                    audit_table_names,
                    index=0,
                    help="Select the audit table from your harmonization process"
                )
            else:
                st.error("❌ Could not find table name column. Available columns: " + str(list(audit_tables_df.columns)))
                selected_audit_table = None
            
            if selected_audit_table:
                # Step 3: Load and Parse Audit Table
                st.header("3. Configure Entity Matching Columns")
                
                # Load audit table data
                audit_data_df = session.sql(f"SELECT * FROM {selected_audit_table}").to_pandas()
                
                # Handle case-insensitive column matching for audit data
                col_mapping = {}
                for col in audit_data_df.columns:
                    col_clean = col.strip('"\'\\').upper()
                    if col_clean == 'MATCHED_IN_OUTPUT':
                        col_mapping['matched_in_output'] = col
                    elif col_clean == 'REFERENCE_DATASET_COLUMN':
                        col_mapping['reference_dataset_column'] = col
                    elif col_clean == 'INPUT_DATASET_COLUMN':
                        col_mapping['input_dataset_column'] = col
                    elif col_clean == 'REFERENCE_DATASET':
                        col_mapping['reference_dataset'] = col
                    elif col_clean == 'INPUT_DATASET':
                        col_mapping['input_dataset'] = col
                    elif col_clean == 'REFERENCE_OUTPUT_TABLE':
                        col_mapping['reference_output_table'] = col
                    elif col_clean == 'INPUT_DATASET_TABLE':
                        col_mapping['input_dataset_table'] = col
                
                # Check if we have all required columns
                required_cols = ['matched_in_output', 'reference_dataset_column', 'input_dataset_column', 
                               'reference_output_table', 'input_dataset_table']
                missing_cols = [col for col in required_cols if col not in col_mapping]
                
                if missing_cols:
                    st.error(f"❌ Missing required columns in audit table: {missing_cols}")
                    st.info(f"Available columns: {list(audit_data_df.columns)}")
                else:
                    # Filter only matched columns (case-insensitive)
                    matched_filter = audit_data_df[col_mapping['matched_in_output']].str.upper() == 'YES'
                    matched_columns = audit_data_df[matched_filter].copy()
                    
                    if not matched_columns.empty:
                        st.subheader("Available Matched Columns:")
                        
                        # Display matched columns in a nice format
                        display_df = matched_columns[[
                            col_mapping['reference_dataset_column'], 
                            col_mapping['input_dataset_column'],
                            col_mapping.get('reference_dataset', col_mapping['reference_dataset_column']),
                            col_mapping.get('input_dataset', col_mapping['input_dataset_column'])
                        ]].copy()
                        
                        display_df.columns = ['Reference Column', 'Input Column', 'Reference Source', 'Input Source']
                        st.dataframe(display_df, use_container_width=True)
                        
                        # Step 4: Select Entity Column for Analysis
                        st.subheader("Select Primary Entity Column for Matching:")
                        
                        # Create options showing both reference and input columns
                        column_options = []
                        for idx, row in matched_columns.iterrows():
                            ref_col = row[col_mapping['reference_dataset_column']]
                            inp_col = row[col_mapping['input_dataset_column']]
                            option_label = f"{ref_col} ↔ {inp_col}"
                            column_options.append((option_label, ref_col, inp_col))
                        
                        if len(column_options) == 1:
                            selected_option = column_options[0]
                            st.info(f"📝 Auto-selected entity columns: {selected_option[0]}")
                        else:
                            option_labels = [opt[0] for opt in column_options]
                            selected_idx = st.selectbox(
                                "Entity Column Pair:",
                                range(len(option_labels)),
                                format_func=lambda x: option_labels[x],
                                help="Select the column pair that contains the primary entity names/labels for matching"
                            )
                            selected_option = column_options[selected_idx]
                        
                        # Helper function to properly quote table identifiers
                        def quote_table_identifier(table_path):
                            """Ensures table path components are properly quoted for case-sensitive names"""
                            if not table_path:
                                return table_path
                            
                            # Remove existing quotes first
                            table_path_clean = table_path.replace('"', '')
                            
                            # Split into parts (database.schema.table)
                            parts = table_path_clean.split('.')
                            
                            # Quote each part that needs it (contains lowercase, special chars, or starts with digit)
                            quoted_parts = []
                            for part in parts:
                                part = part.strip()
                                # Check if quoting is needed (has lowercase letters or special chars)
                                needs_quotes = any(c.islower() or not (c.isalnum() or c == '_') for c in part)
                                if needs_quotes:
                                    quoted_parts.append(f'"{part}"')
                                else:
                                    quoted_parts.append(part)
                            
                            return '.'.join(quoted_parts)
                        
                        # Get table information from audit table
                        ref_table_from_audit = matched_columns.iloc[0][col_mapping['reference_output_table']]
                        inp_table_from_audit = matched_columns.iloc[0][col_mapping['input_dataset_table']]
                        
                        # Properly quote the table names
                        ref_table_from_audit = quote_table_identifier(ref_table_from_audit)
                        inp_table_from_audit = quote_table_identifier(inp_table_from_audit)
                        
                        # Step 5: Verify and Edit Table Names
                        st.subheader("Verify Source Table Names:")
                        st.caption("⚠️ Confirm these table names are correct. Edit them if needed before saving configuration.")
                        
                        col_ref, col_inp = st.columns(2)
                        
                        with col_ref:
                            ref_table_input = st.text_input(
                                "Reference Table (Fully Qualified):",
                                value=ref_table_from_audit,
                                help="Full table path: DATABASE.SCHEMA.TABLE_NAME (quotes added automatically for mixed-case names)"
                            )
                        
                        with col_inp:
                            inp_table_input = st.text_input(
                                "Input Table (Fully Qualified):",
                                value=inp_table_from_audit,
                                help="Full table path: DATABASE.SCHEMA.TABLE_NAME (quotes added automatically for mixed-case names)"
                            )
                        
                        # Show warning if tables were edited
                        if ref_table_input != ref_table_from_audit or inp_table_input != inp_table_from_audit:
                            st.warning("⚠️ Table names have been modified from the audit table values. Make sure they are correct!")
                        
                        # Validate that tables exist
                        if st.button("✅ Validate, Save Configuration, and Set Up Environment"):
                            errors = []
                            
                            # Ensure input table names are properly quoted
                            ref_table_quoted = quote_table_identifier(ref_table_input)
                            inp_table_quoted = quote_table_identifier(inp_table_input)
                            
                            # Try to query the tables to verify they exist
                            try:
                                session.sql(f"SELECT 1 FROM {ref_table_quoted} LIMIT 1").collect()
                                st.success(f"✅ Reference table validated: {ref_table_quoted}")
                            except Exception as e:
                                errors.append(f"❌ Reference table not found: {ref_table_quoted}")
                                errors.append(f"   Error details: {str(e)}")
                            
                            try:
                                session.sql(f"SELECT 1 FROM {inp_table_quoted} LIMIT 1").collect()
                                st.success(f"✅ Input table validated: {inp_table_quoted}")
                            except Exception as e:
                                errors.append(f"❌ Input table not found: {inp_table_quoted}")
                                errors.append(f"   Error details: {str(e)}")
                            
                            if errors:
                                for error in errors:
                                    st.error(error)
                                st.info("💡 Please correct the table names and try again. Make sure the tables exist and you have access permissions.")
                            else:
                                # Store configuration with properly quoted table names
                                config = {
                                    'database': selected_db,
                                    'schema': selected_schema,
                                    'audit_table': selected_audit_table,
                                    'reference_table': ref_table_quoted,
                                    'input_table': inp_table_quoted,
                                    'reference_entity_column': selected_option[1],
                                    'input_entity_column': selected_option[2],
                                    'matched_columns_count': len(matched_columns)
                                }
                                
                                # Store in a TEMPORARY table (auto-drops at session end) for use by subsequent cells
                                config_df = pd.DataFrame([config])
                                
                                # Create temporary table using Snowpark DataFrame API
                                session.create_dataframe(config_df).write.save_as_table(
                                    'TEMP_HYBRID_CONFIG',
                                    mode='overwrite',
                                    table_type='temporary'  # Creates a true TEMPORARY table that auto-drops
                                )
                                
                                st.success("✅ Configuration validated and saved!")
                                st.markdown(f"""
                                **Saved Configuration:**
                                - Reference Table: `{ref_table_quoted}`
                                - Input Table: `{inp_table_quoted}`
                                - Reference Entity Column: `{selected_option[1]}`
                                - Input Entity Column: `{selected_option[2]}`
                                """)
                                
                                # --- BEGIN ENVIRONMENT SETUP (formerly r_Environment_Set_Up cell) ---
                                st.info("🔧 Setting up environment and preparing data structures...")
                                
                                try:
                                    # Clear all caches and ensure fresh start
                                    session.sql("ALTER SESSION SET USE_CACHED_RESULT = FALSE").collect()
                                    session.sql("ALTER SESSION SET QUERY_TAG = 'HYBRID_ENTITY_MATCHING_WITH_AI_CLASSIFY'").collect()
                                    
                                    # Drop any existing temporary tables to avoid conflicts
                                    session.sql("DROP TABLE IF EXISTS config_data_hybrid").collect()
                                    session.sql("DROP TABLE IF EXISTS audit_info_hybrid").collect()
                                    session.sql("DROP TABLE IF EXISTS hybrid_features").collect()
                                    session.sql("DROP TABLE IF EXISTS hybrid_final_results").collect()
                                    
                                    # Get the configuration from the temporary table
                                    session.sql("""
                                        CREATE OR REPLACE TEMPORARY TABLE config_data_hybrid AS
                                        SELECT * FROM TEMP_HYBRID_CONFIG LIMIT 1
                                    """).collect()
                                    
                                    # Set context based on configuration
                                    session.sql(f"""
                                        SET (db_name, schema_name) = (SELECT "database", "schema" FROM config_data_hybrid)
                                    """).collect()
                                    session.sql("USE DATABASE IDENTIFIER($db_name)").collect()
                                    session.sql("USE SCHEMA IDENTIFIER($schema_name)").collect()
                                    
                                    # Get table and column information
                                    session.sql("""
                                        SET (ref_entity_col, inp_entity_col, audit_table_name) = (
                                            SELECT 
                                                "reference_entity_column",
                                                "input_entity_column",
                                                "audit_table"
                                            FROM config_data_hybrid
                                        )
                                    """).collect()
                                    
                                    # Get the matched columns from the audit table
                                    session.sql("""
                                        CREATE OR REPLACE TEMPORARY TABLE column_mappings_hybrid AS
                                        SELECT 
                                            REFERENCE_DATASET_COLUMN as ref_col,
                                            INPUT_DATASET_COLUMN as inp_col
                                        FROM IDENTIFIER($audit_table_name)
                                        WHERE UPPER(MATCHED_IN_OUTPUT) = 'YES'
                                    """).collect()
                                    
                                    # Build dynamic column lists
                                    session.sql("""
                                        CREATE OR REPLACE TEMPORARY TABLE ref_columns_hybrid AS
                                        SELECT LISTAGG(ref_col, ', ') as ref_select_list
                                        FROM column_mappings_hybrid
                                        WHERE ref_col != 'COLLATED_COLUMN'
                                    """).collect()
                                    
                                    session.sql("""
                                        CREATE OR REPLACE TEMPORARY TABLE inp_columns_hybrid AS  
                                        SELECT LISTAGG(inp_col, ', ') as inp_select_list
                                        FROM column_mappings_hybrid
                                        WHERE inp_col != 'COLLATED_COLUMN'
                                    """).collect()
                                    
                                    # Get all matched columns and table names dynamically
                                    session.sql("""
                                        CREATE OR REPLACE TEMPORARY TABLE audit_info_hybrid AS
                                        SELECT 
                                            REFERENCE_DATASET_COLUMN as ref_col,
                                            INPUT_DATASET_COLUMN as inp_col,
                                            CASE 
                                                WHEN REFERENCE_OUTPUT_TABLE LIKE '%.%.%' THEN SPLIT_PART(REFERENCE_OUTPUT_TABLE, '.', 3)
                                                WHEN REFERENCE_OUTPUT_TABLE LIKE '%.%' THEN SPLIT_PART(REFERENCE_OUTPUT_TABLE, '.', 2)
                                                ELSE REFERENCE_OUTPUT_TABLE
                                            END as ref_table,
                                            CASE 
                                                WHEN INPUT_DATASET_TABLE LIKE '%.%.%' THEN SPLIT_PART(INPUT_DATASET_TABLE, '.', 3)
                                                WHEN INPUT_DATASET_TABLE LIKE '%.%' THEN SPLIT_PART(INPUT_DATASET_TABLE, '.', 2)
                                                ELSE INPUT_DATASET_TABLE
                                            END as inp_table,
                                            REFERENCE_OUTPUT_TABLE as ref_table_full,
                                            INPUT_DATASET_TABLE as inp_table_full,
                                            ROW_NUMBER() OVER (ORDER BY REFERENCE_DATASET_COLUMN) as rn
                                        FROM IDENTIFIER($audit_table_name) 
                                        WHERE UPPER(MATCHED_IN_OUTPUT) = 'YES'
                                    """).collect()
                                    
                                    # Set dynamic variables for table names and columns
                                    session.sql("""
                                        SET (ref_table_name, inp_table_name, ref_col1, inp_col1, ref_col2, inp_col2, ref_col3, inp_col3, ref_table_full_name, inp_table_full_name) = (
                                            SELECT 
                                                MAX(CASE WHEN rn = 1 THEN ref_table END),
                                                MAX(CASE WHEN rn = 1 THEN inp_table END),
                                                MAX(CASE WHEN rn = 1 THEN ref_col END),
                                                MAX(CASE WHEN rn = 1 THEN inp_col END),
                                                MAX(CASE WHEN rn = 2 THEN ref_col END),
                                                MAX(CASE WHEN rn = 2 THEN inp_col END),
                                                MAX(CASE WHEN rn = 3 THEN ref_col END),
                                                MAX(CASE WHEN rn = 3 THEN inp_col END),
                                                (SELECT "reference_table" FROM config_data_hybrid),
                                                (SELECT "input_table" FROM config_data_hybrid)
                                            FROM audit_info_hybrid
                                        )
                                    """).collect()
                                    
                                    # Display configuration summary
                                    summary_df = session.sql("""
                                        SELECT 
                                            'SETUP COMPLETE' as status,
                                            $ref_table_full_name as reference_table_used,
                                            $inp_table_full_name as input_table_used,
                                            $ref_entity_col as reference_entity_column,
                                            $inp_entity_col as input_entity_column,
                                            'Ready for hybrid feature engineering' as next_step
                                    """).to_pandas()
                                    
                                    st.success("✅ Environment setup complete!")
                                    st.dataframe(summary_df, use_container_width=True)
                                    st.info("✨ You can now proceed to the Vector Feature Engineering cell.")
                                    
                                except Exception as setup_error:
                                    st.error(f"❌ Error during environment setup: {str(setup_error)}")
                                    st.info("Configuration was saved, but environment setup failed. You may need to run the setup manually.")
                                # --- END ENVIRONMENT SETUP ---
                        
                    
                        
                    else:
                        st.warning("⚠️ No matched columns found in the audit table. Make sure 'MATCHED_IN_OUTPUT' column contains 'Yes' values.")
        else:
            st.warning(f"⚠️ No audit tables found matching pattern 'AUDIT_HARMONIZATION_%' in {selected_db}.{selected_schema}")
            st.info("💡 Make sure you've run the Snowflake Data Harmonization Streamlit app first to generate audit tables.")
    
    except Exception as e:
        st.error(f"❌ Error accessing audit tables: {str(e)}")
        st.info("🔍 Debug info: Check if the audit table exists and you have proper permissions")
else:
    st.info("👆 Please select a database and schema to continue.")


## ℹ️  Vector Feature Engineering

**What This Cell Does**: This cell creates the foundational features needed for the hybrid matching approach. It generates vector embeddings for both reference and input entity names using Snowflake's CORTEX.EMBED_TEXT_1024 function, performs basic text cleaning and standardization, and creates a cross-join of all possible entity pairs with their similarity scores.

**Why This Is Important**: The hybrid approach relies on vector similarity as its primary matching mechanism. This feature engineering step creates the vector embeddings that will be used in both stages of the matching process - for high-confidence direct matches and as input to AI_CLASSIFY for medium-confidence cases. The text cleaning ensures consistent comparison, and the similarity filtering reduces the search space for more efficient processing.


In [None]:
-- Vector Feature Engineering for Hybrid Matching
-- Creates vector embeddings and similarity scores for the two-stage matching process

-- Create the hybrid_features table with vector embeddings
CREATE OR REPLACE TABLE hybrid_features AS
WITH reference_features AS (
    SELECT 
        -- Core entity matching columns (dynamically selected)
        IDENTIFIER($ref_col1) as REF_COL1,
        IDENTIFIER($ref_col2) as REF_COL2,
        IDENTIFIER($ref_col3) as REF_COL3,
        -- Entity name and cleaned version for matching
        IDENTIFIER($ref_entity_col) as REF_RAW_NAME,
        -- Pre-compute vector embedding for performance
        SNOWFLAKE.CORTEX.EMBED_TEXT_1024('voyage-multilingual-2', IDENTIFIER($ref_entity_col)) as REF_EMBEDDING
    FROM IDENTIFIER($ref_table_full_name)
),
input_features AS (
    SELECT 
        -- Core entity matching columns (dynamically selected) 
        IDENTIFIER($inp_col1) as INP_COL1,
        IDENTIFIER($inp_col2) as INP_COL2,
        IDENTIFIER($inp_col3) as INP_COL3,
        -- Entity name and cleaned version for matching
        IDENTIFIER($inp_entity_col) as INP_RAW_NAME,
       -- Pre-compute vector embedding for performance
        SNOWFLAKE.CORTEX.EMBED_TEXT_1024('voyage-multilingual-2', IDENTIFIER($inp_entity_col)) as INP_EMBEDDING
    FROM IDENTIFIER($inp_table_full_name)
)
SELECT 
    -- Reference table columns with REF_ prefix
    ref.REF_COL1,
    ref.REF_COL2,
    ref.REF_COL3,
    ref.REF_RAW_NAME,
    ref.REF_EMBEDDING,
    -- Input table columns with INP_ prefix  
    inp.INP_COL1,
    inp.INP_COL2,
    inp.INP_COL3,
    inp.INP_RAW_NAME,
    inp.INP_EMBEDDING,
    -- Pre-compute vector similarity for efficiency
    VECTOR_COSINE_SIMILARITY(ref.REF_EMBEDDING, inp.INP_EMBEDDING) as vector_similarity
FROM reference_features ref
CROSS JOIN input_features inp
-- Filter out very low similarity pairs to improve performance
WHERE VECTOR_COSINE_SIMILARITY(ref.REF_EMBEDDING, inp.INP_EMBEDDING) > 0.2;

-- Show the performance and configuration used
SELECT 
    'HYBRID VECTOR FEATURES ENGINEERING COMPLETE' as method,
    COUNT(*) as total_comparisons,
    COUNT(DISTINCT REF_RAW_NAME) as reference_products,
    COUNT(DISTINCT INP_RAW_NAME) as input_products,
    ROUND(COUNT(*) * 1.0 / COUNT(DISTINCT REF_RAW_NAME), 1) as avg_comparisons_per_reference_product,
    ROUND(AVG(vector_similarity), 3) as avg_vector_similarity,
    $ref_table_full_name as reference_table_used,
    $inp_table_full_name as input_table_used,
    $ref_entity_col as reference_entity_column,
    $inp_entity_col as input_entity_column,
    'Vector embeddings ready for hybrid matching process' as note
FROM hybrid_features;


## ℹ️  Hybrid Matching: Vector Similarity + AI_CLASSIFY

**What This Cell Does**: This is the core of the hybrid approach, implementing the two-stage matching process. First, it identifies high-confidence matches (≥0.9 vector similarity) that can be resolved immediately. Then, for medium-confidence cases, it uses AI_CLASSIFY to select the best match from the top-5 candidates. This approach combines the speed of vector similarity with the intelligence of AI_CLASSIFY for optimal performance.

**Why This Is Important**: This hybrid approach provides the best balance between speed, accuracy, and cost. High-confidence matches are resolved quickly without expensive AI_CLASSIFY calls, while medium-confidence cases benefit from AI_CLASSIFY's ability to understand context and make intelligent decisions. This pattern is inspired by the successful AI_SIMILARITY + AI_CLASSIFY workflow and ensures that AI_CLASSIFY is only used when there are multiple viable candidates.


In [None]:
-- Hybrid Matching: Vector Similarity + AI_CLASSIFY
-- Implements the two-stage matching process for optimal performance and cost efficiency

CREATE OR REPLACE TABLE hybrid_final_results AS
WITH top_k_candidates AS (
    -- Get top 15 candidates per reference product for AI_CLASSIFY
    SELECT 
      
        hf.REF_RAW_NAME,
        hf.INP_RAW_NAME,
        hf.REF_COL1,
        hf.INP_COL1,
        hf.REF_COL2,
        hf.INP_COL2,
        hf.REF_COL3,
        hf.INP_COL3,
        hf.vector_similarity,
        ROW_NUMBER() OVER (PARTITION BY hf.REF_RAW_NAME ORDER BY hf.vector_similarity DESC) as rank
    FROM hybrid_features hf
    WHERE hf.vector_similarity > 0.3  -- Basic threshold for candidate selection
),

-- Aggregate top K candidates for AI_CLASSIFY
agg_candidates AS (
    SELECT 
       
        REF_RAW_NAME,
        ARRAY_AGG(INP_RAW_NAME) WITHIN GROUP (ORDER BY rank) as candidate_names,
        COUNT(*) as candidate_count
    FROM top_k_candidates
    WHERE rank <= 15  -- Top 15 candidates for AI_CLASSIFY
    GROUP BY  REF_RAW_NAME
),

-- Apply AI_CLASSIFY for medium confidence matches
ai_classified AS (
    SELECT 
     
        ac.REF_RAW_NAME,
        ac.candidate_names,
        ac.candidate_count,
        -- Apply AI_CLASSIFY only if we have multiple candidates
        CASE 
            WHEN ac.candidate_count >= 2 THEN
                GET(AI_CLASSIFY(ac.REF_RAW_NAME, ac.candidate_names):"labels", 0)::string
            ELSE ac.candidate_names[0]::string
        END as ai_selected_match,
        -- Mark if AI_CLASSIFY was used
        CASE WHEN ac.candidate_count >= 2 THEN 1 ELSE 0 END as ai_classify_used
    FROM agg_candidates ac
),

-- Get the final results with confidence scoring
final_scoring AS (
    SELECT 
        tc.REF_RAW_NAME,
        tc.INP_RAW_NAME,
        tc.REF_COL1,
        tc.INP_COL1,
        tc.REF_COL2,
        tc.INP_COL2,
        tc.REF_COL3,
        tc.INP_COL3,
        tc.vector_similarity,
        tc.rank,
        ac.ai_selected_match,
        ac.ai_classify_used,
        -- Use AI_CLASSIFY result if it matches, otherwise use top vector match
        CASE 
            WHEN ac.ai_selected_match = tc.INP_RAW_NAME THEN ac.ai_selected_match
            WHEN tc.rank = 1 THEN tc.INP_RAW_NAME
            ELSE NULL
        END as final_match,
        -- Enhanced confidence: boost AI_CLASSIFY matches slightly
        CASE 
            WHEN ac.ai_selected_match = tc.INP_RAW_NAME THEN 
                LEAST(1.0, tc.vector_similarity + 0.1)
            ELSE tc.vector_similarity
        END as enhanced_confidence
    FROM top_k_candidates tc
    LEFT JOIN ai_classified ac ON tc.REF_RAW_NAME = ac.REF_RAW_NAME
),

-- Get the best match per reference product
best_matches AS (
    SELECT 
        
        final_match,
        enhanced_confidence,
        REF_RAW_NAME as reference_entity_name,
        INP_RAW_NAME as input_entity_name,
        vector_similarity,
        ai_classify_used,
        -- Determine match method
        CASE 
            WHEN ai_classify_used = 1 AND final_match = ai_selected_match THEN 'AI_CLASSIFY'
            WHEN vector_similarity >= 0.8 THEN 'HIGH_CONFIDENCE_VECTOR'
            ELSE 'MEDIUM_CONFIDENCE_VECTOR'
        END as match_method,
        -- Include key identifier columns for evaluation
        REF_COL1 as reference_id_column,
        INP_COL1 as input_id_column,
        REF_COL2 as reference_label_column,
        INP_COL2 as input_label_column,
        REF_COL3 as reference_detail_column,
        INP_COL3 as input_detail_column,
        rank,
        -- No brand/model matching in hybrid version
        0 as brand_exact_match,
        0 as model_exact_match
    FROM final_scoring
    WHERE final_match IS NOT NULL
    QUALIFY ROW_NUMBER() OVER (PARTITION BY REF_RAW_NAME ORDER BY enhanced_confidence DESC) = 1
)

SELECT * FROM best_matches;

-- Performance summary
SELECT 
    'HYBRID VECTOR + AI_CLASSIFY MATCHING COMPLETE' as method,
    COUNT(*) as total_matches,
    ROUND(AVG(enhanced_confidence), 3) as avg_confidence,
    ROUND(AVG(vector_similarity), 3) as avg_vector_similarity,
    COUNT(CASE WHEN enhanced_confidence >= 0.8 THEN 1 END) as high_confidence_matches,
    SUM(ai_classify_used) as ai_classify_used_count,
    ROUND(SUM(ai_classify_used) * 100.0 / COUNT(*), 2) as ai_classify_usage_percent,
    COUNT(CASE WHEN match_method = 'AI_CLASSIFY' THEN 1 END) as ai_classify_matches,
    COUNT(CASE WHEN match_method = 'HIGH_CONFIDENCE_VECTOR' THEN 1 END) as high_confidence_vector_matches,
    COUNT(CASE WHEN match_method = 'MEDIUM_CONFIDENCE_VECTOR' THEN 1 END) as medium_confidence_vector_matches
FROM hybrid_final_results;


## ℹ️ Record Management and Threshold Selection

**What This Cell Does**: This interactive cell allows you to review the confidence distribution of your hybrid matching results and select a confidence threshold to separate records into MATCHED and UNMATCHED tables. You can also customize the table names before creation.

**Why This Is Important**: Not all matches require the same level of confidence for different business use cases. This cell gives you control over what gets accepted as a match versus what needs further review. The ability to customize table names ensures the output tables fit your naming conventions and workflow requirements. Using CREATE OR REPLACE ensures you can re-run the process with different thresholds without manual cleanup.


In [None]:
# Record Management - Confidence Threshold Selection and Table Creation
# Interactive Streamlit interface for selecting confidence thresholds and creating matched/unmatched tables

import streamlit as st
import pandas as pd
from snowflake.snowpark.context import get_active_session
import numpy as np
import altair as alt # Import Altair for advanced charting controls

# Get active session
session = get_active_session()

# Setting a cleaner title and description for the section
st.title("🔗 Hybrid Matching - Finalize Records")
st.markdown("Select a confidence threshold to separate records into **MATCHED** and **UNMATCHED** output tables.")

# --- Configuration Check ---
try:
    # Safely retrieve configuration from the temporary table
    config_df = session.sql("SELECT * FROM TEMP_HYBRID_CONFIG LIMIT 1").to_pandas()
    if config_df.empty:
        st.error("❌ Configuration not found. Please run the setup cell first.")
        st.stop()
    audit_table_name = config_df.iloc[0]['audit_table']
    results_table_name = "hybrid_final_results"

    # Quick check for results table existence (will throw error if not found)
    session.sql(f"SELECT 1 FROM {results_table_name} LIMIT 1").collect()

except Exception as e:
    st.error(f"❌ Error initializing. Ensure the configuration and hybrid matching cells were run successfully: {str(e)}")
    st.stop()


# --- 1. Confidence Distribution Visualization ---
st.header("1. Confidence Distribution Analysis")
st.markdown("Visualize the distribution of match quality before setting your threshold.")

try:
    # Fetch data for visualization and breakdown
    # Binning data. Store confidence as a float for charting/sorting.
    # Note: The SQL query still bins by 0.1 (10%), but Altair controls its display.
    confidence_data_sf = session.sql(f"""
        SELECT 
            FLOOR(enhanced_confidence * 10) / 10 AS confidence_bin_float,
            COUNT(*) AS record_count
        FROM {results_table_name}
        GROUP BY 1
        ORDER BY 1 DESC
    """).to_pandas()
    
    # Calculate percentage string for display in the table
    confidence_data_sf.columns = ['Confidence Bin Float', 'Record Count']
    confidence_data_sf['Confidence Bin Float'] = confidence_data_sf['Confidence Bin Float'].astype(float)

    col_chart, col_table = st.columns([7, 3])

    with col_chart:
        st.subheader("Match Count by Confidence Bin")
        
        # --- Altair Chart for Snappier Look and Percentage Formatting ---
        chart = alt.Chart(confidence_data_sf).mark_bar().encode(
            # X-axis: Use the float for binning, format label as percentage
            x=alt.X('Confidence Bin Float', 
                    title='Minimum Enhanced Confidence',
                    bin=alt.Bin(step=0.1), # Explicitly set bin step to 0.1 (10%)
                    axis=alt.Axis(format='.0%') 
                   ),
            y=alt.Y('Record Count', 
                    title='Record Count',
                    axis=alt.Axis(grid=False) # Remove horizontal gridlines
                   ),
            tooltip=[
                alt.Tooltip('Confidence Bin Float', title='Min Confidence', format='.1%'), 
                'Record Count'
            ]
        ).properties(
            height=300
        ).configure_view(
            stroke='transparent' # Remove outer border
        ).configure_axis(
            grid=False, # Remove vertical gridlines
            domainColor='gray',
            tickColor='gray',
            titleFontWeight='bold'
        )

        st.altair_chart(chart, use_container_width=True)
        # --- End Altair Chart ---

    with col_table:
        st.subheader("Summary Table")
        # Add Percentage of Total
        total_records = confidence_data_sf['Record Count'].sum()
        confidence_data_sf['% of Total'] = (confidence_data_sf['Record Count'] / total_records * 100).round(1).astype(str) + '%'
        
        # Calculate percentage string for display in the table
        display_df = confidence_data_sf.copy()
        # Display bin as a percentage (e.g., 0.9 -> 90%)
        display_df['Min Confidence'] = (display_df['Confidence Bin Float'] * 100).astype(int).astype(str) + '%'
        
        st.dataframe(
            display_df[['Min Confidence', 'Record Count', '% of Total']],
            use_container_width=True,
            hide_index=True
        )

except Exception as e:
    st.error(f"❌ Error generating confidence breakdown visualization: {str(e)}")
    # Continue execution to allow manual table creation if needed


# --- 2. Threshold Selection ---
st.header("2. Select Confidence Threshold")
st.caption("Move the slider to see how the matched/unmatched split changes instantly.")

# Use a slider in the 60-100% range for easier user input (step 5%)
selected_threshold_percent = st.slider(
    "Minimum Enhanced Confidence for MATCHED Records:",
    min_value=60, 
    max_value=100, 
    value=80, # Default to 80%
    step=5,
    format="≥ %d%%" # Show as percentage string
)

# Convert back to float 0.0-1.0 range for SQL query
selected_threshold = selected_threshold_percent / 100.0 

st.info(f"💡 All records with **Enhanced Confidence $\\geq$ {selected_threshold_percent:.0f}%** will be flagged as MATCHED.")

# Calculate split preview (executed after slider change)
split_preview = session.sql(f"""
    SELECT 
        SUM(CASE WHEN enhanced_confidence >= {selected_threshold} THEN 1 ELSE 0 END) as matched_count,
        SUM(CASE WHEN enhanced_confidence < {selected_threshold} THEN 1 ELSE 0 END) as unmatched_count,
        COUNT(*) as total_count
    FROM {results_table_name}
""").to_pandas()

row = split_preview.iloc[0]
total = int(row['TOTAL_COUNT'])
matched = int(row['MATCHED_COUNT'])
unmatched = int(row['UNMATCHED_COUNT'])

# Use st.columns and st.metric for clear, consistent KPI display
col1, col2, col3 = st.columns(3)
with col1:
    st.metric("Total Records", f"{total:,}")
with col2:
    # REMOVED DELTA/ARROW: Combined count and percentage into the primary value field
    matched_pct = round(matched / total * 100, 1) if total > 0 else 0
    st.metric("Matched Records", f"{matched:,} ({matched_pct}%)") 
with col3:
    # REMOVED DELTA/ARROW: Combined count and percentage into the primary value field
    unmatched_pct = round(unmatched / total * 100, 1) if total > 0 else 0
    st.metric("Unmatched Records", f"{unmatched:,} ({unmatched_pct}%)")


# --- 3. Sample Preview (Hidden by default) ---
# Update sample preview to show confidence as percent
with st.expander("👁️ View Sample Results (Top 20 by Confidence)"):
    sample_results = session.sql(f"""
        SELECT 
            reference_entity_name,
            input_entity_name,
            ROUND(enhanced_confidence * 100, 1) || '%' as enhanced_confidence_pct, -- Convert to percentage string
            ROUND(vector_similarity * 100, 1) || '%' as vector_similarity_pct,
            match_method,
            CASE WHEN enhanced_confidence >= {selected_threshold} THEN '✅ MATCHED' ELSE '⚠️ UNMATCHED' END as status
        FROM {results_table_name}
        ORDER BY enhanced_confidence DESC
        LIMIT 20
    """).to_pandas()
    
    # Rename columns for display
    sample_results.columns = [
        'Reference Entity', 
        'Input Entity', 
        'Enhanced Confidence', 
        'Vector Similarity', 
        'Match Method', 
        'Status'
    ]
    st.dataframe(sample_results, use_container_width=True)


# --- 4. Configure & Create Tables ---
st.header("4. Configure Output Table Names & Finalize")
# Generate sensible default names
matched_table_name = f"{audit_table_name}_HYBRID_MATCHED"
unmatched_table_name = f"{audit_table_name}_HYBRID_UNMATCHED"

col_matched, col_unmatched = st.columns(2)

with col_matched:
    matched_table_input = st.text_input(
        "Matched Records Table Name:",
        value=matched_table_name
    )

with col_unmatched:
    unmatched_table_input = st.text_input(
        "Unmatched Records Table Name:",
        value=unmatched_table_name
    )

if matched_table_input == unmatched_table_input:
    st.error("❌ Table names must be different!")
else:
    st.info("⚠️ Tables will be created using `CREATE OR REPLACE`. Existing tables will be overwritten.")
    
    # Use a primary button for the final, irreversible action
    if st.button("🚀 Finalize & Create Output Tables", type="primary"):
        with st.spinner("Creating tables..."):
            try:
                # Create matched records table (SQL remains the same)
                session.sql(f"""
                    CREATE OR REPLACE TABLE {matched_table_input} AS
                    SELECT *
                    FROM {results_table_name}
                    WHERE enhanced_confidence >= {selected_threshold}
                """).collect()
                
                # Create unmatched records table (SQL remains the same)
                session.sql(f"""
                    CREATE OR REPLACE TABLE {unmatched_table_input} AS
                    SELECT *
                    FROM {results_table_name}
                    WHERE enhanced_confidence < {selected_threshold}
                """).collect()
                
                matched_df = session.sql(f"SELECT COUNT(*) as cnt FROM {matched_table_input}").to_pandas()
                unmatched_df = session.sql(f"SELECT COUNT(*) as cnt FROM {unmatched_table_input}").to_pandas()
                
                # Handling case-insensitivity of SQL output column names
                matched_count = int(matched_df.iloc[0]['CNT'])
                unmatched_count = int(unmatched_df.iloc[0]['CNT'])
                
                # Create audit table to track table creation history
                # Use fixed audit table name for the schema (not tied to specific audit table)
                hybrid_audit_table = "AUDIT_HARMONIZATION_HYBRID_AUDIT"
                
                # Get reference and input table names from config
                ref_table_full = config_df.iloc[0]['reference_table']
                inp_table_full = config_df.iloc[0]['input_table']
                
                # Create audit table if it doesn't exist
                session.sql(f"""
                    CREATE TABLE IF NOT EXISTS {hybrid_audit_table} (
                        unmatched_table_name VARCHAR,
                        matched_table_name VARCHAR,
                        reference_table_name VARCHAR,
                        input_table_name VARCHAR,
                        confidence_threshold FLOAT,
                        matched_record_count INTEGER,
                        unmatched_record_count INTEGER,
                        created_timestamp TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
                    )
                """).collect()
                
                # Insert audit record
                session.sql(f"""
                    INSERT INTO {hybrid_audit_table} (
                        unmatched_table_name,
                        matched_table_name,
                        reference_table_name,
                        input_table_name,
                        confidence_threshold,
                        matched_record_count,
                        unmatched_record_count
                    )
                    VALUES (
                        '{unmatched_table_input}',
                        '{matched_table_input}',
                        '{ref_table_full}',
                        '{inp_table_full}',
                        {selected_threshold},
                        {matched_count},
                        {unmatched_count}
                    )
                """).collect()
                
                # st.balloons() REMOVED per user request
                st.success(f"🎉 **Tables created successfully!**")
                # Update markdown to use percentage format
                st.markdown(f"**{matched_table_input}**: {matched_count:,} records (Confidence $\\geq$ {selected_threshold_percent:.0f}%)")
                st.markdown(f"**{unmatched_table_input}**: {unmatched_count:,} records (Confidence $<$ {selected_threshold_percent:.0f}%)")
                st.markdown(f"**{hybrid_audit_table}**: Audit record created with timestamp")
                st.info("You can now use these tables for further analysis or downstream processing.")
                
            except Exception as e:
                st.error(f"❌ Error creating tables: {str(e)}")


## ℹ️ Performance Evaluation and Analysis

**What This Cell Does**: This cell evaluates the hybrid matching results against ground truth data to measure accuracy, precision, and recall. It compares the hybrid approach against other methods (if available) and provides detailed breakdowns by match method, confidence levels, and AI_CLASSIFY usage patterns.

**Why This Is Important**: Evaluation is crucial for understanding the effectiveness of the hybrid approach. This analysis shows how well the two-stage process performs, identifies where AI_CLASSIFY adds value, and provides insights into the cost-benefit trade-offs. The detailed breakdowns help optimize the approach and understand which cases benefit most from AI_CLASSIFY intervention.


In [None]:
-- Performance Evaluation and Analysis
-- Evaluates hybrid matching results against ground truth with confidence-based breakdowns

-- CTE to select a unique set of ground truth mappings
WITH unique_ground_truth AS (
    SELECT
        IDBUY,
        IDABT,
        ROW_NUMBER() OVER(PARTITION BY IDBUY ORDER BY IDABT) as rn
    FROM ABT_BEST_BUY.STRUCTURED.ABT_BEST_BUY_PERFECT_MAPPING
),

-- Hybrid evaluation with confidence buckets
hybrid_evaluation AS (
    SELECT
        hfr.input_detail_column,
        hfr.input_entity_name,
        hfr.reference_detail_column,
        hfr.enhanced_confidence,
        hfr.vector_similarity,
        hfr.match_method,
        gm.IDBUY,
        gm.IDABT,
        -- Match evaluation against ground truth
        (hfr.reference_detail_column = gm.IDABT) AS is_correct_match,
        -- Confidence buckets
        CASE 
            WHEN hfr.enhanced_confidence >= 1 THEN '1. Very High (≥1.0)'
            WHEN hfr.enhanced_confidence >= 0.9 THEN '2. High (0.9-1.0)'
            WHEN hfr.enhanced_confidence >= 0.8 THEN '3. Medium (0.8-0.9)'
            WHEN hfr.enhanced_confidence >= 0.7 THEN '4. Low (0.7-0.8)'
            WHEN hfr.enhanced_confidence >= 0.6 THEN '5. Very Low (0.6-0.7)'
            ELSE '6. Very Low (<0.6)'
        END as confidence_bucket
    FROM hybrid_final_results hfr
    INNER JOIN unique_ground_truth gm
        ON hfr.input_detail_column = gm.IDBUY
        
    WHERE gm.rn = 1
)

-- Overall performance summary
SELECT
    'HYBRID VECTOR + AI_CLASSIFY - TOTAL' as metric_category,
    COUNT(*) as total_evaluated,
    SUM(CASE WHEN is_correct_match THEN 1 ELSE 0 END) as correct_matches,
    ROUND(SUM(CASE WHEN is_correct_match THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as accuracy_percent,
    ROUND(AVG(enhanced_confidence), 4) as avg_confidence,
    ROUND(AVG(vector_similarity), 4) as avg_vector_similarity,
    ROUND(AVG(CASE WHEN is_correct_match THEN enhanced_confidence ELSE NULL END), 4) as avg_confidence_correct,
    ROUND(AVG(CASE WHEN NOT is_correct_match THEN enhanced_confidence ELSE NULL END), 4) as avg_confidence_incorrect
FROM hybrid_evaluation

UNION ALL

-- Breakdown by confidence buckets
SELECT
    confidence_bucket as metric_category,
    COUNT(*) as total_evaluated,
    SUM(CASE WHEN is_correct_match THEN 1 ELSE 0 END) as correct_matches,
    ROUND(SUM(CASE WHEN is_correct_match THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as accuracy_percent,
    ROUND(AVG(enhanced_confidence), 4) as avg_confidence,
    ROUND(AVG(vector_similarity), 4) as avg_vector_similarity,
    ROUND(AVG(CASE WHEN is_correct_match THEN enhanced_confidence ELSE NULL END), 4) as avg_confidence_correct,
    ROUND(AVG(CASE WHEN NOT is_correct_match THEN enhanced_confidence ELSE NULL END), 4) as avg_confidence_incorrect
FROM hybrid_evaluation
GROUP BY confidence_bucket


ORDER BY metric_category;


## 🔆 Summary and Key Insights

This hybrid entity matching approach successfully combines the speed of vector similarity with the intelligence of AI_CLASSIFY to deliver optimal performance and cost efficiency.

### ℹ️  Key Benefits:

1. **Optimal Performance**: High-confidence matches (≥0.8) are resolved quickly with vector similarity, while medium-confidence cases benefit from AI_CLASSIFY's contextual understanding.

2. **Cost Efficiency**: AI_CLASSIFY is only used when there are multiple viable candidates, reducing unnecessary API calls and costs.

3. **Proven Pattern**: Based on the successful AI_SIMILARITY + AI_CLASSIFY workflow from Snowflake's AI SQL guide, ensuring reliability and best practices.

4. **Comprehensive Evaluation**: Detailed breakdowns show exactly where AI_CLASSIFY adds value and how different match methods perform.

### 📈 Expected Results:

- **High Recall**: Combines vector similarity accuracy with AI_CLASSIFY intelligence
- **Balanced Cost**: AI_CLASSIFY usage only when needed, typically 20-40% of cases
- **Fast Processing**: Most matches resolved quickly with vector similarity
- **Intelligent Fallback**: AI_CLASSIFY handles ambiguous cases effectively

### 🔧 Usage Instructions:

1. **Configuration & Setup**: Run the configuration cell, select your harmonization tables and entity columns, then click the button to validate and automatically set up the environment (combines steps 1 & 2)
2. **Feature Engineering**: Run the vector feature engineering cell to create embeddings
3. **Hybrid Matching**: Run the hybrid matching cell to perform the two-stage matching process
4. **Record Management**: Select your confidence threshold and create matched/unmatched output tables
5. **Evaluation**: Run the evaluation cell to analyze performance and results

### 💡 Key Insights:

- **Match Method Distribution**: Shows how many matches use each method (AI_CLASSIFY, high-confidence vector, medium-confidence vector)
- **AI_CLASSIFY Usage**: Indicates the percentage of cases where AI_CLASSIFY was used
- **Performance Metrics**: Recall, precision, and confidence scores for each approach
- **Cost Analysis**: Understanding of when AI_CLASSIFY adds value vs. when vector similarity is sufficient

This hybrid approach provides the best balance between speed, accuracy, and cost for entity matching tasks, making it ideal for production environments where both performance and cost efficiency are critical.
