In [None]:
! pip install dotenv boto3 langchain_community pinecone transformers

Collecting dotenv
  Downloading dotenv-0.9.9-py2.py3-none-any.whl.metadata (279 bytes)
Collecting boto3
  Downloading boto3-1.38.0-py3-none-any.whl.metadata (6.6 kB)
Collecting langchain_community
  Downloading langchain_community-0.3.22-py3-none-any.whl.metadata (2.4 kB)
Collecting pinecone
  Downloading pinecone-6.0.2-py3-none-any.whl.metadata (9.0 kB)
Collecting python-dotenv (from dotenv)
  Downloading python_dotenv-1.1.0-py3-none-any.whl.metadata (24 kB)
Collecting botocore<1.39.0,>=1.38.0 (from boto3)
  Downloading botocore-1.38.0-py3-none-any.whl.metadata (5.7 kB)
Collecting jmespath<2.0.0,>=0.7.1 (from boto3)
  Downloading jmespath-1.0.1-py3-none-any.whl.metadata (7.6 kB)
Collecting s3transfer<0.13.0,>=0.12.0 (from boto3)
  Downloading s3transfer-0.12.0-py3-none-any.whl.metadata (1.7 kB)
Collecting langchain-core<1.0.0,>=0.3.55 (from langchain_community)
  Downloading langchain_core-0.3.55-py3-none-any.whl.metadata (5.9 kB)
Collecting langchain<1.0.0,>=0.3.24 (from langchain_co

In [None]:
from google.colab import drive
drive.mount('/content/drive')

from dotenv import load_dotenv
load_dotenv('/content/drive/My Drive/RAG_PROJECT/.env')

import os
import logging
import hashlib
import json
import re
import unicodedata
import string
import time

import boto3
from dotenv import load_dotenv
from tqdm import tqdm
# import nltk
# nltk.download('stopwords')
# from nltk.corpus import stopwords
# STOPWORDS = set(stopwords.words("english"))

import torch
from transformers import DPRContextEncoder, DPRContextEncoderTokenizer

from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import (
    UnstructuredPDFLoader, UnstructuredWordDocumentLoader
)
from pinecone import Pinecone, ServerlessSpec

########################################
# Config & Environment Setup
########################################
load_dotenv()
AWS_REGION = "us-east-2"
DYNAMO_TABLE_NAME = "ScrapedPages"

PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
PINECONE_API_ENV = os.getenv("PINECONE_API_ENV", "us-east-1")
INDEX_NAME = "ut-rag-app-two"

EMBED_DIM = 768      # DPR context encoder outputs 768-d vectors
BATCH_SIZE = 100
MAX_VECTOR_PAYLOAD_BYTES = 4 * 1024 * 1024  # 4MB
EMBED_BATCH_SIZE = 500  # number of passages to embed in one forward pass

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

########################################
# Pinecone Initialization
########################################
pc = Pinecone(api_key=PINECONE_API_KEY)
if INDEX_NAME not in pc.list_indexes().names():
    pc.create_index(
        name=INDEX_NAME,
        dimension=EMBED_DIM,
        metric="dotproduct",  # typical for DPR
        spec=ServerlessSpec(cloud='aws', region=PINECONE_API_ENV)
    )
pinecone_index = pc.Index(INDEX_NAME)

########################################
# DynamoDB Initialization
########################################
dynamo = boto3.resource(
    "dynamodb",
    region_name=AWS_REGION,
    aws_access_key_id=os.getenv("AWS_ACCESS_KEY"),
    aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
)
table = dynamo.Table(DYNAMO_TABLE_NAME)

########################################
# GPU Setup & DPR Context Encoder
########################################
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"Using device: {device}")

ctx_tokenizer = DPRContextEncoderTokenizer.from_pretrained("facebook/dpr-ctx_encoder-single-nq-base")
ctx_encoder = DPRContextEncoder.from_pretrained("facebook/dpr-ctx_encoder-single-nq-base")
ctx_encoder.to(device)
ctx_encoder.eval()

def encode_passages_batch(texts: list) -> list:
    """
    Encodes a list of passages using the DPR context encoder on GPU.
    Uses truncation and padding to 512 tokens.
    Returns a list of embeddings (each a list of 768 floats).
    """
    inputs = ctx_tokenizer(
        texts,
        return_tensors="pt",
        truncation=True,
        max_length=512,
        padding=True
    )
    inputs = {k: v.to(device) for k, v in inputs.items()}
    with torch.no_grad():
        batch_embeddings = ctx_encoder(**inputs).pooler_output  # shape [batch_size, 768]
    return batch_embeddings.cpu().tolist()

########################################
# Preprocessing
########################################
def preprocess_text(text: str) -> str:
    """
    Comprehensive preprocessing:
      1) Normalize unicode
      2) Remove HTML tags
      3) Collapse whitespace
      4) (Optional) Remove stopwords
    """
    text = unicodedata.normalize("NFKD", text)
    text = re.sub(r"<[^>]+>", " ", text)
    text = re.sub(r"\s+", " ", text).strip()
    # Uncomment below to remove punctuation or stopwords if desired
    # text = text.translate(str.maketrans("", "", string.punctuation))
    # tokens = text.split()
    # tokens = [t for t in tokens if t not in STOPWORDS]
    # text = " ".join(tokens)
    return text

