In [13]:
!pip install langchain openai chromadb tiktoken jq python-dotenv pinecone-client langchain-pinecone

Collecting pinecone-client
Collecting pinecone-client
  Downloading pinecone_client-6.0.0-py3-none-any.whl.metadata (3.4 kB)
  Downloading pinecone_client-6.0.0-py3-none-any.whl.metadata (3.4 kB)
Collecting langchain-pinecone
Collecting langchain-pinecone
  Downloading langchain_pinecone-0.2.13-py3-none-any.whl.metadata (8.6 kB)
  Downloading langchain_pinecone-0.2.13-py3-none-any.whl.metadata (8.6 kB)
Collecting pinecone-plugin-interface<0.0.8,>=0.0.7 (from pinecone-client)
Collecting pinecone-plugin-interface<0.0.8,>=0.0.7 (from pinecone-client)
  Downloading pinecone_plugin_interface-0.0.7-py3-none-any.whl.metadata (1.2 kB)
  Downloading pinecone_plugin_interface-0.0.7-py3-none-any.whl.metadata (1.2 kB)
Collecting langchain-core<0.4.0,>=0.3.21 (from langchain)
Collecting langchain-core<0.4.0,>=0.3.21 (from langchain)
  Downloading langchain_core-0.3.80-py3-none-any.whl.metadata (3.2 kB)
  Downloading langchain_core-0.3.80-py3-none-any.whl.metadata (3.2 kB)
Collecting pinecone<8.0.0,

In [14]:
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.document_loaders import JSONLoader
from langchain_core.documents import Document
from collections import defaultdict
from typing import Dict, List, Tuple, Any
import os
from dotenv import load_dotenv
import tiktoken
import time
import chromadb
import gc
from pinecone import Pinecone, ServerlessSpec

load_dotenv()

True

In [15]:
def group_by_episode_act(docs: List[Document]) -> Dict[Tuple[str, str], List[Document]]:
    """
    Group documents by (episode, act).
    """
    grouped: Dict[Tuple[str, str], List[Document]] = defaultdict(list)
    for doc in docs:
        episode = doc.metadata.get("episode", "unknown_episode")
        act = doc.metadata.get("act", "unknown_act")
        grouped[(episode, act)].append(doc)
    return grouped


def sort_group_by_utterance_start(group: List[Document]) -> List[Document]:
    """
    Sort a list of docs by utterance_start (ascending).
    Missing values are treated as 0.
    """
    return sorted(
        group,
        key=lambda d: (d.metadata.get("utterance_start")
                       if d.metadata.get("utterance_start") is not None
                       else 0.0)
    )


