In [None]:
import pandas as pd
import numpy as np

import snowflake.core
from snowflake.snowpark import Session
from snowflake.core import Root
import snowflake.snowpark as snowpark
from snowflake.snowpark.context import get_active_session
from snowflake.cortex import complete

from typing import List
import os
import sys
import json
import time
import requests
from bs4 import BeautifulSoup

#Set up snowflake session vars and env vars
session = get_active_session()
root = Root(session)

os.environ["TRULENS_OTEL_TRACING"] = "1"

In [None]:
DB_NAME = "SUMMIT_25_AI_OBS_DEMO"
SCHEMA_NAME = "DATA"
STAGE_NAME = "DOCS"
WH_NAME = "COMPUTE_WH"

In [None]:
#Access cortex search retriever built in 1st notebook
test_query = "What is the performance of Cortex Search?"


cortex_search_service = (
    root
    .databases[DB_NAME]
    .schemas[SCHEMA_NAME]
    .cortex_search_services["SNOWFLAKE_BLOG_RETRIEVAL"]
)
resp = cortex_search_service.search(
    query=test_query,
    columns=["SEARCH_COL"],
    limit=10,
    experimental={"returnConfidenceScores": True}
)

search_results = [row["SEARCH_COL"] for row in resp.results] if resp.results else []

search_results

In [None]:
def search_snow_docs(query):
    try:
        #Define URL and get links
        url = f"https://docs.snowflake.com/search?q={query}"
        response = requests.get(url)
        response.raise_for_status()  # Raise an HTTPError for bad responses (4xx or 5xx)
        
        #set up bs4 and get all links from search result page
        link_soup = BeautifulSoup(response.text, 'html.parser')
        links = [a.get('href') for a in link_soup.find_all('a', href=True) if a.get('href').startswith("https://")]

        #Remove extra links that are on search results page but not relevant to results
        try:
            links.remove('https://docs.snowflake.com')
            links.remove('https://status.snowflake.com')
            links.remove('https://other-docs.snowflake.com/en/opencatalog/overview')
        except:
            pass


        # links
        try:
            #Get content from first web page in list
            web_page_soup = BeautifulSoup(requests.get(links[0]).text, 'html.parser')
            
            # Extract the title
            title = web_page_soup.title.string if web_page_soup.title else "No Title Found"
            
            # Initialize the markdown output
            markdown_output = f"# {title}\n\n"
            # Find all headers and paragraphs together
            elements = web_page_soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'pre', 'code', 'table'])

            # Iterate through the elements, preserving the order
            for element in elements:
                if element.name.startswith('h'):  # If it's a header (h1, h2, etc.)
                    markdown_output += f"## {element.get_text()}\n\n"
                else:  # If it's a paragraph
                    markdown_output += f"{element.get_text()}\n\n"
    
            return markdown_output
        except: 
               return "No web page content found!"
    except requests.exceptions.RequestException as e:
        print(f"Error fetching URL: {e}")
        return []

In [None]:
def context_retrieval(user_query, confidence_score_threshold):

    #First call cortex search service on knowledgebase!
    
    css_response = cortex_search_service.search(
        query=user_query,
        columns=["SEARCH_COL"],
        limit=10,
        experimental={"returnConfidenceScores": True})
    
    filtered_results = list(filter(lambda x: int(x['@CONFIDENCE_SCORE']) >=confidence_score_threshold, 
                                   css_response.results))
    context_chunks = list(map(lambda x: x['SEARCH_COL'], filtered_results))

    #If no results from knowledgebase, do a websearch on snowflake docs and truncate results to 10000 chars
    if len(context_chunks)==0:
        print("No results found in knowledgebase! Performing search on docs.snowflake.com...")
        #Call function to do search on snowflake docs (
        context_chunks.append(search_snow_docs(user_query)[0:10000])
    else:
        print(f"Found {len(context_chunks)} relevant context chunks in the knowledgebase!")
    return context_chunks

In [None]:
test_chunks = context_retrieval(user_query = "Are there any Cortex Search customers?", 
                  confidence_score_threshold=3)

test_chunks

In [None]:
q = "Who are some cortex analyst customers"

css_response = cortex_search_service.search(
    query=q,
    columns=["SEARCH_COL"],
    limit=10,
    experimental={"returnConfidenceScores": True})

filtered_results = list(filter
                        (lambda x: json.loads(x['@DEBUG_PER_RESULT'])['ConfidenceScoreUnrounded'] >=1.8, 
                               css_response.results))

filtered_results

In [None]:
# Create the RAGWithObservability class to structure the RAG pipeline
from snowflake.cortex import complete
from trulens.core.otel.instrument import instrument
from trulens.otel.semconv.trace import SpanAttributes


