In [None]:
# Import python packages
import streamlit as st
import pandas as pd

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()

In [None]:
a = pd.read_csv('Amazon.csv',  encoding='unicode_escape')
a.columns = [c.upper() for c in a.columns]
amazon = session.create_dataframe(a)
amazon.write.mode("overwrite").save_as_table("amazon_items")

g = pd.read_csv('GoogleProducts.csv',  encoding='unicode_escape')
g.columns = [c.upper() for c in g.columns]
google = session.create_dataframe(g)
google.write.mode("overwrite").save_as_table("google_items")

First create a lookup table representing the largest table. We want 2 columns, an ID column and a column with all the information related to the ID/Product

In [None]:
UPDATE amazon_items
SET 
    title = COALESCE(title, 'NA'),
    DESCRIPTION = COALESCE(DESCRIPTION, 'NA'),
    manufacturer = COALESCE(manufacturer, 'NA'),
    Price = COALESCE(Price, 'NA')
WHERE 
    title IS NULL OR DESCRIPTION IS NULL OR manufacturer IS NULL;

create or replace table amazon_lookup as
select ID, 'Name: '||title||', Description: '||DESCRIPTION ||', Manufacturer: '||manufacturer||', Price: '||Price as ITEM
from amazon_items;
     

In [None]:
current_wh = session.get_current_warehouse()
current_db = session.get_current_database()
current_schema = session.get_current_schema()

In [None]:
session.sql(f'''CREATE OR REPLACE CORTEX SEARCH SERVICE PRODUCT_LOOKUP
  ON ITEM
  ATTRIBUTES ID
  WAREHOUSE = {current_wh}
  TARGET_LAG = '1 day'
  EMBEDDING_MODEL = 'snowflake-arctic-embed-l-v2.0'
  AS (
    SELECT
        ID,
        ITEM
    FROM amazon_lookup)''')

In [None]:
CREATE OR REPLACE PROCEDURE batch_cortex_search(
    db_name STRING, 
    schema_name STRING, 
    service_name STRING, 
    queries ARRAY, 
    columns ARRAY, 
    n_jobs INTEGER DEFAULT -1
)
RETURNS VARIANT
LANGUAGE PYTHON
PACKAGES = ('snowflake-snowpark-python==1.9.0', 'joblib==1.4.2', 'backoff==2.2.1')
RUNTIME_VERSION = '3.10'
HANDLER = 'main'
AS
$$
import _snowflake
import json
from joblib import Parallel, delayed
import backoff
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col

# Helper function to call the API with retry logic
@backoff.on_exception(backoff.expo, Exception, max_tries=5, giveup=lambda e: not (isinstance(e, Exception) and hasattr(e, "args") and len(e.args) > 0 and isinstance(e.args[0], dict) and e.args[0].get("status") == 429))
def call_api(db_name, schema_name, service_name, request_body):
    """Calls the Cortex Search REST API with retry logic for rate limiting."""
    resp = _snowflake.send_snow_api_request(
        "POST",
        f"/api/v2/databases/{db_name}/schemas/{schema_name}/cortex-search-services/{service_name}:query",
        {},
        {},
        request_body,
        {},
        30000,
    )
    if resp["status"] == 429:
        raise Exception({"status": resp["status"], "content": resp["content"]})
    return resp

# Function to call the API for a single query
def search(db_name, schema_name, service_name, query, columns):
    """Calls the Cortex Search REST API and returns the response without filters."""
    
    request_body = {
        "query": query,
        "columns": columns,
        "limit": 1, # You can adjust this limit if needed
    }
    try:
        resp = call_api(db_name, schema_name, service_name, request_body)
        if resp["status"] < 400:
            response_content = json.loads(resp["content"])
            results = response_content.get("results", [])
            return {"query": query, "results": results}
        else:
            return {"query": query, "results": f"Failed request with status {resp['status']}: {resp}"}
    except Exception as e:
        return {"query": query, "results": f"API Error: {e}"}

# Function to process queries concurrently using batch size
def concurrent_searches(db_name, schema_name, service_name, queries, columns, n_jobs):
    """Calls the Cortex Search REST API for multiple queries and returns the response without filters."""
    results = Parallel(n_jobs=n_jobs, backend='threading')(
        delayed(search)(db_name, schema_name, service_name, q, columns) for q in queries
    )
    return results

