In [None]:
import pandas as pd
import numpy as np

import snowflake.core
from snowflake.snowpark import Session
from snowflake.core import Root
import snowflake.snowpark as snowpark
from snowflake.snowpark.context import get_active_session
from snowflake.cortex import complete

from typing import List
import os
import sys
import json
import time
import requests

#Set up snowflake session vars and env vars
session = get_active_session()
root = Root(session)

#Enable OpenTelemetry Tracing
os.environ["TRULENS_OTEL_TRACING"] = "1"

In [None]:
DB_NAME = "CHUNKING_EVAL_DEMO"
SCHEMA_NAME = "DATA"
STAGE_NAME = "DOCS"
WH_NAME = "CHUNKING_EVAL_WAREHOUSE"

In [None]:
#Access cortex search retriever built in 1st notebook
test_query = "How did the economic outlook change over the course of 2023?"


cortex_search_service = (
    root
    .databases[DB_NAME]
    .schemas[SCHEMA_NAME]
    .cortex_search_services["FOMC_RAW_TEXT_RETRIEVAL"]
)
resp = cortex_search_service.search(
    query=test_query,
    columns=["SEARCH_COL"],
    limit=3,
)

search_results = [(row["SEARCH_COL"]) for row in resp.results] if resp.results else []

search_results

In [None]:
#Access cortex search retriever built in 1st notebook
test_query = "How did the economic outlook change over the course of 2023?"


cortex_search_service = (
    root
    .databases[DB_NAME]
    .schemas[SCHEMA_NAME]
    .cortex_search_services["FOMC_TAGGED_CHUNK_RETRIEVAL"]
)
resp = cortex_search_service.search(
    query=test_query,
    columns=["SEARCH_COL"],
    limit=3,
)

search_results = [(row["SEARCH_COL"]) for row in resp.results] if resp.results else []

search_results

In [None]:
# Create the RAGWithObservability class to structure the RAG pipeline
from snowflake.cortex import complete
from trulens.core.otel.instrument import instrument
from trulens.otel.semconv.trace import SpanAttributes


class RAG():
    def __init__(self, llm_model, cortex_search_service_name):
        self.llm_model = llm_model
        self.cortex_search_service_name = cortex_search_service_name
#Here we're using the @instrument decorator to trace various stages of our RAG applicaiton


#RETRIEVEL FUNCTION
    
    @instrument (
        span_type=SpanAttributes.SpanType.RETRIEVAL, 
        attributes={
            SpanAttributes.RETRIEVAL.QUERY_TEXT: "query",
            SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS: "return",
        })  
    def retrieve_context(self, query: str):
    
        #First call cortex search service on knowledgebase!

        cortex_search_service = (
        root
        .databases[DB_NAME]
        .schemas[SCHEMA_NAME]
        .cortex_search_services[self.cortex_search_service_name])

        
        css_response = cortex_search_service.search(
            query=query,
            columns=["SEARCH_COL"],
            limit=5)
        
        search_results = [(row["SEARCH_COL"]) for row in resp.results] if resp.results else []

        return search_results

#PROMPT AUGMENTATION FUNCTION

    @instrument()
    def augment_prompt(self, query: str, contexts: list) -> str:
     
        prompt = f"""
        You are an expert assistant extracting information from context provided on Federal Open Market Committee (FOMC)
        Meeting notes.
        Answer the question based on the context. Be sure to pay close attention to the dates in the user's query as well
        as in the retrieved contexts.
        Be concise and do not hallucinate.
        If you don't have the information, just say so.
        Context: {' '.join(contexts)}
        Question: {query}
        Answer:
        """
        return prompt

#COMPLETION FUNCTION

    @instrument (span_type=SpanAttributes.SpanType.GENERATION)    
    def generate_completion(self, query: str):
        
        df_response = complete(self.llm_model, query)
        return df_response

#ROOT FUNCTION
    @instrument (
        span_type=SpanAttributes.SpanType.RECORD_ROOT, 
        attributes={
            SpanAttributes.RECORD_ROOT.INPUT: "query",
            SpanAttributes.RECORD_ROOT.OUTPUT: "return",
        })
    def query_app(self, query: str) -> str:
        st.write(query)
        contexts = self.retrieve_context(query)
        prompt = self.augment_prompt(query, contexts)
        final_response = self.generate_completion(prompt)
        st.write(final_response)
        return final_response

In [None]:
import streamlit as st

test_query = "How did interest rates change over the course of 2023-2025?"
test_query = "How did the economic outlook change over the course of 2023?"


