In [1]:
from openai import OpenAI
from pinecone import Pinecone, ServerlessSpec
import hashlib
from datetime import datetime
from tqdm import tqdm
import os
import logging
logger = logging.getLogger()
logger.setLevel(logging.CRITICAL)

In [20]:
from dotenv import load_dotenv
load_dotenv()  

# Instantiate the api keys
# Get your pinecone api here: https://www.pinecone.io
pinecone_key = os.environ.get("PINECONE_API_KEY")

# Craete you openai api key here: https://platform.openai.com/settings/organization/api-keys
client = OpenAI(api_key = os.environ.get("OPENAI_API_KEY"))

INDEX_NAME = 'semantic-search-rag'  # name of our embedded database
NAMESPACE = 'default'                # Dont need this if we just have one database
ENGINE = 'text-embedding-3-small'  # this openai largets most recent embedding model has vector size 1,536

# Initialize the pinecone client
pc = Pinecone(api_key = pinecone_key)

In [21]:
# Function to get embeddings for a list of texts using OpenAI API
def get_embeddings(texts, embedding_model = ENGINE):
    response = client.embeddings.create(
        input = texts,
        model = embedding_model
    )
    return [d.embedding for d in list(response.data)]

# Function to get embedding for a single text using OpenAI API
def get_embedding(text, embedding_model = ENGINE):
    return get_embeddings([text], embedding_model)[0]

# Lets create index for our data
if INDEX_NAME not in pc.list_indexes().names():
    print(f"Creating index: {INDEX_NAME}")
    pc.create_index(
        name = INDEX_NAME, 
        dimension = 1536,   # The dimension of the OpenAI vector embedder
        metric = 'cosine',   # The similarity metric to use when searching index
        spec = ServerlessSpec(
            cloud = 'aws',
            region = 'us-east-1'  # make sure this is the region you were provided in pinecone.io
        )
    )

# Store the index as a variable
index = pc.Index(name = INDEX_NAME)
index

Creating index: semantic-search-rag


<pinecone.data.index.Index at 0x1fe7f9a0820>

In [22]:
index.describe_index_stats()

{'dimension': 1536,
 'index_fullness': 0.0,
 'metric': 'cosine',
 'namespaces': {},
 'total_vector_count': 0,
 'vector_type': 'dense'}

In [23]:
# Lets use hashlib to create a hash for input string, so pinecone does not allow duplicate strings
def my_hash(s):
    # Returns the MD5 hash of the string as hexadecimal string
    return hashlib.md5(s.encode()).hexdigest()

# Example
my_hash("Please hash me")

'7029d9f7fcff52d665b9729784023f21'

In [24]:
def prepare_for_pinecone(texts, embeddings_model=ENGINE, urls=None):
    # Get current EST time
    now = datetime.now().isoformat()

    # Generate vector embedding for each string in the input list
    embeddings = get_embeddings(texts, embedding_model=embeddings_model)

    # Create tuples of (hash, embedding, metadata) for each input string and its corresponding
    # vector embedding. The my_hash() function is used to generate a unique hash for each string,
    # and the datetime.now() function to generate current EST time
    responses = [
        (
            my_hash(text),  # A unique ID for each string, generated using the my_hash() function
            embedding,      # The vector embedding of the string
            dict(text=text, date_uploaded=now)  # A dictionary of metadata
        )
        for text, embedding in zip(texts, embeddings)  # Iterate over each input string and its corresponding embedding
    ]

    # Add url in metadata if exists 
    if urls:
        # Use as many URLs as possible
        for i, response in enumerate(responses):
            if i < len(urls) and urls[i]:  # Check if we have a URL for this index
                response[-1]['url'] = urls[i]
    
    return responses

In [25]:
def upload_texts_to_pinecone(texts, batch_size=4, show_progress_bar=True, urls=None):
    # Prepare data for Pinecone
    pinecone_data = prepare_for_pinecone(texts, urls=urls)
    
    # Track number of items uploaded
    count = 0
    
    # Create batches for upload
    batches = [pinecone_data[i:i+batch_size] for i in range(0, len(pinecone_data), batch_size)]
    
    # Set up progress bar if requested
    if show_progress_bar:
        batches = tqdm(batches)
    
    # Upload each batch
    for batch in batches:
        # Upload directly without reconstructing the tuples
        index.upsert(vectors=batch)
        
        # Update count
        count += len(batch)
    
    return count

