# Payer Call Center Assistant Unstructured Data Setup

In this **Container Runtime** Notebook, we will **prepare all the unstructured data** needed before we can run the Payer Call Center Assistant Streamlit App. Once this data is processed, the chatbot will have a rich knowledge base to start from that's all stored within the [Cortex Search](https://docs.snowflake.com/en/user-guide/snowflake-cortex/cortex-search/cortex-search-overview) service, a fully managed indexing and retrieval service. Cortex Search will then be used for RAG.

There are two types of data we're dealing with in this solution:
- **Audio files**: previously recorded calls between a call center agent and a member
- **PDF files**: FAQ docs for call center agents to help answer member inquiries

**Why is Container Runtime needed?**\
Since we have audio files, we will need to install OpenAI Whisper in order to transcribe those files into text. OpenAI Whisper requires `ffmpeg` to be installed, which cannot be installed in Warehouse compute. We will also use GPU compute here, which makes it much faster to transcribe these files.

### Cortex Search 

Cortex Search gets you up and running with a hybrid (vector and keyword) search engine on your text data in minutes, without having to worry about embedding, infrastructure maintenance, search quality parameter tuning, or ongoing index refreshes.

It powers a broad array of search experiences for Snowflake users including [Retrieval Augmented Generation (RAG)](Retrieval Augmented Generation (RAG)) applications leveraging Large Language Models (LLMs).

In [None]:
import streamlit as st
st.image("cortex_search.png")

### Cortex Search for RAG

Retrieval augmented generation (RAG) is a technique for retrieving data from a knowledge base to enhance the generated response of a large language model. The following architecture diagram shows how you can combine Cortex Search with [Cortex LLM Functions](Cortex LLM Functions) to create enterprise chatbots with RAG using your Snowflake data as a knowledge base.

#### Using Cortex Search for RAG in Snowflake
Cortex Search is the retrieval engine that provides the Large Language Model with the context it needs to return answers that are grounded in your most up-to-date proprietary data.

In [None]:
import streamlit as st
st.image("cortex_search_rag.png")

## Let's get started!

In [None]:
import warnings
warnings.filterwarnings("ignore")

from snowflake.core import Root
from snowflake.snowpark.context import get_active_session
import glob

session = get_active_session()
root = Root(session)

# Add a query tag to the session. This helps with debugging and performance monitoring.
session.query_tag = {"origin":"sf_sit", 
                     "name":"payer_call_center_assistant", 
                     "version":{"major":1, "minor":0},
                     "attributes":{"is_quickstart":0, "source":"notebook"}}

# Set session context 
session.use_role(f"CORTEX_CUSTOMER_HCLSPAYERCALLCENTERASSISTANT_DATA_SCIENTIST") 

# Print the current role, warehouse, and database/schema
print(f"role: {session.get_current_role()} | WH: {session.get_current_warehouse()} | DB.SCHEMA: {session.get_fully_qualified_current_schema()}")

## 1. Transcribe Audio Files

For this portion, we will download OpenAI's [whisper](https://github.com/openai/whisper) model (a pretrained model), and use it for inference. In this case, we're just passing audio files to the model to output transcriptions. 

In order to install `whisper`, we'll need `ffmpeg`, and there's a provided shell script within the Notebook files repo to get it installed since it's not a Python library.

First, install `ffmpeg` by running the setup script provided in this project.

In [None]:
# Run this script to install ffmpeg
!sh ffmpeg_install.sh > out.log 2> err.log

In [None]:
# Uncomment if you want to see the installation logs
#!cat out.log
#!cat err.log

In [None]:
# Make sure it got installed
!which ffmpeg

Now, we install OpenAI's Whisper model to transcribe the audio files.

In [None]:
# Install whisper

# Note: --quiet suppresses the output. 
#       You can remove it if you'd like to 
#       see all the installation messages.

!pip install openai-whisper --quiet

Now, we can load the model.

In [None]:
# Load whisper model
import whisper
model = whisper.load_model("base")

Our audio files live in a stage, so we'll download them into this environment.

In [None]:
# Download all files from stage
f = session.file.get('@RAW_DATA/CALL_RECORDINGS/', 'call_recordings/')

We'll create a helper function to transcribe the audio, which includes a few audio processing steps before it's ready to pass to the model to decode the audio.

In [None]:
# Create function to transcribe all audio
def transcribe_audio(audio_file_name):
    '''
        Transcribe audio files
    '''
    # load audio and pad/trim it to fit 30 seconds
    print(f"Transcribing: {audio_file_name}")
    audio = whisper.load_audio(audio_file_name)
    audio = whisper.pad_or_trim(audio)
    
    # make log-Mel spectrogram and move to the same device as the model
    mel = whisper.log_mel_spectrogram(audio).to(model.device)
    
    # detect the spoken language
    _, probs = model.detect_language(mel)
    print(f"Detected language: {max(probs, key=probs.get)}")
    
    # decode the audio
    options = whisper.DecodingOptions()
    result = whisper.decode(model, mel, options)
    
    # return the recognized text
    return result.text

We'll apply this function to all our audio files.

In [None]:
# Process all audio files and store in a list
audio_files = glob.glob('call_recordings/*.mp3')

all_transcribed = []

for f in audio_files:
    all_transcribed.append((f, transcribe_audio(f)))

Let's take a look at a few of the transcriptions.

In [None]:
# Look at a few of the transcriptions
all_transcribed[0:3]

