In [None]:
from dotenv import load_dotenv
from snowflake.snowpark.session import Session
import os

load_dotenv()

connection_params = {
  "account": os.getenv("SNOWFLAKE_ACCOUNT"),
  "user": os.getenv("SNOWFLAKE_USER"),
  "password": "Passwordforsnowflake1",
  "role": os.getenv("SNOWFLAKE_ROLE"),
  "database": os.getenv("SNOWFLAKE_DATABASE"),
  "schema": os.getenv("SNOWFLAKE_SCHEMA"),
  "warehouse": os.getenv("SNOWFLAKE_WAREHOUSE")
}

snowpark_session = Session.builder.configs(connection_params).create()

In [2]:
snowpark_session

<snowflake.snowpark.session.Session at 0x2036bad34f0>

In [4]:
from snowflake.cortex import Complete

print(Complete("mistral-large", "how do snowflakes get their unique patterns?"))

 Snowflakes get their unique patterns through a complex process that involves both physics and chemistry. It all starts with a tiny particle in the atmosphere, like a dust or pollen grain, which serves as a nucleus for the snowflake to form around.

As this particle cools, water vapor in the air begins to condense and freeze onto it, forming an ice crystal. The shape of this initial crystal is determined by the temperature and humidity conditions in the atmosphere at that time.

As the ice crystal falls through the sky, it passes through different layers of air with varying temperatures and humidity levels. Each layer causes the ice crystal to grow and change in a unique way. The six-sided structure of snowflakes is due to the molecular structure of water, which forms hexagonal shapes as it freezes.

The intricate patterns and branches of a snowflake are a result of the way water molecules arrange themselves as they freeze onto the existing crystal structure. The specific pattern depen

In [None]:
from llama_index.readers.github import GithubRepositoryReader, GithubClient
import os
import re
import nest_asyncio

nest_asyncio.apply()

github_token = os.getenv("GITHUB_TOKEN")
client = GithubClient(github_token=github_token, verbose=False)

reader = GithubRepositoryReader(
    github_client=client,
    owner="streamlit",
    repo="docs",
    use_parser=False,
    verbose=True,
    filter_directories=(
        ["content"],
        GithubRepositoryReader.FilterType.INCLUDE,
    ),
    filter_file_extensions=(
        [".md"],
        GithubRepositoryReader.FilterType.INCLUDE,
    ),
)

documents = reader.load_data(branch="main")


def clean_up_text(content: str) -> str:
    """
    Remove unwanted characters and patterns in text input.

    :param content: Text input.

    :return: Cleaned version of original text input.
    """

    # Fix hyphenated words broken by newline
    content = re.sub(r"(\w+)-\n(\w+)", r"\1\2", content)

    unwanted_patterns = ["---\nvisible: false", "---", "#", "slug:"]
    for pattern in unwanted_patterns:
        content = re.sub(pattern, "", content)

    # Remove all slugs starting with a \ and stopping at the first space
    content = re.sub(r"\\slug: [^\s]*", "", content)

    # normalize whitespace
    content = re.sub(r"\s+", " ", content)
    return content


cleaned_documents = []

for d in documents:
    cleaned_text = clean_up_text(d.text)
    # Create a new instance of the document with the cleaned text
    cleaned_document = d.copy(update={"text": cleaned_text})
    cleaned_documents.append(cleaned_document)

In [6]:
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.node_parser import SemanticSplitterNodeParser

embed_model = HuggingFaceEmbedding("Snowflake/snowflake-arctic-embed-m")

splitter = SemanticSplitterNodeParser(
    buffer_size=1, breakpoint_percentile_threshold=85, embed_model=embed_model
)

  from .autonotebook import tqdm as notebook_tqdm


In [7]:
from llama_index.core.ingestion import IngestionPipeline

cortex_search_pipeline = IngestionPipeline(
    transformations=[
        splitter,
    ],
)

results = cortex_search_pipeline.run(show_progress=True, documents=cleaned_documents)

Parsing nodes:   0%|          | 0/333 [00:00<?, ?it/s]

Generating embeddings: 100%|██████████| 7/7 [00:01<00:00,  3.80it/s]
Generating embeddings: 100%|██████████| 8/8 [00:01<00:00,  6.86it/s]
Generating embeddings: 100%|██████████| 21/21 [00:03<00:00,  6.34it/s]
Generating embeddings: 100%|██████████| 16/16 [00:03<00:00,  4.91it/s]
Generating embeddings: 100%|██████████| 42/42 [00:09<00:00,  4.54it/s]
Generating embeddings: 100%|██████████| 48/48 [00:08<00:00,  5.63it/s]
Generating embeddings: 100%|██████████| 27/27 [00:05<00:00,  4.82it/s]
Generating embeddings: 100%|██████████| 16/16 [00:03<00:00,  4.50it/s]
Generating embeddings: 100%|██████████| 18/18 [00:04<00:00,  4.02it/s]
Generating embeddings: 100%|██████████| 53/53 [00:16<00:00,  3.29it/s]
Generating embeddings: 100%|██████████| 46/46 [00:04<00:00,  9.40it/s]
Generating embeddings: 100%|██████████| 47/47 [00:05<00:00,  8.89it/s]
Generating embeddings: 100%|██████████| 37/37 [00:05<00:00,  6.83it/s]
Generating embeddings: 100%|██████████| 34/34 [00:03<00:00,  9.20it/s]
Generating