class TranscriptChunker:
    """
    Chunk This American Life–style transcripts into smaller text blocks,
    grouped by episode + act and ordered by utterance_start.

    Chunks are formed by concatenating utterances until `max_words`
    is reached, with optional overlap in terms of utterances.
    """

    def __init__(self,
                 max_words: int = 350,
                 overlap_utterances: int = 2):
        """
        Args:
            max_words: Target maximum words per chunk.
            overlap_utterances: How many utterances from the end of the
                                previous chunk to repeat at the start
                                of the next chunk.
        """
        self.max_words = max_words
        self.overlap_utterances = overlap_utterances

    def _make_chunk_document(
        self,
        episode: str,
        act: str,
        chunk_index: int,
        docs_in_chunk: List[Document]
    ) -> Document:
        """
        Create a new Document representing one chunk, with aggregated metadata.
        """
        text_parts = [d.page_content.strip() for d in docs_in_chunk if d.page_content]
        chunk_text = " ".join(text_parts)

        # Aggregate metadata
        speakers = {d.metadata.get("speaker") for d in docs_in_chunk if d.metadata.get("speaker")}
        roles = {d.metadata.get("role") for d in docs_in_chunk if d.metadata.get("role")}

        starts = [d.metadata.get("utterance_start") for d in docs_in_chunk
                  if d.metadata.get("utterance_start") is not None]
        ends = [d.metadata.get("utterance_end") for d in docs_in_chunk
                if d.metadata.get("utterance_end") is not None]

        chunk_metadata: Dict[str, Any] = {
            "episode": episode,
            "act": act,
            "chunk_index": chunk_index,
            "num_utterances": len(docs_in_chunk),
            "num_words": len(chunk_text.split()),
            "speakers": ", ".join(sorted(list(speakers))),
            "roles": ", ".join(sorted(list(roles))),
            "chunk_utterance_start": min(starts) if starts else None,
            "chunk_utterance_end": max(ends) if ends else None,
        }

        return Document(page_content=chunk_text, metadata=chunk_metadata)

    def chunk_group(self,
                    episode: str,
                    act: str,
                    docs_in_group: List[Document]) -> List[Document]:
        """
        Chunk all utterances for a single (episode, act) group.
        Assumes docs_in_group are already sorted by utterance_start.
        """
        chunks: List[Document] = []
        current_docs: List[Document] = []
        current_word_count = 0
        chunk_index = 0

        for doc in docs_in_group:
            words = doc.page_content.split()
            n_words = len(words)

            # If adding this utterance would exceed the max_words, flush current chunk
            if current_docs and (current_word_count + n_words > self.max_words):
                chunk_doc = self._make_chunk_document(
                    episode, act, chunk_index, current_docs
                )
                chunks.append(chunk_doc)
                chunk_index += 1

                # prepare next chunk, with overlap
                if self.overlap_utterances > 0:
                    overlap_docs = current_docs[-self.overlap_utterances:]
                else:
                    overlap_docs = []

                current_docs = list(overlap_docs)
                current_word_count = sum(
                    len(d.page_content.split()) for d in current_docs
                )

            # add current utterance
            current_docs.append(doc)
            current_word_count += n_words

        # flush final chunk
        if current_docs:
            chunk_doc = self._make_chunk_document(
                episode, act, chunk_index, current_docs
            )
            chunks.append(chunk_doc)

        return chunks

    def chunk_documents(self, docs: List[Document]) -> List[Document]:
        """
        High-level method: groups by (episode, act), sorts within each group,
        and returns a flat list of chunk Documents.
        """
        grouped = group_by_episode_act(docs)
        all_chunks: List[Document] = []

        for (episode, act), group_docs in grouped.items():
            sorted_docs = sort_group_by_utterance_start(group_docs)
            group_chunks = self.chunk_group(episode, act, sorted_docs)
            all_chunks.extend(group_chunks)

        return all_chunks

In [16]:
def metadata_func(record: dict, metadata: dict) -> dict:
    metadata["episode"] = record.get("episode")
    metadata["role"] = record.get("role")
    metadata["speaker"] = record.get("speaker")
    metadata["act"] = record.get("act")
    metadata["utterance_start"] = record.get("utterance_start")
    metadata["utterance_end"] = record.get("utterance_end")
    
    return metadata

In [17]:
loader = JSONLoader(
    file_path="../data/transcripts_full.json",
    jq_schema=".[].[]",     
    content_key="utterance",  
    metadata_func=metadata_func
)

docs = loader.load()
print(f"Loaded {len(docs)} utterances.")

Loaded 163808 utterances.


In [18]:
# Combine docs into larger chunks
chunker = TranscriptChunker(
    max_words=350,          # you can tune this
    overlap_utterances=2    # you can tune this too
)

chunked_docs = chunker.chunk_documents(docs)

print(f"Original utterances: {len(docs)}")
print(f"Chunked docs: {len(chunked_docs)}")
if chunked_docs:
    print(chunked_docs[0].metadata)
    print(chunked_docs[0].page_content[:400], "...")