# Main function to handle batching and inserting results
def main(session: Session, db_name, schema_name, service_name, queries, columns, n_jobs):
    if isinstance(queries, list) and len(queries) > 0:
        # Split queries into batches of 500
        batch_size = 500
        total_queries = len(queries)
        batches = [queries[i:i + batch_size] for i in range(0, total_queries, batch_size)]
        
        # Loop through each batch and process
        for batch in batches:
            # Perform concurrent searches for this batch
            batch_results = concurrent_searches(db_name, schema_name, service_name, batch, columns, n_jobs)
            
            # Create a list to hold the results for batch insertion
            insert_values = []

            # Prepare the results for batch insert
            for result in batch_results:
                insert_values.append({
                    "QUERY": result["query"],
                    "RESULTS": json.dumps(result["results"])
                })

            # Insert the results into the BATCH_SEARCH_RESULTS table using Snowpark
            if insert_values:
                # Create a DataFrame with the results
                df = session.create_dataframe(insert_values)

                # Perform the batch insert into BATCH_SEARCH_RESULTS
                df.write.mode("append").save_as_table("BATCH_SEARCH_RESULTS")

        return {"status": "Success", "message": f"{total_queries} queries processed in batches."}
    else:
        raise ValueError("Queries must be an array of query text")
$$;
     

In [None]:
CREATE OR REPLACE TABLE BATCH_SEARCH_RESULTS (
    QUERY STRING,
    RESULTS STRING
);
     

In [None]:

UPDATE google_items
SET 
    name = COALESCE(name, 'NA'),
    DESCRIPTION = COALESCE(DESCRIPTION, 'NA'),
    manufacturer = COALESCE(manufacturer, 'NA'),
    Price = COALESCE(Price, 'NA')
WHERE 
    name IS NULL OR DESCRIPTION IS NULL OR manufacturer IS NULL;
    
create or replace table google_lookup as
select ID, 'Name: '||name||', Description: '||DESCRIPTION ||', Manufacturer: '||manufacturer||', Price: '||Price as ITEM
from google_items;

In [None]:
#  Replace lines 5 and 6 with your DB and Schema

session.sql(f'''
CALL batch_cortex_search(
    'LLMOPS_DB',
    'LLMOPS_SCHEMA',
    'PRODUCT_LOOKUP',
    (SELECT ARRAY_AGG(ITEM) FROM google_lookup),
    ARRAY_CONSTRUCT('ID','ITEM'),
    -1
)
''')

In [None]:
select * from BATCH_SEARCH_RESULTS;

In [None]:
create or replace table google_amazon_items as
select 
query, 
value:ITEM::varchar as matched_item_info, 
value:ID::varchar as amazon_id,
-- value:"@CONFIDENCE_SCORE"::int as score
value:"@scores":cosine_similarity::float as score
from BATCH_SEARCH_RESULTS,
lateral FLATTEN(INPUT => parse_json(results));

create or replace table google_amazon_matches as
select Amazon_ID,
id as google_id,
query as google_desc,
matched_item_info as amazon_desc,
score
from google_amazon_items a
left join
google_lookup g
on a.query = g.item;

select * from google_amazon_matches limit 10;

In [None]:
create or replace table snowflake_matches
as
select amazon_id, 
google_id
from google_amazon_matches
where score >= 0.8;

In [None]:
create or replace table matches_w_claude as
SELECT amazon_id, google_id, amazon_desc, google_desc, score, SNOWFLAKE.CORTEX.COMPLETE(
    'claude-3-7-sonnet',
        CONCAT('You are responsible for identifying if two products are the same product even though there may be some slight differences since they are sold on two different websites.
Given the descriptions of the two products, return a 1 if they are likely the same product, 0 if they are not.
Item 1 Description: ', AMAZON_DESC,
' Item 2 Description: ', GOOGLE_DESC,
'Respond only with a JSON object in the following format: {
  "Match": 1,
  "Reasoning": "Concise explanation here"
}')
) as match from google_amazon_matches
where score < 0.8
limit 100;

In [None]:
select * from matches_w_claude LIMIT 10;

In [None]:
create or replace table snowflake_matches_claude
as
select
amazon_id, 
google_id, 
amazon_desc,
google_desc,
try_parse_json(match):Match::int as match,
try_parse_json(match):Reasoning::varchar as reasoning
from matches_w_claude
where try_parse_json(match):Match::int = 1;

In [None]:

select * from snowflake_matches_claude;