# Main Data Setup

In this Notebook on **Container Runtime**, we will first **prepare all the unstructured data** needed before we can run the Streamlit App. Once this data is processed, the chatbot will have a rich knowledge base to start from that's all stored within the [Cortex Search](https://docs.snowflake.com/en/user-guide/snowflake-cortex/cortex-search/cortex-search-overview) service, a fully managed indexing and retrieval service. Cortex Search will then be used for RAG.

There are two types of data we're dealing with in this solution:
- **Audio files**: previously recorded calls between a call center agent and a member
- **PDF files**: FAQ docs for call center agents to help answer member inquiries

**Why is Container Runtime needed?**\
Since we have audio files, we will need to install OpenAI Whisper in order to transcribe those files into text. OpenAI Whisper requires `ffmpeg` to be installed, which cannot be installed in Warehouse compute. We will also use GPU compute here, which makes it much faster to transcribe these files.


### Cortex Search 

Cortex Search gets you up and running with a hybrid (vector and keyword) search engine on your text data in minutes, without having to worry about embedding, infrastructure maintenance, search quality parameter tuning, or ongoing index refreshes.

It powers a broad array of search experiences for Snowflake users including [Retrieval Augmented Generation (RAG)](Retrieval Augmented Generation (RAG)) applications leveraging Large Language Models (LLMs).

### Cortex Search for RAG

Retrieval augmented generation (RAG) is a technique for retrieving data from a knowledge base to enhance the generated response of a large language model.

#### Using Cortex Search for RAG in Snowflake
Cortex Search is the retrieval engine that provides the Large Language Model with the context it needs to return answers that are grounded in your most up-to-date proprietary data.

## Let's get started!

In [None]:
import warnings
warnings.filterwarnings("ignore")

from snowflake.core import Root
from snowflake.snowpark.context import get_active_session
import glob
import numpy as np

session = get_active_session()
root = Root(session)

# Add a query tag to the session. This helps with debugging and performance monitoring.
session.query_tag = {"origin":"sf_sit", 
                     "name":"payer_call_center_assistant_v2", 
                     "version":{"major":1, "minor":0},
                     "attributes":{"is_quickstart":1, "source":"notebook"}}

# Set session context 
session.use_role("SYSADMIN")

# Print the current role, warehouse, and database/schema
print(f"role: {session.get_current_role()} | WH: {session.get_current_warehouse()} | DB.SCHEMA: {session.get_fully_qualified_current_schema()}")

## 1. Transcribe Audio Files

For this portion, we will download OpenAI's [whisper](https://github.com/openai/whisper) model (a pretrained model), and use it for inference. In this case, we're just passing audio files to the model to output transcriptions. 

In order to install `whisper`, we'll need `ffmpeg`, and there's a provided shell script within the Notebook files repo to get it installed since it's not a Python library.

First, install `ffmpeg` by running the setup script provided in this project.

In [None]:
# Run this script to install ffmpeg
!sh ffmpeg_install.sh > out.log 2> err.log

In [None]:
# Uncomment if you want to see the installation logs
#!cat out.log
#!cat err.log

In [None]:
# Make sure it got installed
!which ffmpeg

Now, we install OpenAI's Whisper model to transcribe the audio files.

In [None]:
# Install whisper

# Note: --quiet suppresses the output. 
#       You can remove it if you'd like to 
#       see all the installation messages.

!pip install openai-whisper --quiet

Now, we can load the model.

In [None]:
# Load whisper model
import whisper
model = whisper.load_model("base")

Our audio files live in a stage, so we'll download them into this environment.

In [None]:
# Download all files from stage
f = session.file.get('@RAW_DATA_TX/CALL_RECORDINGS/', 'call_recordings/')

We'll create a helper function to transcribe the audio, which includes a few audio processing steps before it's ready to pass to the model to decode the audio.