class RAGWithObservability():
    def __init__(self, llm_model):
        self.llm_model = llm_model
        # self.retriever = retriever

#Here we're using the @instrument decorator to trace various stages of our RAG applicaiton

#WEB SEARCH FUNCTION
    @instrument()
    def search_snow_docs(self, query):
        try:
            #Define URL and get links
            url = f"https://docs.snowflake.com/search?q={query}"
            response = requests.get(url)
            response.raise_for_status()  # Raise an HTTPError for bad responses (4xx or 5xx)
            
            #set up bs4 and get all links from search result page
            link_soup = BeautifulSoup(response.text, 'html.parser')
            links = [a.get('href') for a in link_soup.find_all('a', href=True) if a.get('href').startswith("https://")]
    
            #Remove extra links that are on search results page but not relevant to results
            try:
                links.remove('https://docs.snowflake.com')
                links.remove('https://status.snowflake.com')
                links.remove('https://other-docs.snowflake.com/en/opencatalog/overview')
            except:
                pass
    
    
            # links
            try:
                #Get content from first web page in list
                web_page_soup = BeautifulSoup(requests.get(links[0]).text, 'html.parser')
                
                # Extract the title
                title = web_page_soup.title.string if web_page_soup.title else "No Title Found"
                
                # Initialize the markdown output
                markdown_output = f"# {title}\n\n"
                # Find all headers and paragraphs together
                elements = web_page_soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'pre', 'code', 'table'])
    
                # Iterate through the elements, preserving the order
                for element in elements:
                    if element.name.startswith('h'):  # If it's a header (h1, h2, etc.)
                        markdown_output += f"## {element.get_text()}\n\n"
                    else:  # If it's a paragraph
                        markdown_output += f"{element.get_text()}\n\n"
        
                return markdown_output
            except: 
                   return "No web page content found!"
        except requests.exceptions.RequestException as e:
            print(f"Error fetching URL: {e}")
            return []

#RETRIEVEL FUNCTION
    
    @instrument (
        span_type=SpanAttributes.SpanType.RETRIEVAL, 
        attributes={
            SpanAttributes.RETRIEVAL.QUERY_TEXT: "query",
            SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS: "return",
        })  
    def retrieve_context(self, query: str, confidence_score_threshold=2):
    
        #First call cortex search service on knowledgebase!
        
        css_response = cortex_search_service.search(
            query=query,
            columns=["SEARCH_COL"],
            limit=10,
            experimental={"returnConfidenceScores": True})
        
        filtered_results = list(filter(lambda x: int(x['@CONFIDENCE_SCORE']) >=confidence_score_threshold, 
                                       css_response.results))
        context_chunks = list(map(lambda x: x['SEARCH_COL'], filtered_results))
    
        #If no results from knowledgebase, do a websearch on snowflake docs and truncate results to 10000 chars
        if len(context_chunks)==0:
            print("No results found in knowledgebase! Performing search on docs.snowflake.com...")
            #Call function to do search on snowflake docs (
            context_chunks.append(self.search_snow_docs(query)[0:10000])
        else:
            print(f"Found {len(context_chunks)} relevant context chunks in the knowledgebase!")
        return context_chunks

#PROMPT AUGMENTATION FUNCTION

    @instrument()
    def augment_prompt(self, query: str, contexts: list) -> str:
     
        prompt = f"""
        You are an expert assistant extracting information from context provided.
        Answer the question based on the context. Be concise and do not hallucinate.
        If you don't have the information, just say so.
        Context: {' '.join(contexts)}
        Question: {query}
        Answer:
        """
        return prompt

#COMPLETION FUNCTION

    @instrument (span_type=SpanAttributes.SpanType.GENERATION)    
    def generate_completion(self, query: str):
        
        df_response = complete(self.llm_model, query)
        return df_response

#ROOT FUNCTION
    @instrument (
        span_type=SpanAttributes.SpanType.RECORD_ROOT, 
        attributes={
            SpanAttributes.RECORD_ROOT.INPUT: "query",
            SpanAttributes.RECORD_ROOT.OUTPUT: "return",
        })
    def query_app(self, query: str) -> str:
        contexts = self.retrieve_context(query)
        prompt = self.augment_prompt(query, contexts)
        final_response = self.generate_completion(prompt)
        return final_response

In [None]:
import streamlit as st

test_query = "Is cortex analyst public preview or GA?"

#Define LLM classes
llama_rag = RAGWithObservability('llama4-maverick')
mistral7b_rag = RAGWithObservability('mistral-7b')
deepseek_rag = RAGWithObservability('deepseek-r1')

#print Query
print(f"Query: {test_query}")

#Get and print responses
llama_response = llama_rag.query_app(test_query)
st.write(f"**Llama response** -  {llama_response} \n")