# With web search agent disabled
raw_chunk_rag = RAG(llm_model = 'openai-gpt-4.1', cortex_search_service_name='FOMC_RAW_TEXT_RETRIEVAL')
tagged_chunk_rag = RAG(llm_model = 'openai-gpt-4.1', cortex_search_service_name='FOMC_TAGGED_CHUNK_RETRIEVAL')

#Get and print results
st.write("RAW_CHUNK")
llama_response = raw_chunk_rag.query_app(test_query)

st.write("TAGGED_CHUNK")
mistral_response = tagged_chunk_rag.query_app(test_query)


In [None]:
# from trulens.core import TruSession
from trulens.apps.app import TruApp
from trulens.connectors.snowflake import SnowflakeConnector

tru_snowflake_connector = SnowflakeConnector(snowpark_session=session)

app_name = "FOMC_RAG_CHUNKING_EVAL"
version_num = 'v0'

tru_raw_chunk_rag = TruApp(
    raw_chunk_rag,
    app_name=app_name,
    app_version=f"OAI_RAW_CHUNKS_{version_num}",
    connector=tru_snowflake_connector
)

tru_tagged_chunk_rag = TruApp(
    tagged_chunk_rag,
    app_name=app_name,
    app_version=f"OAI_TAGGED_CHUNKS_{version_num}",
    connector=tru_snowflake_connector
)

In [None]:
import pandas as pd

prompts = [
    "What did the Fed decide about interest rates in June 2024?",
    "What is the Fed's balance sheet policy?",
    "Who is the Senior Vice President at the Federal Reserve Bank of Dallas?",
    "What factors often contribute to future rate decreases?",
    "Why did the FOMC keep rates unchanged despite strong economic growth in March 2023?",
    "What is the largest increase in interest rates seen in 2023-2024?",
    "What should Snowflake's target stock price be in 2025?",
    "How often does the Fed meet?",
    "Are interest rates just impactful for mortgages?",
    "How has Asian economic growth impacted the US economy?"
    
    
]

batch_data = pd.DataFrame({'QUERY': prompts})
batch_data

In [None]:
from trulens.core.run import Run
from trulens.core.run import RunConfig

run_version = version_num

raw_chunk_run_config = RunConfig(
    run_name=f"raw_chunk_run_{run_version}",
    description="questions about snowflake AI cababilities",
    dataset_name="SNOW_RAG_DF1",
    source_type="DATAFRAME",
    label="LOCAL",
    llm_judge_name = "llama3.1-70b",
    dataset_spec={
        "RECORD_ROOT.INPUT": "QUERY",
    },
)



tagged_chunk_run_config = RunConfig(
    run_name=f"tagged_chunk_run_{run_version}",
    description="questions about snowflake AI cababilities",
    dataset_name="SNOW_RAG_DF1",
    source_type="DATAFRAME",
    label="LOCAL",
    dataset_spec={
        "RECORD_ROOT.INPUT": "QUERY",
    },
    
)

In [None]:
raw_chunk_run = tru_raw_chunk_rag.add_run(run_config=raw_chunk_run_config)
tagged_chunk_run = tru_tagged_chunk_rag.add_run(run_config=tagged_chunk_run_config)


In [None]:
raw_chunk_run.start(input_df=batch_data)
print("Finished raw chunk run")

In [None]:
tagged_chunk_run.start(input_df=batch_data)
print("Finished tagged_chunk run")

In [None]:
run_list = [raw_chunk_run, tagged_chunk_run]

for i in run_list:
    print(f"{i.run_name} Run Status: {i.get_status()}")

In [None]:
#The following code kicks off LLM-as-a-Judge evals for several metrics

for i in run_list:
    while i.get_status() == "INVOCATION_IN_PROGRESS":
        time.sleep(3)
    if i.get_status() == "INVOCATION_COMPLETED":
        i.compute_metrics(["coherence",
                           "answer_relevance",
                           "context_relevance",
                           "groundedness"])
        print(f"Kicked off Metrics Computation for Run {i.run_name}")
    if i.get_status() in ["FAILED", "UNKNOWN"]:
        print("Not able to compute metrics! Run status:", i.get_status())


In [None]:
import streamlit as st

org_name = session.sql('SELECT CURRENT_ORGANIZATION_NAME()').collect()[0][0]
account_name = session.sql('SELECT CURRENT_ACCOUNT_NAME()').collect()[0][0]
db_name = session.sql('SELECT CURRENT_DATABASE()').collect()[0][0]
schema_name = session.sql('SELECT CURRENT_SCHEMA()').collect()[0][0]

st.write(f'https://app.snowflake.com/{org_name}/{account_name}/#/ai-evaluations/databases/{db_name}/schemas/{schema_name}/applications/{app_name.upper()}')

# ARCHIVE BELOW

In [None]:
## Optional Cleanup
# for i in run_list:
#     i.delete()