In [None]:
# Create function to transcribe all audio
def transcribe_audio_bkup(audio_file_name):
    '''
        Transcribe audio files
    '''
    # load audio and pad/trim it to fit 30 seconds
    print(f"Transcribing: {audio_file_name}")
    audio = whisper.load_audio(audio_file_name)
    #audio = whisper.pad_or_trim(audio)
    
    # make log-Mel spectrogram and move to the same device as the model
    mel = whisper.log_mel_spectrogram(audio).to(model.device)
    
    # detect the spoken language
    _, probs = model.detect_language(mel)
    language = max(probs, key=probs.get)
    #print(f"Detected language: {max(probs, key=probs.get)}")
    print(f"Detected language: {language}")
   
    
    # decode the audio
    options = whisper.DecodingOptions()
    result = whisper.decode(model, mel, options)
    print("Text", result.text)
    
    # return the recognized text
    return result.text, language

The above example truncated the audio files to 30 seconds. Needed to find a way to transcribe the entire conversation. Below I check to see if the file is > 30 seconds, and if it is, break it up into chunks, append each chunk and then save the entire transcribed converstation. 

- SAMPLE_RATE = 16000
- CHUNK_LENGTH = 30
- N_SAMPLES = CHUNK_LENGTH * SAMPLE_RATE  # 480000 samples in a 30-second chunk

In [None]:
# Create function to transcribe all audio
def transcribe_audio(audio_file_name):

    CHUNK_LIM = 480000
    audios = []

    
    '''
        Transcribe audio files
    '''
    # load audio and pad/trim it to fit 30 seconds
    print(f"Transcribing: {audio_file_name}")
    audio = whisper.load_audio(audio_file_name)


    if len(audio) <= CHUNK_LIM:
        audios.append(audio)

    else:
        for i in range(0, len(audio), CHUNK_LIM):
            chunk = audio[i:i + CHUNK_LIM]
            chunk_index = len(chunk)
            if chunk_index < CHUNK_LIM:
                padding = [0] * (CHUNK_LIM - chunk_index)
                array1 = np.array(chunk)
                array2 = np.array(padding)
                concat =  np.concatenate((array1, array2))
                chunk = concat.astype(np.float32)
            audios.append(chunk)

    results = ""
    language = ""

    for chunk in audios:
        
        # make log-Mel spectrogram and move to the same device as the model
        mel = whisper.log_mel_spectrogram(chunk).to(model.device)
        _, probs = model.detect_language(mel)
        language = max(probs, key=probs.get)
        #print(f"Detected language: {max(probs, key=probs.get)}")
        print(f"Detected language: {language}")
        
        # decode the audio
        #options = whisper.DecodingOptions(fp16=False)
        options = whisper.DecodingOptions()
        result = whisper.decode(model, mel, options)
        results += result.text
    

    return results, language

We'll apply this function to all our audio files.

In [None]:
# Process all audio files and store in a list
audio_files = glob.glob('call_recordings/*.mp3')

all_transcribed = []

for f in audio_files:
    text, language = transcribe_audio(f)
    #all_transcribed.append((f, transcribe_audio(f)))
    all_transcribed.append((f, text, language))

Let's take a look at a few of the transcriptions.

In [None]:
# Look at a few of the transcriptions
all_transcribed[0:10]

Now we'll store all the results in a Snowpark DF and write it to a Snowflake table.

In [None]:
# Create a Snowpark DataFrame from the transcriptions
df = session.create_dataframe(all_transcribed, schema=["AUDIO_FILE_NAME", "TRANSCRIPT", "LANGUAGE"])
df

In [None]:
# Save results as a Snowflake Table
#df.write.mode("overwrite").save_as_table("CALL_RECORDINGS_TRANSCRIPT_TABLE")
df.write.mode("overwrite").save_as_table("CALL_RECORDINGS_TRANSCRIPT_TABLE_ORIG")

In [None]:
SELECT * FROM CALL_RECORDINGS_TRANSCRIPT_TABLE_ORIG

Translate the table