In [8]:
import snowflake.connector
from tqdm.auto import tqdm

snowflake_connector = snowflake.connector.connect(**connection_params)

cursor = snowflake_connector.cursor()

cursor.execute("CREATE OR REPLACE TABLE streamlit_docs(doc_text VARCHAR)")
for curr in tqdm(results):
    cursor.execute("INSERT INTO streamlit_docs VALUES (%s)", curr.text)

100%|██████████| 1698/1698 [16:56<00:00,  1.67it/s]


In [12]:
from snowflake.core import Root
from typing import List


class CortexSearchRetriever:

    def __init__(self, snowpark_session: Session, limit_to_retrieve: int = 4):
        self._snowpark_session = snowpark_session
        self._limit_to_retrieve = limit_to_retrieve

    def retrieve(self, query: str) -> List[str]:
        root = Root(self._snowpark_session)
        cortex_search_service = (
            root.databases[os.getenv("SNOWFLAKE_DATABASE")]
            .schemas[os.getenv("SNOWFLAKE_SCHEMA")]
            .cortex_search_services[os.getenv("SNOWFLAKE_CORTEX_SEARCH_SERVICE")]
        )
        resp = cortex_search_service.search(
            query=query,
            columns=["doc_text"],
            limit=self._limit_to_retrieve,
        )

        if resp.results:
            return [curr["doc_text"] for curr in resp.results]
        else:
            return []

In [13]:
retriever = CortexSearchRetriever(snowpark_session=snowpark_session, limit_to_retrieve=4)

retrieved_context = retriever.retrieve(query="How do I launch a streamlit app?")

In [14]:
retrieved_context