Now we'll store all the results in a Snowpark DF and write it to a Snowflake table.

In [None]:
# Create a Snowpark DataFrame from the transcriptions
df = session.create_dataframe(all_transcribed, schema=["AUDIO_FILE_NAME", "TRANSCRIPT"])
df

In [None]:
# Save results as a Snowflake Table
df.write.mode("overwrite").save_as_table("CALL_RECORDINGS_TRANSCRIPT_TABLE")

Finally, we create a Cortex Search service on top of this data.

In [None]:
-- Create Cortex Search Service
CREATE OR REPLACE CORTEX SEARCH SERVICE CALL_CENTER_RECORDING_SEARCH
ON CHUNK
WAREHOUSE = CORTEX_CUSTOMER_HCLSPAYERCALLCENTERASSISTANT_DS_WH
TARGET_LAG = '1 Day'
AS
(
    SELECT
        TRANSCRIPT AS CHUNK,
        AUDIO_FILE_NAME AS RELATIVE_PATH
    FROM
        CALL_RECORDINGS_TRANSCRIPT_TABLE
        
)

We can quickly test the service to make sure it was created correctly.

In [None]:
# Test out the service

response = (root.databases[session.get_current_database()]
                 .schemas[session.get_current_schema()]
                 .cortex_search_services["CALL_CENTER_RECORDING_SEARCH"]
                 .search(
                            'Can you give me a summary from the previous call made by Jim Pacheco',
                              ['CHUNK',
                               'RELATIVE_PATH'],
                         limit=3
                         )
    )

results = response.results
results

## 2. Process PDF Files

For this portion, we'll create a Python UDF using the SQL API to read and chunk a PDF using open source libraries.

We first create a Python UDF to read and chunk PDFs.

In [None]:
CREATE OR REPLACE FUNCTION PDF_TEXT_CHUNKER("FILE_URL" VARCHAR(16777216))
RETURNS TABLE ("CHUNK" VARCHAR(16777216))
LANGUAGE PYTHON
RUNTIME_VERSION = '3.9'
PACKAGES = ('snowflake-snowpark-python','PyPDF2','langchain')
HANDLER = 'pdf_text_chunker'
AS '
from snowflake.snowpark.types import StringType,StructField,StructType
from langchain.text_splitter import RecursiveCharacterTextSplitter
from snowflake.snowpark.files import SnowflakeFile
import PyPDF2, io
import logging
import pandas as pd

class pdf_text_chunker:
    
    def read_pdf(self,file_url:str) ->str:

        logger = logging.getLogger("udf_logger")
        logger.info(f"Opening the file path {file_url}")

        with SnowflakeFile.open(file_url,mode="rb") as f:
            buffer = io.BytesIO(f.read())
        
        reader = PyPDF2.PdfReader(buffer)
        text = ""
        for page in reader.pages:
            try:
                text += page.extract_text().replace("\\n"," ").replace("\\0"," ")
            except:
                text = "Unable to extract"
                logger.warn(f"Unable to extract text from the pdf file {file_url}, page {page}")
        return text

    def process(self,file_url:str):
        text = self.read_pdf(file_url)

        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=4000,
            chunk_overlap=400,
            length_function = len
        )

        chunks = text_splitter.split_text(text)
        df = pd.DataFrame(chunks,columns=["chunks"])

        yield from df.itertuples(index=False, name=None)
'

Then, apply the UDF to the PDFs stored in our stage.

In [None]:
CREATE OR REPLACE TABLE FAQ_DOCS_CHUNKS_TABLE ( 
    RELATIVE_PATH VARCHAR(16777216), -- Relative path to the PDF file
    CHUNK VARCHAR(16777216) -- Piece of text
);

INSERT INTO FAQ_DOCS_CHUNKS_TABLE (RELATIVE_PATH, CHUNK)
SELECT RELATIVE_PATH, func.CHUNK AS CHUNK
FROM (
    SELECT *
    FROM DIRECTORY(@RAW_DATA)
    WHERE RELATIVE_PATH LIKE '%FAQ%'
),
TABLE(PDF_TEXT_CHUNKER(build_scoped_file_url(
    @RAW_DATA,
    relative_path))) AS func;


Let's make sure the files were properly read and chunked now.

In [None]:
-- Make sure files were properly read and chunked
SELECT * FROM FAQ_DOCS_CHUNKS_TABLE

Finally, we create a Cortex Search service on top of this data.

In [None]:
CREATE OR REPLACE CORTEX SEARCH SERVICE CALL_CENTER_FAQ_SEARCH
ON CHUNK
WAREHOUSE = CORTEX_CUSTOMER_HCLSPAYERCALLCENTERASSISTANT_DS_WH
TARGET_LAG = '1 Day'
AS
(
    SELECT
        CHUNK,
        RELATIVE_PATH
    FROM
        FAQ_DOCS_CHUNKS_TABLE
        
)

We can quickly test the service to make sure it was created correctly.

In [None]:
# Test out the service

response = (root.databases[session.get_current_database()]
                 .schemas[session.get_current_schema()]
                 .cortex_search_services["CALL_CENTER_FAQ_SEARCH"]
                 .search(
                     'Were there any revisions to COVID related coverages?',
                     ['CHUNK','RELATIVE_PATH'], limit=3
                        )
           )

results = response.results
results

### :tada: All the unstructured data is now processed and ready to be used by the Payer Call Center Assistant Streamlit App!