## Evaluation Pipeline

In [None]:
import os
import numpy as np
import pandas as pd
import streamlit as st

from trulens.core import TruSession, Feedback, Select
from trulens.apps.custom import TruCustomApp, instrument
from trulens.providers.cortex import Cortex

from snowflake.core import Root
from snowflake.cortex import Complete

from utility.data_models import Video

snow_conn = st.connection("snowflake")
snow_session = snow_conn.session()
tru_session = TruSession()
tru_session.reset_database()

course_name = "machine_learning_for_health"

In [None]:
def get_feedbacks(session):
    provider = Cortex(session, model_name="mistral-large2")

    f_groundedness = (
        Feedback(
            provider.groundedness_measure_with_cot_reasons,
            name="Groundedness",
            provider=provider,
        )
        .on(Select.RecordCalls.retrieve.rets.collect())
        .on_output()
    )

    f_answer_relevance = (
        Feedback(
            provider.relevance_with_cot_reasons,
            name="Answer Relevance",
            provider=provider,
        )
        .on_input()
        .on_output()
    )

    f_context_relevance = (
        Feedback(
            provider.context_relevance_with_cot_reasons,
            name="Context Relevance",
            provider=provider,
        )
        .on_input()
        .on(Select.RecordCalls.retrieve.rets[:])
        .aggregate(np.mean)
    )

    return [f_groundedness, f_answer_relevance, f_context_relevance]

feedbacks = get_feedbacks(snow_session)


In [None]:
class ContentRetriever:

    def __init__(self, session, service_name: str, model_name: str = "mistral-large2"):

        root = Root(session)
        db, schema = (
            session.get_current_database(),
            session.get_current_schema(),
        )

        self.service = (
            root.databases[db].schemas[schema].cortex_search_services[service_name]
        )

        self.model_name = model_name
        self.system_prompt = """
        You are a knowledgeable teaching assistant helping university students learn from their lecture materials. 
        Use the provided context from lecture videos and notes to answer questions. If the context doesn't contain 
        relevant information, simply state that you don't know. Keep responses friendly but concise, using no more 
        than three sentences. For general greetings or casual conversation, respond naturally without needing context.
        """

    @instrument
    def complete(self, query: str, context: list[str]) -> str:
        """
        Get a completion from the Snowflake Cortex model.
        """
        context_str = "\n".join(context)
        messages = [
            {"role": "system", "content": self.system_prompt},
            {
                "role": "user",
                "content": f"Context: {context_str}\nQuestion: {query}\nAnswer:",
            },
        ]

        return Complete(model=self.model_name, prompt=messages)

    @instrument
    def retrieve(self, query: str, limit: int = 3) -> list[str]:
        """
        Retrieve documents from cortex search service.
        """
        documents = self.service.search(query, columns=["text"], limit=limit)
        return [doc["text"] for doc in documents.results]

    @instrument
    def search(self, query: str, n_context: int = 3) -> str:
        context = self.retrieve(query, n_context)
        answer = self.complete(query, context)
        return answer


In [4]:
def get_questions_for_lecture():
    questions = {}
    for file in os.listdir("qna_for_eval"):
        df = pd.read_csv(os.path.join("qna_for_eval", file))
        lecture_name = os.path.splitext(file)[0]
        questions[lecture_name] = df["Question"].tolist()
    return questions

questions_for_lecture = get_questions_for_lecture()
eval_questions = []
for questions in questions_for_lecture.values():
    eval_questions.extend(questions)


In [5]:
for content_type in ["Video", "PDF"]:
    print(f"\nEvaluating {content_type} content")
    table_name = f"{course_name}_{content_type.lower()}"
    retriever = ContentRetriever(snow_session, table_name)
    tru_app = TruCustomApp(
        retriever,
        app_name=f"{content_type} Retriever",
        app_version="base",
        feedbacks=feedbacks,
    )

    with tru_app as recording:
        for question in eval_questions:
            response = retriever.search(question)