########################################
# DynamoDB Fetch and Document Building
########################################
def fetch_items_from_dynamodb() -> list:
    logger.info("Fetching items from DynamoDB table: %s", DYNAMO_TABLE_NAME)
    items = []
    response = table.scan()
    items.extend(response.get("Items", []))
    while 'LastEvaluatedKey' in response:
        response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
        items.extend(response.get("Items", []))
    logger.info(f"Fetched {len(items)} items from DynamoDB.")
    return items

def build_documents_from_items(items: list) -> list:
    docs = []
    for item in items:
        raw_text = item.get("scraped_text")
        url = item.get("url") or "N/A"
        if not raw_text or not isinstance(raw_text, str):
            logger.warning("Skipping item with no or non-string 'scraped_text': %s", item)
            continue
        text = preprocess_text(raw_text)
        doc = Document(page_content=text, metadata={"url": url})
        docs.append(doc)
    logger.info(f"Built {len(docs)} Documents from Dynamo items.")
    return docs

########################################
# Document Splitting
########################################
def split_documents(docs: list, chunk_size=512, chunk_overlap=100) -> list:
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap
    )
    return splitter.split_documents(docs)

def estimate_payload_size(vector: dict) -> int:
    return len(json.dumps(vector).encode("utf-8"))

########################################
# Embedding in Batches (GPU Optimized)
########################################
def embed_passages_in_batches(texts: list, batch_size=EMBED_BATCH_SIZE) -> list:
    """
    Process texts in batches to leverage GPU for encoding.
    """
    all_embeds = []
    total = len(texts)
    for i in range(0, total, batch_size):
        batch = texts[i : i + batch_size]
        logger.info(f"Encoding passage batch {i} to {i + len(batch)} / {total} ...")
        start = time.time()
        batch_embeds = encode_passages_batch(batch)
        elapsed = time.time() - start
        logger.info(f"Batch embed took {elapsed:.2f} seconds")
        all_embeds.extend(batch_embeds)
    return all_embeds

########################################
# Upsert to Pinecone
########################################
def index_documents(chunks: list, upsert_batch_size: int = BATCH_SIZE):
    if not chunks:
        logger.warning("No chunks to index.")
        return
    texts = [c.page_content for c in chunks]
    embedded_vectors = embed_passages_in_batches(texts, EMBED_BATCH_SIZE)
    vectors = []
    for chunk, vec in zip(chunks, embedded_vectors):
        text = chunk.page_content
        vector_id = hashlib.sha256(text.encode()).hexdigest()
        meta = {
            "text": text,
            "url": chunk.metadata["url"]
        }
        vectors.append({
            "id": vector_id,
            "values": vec,
            "metadata": meta
        })
    if not vectors:
        logger.warning("No vectors to upsert.")
        return
    logger.info(f"Upserting {len(vectors)} vectors to Pinecone in batches of {upsert_batch_size}...")
    batch = []
    current_size = 0
    for vec in tqdm(vectors, desc="Indexing chunks"):
        est_size = estimate_payload_size(vec)
        if current_size + est_size > MAX_VECTOR_PAYLOAD_BYTES or len(batch) >= upsert_batch_size:
            pinecone_index.upsert(vectors=batch)
            batch = []
            current_size = 0
        batch.append(vec)
        current_size += est_size
    if batch:
        pinecone_index.upsert(vectors=batch)
    logger.info("Indexing complete.")

########################################
# Main Workflow
########################################
def rag_workflow_dynamodb():
    """DPR ingestion pipeline: Dynamo -> chunk -> DPR encode -> Pinecone (768D, dotproduct)."""
    items = fetch_items_from_dynamodb()
    if not items:
        logger.warning("No items found in DynamoDB.")
        return
    docs = build_documents_from_items(items)
    if not docs:
        logger.warning("No valid docs to index.")
        return
    chunks = split_documents(docs)
    logger.info(f"Created {len(chunks)} chunks from documents.")
    index_documents(chunks)

if __name__ == "__main__":
    rag_workflow_dynamodb()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/28.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/492 [00:00<?, ?B/s]

The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'DPRQuestionEncoderTokenizer'. 
The class this function is called from is 'DPRContextEncoderTokenizer'.


pytorch_model.bin:   0%|          | 0.00/438M [00:00<?, ?B/s]

Some weights of the model checkpoint at facebook/dpr-ctx_encoder-single-nq-base were not used when initializing DPRContextEncoder: ['ctx_encoder.bert_model.pooler.dense.bias', 'ctx_encoder.bert_model.pooler.dense.weight']
- This IS expected if you are initializing DPRContextEncoder from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DPRContextEncoder from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


model.safetensors:   0%|          | 0.00/438M [00:00<?, ?B/s]

Indexing chunks: 100%|██████████| 259522/259522 [21:39<00:00, 199.69it/s]


In [None]:
  import multiprocessing
  cores = multiprocessing.cpu_count()
  print(cores)

12