mistral_response = mistral7b_rag.query_app(test_query)
st.write(f"**Mistral-7b response** - {mistral_response} \n")

deepseek_response = deepseek_rag.query_app(test_query)
st.write(f"**Deepseek response** -  {deepseek_response} \n")

In [None]:
# from trulens.core import TruSession
from trulens.apps.app import TruApp
from trulens.connectors.snowflake import SnowflakeConnector

tru_snowflake_connector = SnowflakeConnector(snowpark_session=session)

app_name = "DEV_SUMMIT_AI_OPS"
version_num = 'v0'

tru_rag_mistral = TruApp(
    mistral7b_rag,
    app_name=app_name,
    app_version=f"MISTRAL_{version_num}",
    connector=tru_snowflake_connector
)

tru_rag_llama = TruApp(
    llama_rag,
    app_name=app_name,
    app_version=f"LLAMA_{version_num}",
    connector=tru_snowflake_connector
)

In [None]:
import pandas as pd

prompts = [
    "How does Cortex Search work?",
    "What components of Cortex Analyst are in Preview vs GA?",
    "Can I have a multiturn conversation with Cortex?",
    "What are some best practices to consider using Custom Instructions in Cortex Analayst?",
    "How does Markaasz benefit from Cortex Search?",
    "Who uses DocAI?",
    "Can you help me purchase a new refridgerator?",
    "Who is the product manager"
]

batch_data = pd.DataFrame({'QUERY': prompts})
batch_data

In [None]:
from trulens.core.run import Run
from trulens.core.run import RunConfig

mistral_run_config = RunConfig(
    run_name=f"mistral_exp_{version_num}",
    description="questions about snowflake AI cababilities",
    dataset_name="SNOW_RAG_DF1",
    source_type="DATAFRAME",
    label="MISTRAL",
    llm_judge_name = "llama3.1-70b",
    dataset_spec={
        "RECORD_ROOT.INPUT": "QUERY",
    },
)



llama_run_config = RunConfig(
    run_name=f"llama_exp_{version_num}",
    description="questions about snowflake AI cababilities",
    dataset_name="SNOW_RAG_DF1",
    source_type="DATAFRAME",
    label="LLAMA",
    dataset_spec={
        "RECORD_ROOT.INPUT": "QUERY",
    },
    
)

In [None]:
mistral_run = tru_rag_mistral.add_run(run_config=mistral_run_config)

llama_run = tru_rag_llama.add_run(run_config=llama_run_config)

In [None]:
mistral_run.start(input_df=batch_data)
print("Finished mistral run")

In [None]:
llama_run.start(input_df=batch_data)
print("Finished Llama run")

In [None]:
print(f"Mistral: {mistral_run.get_status()}")
print(f"Llama: {llama_run.get_status()}")

In [None]:
#The following code kicks off LLM-as-a-Judge evals for several metrics

mistral_run.compute_metrics([
    "coherence",
    "answer_relevance",
    "context_relevance",
    "groundedness",
])

In [None]:
#The following code kicks off LLM-as-a-Judge evals for several metrics

llama_run.compute_metrics([
    "coherence",
    "answer_relevance",
    "context_relevance",
    "groundedness",
])

In [None]:
print(f"Mistral: {mistral_run.get_status()}")
print(f"Llama: {llama_run.get_status()}")

In [None]:
import streamlit as st

org_name = session.sql('SELECT CURRENT_ORGANIZATION_NAME()').collect()[0][0]
account_name = session.sql('SELECT CURRENT_ACCOUNT_NAME()').collect()[0][0]
db_name = session.sql('SELECT CURRENT_DATABASE()').collect()[0][0]
schema_name = session.sql('SELECT CURRENT_SCHEMA()').collect()[0][0]

st.write(f'https://app.snowflake.com/{org_name}/{account_name}/#/ai-evaluations/databases/{db_name}/schemas/{schema_name}/applications/{app_name.upper()}')

# ARCHIVE BELOW

In [None]:
t1 = time.time()

t2 = time.time()

f_time = 1000*(t2 - t1)
print(f"Execution time: {f_time:.2f} milliseconds")

In [None]:
#Filter out resuls with confidence score below a set confidence_score_threshold
confidence_score_threshold = 3
filtered_results = list(filter(lambda x: int(x['@CONFIDENCE_SCORE']) >=confidence_score_threshold, resp.results))
context_chunks = list(map(lambda x: x['SEARCH_COL'], filtered_results))

context_chunks

# Below code is 50x slower (1 ms instead of 0.02 but a little cleaner)

#context_chunks_list= [d['SEARCH_COL'] for d in resp.results if int(d['@CONFIDENCE_SCORE']) >= 2]