In [None]:
import os
from typing import Dict, List
from dotenv import load_dotenv
from pinecone import Pinecone, ServerlessSpec
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings

In [None]:
load_dotenv()

PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
pc = Pinecone(api_key=PINECONE_API_KEY)

In [None]:
# ---------------------------------------------
# Utility Functions
# ---------------------------------------------

In [None]:
def index_exists(index_name: str) -> bool:
    return index_name in pc.list_indexes().names()

In [None]:
def namespace_exists(index_name: str, namespace: str) -> bool:
    idx = pc.Index(index_name)
    stats = idx.describe_index_stats()
    return namespace in stats.get("namespaces", {})


In [None]:
def list_documents(path: str) -> List[str]:
    files = []
    for root, _, filenames in os.walk(path):
        for f in filenames:
            if f.lower().endswith((".txt", ".md", ".pdf")):
                files.append(os.path.join(root, f))
    return files

In [None]:
# ---------------------------------------------
# Chunking & Embedding
# ---------------------------------------------

In [None]:
def recursive_chunking(path: str, chunk_size: int, chunk_overlap: int) -> List[str]:
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap
    )

    chunks = []
    for file in list_documents(path):
        with open(file, "r", errors="ignore") as f:
            text = f.read()
            chunks.extend(splitter.split_text(text))

    return chunks

In [None]:
def embed_chunks(chunks: List[str], embedding_model: str):
    embeddings = OpenAIEmbeddings(model=embedding_model)
    return embeddings.embed_documents(chunks)

In [None]:
# ---------------------------------------------
# Index Creation
# ---------------------------------------------

In [None]:
def create_index_if_needed(config: Dict):
    index_name = config["index_name"]

    if index_exists(index_name):
        return  # already exists

    pc.create_index(
        name=index_name,
        dimension=config["dimensions"],
        metric=config["metric"],
        spec=ServerlessSpec(cloud="aws", region="us-east-1")
    )

In [None]:
def create_namespace_if_needed(config: Dict):
    index_name = config["index_name"]
    namespace = config["namespace_name"]

    if not namespace_exists(index_name, namespace):
        # Initialize namespace (Pinecone auto-creates on first upsert)
        idx = pc.Index(index_name)
        idx.upsert(
            vectors=[("init-vector", [0.0] * config["dimensions"])],
            namespace=namespace
        )
        idx.delete(ids=["init-vector"], namespace=namespace)


In [None]:
# ---------------------------------------------
# Indexing Pipeline
# ---------------------------------------------

In [None]:
def index_documents(config: Dict):
    index_name = config["index_name"]
    namespace = config["namespace_name"]

    reindex = not config["skip_if_namespace_exists"]

    # Case 1 — namespace exists & skip = True
    if namespace_exists(index_name, namespace) and config["skip_if_namespace_exists"]:
        return

    # Case 2 — namespace exists & skip = False → reindex
    if namespace_exists(index_name, namespace) and reindex:
        delete_namespace(config)

    # Guarantee namespace exists before indexing
    create_namespace_if_needed(config)

    # Get content
    chunks = recursive_chunking(
        config["documents_path"],
        config["chunk_size"],
        config["chunk_overlap"]
    )

    vectors = embed_chunks(chunks, config["embedding_model"])

    # Format for Pinecone
    payload = [(f"id-{i}", vectors[i], {"text": chunks[i]}) for i in range(len(chunks))]

    # Upsert
    idx = pc.Index(index_name)
    idx.upsert(vectors=payload, namespace=namespace)


In [None]:
# ---------------------------------------------
# Delete Functions
# ---------------------------------------------


In [None]:
def delete_namespace(config: Dict):
    idx = pc.Index(config["index_name"])
    idx.delete(delete_all=True, namespace=config["namespace_name"])


In [None]:
def delete_index(config: Dict):
    index_name = config["index_name"]
    if index_exists(index_name):
        pc.delete_index(index_name)

In [None]:
# ---------------------------------------------
# Update (Reindex)
# ---------------------------------------------

In [None]:
"""
Pinecone Index Manager
Production-ready, no logging, config-driven
"""

