In [None]:
# Import required libraries  
import os  
from openai import AzureOpenAI
from dotenv import load_dotenv  
from azure.core.credentials import AzureKeyCredential  
from azure.search.documents import SearchClient 
from azure.search.documents.models import (
    VectorizedQuery,
    VectorFilterMode,    
    QueryAnswerType,
    QueryCaptionType,
    QueryType,
)
  
# Configure environment variables  
load_dotenv()  

#Azure Search setup
service_endpoint = os.getenv("AZURE_SEARCH_SERVICE_ENDPOINT") 
index_name = os.getenv("AZURE_SEARCH_INDEX_NAME") 
key = os.getenv("AZURE_SEARCH_ADMIN_KEY") 
credential = AzureKeyCredential(key)

#Azure openAI setup
model: str = "text-embedding-ada-002"
openai_client = AzureOpenAI(
    api_key=os.getenv("AZURE_OPENAI_API_KEY"),  
    api_version="2023-12-01-preview",
    azure_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
)
openai_model = os.getenv("AZURE_OPENAI_MODEL_NAME")

In [None]:
import tiktoken
from langchain.text_splitter import RecursiveCharacterTextSplitter
import uuid

# method to get the token length with the encoding
tokenizer_name = tiktoken.encoding_for_model("gpt-4-32k-0613")
tokenizer = tiktoken.get_encoding(tokenizer_name.name)

# create the length function
def tiktoken_len(text):
    tokens = tokenizer.encode(text, disallowed_special=())
    return len(tokens)

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=8000, # this depends on which model you might use, for example with the 16k GPT models setting this to 8k is reasonable and maybe higher
    chunk_overlap=100,
    length_function=tiktoken_len,
    separators=["\n\n", "\n", " ", ""],
)

# get a UUID - URL safe, Base64
def get_a_uuid():
    return str(uuid.uuid4())

#function to return the number of tokens in a string
def num_tokens_from_string(string: str, model_name: str) -> int:
    """Returns the number of tokens in a text string."""
    #encoding = tiktoken.get_encoding(encoding_name)
    encoding = tiktoken.encoding_for_model(model_name)
    token_integers = encoding.encode(string)
    num_tokens = len(token_integers)

    return num_tokens

def get_embedding(text, model="text-embedding-ada-002"):
   text = text.replace("\n", " ")
   return openai_client.embeddings.create(input = [text], model=model).data[0].embedding

def open_file(filepath):
        with open(filepath, "r", encoding="utf-8", errors="ignore") as infile:
            return infile.read()

In [None]:
# do some file cleanup, this code removes the space at the beginning of the txt file and renames it to the title of the song

import os
import pandas as pd

input_directory = "../web-crawler/txt/www.lightfoot.ca/"
output_directory = "../web-crawler/txt/www.lightfoot.ca/cleaned/"

for filename in os.listdir(input_directory):
    if filename.endswith(".txt"):
        with open(os.path.join(input_directory, filename), "r", encoding="utf-8") as f:

                content = f.read()
                if content[0] == ' ':  # check if the first character is a space
                    content = content[1:]  # remove the first character space
                
                index = content.find("  ")  # find the index of the first occurrence of double space this is the title
                        # check if double space exists in the content
                if index != -1:
                    # get the text before the double space
                    text_before_double_space = content[:index]
                    # wrap the text in brackets
                    #text_before_double_space = '[' + text_before_double_space + ']'
                    # replace the original text with the modified text in the content
                    #print(text_before_double_space)
                    #content = content[:index] + content[index:].replace(text_before_double_space, '')
                    #print(content)

                #content = ' '.join(content.split()) # replace multiple spaces with single space

                # write the modified content back to the file
                with open(os.path.join(output_directory, f"{text_before_double_space}.txt"), 'w', encoding="utf-8") as file:
                    file.write(content)

In [None]:
# do some file cleanup, this code removes the xtra stuff before the occuance of the spacing

import os
import pandas as pd

input_directory = "../web-crawler/txt/www.lightfoot.ca/"
output_directory = "../web-crawler/txt/www.lightfoot.ca/cleaned/"

for filename in os.listdir(input_directory):
    if filename.endswith(".txt"):
        with open(os.path.join(input_directory, filename), "r", encoding="utf-8") as f:
                
                content = f.read()
                index = content.find("  ")  # find the index of the first occurrence of double space this is the title

                # Remove everything before the first occurrence (including the occurrence itself)
                result = content[index+2:]

                # write the modified content back to the file
                with open(os.path.join(output_directory, filename), 'w', encoding="utf-8") as file:
                    file.write(result)

In [None]:
# do some file cleanup, this code removes the space at the beginning of the file and all double spacing

import os
import pandas as pd

input_directory = "../web-crawler/txt/www.lightfoot.ca/"
output_directory = "../web-crawler/txt/www.lightfoot.ca/cleaned/"

for filename in os.listdir(input_directory):
    if filename.endswith(".txt"):
        with open(os.path.join(input_directory, filename), "r", encoding="utf-8") as f:

                content = f.read()
                if content[0] == ' ':  # check if the first character is a space
                    content = content[1:]  # remove the first character space

                content = ' '.join(content.split()) # replace multiple spaces with single space

                # write the modified content back to the file
                with open(os.path.join(output_directory, filename), 'w', encoding="utf-8") as file:
                    file.write(content)

In [None]:
import os
import pandas as pd

directory = "../web-crawler/txt/www.lightfoot.ca/cleaned"
chunk = {}
txt = []