['<Note>\n\nWhen passing your script some custom arguments, they must be passed after two dashes. Otherwise the\narguments get interpreted as arguments to Streamlit itself.\n\n</Note>\n\nAnother way of running Streamlit is to run it as a Python module. This can be\nuseful when configuring an IDE like PyCharm to work with Streamlit:\n\n```bash\n# Running\npython -m streamlit run your_script.py\n\n# is equivalent to:\nstreamlit run your_script.py\n```\n\n<Tip>\n\nYou can also pass a URL to `streamlit run`! This is great when combined with\nGitHub Gists. For example:\n\n```bash\nstreamlit run https://raw.githubusercontent.com/streamlit/demo-uber-nyc-pickups/master/streamlit_app.py\n```\n\n</Tip>\n\n## Development flow\n\nEvery time you want to update your app, save the source file. When you do\nthat, Streamlit detects if there is a change and asks you whether you want to\nrerun your app. Choose "Always rerun" at the top-right of your screen to\nautomatically update your app every time you

In [15]:
from trulens.core import TruSession
from trulens.connectors.snowflake import SnowflakeConnector

tru_snowflake_connector = SnowflakeConnector(snowpark_session=snowpark_session)

tru_session = TruSession(connector=tru_snowflake_connector)

Running the TruLens dashboard requires providing a `password` to the `SnowflakeConnector`.


🦑 Initialized with db url snowflake://JUNWEI:***@uxccqxi-hijw/LLMOPS_DB/LLMOPS_SCHEMA?role=ACCOUNTADMIN&warehouse=LLMOPS_WH_M .
🛑 Secret keys may be written to the database. See the `database_redact_keys` option of `TruSession` to prevent this.


Updating app_name and app_version in apps table: 0it [00:00, ?it/s]
Updating app_id in records table: 0it [00:00, ?it/s]
Updating app_json in apps table: 0it [00:00, ?it/s]


Error setting TruLens workspace version tag: 000002 (0A000): Unsupported feature 'TAG'., check if you have enterprise version of Snowflake.


In [16]:
from trulens.apps.custom import instrument
from snowflake.cortex import Complete


class RAG_from_scratch:

    def __init__(self):
        self.retriever = CortexSearchRetriever(snowpark_session=snowpark_session, limit_to_retrieve=4)

    @instrument
    def retrieve_context(self, query: str) -> list:
        """
        Retrieve relevant text from vector store.
        """
        return self.retriever.retrieve(query)

    @instrument
    def generate_completion(self, query: str, context_str: list) -> str:
        """
        Generate answer from context.
        """
        prompt = f"""
          You are an expert assistant extracting information from context provided.
          Answer the question based on the context. Be concise and do not hallucinate.
          If you don´t have the information just say so.
          Context: {context_str}
          Question:
          {query}
          Answer:
        """
        return Complete("mistral-large", prompt)

    @instrument
    def query(self, query: str) -> str:
        context_str = self.retrieve_context(query)
        return self.generate_completion(query, context_str)


rag = RAG_from_scratch()

decorating <function RAG_from_scratch.retrieve_context at 0x0000020389184B80>
decorating <function RAG_from_scratch.generate_completion at 0x0000020389184C10>
decorating <function RAG_from_scratch.query at 0x0000020389184790>
adding method <class '__main__.RAG_from_scratch'> retrieve_context __main__
adding method <class '__main__.RAG_from_scratch'> generate_completion __main__
adding method <class '__main__.RAG_from_scratch'> query __main__


In [17]:
rag.query("How do I launch a streamlit app?")

' You can launch a Streamlit app by running it as a Python module with the command `python -m streamlit run your_script.py` or directly with `streamlit run your_script.py`. You can also pass a URL to `streamlit run` to launch an app from a remote location, such as a GitHub Gist.'

# Feedback functions

In [18]:
from trulens.providers.cortex.provider import Cortex
from trulens.core import Feedback
from trulens.core import Select
import numpy as np

provider = Cortex(snowpark_session, "llama3.1-8b")

f_groundedness = (
    Feedback(provider.groundedness_measure_with_cot_reasons, name="Groundedness")
    .on(Select.RecordCalls.retrieve_context.rets[:].collect())
    .on_output()
)

f_context_relevance = (
    Feedback(provider.context_relevance, name="Context Relevance")
    .on_input()
    .on(Select.RecordCalls.retrieve_context.rets[:])
    .aggregate(np.mean)
)

f_answer_relevance = (
    Feedback(provider.relevance, name="Answer Relevance")
    .on_input()
    .on_output()
    .aggregate(np.mean)
)

✅ In Groundedness, input source will be set to __record__.app.retrieve_context.rets[:].collect() .
✅ In Groundedness, input statement will be set to __record__.main_output or `Select.RecordOutput` .
✅ In Context Relevance, input question will be set to __record__.main_input or `Select.RecordInput` .
✅ In Context Relevance, input context will be set to __record__.app.retrieve_context.rets[:] .
✅ In Answer Relevance, input prompt will be set to __record__.main_input or `Select.RecordInput` .
✅ In Answer Relevance, input response will be set to __record__.main_output or `Select.RecordOutput` .


In [23]:
from trulens.apps.custom import TruCustomApp

tru_rag = TruCustomApp(
    rag,
    app_name="RAG",
    app_version="simple",
    feedbacks=[f_groundedness, f_answer_relevance, f_context_relevance],
)

instrumenting <class '__main__.RAG_from_scratch'> for base <class '__main__.RAG_from_scratch'>
	instrumenting retrieve_context
	instrumenting generate_completion
	instrumenting query
skipping base <class 'object'> because of class
skipping base <class '__main__.CortexSearchRetriever'> because of class
skipping base <class 'object'> because of class


In [24]:
prompts = [
    "How do I launch a streamlit app?",
    "How can I capture the state of my session in streamlit?",
    # "How do I install streamlit?",
    # "How do I change the background color of a streamlit app?",
    # "What's the advantage of using a streamlit form?",
    # "What are some ways I should use checkboxes?",
    # "How can I conserve space and hide away content?",
    # "Can you recommend some resources for learning Streamlit?",
    # "What are some common use cases for Streamlit?",
    # "How can I deploy a streamlit app on the cloud?",
    # "How do I add a logo to streamlit?",
    # "What is the best way to deploy a Streamlit app?",
    # "How should I use a streamlit toggle?",
    # "How do I add new pages to my streamlit app?",
    # "How do I write a dataframe to display in my dashboard?",
    # "Can I plot a map in streamlit? If so, how?",
    # "How do vector stores enable efficient similarity search?",
]

In [25]:
with tru_rag as recording:
    for prompt in prompts:
        rag.query(prompt)

tru_session.get_leaderboard()

calling <function RAG_from_scratch.query at 0x0000020389184790> with (<__main__.RAG_from_scratch object at 0x000002037EB92560>, 'How do I launch a streamlit app?')
calling <function RAG_from_scratch.retrieve_context at 0x0000020389184B80> with (<__main__.RAG_from_scratch object at 0x000002037EB92560>, 'How do I launch a streamlit app?')
calling <function RAG_from_scratch.generate_completion at 0x0000020389184C10> with (<__main__.RAG_from_scratch object at 0x000002037EB92560>, 'How do I launch a streamlit app?', ['<Note>\n\nWhen passing your script some custom arguments, they must be passed after two dashes. Otherwise the\narguments get interpreted as arguments to Streamlit itself.\n\n</Note>\n\nAnother way of running Streamlit is to run it as a Python module. This can be\nuseful when configuring an IDE like PyCharm to work with Streamlit:\n\n```bash\n# Running\npython -m streamlit run your_script.py\n\n# is equivalent to:\nstreamlit run your_script.py\n```\n\n<Tip>\n\nYou can also pass

Unnamed: 0_level_0,Unnamed: 1_level_0,Answer Relevance,Context Relevance,Groundedness,latency,total_cost
app_name,app_version,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
RAG,simple,1.0,0.666667,1.0,4.651037,0.684355


# Guardrails

In [26]:
from trulens.core.guardrails.base import context_filter

# note: feedback function used for guardrail must only return a score, not also reasons
f_context_relevance_score = Feedback(
    provider.context_relevance, name="Context Relevance"
)


class filtered_RAG_from_scratch(RAG_from_scratch):

    @instrument
    @context_filter(f_context_relevance_score, 0.75, keyword_for_prompt="query")
    def retrieve_context(self, query: str) -> list:
        """
        Retrieve relevant text from vector store.
        """
        return self.retriever.retrieve(query)


filtered_rag = filtered_RAG_from_scratch()

decorating <function context_filter.__call__.<locals>.wrapper at 0x000002038CF6DF30>
adding method <class '__main__.filtered_RAG_from_scratch'> retrieve_context __main__


In [27]:
from trulens.apps.custom import TruCustomApp

tru_filtered_rag = TruCustomApp(
    filtered_rag,
    app_name="RAG",
    app_version="filtered",
    feedbacks=[f_groundedness, f_answer_relevance, f_context_relevance],
)

instrumenting <class '__main__.filtered_RAG_from_scratch'> for base <class '__main__.filtered_RAG_from_scratch'>
	instrumenting retrieve_context
	instrumenting generate_completion
	instrumenting query
instrumenting <class '__main__.filtered_RAG_from_scratch'> for base <class '__main__.RAG_from_scratch'>
	instrumenting retrieve_context
	instrumenting generate_completion
	instrumenting query
skipping base <class 'object'> because of class
skipping base <class '__main__.CortexSearchRetriever'> because of class
skipping base <class 'object'> because of class


In [29]:
with tru_filtered_rag as recording:
    for prompt in prompts:
        filtered_rag.query(prompt)

tru_session.get_leaderboard()

calling <function RAG_from_scratch.query at 0x0000020389184790> with (<__main__.filtered_RAG_from_scratch object at 0x000002038CEA1300>, 'How do I launch a streamlit app?')
calling <function context_filter.__call__.<locals>.wrapper at 0x000002038CF6DF30> with (<__main__.filtered_RAG_from_scratch object at 0x000002038CEA1300>, 'How do I launch a streamlit app?')
calling <function RAG_from_scratch.generate_completion at 0x0000020389184C10> with (<__main__.filtered_RAG_from_scratch object at 0x000002038CEA1300>, 'How do I launch a streamlit app?', [])
calling <function RAG_from_scratch.query at 0x0000020389184790> with (<__main__.filtered_RAG_from_scratch object at 0x000002038CEA1300>, 'How can I capture the state of my session in streamlit?')
calling <function context_filter.__call__.<locals>.wrapper at 0x000002038CF6DF30> with (<__main__.filtered_RAG_from_scratch object at 0x000002038CEA1300>, 'How can I capture the state of my session in streamlit?')




calling <function RAG_from_scratch.generate_completion at 0x0000020389184C10> with (<__main__.filtered_RAG_from_scratch object at 0x000002038CEA1300>, 'How can I capture the state of my session in streamlit?', ['For each browser tab that connects to the Streamlit server, a new session is created. Streamlit reruns your script from top to bottom every time you interact with your app. Each reruns takes place in a blank slate: no variables are shared between runs.\n\nSession State is a way to share variables between reruns, for each user session. In addition to the ability to store and persist state, Streamlit also exposes the ability to manipulate state using Callbacks. Session state also persists across pages inside a [multipage app](/develop/concepts/multipage-apps).\n\n', 'Streamlit makes this easy through the use of [Session State](/develop/concepts/architecture/session-state). If a `key` parameter is set, Streamlit will store any changes made to the dataframe in Session State.\n\nThi

Unnamed: 0_level_0,Unnamed: 1_level_0,Answer Relevance,Context Relevance,Groundedness,latency,total_cost
app_name,app_version,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
RAG,filtered,1.0,1.0,,5.295568,0.585047
RAG,simple,1.0,0.75,1.0,4.651037,0.684355
