<a href="https://colab.research.google.com/github/melhem-m/AI-Training-Colab/blob/main/ENEC_rag_vector_stores_pineconeindexdemo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Pinecone Vector Store

If you're opening this Notebook on colab, you will probably need to install LlamaIndex ðŸ¦™.

In [None]:
%pip install llama-index llama-index-vector-stores-pinecone

In [None]:
import logging
import sys
import os

logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))

#### Creating a Pinecone Index

In [None]:
from pinecone import Pinecone, ServerlessSpec

In [None]:
import openai
from google.colab import userdata

# Retrieve the OpenAI API key from Google Colab secrets
openai.api_key = userdata.get('openai')

if openai.api_key:
    os.environ["OPENAI_API_KEY"] = openai.api_key

In [None]:
#PUT Tyour api key

from google.colab import userdata


api_key = userdata.get('PINECONE_API_KEY')

if api_key:
    os.environ["PINECONE_API_KEY"] = api_key


pc = Pinecone(api_key=api_key)

In [None]:
api_key

In [None]:
# dimensions are for text-embedding-ada-002

pc.create_index(
    name="quickstart",
    dimension=1536,
    metric="euclidean",
    spec=ServerlessSpec(cloud="aws", region="us-east-1"),
)


In [None]:
pinecone_index = pc.Index("quickstart")

#### Load documents, build the PineconeVectorStore and VectorStoreIndex

In [None]:
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.vector_stores.pinecone import PineconeVectorStore
from IPython.display import Markdown, display

Download Data

In [None]:
# load documents
documents = SimpleDirectoryReader("./data").load_data()

In [None]:
documents

The previous error occurred because the variable `pinecone_index` was used before it was defined. To fix this, I have combined the code that defines `pinecone_index` and the code that uses it into a single cell.

In [None]:
# Initialize Pinecone index
pinecone_index = pc.Index("quickstart")

# initialize without metadata filter
from llama_index.core import StorageContext

vector_store = PineconeVectorStore(pinecone_index=pinecone_index)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
    documents, storage_context=storage_context
)

#### Query Index

May take a minute or so for the index to be ready!

In [None]:
# set Logging to DEBUG for more detailed outputs
import time

query_engine = index.as_query_engine()
start_time = time.time()
response = query_engine.query("what is this document about")
display(Markdown(f"<b>{response}</b>"))

# End timer and print duration
end_time = time.time()
print(f"\nExecution Time: {end_time - start_time:.2f} seconds")

In [None]:
display(Markdown(f"<b>{response}</b>"))

In [None]:
pip install gradio

In [None]:

# --- Imports ---
import os
import logging
import sys
import gradio as gr
from IPython.display import Markdown, display

from pinecone import Pinecone, ServerlessSpec
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, StorageContext
from llama_index.vector_stores.pinecone import PineconeVectorStore

# --- Initialize Pinecone ---

index_name = "quickstart"
dimension = 1536

# Delete index if exists (optional: mirrors original behavior)
if index_name in [idx["name"] for idx in pc.list_indexes()]:
    pc.delete_index(index_name)

# Create Pinecone index
pc.create_index(
    name=index_name,
    dimension=dimension,
    metric="euclidean",
    spec=ServerlessSpec(cloud="aws", region="us-east-1"),
)

pinecone_index = pc.Index(index_name)

# --- Load Data ---
# Create folders & download a sample doc (kept same logic, fixed subfolder creation)

documents = SimpleDirectoryReader("./data").load_data()

# --- Create Index ---
vector_store = PineconeVectorStore(pinecone_index=pinecone_index)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(documents, storage_context=storage_context)

# --- System Prompt (polite + answer-from-document constraint) ---
SYSTEM_PROMPT = """You are Aisha, a polite and professional SOP assistant.
Answer ONLY using the information found in the indexed SOP document(s).
If the answer is not in the document(s), say: "I couldnâ€™t find that in the document."
Keep responses concise, helpful, and courteous.
"""