calling <function ContentRetriever.search at 0x000001E868627BA0> with (<__main__.ContentRetriever object at 0x000001E868739890>, 'What challenge does cardiac motion pose to high-quality imaging scans?')
calling <function ContentRetriever.retrieve at 0x000001E8686277E0> with (<__main__.ContentRetriever object at 0x000001E868739890>, 'What challenge does cardiac motion pose to high-quality imaging scans?', 3)
calling <function ContentRetriever.complete at 0x000001E8686276A0> with (<__main__.ContentRetriever object at 0x000001E868739890>, 'What challenge does cardiac motion pose to high-quality imaging scans?', ["And so one of the real headaches is that the heart moves. So the chest wall moves because we breathe, and the heart moves too. So you have to image something that it has enough temporal frequency that you're you're sort of not overwhelmed by the basic movement of the heart itself. And so some of these things aren't great. So so SPECT or PET acquire their images, which are, you kn



calling <function ContentRetriever.search at 0x000001E868627BA0> with (<__main__.ContentRetriever object at 0x000001E868739890>, 'What are the two primary challenges in mammographic analysis for early breast cancer detection?')
calling <function ContentRetriever.retrieve at 0x000001E8686277E0> with (<__main__.ContentRetriever object at 0x000001E868739890>, 'What are the two primary challenges in mammographic analysis for early breast cancer detection?', 3)
calling <function ContentRetriever.complete at 0x000001E8686276A0> with (<__main__.ContentRetriever object at 0x000001E868739890>, 'What are the two primary challenges in mammographic analysis for early breast cancer detection?', ["Next, the actual methodology and kind of going to the general challenges when you're modeling mammograms for any computer vision tasks, specifically in cancer and also, obviously, risk. And lastly, how we thought about the analysis and some kind of objectives there. So to kind of dive right into it, we too

## Does metadata filtering help?

In [6]:
class ContentRetrieverWithFilter(ContentRetriever):
    @instrument
    def search(self, query: str, lecture_name: str) -> str:
        context = self.retrieve(query, lecture_name)
        answer = self.complete(query, context)
        return answer

    @instrument
    def retrieve(self, query: str, lecture_name: str) -> list[str]:
        """
        Retrieve documents from cortex search service.
        """
        filter_query = {"@eq": {"lecture_name": lecture_name}}
        documents = self.service.search(query, columns=["text"], limit=3, filter=filter_query)
        return [doc["text"] for doc in documents.results]


decorating <function ContentRetrieverWithFilter.search at 0x000001E86AE60860>
decorating <function ContentRetrieverWithFilter.retrieve at 0x000001E86AE631A0>
adding method <class '__main__.ContentRetrieverWithFilter'> search __main__
adding method <class '__main__.ContentRetrieverWithFilter'> retrieve __main__


In [7]:
for content_type in ["Video", "PDF"]:
    print(f"\nEvaluating {content_type} content")
    table_name = f"{course_name}_{content_type.lower()}"
    retriever = ContentRetrieverWithFilter(snow_session, table_name)
    tru_app = TruCustomApp(
        retriever,
        app_name=f"{content_type} Retriever",
        app_version="metadata filter",
        feedbacks=feedbacks,
    )

    with tru_app as recording:
        for lecture_name, questions in questions_for_lecture.items():
            for question in questions:
                response = retriever.search(question, lecture_name)



Evaluating Video content
instrumenting <class '__main__.ContentRetrieverWithFilter'> for base <class '__main__.ContentRetrieverWithFilter'>
	instrumenting complete
	instrumenting retrieve
	instrumenting search
instrumenting <class '__main__.ContentRetrieverWithFilter'> for base <class '__main__.ContentRetriever'>
	instrumenting complete
	instrumenting retrieve
	instrumenting search
skipping base <class 'object'> because of class
calling <function ContentRetrieverWithFilter.search at 0x000001E86AE60860> with (<__main__.ContentRetrieverWithFilter object at 0x000001E8692C2750>, 'What is the primary function of the heart in the circulatory system?', 'cardiac_imaging')
calling <function ContentRetrieverWithFilter.retrieve at 0x000001E86AE631A0> with (<__main__.ContentRetrieverWithFilter object at 0x000001E8692C2750>, 'What is the primary function of the heart in the circulatory system?', 'cardiac_imaging')
calling <function ContentRetriever.complete at 0x000001E8686276A0> with (<__main__.C

## Modify Chunk Duration

In [None]:
def run_query(query: str, data=None):
    cursor = snow_conn.cursor()
    try:
        if data:
            cursor.executemany(query, data)
        else:
            cursor.execute(query)
        return True
    except Exception as e:
        raise Exception(f"Error executing query: {str(e)}")
    finally:
        cursor.close()

In [9]:
def create_table(table_name):
    print(f"Creating table: {table_name}")
    table_query = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
        text STRING,
        start_time INTEGER,
        end_time INTEGER,
        file_name STRING,
        lecture_name STRING
    )
    """
    run_query(table_query)

def insert_data(table_name, data):
    data_query = f"""
    INSERT INTO {table_name} (text, start_time, end_time, file_name, lecture_name)
    VALUES (?, ?, ?, ?, ?)
    """
    run_query(data_query, data)

def create_search_service(table_name):
    service_query = f"""
    CREATE CORTEX SEARCH SERVICE IF NOT EXISTS {table_name}
    ON text
    ATTRIBUTES lecture_name
    warehouse = COMPUTE_WH
    TARGET_LAG = '1 minute'
        as (
            SELECT *
            FROM {table_name}
        );
        """
    run_query(service_query)


In [8]:
video_transcripts = {}
course_path = f"courses/{course_name}"
chunk_configs = [(30, 5), (60, 10), (120, 20)]

In [None]:
for chunk_size, overlap in chunk_configs:
    print(f"\nProcessing chunk size: {chunk_size}s with overlap: {overlap}s")
    table_name = f"{course_name}_video_{chunk_size}_{overlap}"
    create_table(table_name)

    for lecture in os.listdir(course_path):
        print(f"\nProcessing lecture: {lecture}")
        lecture_path = os.path.join(course_path, lecture)
        video_file = [
            file_name
            for file_name in os.listdir(lecture_path)
            if file_name.endswith(".mp4")
        ]
        video_path = os.path.join(lecture_path, video_file[0])

        video = Video(file_path=video_path)

        if lecture not in video_transcripts.keys():
            print(f"Transcribing video for lecture: {lecture}")
            _, _, transcript = video._transcribe()
            video_transcripts[lecture] = transcript
        else:
            print(f"Using cached transcript for lecture: {lecture}")
            transcript = video_transcripts[lecture]

        print("Chunking transcript...")
        chunks = video._chunk_text(transcript, chunk_size, overlap)
        print(f"Generated {len(chunks)} chunks")

        data = [
            (chunk.text, chunk.start, chunk.end, video_file[0], lecture)
            for chunk in chunks
        ]
        print(f"Inserting chunks into table: {table_name}")
        insert_data(table_name, data)
    
    print(f"\nCreating search service for table: {table_name}")
    create_search_service(table_name)
    print("Search service created successfully")

## Evaluate Chunk Config

In [9]:
for chunk_size, overlap in chunk_configs:
    print(f"\nEvaluating chunk size: {chunk_size}s with overlap: {overlap}s")
    table_name = f"{course_name}_video_{chunk_size}_{overlap}"
    retriever = ContentRetriever(snow_session, table_name)
    tru_app = TruCustomApp(
        retriever,
        app_name="Video Retriever",
        app_version=f's_{chunk_size}_o_{overlap}',
        feedbacks=feedbacks,
    )

    with tru_app as recording:
        for question in eval_questions:
            response = retriever.search(question)



Evaluating chunk size: 30s with overlap: 5s
instrumenting <class '__main__.ContentRetriever'> for base <class '__main__.ContentRetriever'>
	instrumenting complete
	instrumenting retrieve
	instrumenting search
skipping base <class 'object'> because of class
calling <function ContentRetriever.search at 0x000001E868627BA0> with (<__main__.ContentRetriever object at 0x000001E8693EF790>, 'What is the primary function of the heart in the circulatory system?')
calling <function ContentRetriever.retrieve at 0x000001E8686277E0> with (<__main__.ContentRetriever object at 0x000001E8693EF790>, 'What is the primary function of the heart in the circulatory system?', 3)
calling <function ContentRetriever.complete at 0x000001E8686276A0> with (<__main__.ContentRetriever object at 0x000001E8693EF790>, 'What is the primary function of the heart in the circulatory system?', ["So the main thing the heart does is it's a pump, and it and it delivers oxygenated blood throughout the circulatory system to all 

  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)


calling <function ContentRetriever.search at 0x000001E868627BA0> with (<__main__.ContentRetriever object at 0x000001E8676253D0>, 'What are the four categories of breast tissue density used in medical practice?')
calling <function ContentRetriever.retrieve at 0x000001E8686277E0> with (<__main__.ContentRetriever object at 0x000001E8676253D0>, 'What are the four categories of breast tissue density used in medical practice?', 3)
calling <function ContentRetriever.complete at 0x000001E8686276A0> with (<__main__.ContentRetriever object at 0x000001E8676253D0>, 'What are the four categories of breast tissue density used in medical practice?', ["So the darker the box, the higher the incidence, and on the right hand side, there's random images from cases that fit within those boxes. Does that make sense for everyone? Great. So a clear trend that you see is that, for example, if, TCVA calls you, high risk but we call it low, that is a lower incidence than if we call it, medium and they call it lo