Original utterances: 163808
Chunked docs: 28642
{'episode': 'ep-1', 'act': 'prologue', 'chunk_index': 0, 'num_utterances': 15, 'num_words': 342, 'speakers': 'ira glass, joe franklin', 'roles': 'host, interviewer, subject', 'chunk_utterance_start': 0.17, 'chunk_utterance_end': 132.75}
Joe Franklin? I'm ready. It's Ira Glass here. Oh, you're the emcee on the show, Ira. I am the emcee on the show. Yes. Oh great. Ira? I-R-A, Ira? Ira, I-R-A. Oh, great. Now hold on one second, Ira. Don't go away. Hello? [UNINTELLIGIBLE]. Call me after 3 o'clock. I have great news for you. Ira. Yes. So listen, Tony. If the phone rings, take it in the back. And then come out and tell me who it is. Ju ...


In [19]:

tokenizer = tiktoken.encoding_for_model("text-embedding-3-small")

total_tokens = 0
for doc in chunked_docs:
    total_tokens += len(tokenizer.encode(doc.page_content))

print(f"Total tokens to embed: {total_tokens}")



Total tokens to embed: 10993831


In [20]:
# 1. Initialize embeddings
embeddings = OpenAIEmbeddings(
    model="text-embedding-3-small",
    openai_api_key=os.getenv("OPENAI_API_KEY")
)

# 2. Extract texts + metadata
texts = [doc.page_content for doc in chunked_docs]
metadatas = [doc.metadata for doc in chunked_docs]
ids = [str(i) for i in range(len(texts))]

# 3. Manual embedding with rate-limit protection
batch_size = 20     # safe
sleep_time = 0.15   # ~150 ms between batches

vectors = []

print(f"Starting embedding generation for {len(texts)} chunks...")
for i in range(0, len(texts), batch_size):
    batch = texts[i:i+batch_size]
    vec = embeddings.embed_documents(batch)
    vectors.extend(vec)

    print(f"Embedded {i + len(batch)} / {len(texts)}", end="\r")
    time.sleep(sleep_time)

print("\nFinished embedding all chunks.")

Starting embedding generation for 28642 chunks...
Embedded 28642 / 28642
Finished embedding all chunks.
Embedded 28642 / 28642
Finished embedding all chunks.


In [22]:
# 4. Upload to Pinecone

# Initialize Pinecone
pc = Pinecone(api_key=os.getenv("PINECONE_API_KEY"))
index_name = "podcast-rag"

# Check if index exists, if not create it
if index_name not in pc.list_indexes().names():
    print(f"Creating index {index_name}...")
    pc.create_index(
        name=index_name,
        dimension=1536, # Matches text-embedding-3-small
        metric="cosine",
        spec=ServerlessSpec(
            cloud="aws",
            region="us-east-1"
        )
    )
    # Wait for index to be ready
    while not pc.describe_index(index_name).status['ready']:
        time.sleep(1)

index = pc.Index(index_name)
print(f"Connected to Pinecone index: {index_name}")

# Upsert in batches
batch_size = 100  # Pinecone recommends smaller batches than Chroma

print("Upserting vectors to Pinecone...")
for i in range(0, len(ids), batch_size):
    # Prepare batch
    i_end = min(i + batch_size, len(ids))
    batch_ids = ids[i:i_end]
    batch_vectors = vectors[i:i_end]
    batch_metas = metadatas[i:i_end]
    batch_texts = texts[i:i_end] # Get the text content
    
    # CRITICAL: Add the text content to the metadata
    # LangChain needs the 'text' field to reconstruct the Document object
    final_metas = []
    for meta, text in zip(batch_metas, batch_texts):
        new_meta = meta.copy()
        new_meta["text"] = text
        final_metas.append(new_meta)
    
    # Pinecone expects list of (id, vector, metadata) tuples
    to_upsert = list(zip(batch_ids, batch_vectors, final_metas))
    
    index.upsert(vectors=to_upsert)
    print(f"Upserted {i_end} / {len(ids)}", end="\r")

print("\nDone uploading to Pinecone.")

Connected to Pinecone index: podcast-rag
Upserting vectors to Pinecone...
Upserted 28642 / 28642
Done uploading to Pinecone.
Upserted 28642 / 28642
Done uploading to Pinecone.