# --- Query Engine ---
query_engine = index.as_query_engine()
def normalize_for_retrieval(q: str) -> str:
    q0 = q.strip()
    ql = q0.lower().strip()

    # typo help
    ql = ql.replace("safty", "safety")

    # If itâ€™s a "who is X" style question, add the name itself as a strong keyword hint
    if ql.startswith("who is "):
        name = q0[7:].strip()              # take everything after "who is "
        # Boost retrieval toward contact list patterns
        return f'{name} {q0} Contact Information contacts list role title ext extension email phone'

    # Generic contact-style boosting
    contact_triggers = ["extension", "ext", "contact", "phone", "email", "officer", "lead", "supervisor", "manager"]
    if any(t in ql for t in contact_triggers):
        return f'{q0} Contact Information contacts list role title ext extension email phone'

    return q0

def query_doc(user_question: str):
    if not user_question or not user_question.strip():
        return "Please enter a question."
    full_query = f"""{SYSTEM_PROMPT}

User question:
{user_question.strip()}
"""
    try:
        response = query_engine.query(full_query)
        text = str(response).strip()
        # Gentle post-processing to keep it brief/polite
        return text if text else "I couldnâ€™t find that in the document."
    except Exception as e:
        return f"Error: {str(e)}"

# --- Gradio UI (Professional look with logo, centered title) ---
# Use the raw GitHub URL for proper image rendering.
LOGO_URL = "https://raw.githubusercontent.com/Decoding-Data-Science/Omantel/main/Omantel_Logo%20(1).png"

CUSTOM_CSS = """
.gradio-container { font-family: Inter, ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, 'Helvetica Neue', Arial; }
.header-wrap {
    display: grid;
    grid-template-columns: 120px 1fr 120px;
    align-items: center;
    gap: 12px;
    padding: 12px 0 8px;
    border-bottom: 1px solid #eaeaea;
}
.header-logo { display:flex; align-items:center; justify-content:flex-start; }
.header-logo img { height: 48px; object-fit: contain; }
.header-title { text-align:center; }
.header-title h1 {
    margin: 0; font-weight: 700; font-size: 1.5rem; line-height: 1.2;
}
.header-spacer { height: 1px; }
.section { padding-top: 8px; }
.footer-note { text-align:center; font-size: 12px; color:#667085; padding: 8px 0 0; }
label.svelte-1ipelgc, .label-wrap label { font-weight: 600; }
"""

with gr.Blocks(css=CUSTOM_CSS, title="SOP Document QA (LlamaIndex + Pinecone)") as demo:
    # Header with logo (left) and centered title
    with gr.Row(elem_classes="header-wrap"):
        with gr.Column(scale=0, elem_classes="header-logo"):
            gr.HTML(f'<img src="{LOGO_URL}" alt="Omantel Logo" />')
        with gr.Column(scale=1, elem_classes="header-title"):
            gr.HTML("<h1>SOP QA</h1>")
        with gr.Column(scale=0):
            gr.HTML("")  # right-side spacer

    gr.Markdown(
        "Ask questions based on the SOP Document "
        "**Answers come only from the document**. If not found, Iâ€™ll say so."
    )

    with gr.Group(elem_classes="section"):
        inp = gr.Textbox(
            label="Your question",
            placeholder="e.g., Ask in SOP Question?",
            lines=2,
        )
        btn = gr.Button("Submit", variant="primary")
        out = gr.Textbox(label="Answer", lines=8)

    btn.click(fn=query_doc, inputs=inp, outputs=out)
    inp.submit(fn=query_doc, inputs=inp, outputs=out)

    gr.Markdown('<div class="footer-note">LlamaIndex + Pinecone â€¢ Demo</div>')

demo.launch()


Replicate the Deplpyment hugging face
https://huggingface.co/spaces/decodingdatascience/ddsSOP1
openai key


In [None]:
documents

In [None]:
import os
import gradio as gr

from google.colab import userdata  # if you're in Colab; remove if not needed