In [None]:
CREATE OR REPLACE TABLE CALL_RECORDINGS_TRANSCRIPT_TABLE_TX AS
SELECT 
    AUDIO_FILE_NAME,
    TRANSCRIPT AS ORIG_TRANSCRIPT,
    LANGUAGE,
    build_scoped_file_url(@RAW_DATA_TX, AUDIO_FILE_NAME) as scoped_file_url,
    CASE 
        WHEN LANGUAGE != 'en' THEN 
            SNOWFLAKE.CORTEX.TRANSLATE(TRANSCRIPT, '', 'en')  -- Ensure external function syntax is correct
        ELSE 
            TRANSCRIPT  -- Return the original value if it's in English
    END AS TRANSCRIPT
FROM CALL_RECORDINGS_TRANSCRIPT_TABLE_ORIG;

In [None]:
SELECT * FROM CALL_RECORDINGS_TRANSCRIPT_TABLE_TX

Finally, we create a Cortex Search service on top of this data.

In [None]:
-- Create Cortex Search Service
--use 2.0 lanague model  EMBEDDING_MODEL = 'snowflake-arctic-embed-l-v2.0'
CREATE OR REPLACE CORTEX SEARCH SERVICE CALL_CENTER_RECORDING_SEARCH_TX2
ON CHUNK
WAREHOUSE = PAYERS_CC_WH
EMBEDDING_MODEL = 'snowflake-arctic-embed-l-v2.0'
TARGET_LAG = '1 Day'
AS
(
    SELECT
        TRANSCRIPT AS CHUNK,
        AUDIO_FILE_NAME AS RELATIVE_PATH,
        scoped_file_url
    FROM
        CALL_RECORDINGS_TRANSCRIPT_TABLE_TX
        
)

We can quickly test the service to make sure it was created correctly.

In [None]:
# Test out the service

response = (root.databases[session.get_current_database()]
                 .schemas[session.get_current_schema()]
                 .cortex_search_services["CALL_CENTER_RECORDING_SEARCH_TX2"]
                 .search(
                            'Santa',
                              ['CHUNK',
                               'RELATIVE_PATH'],
                         limit=8
                         )
    )

results = response.results
results

## 2. Process PDF Files