In [36]:
def query_from_pinecone(query, top_k=3, namespace=''):
    # Get the embedding for the query string
    query_embedding = get_embedding(query)

    # Use the query() method of the index object to retrieve the closest values to the query
    return index.query(
        vector=query_embedding,
        top_k=top_k,
        include_metadata=True,
        namespace=namespace
    ).get('matches')

In [37]:
import requests
from bs4 import BeautifulSoup
import re

def extract_ssa_faq_urls(url="https://www.ssa.gov/faqs/en/"):
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
    }
    
    response = requests.get(url, headers=headers)
    if response.status_code != 200:
        print(f"Failed to access page: {response.status_code}")
        return []
    
    soup = BeautifulSoup(response.text, 'html.parser')
    
    # Look for article patterns in the content
    ka_pattern = re.compile(r'KA-\d+')
    
    articles = []
    # Look for article containers or common patterns
    article_elements = soup.select('.faq-item, article, .question-item, .article-container')
    
    if not article_elements:
        # If we can't find specific containers, check all links
        links = soup.find_all('a', href=True)
        for link in links:
            href = link['href']
            text = link.get_text(strip=True)
            
            # Check for KA pattern in href or specific patterns
            if ka_pattern.search(href) or '/questions/' in href:
                full_url = href if href.startswith('http') else f"https://www.ssa.gov{href if href.startswith('/') else '/' + href}"
                articles.append({
                    'title': text,
                    'url': full_url,
                    'text': text  # You might want to fetch the full text separately
                })
    else:
        # Process found article containers
        for article in article_elements:
            title_elem = article.select_one('h2, h3, h4, .title')
            title = title_elem.get_text(strip=True) if title_elem else "No title found"
            
            # Try to find a link or article ID
            link = article.select_one('a[href]')
            if link:
                href = link['href']
                full_url = href if href.startswith('http') else f"https://www.ssa.gov{href if href.startswith('/') else '/' + href}"
            else:
                # Try to extract KA ID from text
                article_text = article.get_text()
                ka_match = ka_pattern.search(article_text)
                if ka_match:
                    article_id = ka_match.group(0)
                    full_url = f"https://www.ssa.gov/faqs/en/questions/{article_id}.html"
                else:
                    full_url = None
            
            content = article.get_text(strip=True)
            
            articles.append({
                'title': title,
                'url': full_url,
                'text': content
            })
    
    return articles

# Extract FAQ articles
articles = extract_ssa_faq_urls()

# Print first few for debugging
for i, article in enumerate(articles[:5]):
    print(f"Article {i+1}:")
    print(f"  Title: {article['title']}")
    print(f"  URL: {article.get('url', 'No URL found')}")
    print(f"  Text length: {len(article['text'])}")
    print()

# Use these articles for Pinecone upload
if articles:
    texts = [article['text'] for article in articles]
    urls = [article.get('url') for article in articles]
    
    # Now upload to Pinecone with the extracted texts and URLs
    upload_texts_to_pinecone(texts, show_progress_bar=True, urls=urls)
else:
    print("No articles found to upload")

Article 1:
  Title: How can I get help from Social Security?
  URL: https://www.ssa.gov/faqs/en/questions/KA-10037.html
  Text length: 40

Article 2:
  Title: What should I do if I receive a call from someone claiming to be a Social Security employee?
  URL: https://www.ssa.gov/faqs/en/questions/KA-10018.html
  Text length: 92

Article 3:
  Title: How do I schedule, reschedule, or cancel an appointment?
  URL: https://www.ssa.gov/faqs/en/questions/KA-02771.html
  Text length: 56

Article 4:
  Title: What happens if I work and get Social Security retirement benefits?
  URL: https://www.ssa.gov/faqs/en/questions/KA-01921.html
  Text length: 67

Article 5:
  Title: Who is eligible to receive Social Security survivors benefits and how do I apply?
  URL: https://www.ssa.gov/faqs/en/questions/KA-02083.html
  Text length: 81



100%|██████████| 2/2 [00:00<00:00,  3.48it/s]


In [38]:
index.describe_index_stats()

{'dimension': 1536,
 'index_fullness': 0.0,
 'metric': 'cosine',
 'namespaces': {'': {'vector_count': 8}},
 'total_vector_count': 8,
 'vector_type': 'dense'}

In [41]:
# Call query_from_pinecone with a search query
results = query_from_pinecone("Can I get social security service by phone?")