from pinecone import Pinecone, ServerlessSpec
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, StorageContext, Settings
from llama_index.vector_stores.pinecone import PineconeVectorStore

# LlamaIndex OpenAI LLM
# Install if needed:
!pip install -q llama-index-llms-openai
from llama_index.llms.openai import OpenAI


# ============================
# 0) Keys
# ============================
PINECONE_API_KEY = userdata.get("PINECONE_API_KEY")
if not PINECONE_API_KEY:
    raise ValueError("Missing PINECONE_API_KEY (set in Colab userdata or env var).")

# Make sure you set OPENAI_API_KEY in Colab secrets or env vars
OPENAI_API_KEY = userdata.get("openai")
if not OPENAI_API_KEY:
    raise ValueError("Missing OPENAI_API_KEY (set in Colab userdata or env var).")


# ============================
# 1) Initialize Pinecone
# ============================
pc = Pinecone(api_key=PINECONE_API_KEY)

index_name = "quickstart"
dimension = 1536  # keep as-is (matches common embedding dims)

# Create the index ONLY if it doesn't exist
existing = [idx["name"] for idx in pc.list_indexes()]
if index_name not in existing:
    pc.create_index(
        name=index_name,
        dimension=dimension,
        metric="euclidean",
        spec=ServerlessSpec(cloud="aws", region="us-east-1"),
    )

pinecone_index = pc.Index(index_name)


# ============================
# 2) (Optional) Load Data
# ============================
# If your document is ALREADY in Pinecone, you can keep this commented out.
# If you want to (re)index local docs from ./data, uncomment and switch indexing below.
# documents = SimpleDirectoryReader("./data").load_data()


# ============================
# 3) Connect to Existing Pinecone Vector Store
# ============================
vector_store = PineconeVectorStore(pinecone_index=pinecone_index)
storage_context = StorageContext.from_defaults(vector_store=vector_store)

# âœ… Use existing vectors already stored in Pinecone
index = VectorStoreIndex.from_vector_store(vector_store=vector_store)


# ============================
# 4) LLM (OpenAI)
# ============================
Settings.llm = OpenAI(
    model="gpt-5.2-2025-12-11",
    temperature=0,
    api_key=OPENAI_API_KEY
)


# ============================
# 5) System Prompt (document-first constraint)
# ============================
SYSTEM_PROMPT = """# SYSTEM PROMPT â€” STRICT DOCUMENT QA (CONTACT-AWARE)

You answer ONLY using the DOCUMENT EXCERPTS provided below (these are retrieved from the Pinecone-indexed SOP). Never use outside knowledge. Never guess.

VERY IMPORTANT:
- Many questions in this SOP are answered in a "Contact Information" list. If the user asks about a role/title or a person, scan the excerpts for contact-list patterns like:
  "Safety Officer: <Name> â€“ Ext: <number>"
  "<Role>: <Name> â€“ Ext: <number>"
  "Email:" "Phone:" "Ext:" "Radio:"
- Treat these as equivalent intents:
  - "who is the safety officer" = find the line that begins with "Safety Officer:"
  - "what is the extension of the safety officer" = find the "Safety Officer:" line and extract "Ext"
  - "who is priya" = find "Priya" in the excerpts and report her role and extension if shown
- Be robust to small typos and variants: "safty"â†’"safety", "ext"â†’"extension".

MANDATORY OUTPUT FORMAT (always):
Reasoning: (1â€“2 sentences) Say what keyword(s) you looked for in the excerpts and QUOTE the exact matching line(s).
Conclusion: Give the direct answer in 1 sentence.

DENIAL RULE:
Ifâ€”and only ifâ€”the excerpts contain no direct line that answers the question, reply exactly:
"I'm sorry, I cannot answer that as this information is not available in the document."
"""


# ============================
# Retrieval helpers (MODIFIED)
# ============================
# âœ… Increased recall to avoid needing page numbers
retriever = index.as_retriever(similarity_top_k=15)