for filename in os.listdir(directory):
    if filename.endswith(".txt"):
        with open(os.path.join(directory, filename), "r", encoding="utf-8") as f:
                text = f.read()
                texts = text_splitter.create_documents([text])
                doc_count = 0
                for i in texts:
                    #words = i.page_content.split("  ")
                    chunk = {
                        "id": get_a_uuid(),  # generate a random uuid for the document
                        #"title": f"{filename[:-4]}_Part_{doc_count}",  # remove the .txt extension from the filename and use this as the title
                        "title": f"{filename[:-4]}",  # remove the .txt extension from the filename and use this as the title
                        #"title": words[0],  # the title is the text before the first double space
                        "content": i.page_content,
                        "sourcefile": filename,
                        "content_tokens": num_tokens_from_string(i.page_content, "gpt-4-32k-0613"),
                        "category": "Gordon Lightfoot",
                        "contentVector": get_embedding(i.page_content)
                        }
                    txt.append(chunk)
                    doc_count += 1

df = pd.DataFrame(txt)
df

In [None]:
# populate a list with the data we will use to store in the index
def create_sections(df):
    for index, row in df.iterrows():
        yield {
            "id": row["id"],
            "title": row["title"],
            "content": row["content"],
            "sourcefile": row["sourcefile"],
            "category": row["category"],
            "contentVector": row["contentVector"],
            "@search.action": "upload",
        }
        
sections = create_sections(df)

In [None]:
def index_sections(sections):
    print(
        f"Indexing sections into search index '{index_name}'"
    )

    search_client = SearchClient(endpoint=service_endpoint, index_name=index_name, credential=credential)

    i = 0
    batch = []
    for s in sections:
        batch.append(s)
        i += 1
        if i % 1000 == 0:
            results = search_client.upload_documents(documents=batch)
            succeeded = sum([1 for r in results if r.succeeded])
            print(f"\tIndexed {len(results)} sections, {succeeded} succeeded")
            batch = []

    if len(batch) > 0:
        results = search_client.upload_documents(documents=batch)
        succeeded = sum([1 for r in results if r.succeeded])
        print(f"\tIndexed {len(results)} sections, {succeeded} succeeded")
        
index_sections(sections)

In [None]:
# Pure Vector Search
query = "we have a panic situation"  
  
search_client = SearchClient(service_endpoint, index_name, credential=credential)
vector_query = VectorizedQuery(vector=get_embedding(query), k_nearest_neighbors=3, fields="contentVector")
  
results = search_client.search(  
    search_text=None,  
    vector_queries= [vector_query],
    select=["title", "sourcefile", "category", "content"],
)  
  
for result in results:  
    print(f"Score: {result['@search.score']}")  
    print(f"Title: {result['title']}")  
    print(f"Sourcefile: {result['sourcefile']}")  
    print(f"Category: {result['category']}\n")  
    print(f"Content: {result['content']}\n") 

In [None]:
# Pure Vector Search
query = "we have a panic situation"  
  
search_client = SearchClient(service_endpoint, index_name, credential=credential)
vector_query = VectorizedQuery(vector=get_embedding(query), k_nearest_neighbors=3, fields="contentVector", exhaustive=True)
  
results = search_client.search(  
    search_text=None,  
    vector_queries= [vector_query],
    select=["title", "sourcefile", "content", "category"],
)  
  
for result in results:  
    print(f"Title: {result['title']}")  
    print(f"Score: {result['@search.score']}")  
    print(f"Sourcefile: {result['sourcefile']}")  
    print(f"Content: {result['content']}")  
    print(f"Category: {result['category']}\n")  

In [None]:
# Hybrid with category
query = "dont confess"  
  
search_client = SearchClient(service_endpoint, index_name, credential=credential)
vector_query = VectorizedQuery(vector=get_embedding(query), k_nearest_neighbors=3, fields="contentVector")
  
category = "Gordon Lightfoot"

results = search_client.search(  
    search_text=query,  
    vector_queries= [vector_query],
    vector_filter_mode=VectorFilterMode.PRE_FILTER,
    filter=f"category eq '{category}'",
    #filter="category eq 'Microsoft'",
    select=["title", "sourcefile", "content", "category"],
)
  
for result in results:  
    print(f"Title: {result['title']}")  
    print(f"Score: {result['@search.score']}")  
    print(f"Content: {result['content']}")  
    print(f"Category: {result['category']}\n")  
    print(f"Sourcefile: {result['sourcefile']}\n")  

In [None]:
# Semantic Hybrid Search
query = "loss of loved ones in a tragic accident on water"

search_client = SearchClient(service_endpoint, index_name, AzureKeyCredential(key))
vector_query = VectorizedQuery(vector=get_embedding(query), k_nearest_neighbors=3, fields="contentVector")

results = search_client.search(  
    search_text=query,  
    vector_queries=[vector_query],
    select=["title", "content", "category"],
    query_type=QueryType.SEMANTIC, semantic_configuration_name='lyrics-semantic-config', query_caption=QueryCaptionType.EXTRACTIVE, query_answer=QueryAnswerType.EXTRACTIVE,
    top=3
)

semantic_answers = results.get_answers()
for answer in semantic_answers:
    if answer.highlights:
        print(f"Semantic Answer: {answer.highlights}")
    else:
        print(f"Semantic Answer: {answer.text}")
    print(f"Semantic Answer Score: {answer.score}\n")

for result in results:
    print(f"Title: {result['title']}")
    print(f"Reranker Score: {result['@search.reranker_score']}")
    print(f"Content: {result['content']}")
    print(f"Category: {result['category']}")

    captions = result["@search.captions"]
    if captions:
        caption = captions[0]
        if caption.highlights:
            print(f"Caption: {caption.highlights}\n")
        else:
            print(f"Caption: {caption.text}\n")