# Iterate over the results
for result in results:
    print(f"ID: {result['id']}")
    print(f"Score: {result['score']}")
    print(f"Text: {result['metadata']['text']}")
    
    # Safely access the URL - now we know it exists in the metadata
    if 'url' in result['metadata']:
        print(f"URL: {result['metadata']['url']}")
    else:
        print("URL: Not available")
        
    print()

ID: f3b799be58db456c2474f017f9abed22
Score: 0.664267659
Text: How can I get help from Social Security?
URL: https://www.ssa.gov/faqs/en/questions/KA-10037.html

ID: 8d0476637539cdf884f8e450cb03adaa
Score: 0.574349344
Text: What should I do if I receive a call from someone claiming to be a Social Security employee?
URL: https://www.ssa.gov/faqs/en/questions/KA-10018.html

ID: e6e7c3cff7c636a14c08b3fdb8799213
Score: 0.570792496
Text: How do I apply for Social Security retirement benefits?
URL: https://www.ssa.gov/faqs/en/questions/KA-01891.html



## Introducing the G(enerate) in the RAG

In [73]:
#pip install supabase
from supabase import create_client, Client
from typing import Dict, Optional, Any, List, Dict, Tuple
import pandas as pd
from pydantic import BaseModel, Field

In [None]:
# with open('.env', 'a') as env_file:
#     env_file.write(f'SUPABASE_URL={YOUR_SUPABASE_URL}\n')
#     env_file.write(f'SUPABASE_KEY={YOUR_SUPABASE_KEY}\n')

In [57]:
"""
We are introducing cost projection for our Retrieval-Augmented Generation (RAG) system using Large Language Models (LLMs). 
To achieve this, we will utilize Supabase, a database-as-a-service built on PostgreSQL, to store and track the number of tokens 
processed by our GPT-4 model. This is crucial because GPT-4 charges are based on the number of tokens used, both for input and output. 
By storing this data, we can analyze and project costs effectively.
"""
supabase_url: str = os.environ.get("SUPABASE_URL")
supabase_key: str = os.environ.get("SUPABASE_KEY")
supabase: Client = create_client(supabase_url, supabase_key)

# Create client - make sure we're passing string values, not None
if supabase_url and supabase_key:
    supabase = create_client(supabase_url, supabase_key)
    print("Supabase client created successfully!")
else:
    print("Missing Supabase credentials. Check your environment variables.")

Supabase client created successfully!


In [71]:
# Define a class for the Chat LLM
class ChatLLM(BaseModel):
    model: str = Field(default="gpt-4o", description="The model to use for the LLM.")
    temperature: float = Field(default = 0.0, description="The temperature for the LLM.")
    # max_tokens: int = Field(default=150, description="The maximum number of tokens to generate.")
    # top_p: float = Field(default=1.0, description="Top-p sampling parameter.")
    # frequency_penalty: float = Field(default=0.0, description="Frequency penalty for the LLM.")
    # presence_penalty: float = Field(default=0.0, description="Presence penalty for the LLM.")

    # Method to generate a response from the model given an input
    def generate(self, prompt: str, stop: List[str] = None) -> Dict[str, Any]:
        # Here we would call the actual LLM API to get a response
        response = client.chat.completions.create(
            model = self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature = self.temperature,
            stop = stop
        )
        
        # Insert the details of the prompt and response into the 'cost_projecting' table in Supabase
        supabase.table('cost_projecting').insert({
            "prompt": prompt,
            "response": response.choices[0].message.content,
            "input_tokens": response.usage.prompt_tokens,
            "output_tokens": response.usage.completion_tokens,
            "model": self.model,
            'inference_params': {
                'temperature': self.temperature,
                'stop': stop
            },
            'is_openai': True,
            'app': 'RAG'
        }).execute()

        # Return the response from the model
        return response.choices[0].message.content

In [72]:
c = ChatLLM()
c.generate('hi')

'Hello! How can I assist you today?'