def expand_query(q: str) -> str:
    q0 = q.strip()
    ql = q0.lower().strip()

    # typo help
    ql = ql.replace("safty", "safety")

    # If itâ€™s a "who is X" style question, add the name itself as a strong keyword hint
    if ql.startswith("who is "):
        name = q0[7:].strip()
        return (
            f"{name} {q0} "
            "Contact Information contacts list role title ext extension email phone"
        )

    # Generic contact-style boosting
    contact_triggers = [
        "extension", "ext", "contact", "phone", "email",
        "officer", "lead", "supervisor", "manager", "maintenance", "safety"
    ]
    if any(t in ql for t in contact_triggers):
        return (
            f"{q0} "
            "Contact Information contacts list role title ext extension email phone"
        )

    return q0


def query_doc(user_question: str):
    if not user_question or not user_question.strip():
        return "Please enter a question."

    q = user_question.strip()

    # 1) Retrieve relevant excerpts FIRST (with expanded query)
    nodes = retriever.retrieve(expand_query(q))

    if not nodes:
        return "I'm sorry, I cannot answer that as this information is not available in the document."

    # Build a compact context from top retrieved chunks
    excerpts = []
    for i, n in enumerate(nodes[:5], start=1):
        txt = (n.get_content() or "").strip()
        if txt:
            excerpts.append(f"[Excerpt {i}]\n{txt}")

    context_block = "\n\n".join(excerpts).strip()

    if not context_block:
        return "I'm sorry, I cannot answer that as this information is not available in the document."

    # 2) Ask the LLM to answer ONLY from these excerpts
    prompt = f"""{SYSTEM_PROMPT}

DOCUMENT EXCERPTS:
{context_block}

USER QUESTION:
{q}

ANSWER (follow the rules exactly):
"""

    try:
        resp = Settings.llm.complete(prompt)
        text = (resp.text or "").strip()

        if not text:
            return "I'm sorry, I cannot answer that as this information is not available in the document."
        return text
    except Exception as e:
        return f"Error: {str(e)}"


# ============================
# 6) Gradio UI
# ============================
LOGO_URL = "https://raw.githubusercontent.com/melhem-m/AI-Training-Colab/main/enec%20logo.png"

CUSTOM_CSS = """
.gradio-container { font-family: Inter, ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, 'Helvetica Neue', Arial; }
.header-wrap {
    display: grid;
    grid-template-columns: 72px 1fr;
    align-items: center;
    gap: 12px;
    padding: 12px 0 10px;
    border-bottom: 1px solid #eaeaea;
}
.header-logo img { height: 44px; width: 44px; object-fit: contain; border-radius: 10px; }
.header-title h1 { margin: 0; font-weight: 800; font-size: 1.35rem; line-height: 1.2; }
.subtle { color:#667085; font-size: 0.95rem; margin-top: 4px; }
.footer-note { text-align:center; font-size: 12px; color:#667085; padding: 10px 0 0; }
label.svelte-1ipelgc, .label-wrap label { font-weight: 650; }
"""

with gr.Blocks(css=CUSTOM_CSS, title="SOP Document QA (LlamaIndex + Pinecone + OpenAI)") as demo:
    with gr.Row(elem_classes="header-wrap"):
        gr.HTML(f'<div class="header-logo"><img src="{LOGO_URL}" alt="SOP Logo" /></div>')
        gr.HTML('<div class="header-title"><h1>SOP QA</h1><div class="subtle">Answers come only from the document. If not found, Iâ€™ll say so.</div></div>')

    with gr.Group():
        inp = gr.Textbox(
            label="Your question",
            placeholder="e.g., Who is the safety officer? What is their extension?",
            lines=2,
        )
        btn = gr.Button("Submit", variant="primary")
        out = gr.Textbox(label="Answer", lines=10)

    btn.click(fn=query_doc, inputs=inp, outputs=out)
    inp.submit(fn=query_doc, inputs=inp, outputs=out)

    gr.Markdown('<div class="footer-note">LlamaIndex + Pinecone â€¢ OpenAI LLM â€¢ Demo</div>')

demo.launch()