import os
from typing import Dict, List
from dotenv import load_dotenv
from pinecone import Pinecone, ServerlessSpec
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings

load_dotenv()

PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
pc = Pinecone(api_key=PINECONE_API_KEY)


# ---------------------------------------------
# Utility Functions
# ---------------------------------------------

def index_exists(index_name: str) -> bool:
    return index_name in pc.list_indexes().names()


def namespace_exists(index_name: str, namespace: str) -> bool:
    idx = pc.Index(index_name)
    stats = idx.describe_index_stats()
    return namespace in stats.get("namespaces", {})


def list_documents(path: str) -> List[str]:
    files = []
    for root, _, filenames in os.walk(path):
        for f in filenames:
            if f.lower().endswith((".txt", ".md", ".pdf")):
                files.append(os.path.join(root, f))
    return files


# ---------------------------------------------
# Chunking & Embedding
# ---------------------------------------------

def get_chunks(path: str, chunk_size: int, chunk_overlap: int) -> List[str]:
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap
    )

    chunks = []
    for file in list_documents(path):
        with open(file, "r", errors="ignore") as f:
            text = f.read()
            chunks.extend(splitter.split_text(text))

    return chunks


def embed_chunks(chunks: List[str], embedding_model: str):
    embeddings = OpenAIEmbeddings(model=embedding_model)
    return embeddings.embed_documents(chunks)


# ---------------------------------------------
# Index Creation
# ---------------------------------------------

def create_index_if_needed(config: Dict):
    index_name = config["index_name"]

    if index_exists(index_name):
        return  # already exists

    pc.create_index(
        name=index_name,
        dimension=config["dimensions"],
        metric=config["metric"],
        spec=ServerlessSpec(cloud="aws", region="us-east-1")
    )


def create_namespace_if_needed(config: Dict):
    index_name = config["index_name"]
    namespace = config["namespace_name"]

    if not namespace_exists(index_name, namespace):
        # Initialize namespace (Pinecone auto-creates on first upsert)
        idx = pc.Index(index_name)
        idx.upsert(
            vectors=[("init-vector", [0.0] * config["dimensions"])],
            namespace=namespace
        )
        idx.delete(ids=["init-vector"], namespace=namespace)


# ---------------------------------------------
# Indexing Pipeline
# ---------------------------------------------

def index_documents(config: Dict):
    index_name = config["index_name"]
    namespace = config["namespace_name"]

    reindex = not config["skip_if_namespace_exists"]

    # Case 1 — namespace exists & skip = True
    if namespace_exists(index_name, namespace) and config["skip_if_namespace_exists"]:
        return

    # Case 2 — namespace exists & skip = False → reindex
    if namespace_exists(index_name, namespace) and reindex:
        delete_namespace(config)

    # Guarantee namespace exists before indexing
    create_namespace_if_needed(config)

    # Get content
    chunks = get_chunks(
        config["documents_path"],
        config["chunk_size"],
        config["chunk_overlap"]
    )

    vectors = embed_chunks(chunks, config["embedding_model"])

    # Format for Pinecone
    payload = [(f"id-{i}", vectors[i], {"text": chunks[i]}) for i in range(len(chunks))]

    # Upsert
    idx = pc.Index(index_name)
    idx.upsert(vectors=payload, namespace=namespace)


# ---------------------------------------------
# Delete Functions
# ---------------------------------------------

def delete_namespace(config: Dict):
    idx = pc.Index(config["index_name"])
    idx.delete(delete_all=True, namespace=config["namespace_name"])


def delete_index(config: Dict):
    index_name = config["index_name"]
    if index_exists(index_name):
        pc.delete_index(index_name)


# ---------------------------------------------
# Update (Reindex)
# ---------------------------------------------

def update_index(config: Dict, new_config: Dict):
    """
    Update index or namespace content. 
    new_config contains new doc path, chunking, embedding model, etc.
    """

    # Ensure index exists
    create_index_if_needed(new_config)

    # Reindex only the namespace
    delete_namespace(new_config)
    index_documents(new_config)