In [82]:
FINAL_ANSWER_TOKEN = "Assistant Response:"
STOP = '[END]'
PROMPT_TEMPLATE = """ Today is {today} and you can retrieve information from the database.
Response the user's input as best as you can. Here is an example of the format:

[START]
User Input: The input question you must answer
Context: Retrieved context from the database
Context URL: context URL
Context Score: A score from 0 - 1 of how strong information is a match
Assistant Thought: This context has sufficient information to answer the question
Assistant Response: Your final answer to the original input question which could be I dont have 
sufficient information to answer the question.
[END]

[START]
User Input: Another input question you must answer
Context: More retrieved context from the database
Context URL: context URL
Context Score: A score from 0 - 1 of how strong information is a match
Assistant Thought: This context has sufficient information to answer the question   
Assistant Response: Your final answer to the second input question which could be 
I dont have sufficient information to answer the question.
[END]

[START]
User Input: Another input question you must answer
Context: NO CONTEXT FOUND
Context URL: NONE
Context Score: 0.0
Assistant Thought: We either could not find something or we dont need to look something up
[END]

Begin:

{running_convo}
"""

class RagBot(BaseModel):
    llm: ChatLLM
    prompt_template: str = PROMPT_TEMPLATE
    stop_pattern: List[str] = [STOP]
    user_inputs: List[str] = []
    ai_responses: List[str] = []
    contexts: List[Tuple[str, str, float]] = []
    verbose: bool = False
    threshold: float = 0.6

    def query_from_pinecone(self, query: str, top_k: int = 1, namespace = None) -> List[Dict[str, Any]]:
        """
        Query the Pinecone index for the most relevant documents to the input query.
        """
        # Call the external function with the correct parameters
        results = query_from_pinecone(query=query, top_k=top_k, namespace=namespace)
        return results

    @property
    def running_convo(self) -> str:
        """
        Construct the running conversation string from user inputs, AI responses, and contexts.
        """
        convo = ""
        for index in range(len(self.user_inputs)):
            convo += f"[START]\nUser Input: {self.user_inputs[index]}\n"
            convo += f"Context: {self.contexts[index][0]}\nContext URL: {self.contexts[index][1]}\nContext Score: {self.contexts[index][2]}\n"
            if len(self.ai_responses) > index:
                convo += self.ai_responses[index]
                convo += '\n[END]\n'
        return convo.strip()

    def run(self, question: str):
        self.user_inputs.append(question)
        top_response = self.query_from_pinecone(question, top_k=1)[0]
        print(top_response['score'])
        if top_response['score'] >= self.threshold:
            self.contexts.append((
                top_response['metadata']['text'], 
                top_response['metadata']['url'], 
                top_response['score']
            ))
        else:
            self.contexts.append(("NO CONTEXT FOUND", "NONE", 0.0))
        
        prompt = self.prompt_template.format(
            today = datetime.now().isoformat(),
            running_convo = self.running_convo
        )

        if self.verbose:
            print("-" * 50)
            print("PROMPT")
            print("-" * 50)
            print(prompt)
            print("END PROMPT")
            print("-" * 50)

        generated = self.llm.generate(prompt, stop = self.stop_pattern)
        if self.verbose:
            print("-" * 50)
            print("GENERATED")
            print("-" * 50)
            print(generated)
            print("END GENERATED")
            print("-" * 50)
        self.ai_responses.append(generated)

        if FINAL_ANSWER_TOKEN in generated:
            generated = generated.split(FINAL_ANSWER_TOKEN)[-1]
        return generated

In [83]:
r = RagBot(llm = ChatLLM(temperature = 0.0), stop_pattern = ['[END]'])
print(r.run("Can I get social security service by phone?"))

0.664206624
 Yes, you can get Social Security services by phone. You can contact the Social Security Administration at their toll-free number, 1-800-772-1213, for assistance.


In [84]:
print(r.running_convo)

[START]
User Input: Can I get social security service by phone?
Context: How can I get help from Social Security?
Context URL: https://www.ssa.gov/faqs/en/questions/KA-10037.html
Context Score: 0.664206624
Assistant Thought: This context has sufficient information to answer the question.
Assistant Response: Yes, you can get Social Security services by phone. You can contact the Social Security Administration at their toll-free number, 1-800-772-1213, for assistance.
[END]


In [86]:
response = supabase.table('cost_projecting').select('*').eq('app', 'RAG').execute()
completions_df = pd.DataFrame(response.data)

completions_df.head()

Unnamed: 0,prompt,response,input_tokens,output_tokens,model,inference_params,is_openai,app
0,hi,Hello! How can I assist you today?,8,10,gpt-4o,"{'stop': None, 'temperature': 0.0}",True,RAG
1,Today is 2025-04-15T14:49:10.884844 and you c...,Assistant Thought: This context has sufficient...,340,54,gpt-4o,"{'stop': ['[END]'], 'temperature': 0.0}",True,RAG
2,Today is 2025-04-15T14:51:41.203644 and you c...,Assistant Thought: This context has sufficient...,340,54,gpt-4o,"{'stop': ['[END]'], 'temperature': 0.0}",True,RAG