For this portion, we'll use Snowflake's native [PARSE_DOCUMENT](https://docs.snowflake.com/en/sql-reference/functions/parse_document-snowflake-cortex) and [SPLIT_TEXT_RECURSIVE_CHARACTER](https://docs.snowflake.com/en/sql-reference/functions/split_text_recursive_character-snowflake-cortex) to read and chunk a PDF.

In [None]:
CREATE OR REPLACE TEMPORARY TABLE FAQ_DOCS_TEMP AS
    SELECT
        RELATIVE_PATH, 
        SNOWFLAKE.CORTEX.SPLIT_TEXT_RECURSIVE_CHARACTER(
            to_variant(SNOWFLAKE.CORTEX.PARSE_DOCUMENT(
                @RAW_DATA_2,
                RELATIVE_PATH,
                {'mode': 'layout'}
            )):content, 'markdown', 384000, 300) as chunks
from DIRECTORY(@RAW_DATA_2)
where RELATIVE_PATH ilike '%FAQ%';

CREATE OR REPLACE TABLE FAQ_DOCS_CHUNKS_TABLE AS
SELECT RELATIVE_PATH, c.value::string as CHUNK 
FROM 
    FAQ_DOCS_TEMP f, 
    LATERAL FLATTEN(INPUT => f.chunks) c;

Let's make sure the files were properly read and chunked now.

In [None]:
-- Make sure files were properly read and chunked
SELECT * FROM FAQ_DOCS_CHUNKS_TABLE

Finally, we create a Cortex Search service on top of this data.

In [None]:
CREATE OR REPLACE CORTEX SEARCH SERVICE CALL_CENTER_FAQ_SEARCH
ON CHUNK
WAREHOUSE = PAYERS_CC_WH
TARGET_LAG = '1 Day'
AS
(
    SELECT
        CHUNK,
        RELATIVE_PATH
    FROM
        FAQ_DOCS_CHUNKS_TABLE
        
)

We can quickly test the service to make sure it was created correctly.

In [None]:
# Test out the service

response = (root.databases[session.get_current_database()]
                 .schemas[session.get_current_schema()]
                 .cortex_search_services["CALL_CENTER_FAQ_SEARCH"]
                 .search(
                     'Were there any revisions to COVID related coverages?',
                     ['CHUNK','RELATIVE_PATH'], limit=3
                        )
           )

results = response.results
results

### :tada: All the unstructured data is now processed and ready to be used by the Streamlit App!

## Caller Intent Prediction

Now, we will predict the intent of a caller using historical data.

We will be using the [Classification function from the suite of Snowflake's ML Functions](https://docs.snowflake.com/en/user-guide/ml-functions/classification) to build our model.
> - _Classification involves creating a classification model object, passing in a reference to the training data. The model is fitted to the provided training data. You then use the resulting schema-level classification model object to classify new data points and to understand the model’s accuracy through its evaluation APIs._
> - _The classification function is powered by a gradient boosting machine. For binary classification, the model is trained using an area-under-the-curve loss function. For multi-class classification, the model is trained using a logistic loss function._

The `Caller Intent Prediction Model` will be trained using the relationship between call reasons and current member properties.

We will first create a view including Member attributes that we want to train the model on.

In [None]:
CREATE OR REPLACE VIEW CALLER_INTENT_CLASSIFICATION_VIEW AS
    SELECT 
        RECENT_ENROLLMENT_EVENT_IND,
        PCP_CHANGE_IND, 
        ACTIVE_CM_PROGRAM_IND,
        CHRONIC_CONDITION_IND,
        ACTIVE_GRIEVANCE_IND,
        ACTIVE_CLAIM_IND,
        POTENTIAL_CALLER_INTENT_CATEGORY
FROM CALLER_INTENT_TRAIN_DATASET;

Now, we will create an [ML Classification](https://docs.snowflake.com/en/user-guide/ml-functions/classification) function.

In [None]:
CREATE OR REPLACE SNOWFLAKE.ML.CLASSIFICATION CALLER_INTENT(
    INPUT_DATA => SYSTEM$REFERENCE('view', 'CALLER_INTENT_CLASSIFICATION_VIEW'),
    TARGET_COLNAME => 'POTENTIAL_CALLER_INTENT_CATEGORY'
);

We will apply our ML model's prediction function on our prediction data.

In [None]:
CREATE OR REPLACE TABLE CALLER_INTENT_PREDICTIONS AS
SELECT *, CALLER_INTENT!PREDICT(
    INPUT_DATA => {*})
    as predictions from CALLER_INTENT_PREDICT_DATASET;
SELECT * FROM CALLER_INTENT_PREDICTIONS;

We now create a view to join our member data with our predictions.

In [None]:
CREATE OR REPLACE VIEW CALL_CENTER_MEMBER_DENORMALIZED_WITH_INTENT
AS
SELECT
    A.*,
    B.predictions:class::STRING AS POTENTIAL_CALLER_INTENT
FROM 
CALL_CENTER_MEMBER_DENORMALIZED A
LEFT OUTER JOIN 
CALLER_INTENT_PREDICTIONS B
ON A.MEMBER_ID = B.MEMBER_ID;

We will now cleanse some data for our sample members who will be part of the Streamlit demo.

In [None]:
UPDATE CALL_RECORDINGS_TRANSCRIPT_TABLE 
SET TRANSCRIPT = replace(TRANSCRIPT, '159-568-6880','159568380')
WHERE TRANSCRIPT ILIKE '%Tracy%Smith%';;

In [None]:
UPDATE CALL_CENTER_MEMBER_DENORMALIZED
SET CLAIM_PROVIDER = 'Howe Group'
WHERE GRIEVANCE_STATUS = 'Pending'
AND GRIEVANCE_TYPE = 'Inadequate Care'
AND CLAIM_PROVIDER in ('Thornton Group','Kent Group','Perez-Martinez');

### :tada: We now have `Caller Intent` predictions, which are ready to be used by the Streamlit App!