In [87]:
prices = { # per 1M tokens
    'gpt-3.5-turbo': {
        'prompt': 0.5,
        'completion': 1.5,
    },
    'gpt-4o': {
        'prompt': 5,
        'completion': 15,
    },
}

def calculate_cost(input_tokens, output_tokens, model):
    if model not in prices:
        return None
    
    prompt_cost = input_tokens/1e6
    completion_cost = output_tokens/1e6

    return prompt_cost + completion_cost

calculate_cost(354, 400, 'gpt-3.5-turbo'), calculate_cost(354, 400, 'gpt-4o')

(0.000754, 0.000754)

In [89]:
# calculate cost over every row
completions_df['cost'] = completions_df.apply(
    lambda row: calculate_cost(row['input_tokens'], row['output_tokens'], row['model']), axis=1
)

## Use Llama-3 as Our Generator

In [103]:
"""
To access the gated Llama-3 model, you need a HuggingFace access token. Follow these steps:

1. Generate your HuggingFace access token at: https://huggingface.co/settings/tokens
2. Request access to the Llama-3.3-70B-Instruct model here: https://huggingface.co/meta-llama/Llama-3.3-70B-Instruct
3. Log in using the HuggingFace CLI:
    - Run `huggingface-cli login` in your terminal.
    - Paste your token when prompted (input will not be visible).
    - If you already have a token saved, you can check it with `huggingface-cli whoami` or log out using `huggingface-cli logout`.

Once logged in, your token will be securely stored on your machine and used for authentication.
"""

from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-3.3-70B-Instruct")

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


In [104]:
terminators = [
    tokenizer.eos_token_id,
    tokenizer.convert_tokens_to_ids("<|eot_id|>"),
    tokenizer.convert_tokens_to_ids("assistant"), 
]

In [None]:
def test_prompt_llama_3_70b(prompt, suppress= False, **kwargs):
    API_URL = 'create your huggingface endpoint url for llama3.3-70b-instruct here'
    headers = {
        "Accept": "application/json",
        "Authorization": f"Bearer {os.environ.get('HUGGINGFACE_TOKEN')}",
        "Content-Type": "application/json"
    }
    llama_prompt = f"<|begin_of_text|><|start_header_id|>user<end_header_id|>\n\n{prompt}<|eot_id|><|start_header_id|>assistant<end_header_id|>\n\n"

    def query(payload):
        response = requests.post(API_URL, headers = headers, json = payload)
        return response.json()
    
    kwargs["return_text"] = False
    kwargs["return_full_text"] = False
    kwargs["max_new_tokens"] = 512
    kwargs["stop"] = ["<|end_of_text|>", "<|eot_id|>"]

    output = query(
        {
            "inputs": llama_prompt,
            "parameters": kwargs,
            "options": {
                "use_cache": False,
                "wait_for_model": True,
            },
        }
    )

    answer = output[0]['generated_text']
    if not suppress:
        print(f"PROMPT:\n---------\n{llama_prompt}\n---------\nRESPONSE\n---------\n{answer}\n")
    else:
        return answer
    
test_prompt_llama_3_70b('1+1 = ?')

In [105]:
class LlamaChatLLM(ChatLLM):
    temperature: float = Field(default = 0.3, description="The temperature for the LLM.")
    max_new_tokens: int = Field(default = 256, description="The maximum number of tokens to generate.")
    do_sample: bool = Field(default = True, description="Whether to sample or not.")

    def generate(self, prompt:str, stop: List[str] = None) -> Dict[str, Any]:
        # Here we would call the actual LLM API to get a response
        response = test_prompt_llama_3_70b(prompt, suppress = True)

        # Return the response from the model
        return response

In [None]:
llama_llm = LlamaChatLLM(temperature = 0.05)
llama_llm.generate("Who is the president of the United States?")

In [None]:
llama_rag = RagBot(llm = llama_llm, verbose = False, stop_pattern = ['[END]'])
print(llama_rag.run("Can I get social security service by phone?"))

In [None]:
llama_rag.user_inputs

In [None]:
llama_rag.ai_responses

In [None]:
llama_rag.contexts