In [2]:
import json
import os
from typing import List, Dict
from langchain.text_splitter import RecursiveCharacterTextSplitter
import re

def load_json_file(file_path: str) -> Dict:
    """Load a JSON file and return its contents as a dictionary."""
    with open(file_path, 'r', encoding='utf-8') as file:
        return json.load(file)

def clean_text(text: str) -> str:
    """Clean the text by removing extra whitespace and certain patterns."""
    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text.strip())
    # Add more cleaning steps as needed
    return text

def split_transcript(transcript: str, max_tokens: int = 512, chunk_overlap: int = 100) -> List[str]:
    """Split the transcript into chunks."""
    # Assuming 1 token is approximately 4 characters
    chunk_size = max_tokens * 4
    
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
        separators=["\n\n", "\n", ". ", "!", "?", ",", " ", ""],
        keep_separator=False,
    )
    
    return text_splitter.split_text(transcript)

def create_enriched_chunks(video_data: Dict, max_tokens: int = 512, chunk_overlap: int = 100) -> List[Dict]:
    """Create enriched chunks from video data."""
    transcript = clean_text(video_data['transcript'])
    chunks = split_transcript(transcript, max_tokens, chunk_overlap)
    
    enriched_chunks = []
    for i, chunk in enumerate(chunks):
        enriched_chunk = {
            "content": chunk,
            "metadata": {
                "video_id": video_data["id"],
                "title": video_data["title"],
                "upload_date": video_data["upload_date"],
                "chunk_number": f"{i+1} of {len(chunks)}",
                "thumbnail_url": video_data["thumbnail_url"]
            }
        }
        enriched_chunks.append(enriched_chunk)
    
    return enriched_chunks

def process_all_videos(input_directory: str, output_directory: str, max_tokens: int = 512, chunk_overlap: int = 100):
    """Process all video JSON files in the input directory and save enriched chunks."""
    if not os.path.exists(output_directory):
        os.makedirs(output_directory)

    for filename in os.listdir(input_directory):
        if filename.endswith('.json'):
            input_path = os.path.join(input_directory, filename)
            output_path = os.path.join(output_directory, f"enriched_{filename}")
            
            video_data = load_json_file(input_path)
            enriched_chunks = create_enriched_chunks(video_data, max_tokens, chunk_overlap)
            
            with open(output_path, 'w', encoding='utf-8') as f:
                json.dump(enriched_chunks, f, ensure_ascii=False, indent=2)

    print(f"Processed files saved in {output_directory}")

# Example usage
if __name__ == "__main__":
    input_dir = r"E:\Udemy\fitness-podcastai\src\data_collection\data\raw"
    output_dir = r"E:\Udemy\fitness-podcastai\src\data_collection\data\raw\chunks"
    max_tokens = 512  # Adjust as needed
    chunk_overlap = 100  # Adjust as needed
    
    process_all_videos(input_dir, output_dir, max_tokens, chunk_overlap)

Processed files saved in E:\Udemy\fitness-podcastai\src\data_collection\data\raw\chunks


In [8]:
import os
from dotenv import load_dotenv
load_dotenv()

True

In [12]:
from pinecone import Pinecone, ServerlessSpec

pc = Pinecone(api_key=os.environ.get("PINECONE_API_KEY"))

In [19]:
import os
from pinecone import Pinecone, ServerlessSpec
from sentence_transformers import SentenceTransformer
import json
import time

# Initialize Pinecone
pc = Pinecone(api_key=os.environ.get("PINECONE_API_KEY"))

index_name = "fitness-chatbot"

# Check if the index already exists
if index_name not in pc.list_indexes().names():
    print(f"Creating new index: {index_name}")
    pc.create_index(
        name=index_name,
        dimension=768,  # dimension for 'multi-qa-mpnet-base-dot-v1'
        metric="cosine",
        spec=ServerlessSpec(
            cloud="aws",
            region="us-east-1"
        )
    )
    print("Waiting for index to be ready...")
    time.sleep(60)  # Wait for 60 seconds to ensure index is ready
else:
    print(f"Index {index_name} already exists")

# Get the index
index = pc.Index(index_name)

# Initialize the embedding model
model = SentenceTransformer('multi-qa-mpnet-base-dot-v1')

def generate_embedding(text):
    return model.encode(text)

def process_and_store_embeddings(input_dir):
    total_vectors = 0
    for filename in os.listdir(input_dir):
        if filename.endswith('.json'):
            try:
                with open(os.path.join(input_dir, filename), 'r', encoding='utf-8') as f:
                    chunks = json.load(f)
                
                print(f"Processing {filename} with {len(chunks)} chunks")
                
                vectors_to_upsert = []
                for chunk in chunks:
                    embedding = generate_embedding(chunk['content'])
                    vector_id = f"{chunk['metadata']['video_id']}_{chunk['metadata']['chunk_number']}"
                    vectors_to_upsert.append((vector_id, embedding.tolist(), chunk['metadata']))
                
                # Upsert to Pinecone in batches
                batch_size = 100
                for i in range(0, len(vectors_to_upsert), batch_size):
                    batch = vectors_to_upsert[i:i+batch_size]
                    try:
                        upsert_response = index.upsert(vectors=batch)
                        print(f"Upserted batch {i//batch_size + 1}. Response: {upsert_response}")
                        total_vectors += len(batch)
                    except Exception as e:
                        print(f"Error upserting batch: {str(e)}")
                
                print(f"Processed and stored embeddings for {filename}")
            except Exception as e:
                print(f"Error processing {filename}: {str(e)}")

    print(f"Finished processing all chunks in {input_dir}")
    print(f"Total vectors upserted: {total_vectors}")
    return total_vectors

def count_total_vectors():
    stats = index.describe_index_stats()
    return stats['total_vector_count']

# Usage
input_dir = r"E:\Udemy\fitness-podcastai\src\data_collection\data\raw\chunks"
total_upserted = process_and_store_embeddings(input_dir)

print("Waiting for 60 seconds before checking total vectors...")
time.sleep(60)  # Wait for 60 seconds to ensure all updates are reflected

total_vectors = count_total_vectors()
print(f"Total vectors in index: {total_vectors}")

if total_vectors != total_upserted:
    print(f"Warning: Upserted {total_upserted} vectors, but index shows {total_vectors} vectors")

Index fitness-chatbot already exists
Processing enriched_-2FesPR4WEQ.json with 3 chunks
Upserted batch 1. Response: {'upserted_count': 3}
Processed and stored embeddings for enriched_-2FesPR4WEQ.json
Processing enriched_-5T82eXSYvY.json with 6 chunks
Upserted batch 1. Response: {'upserted_count': 6}
Processed and stored embeddings for enriched_-5T82eXSYvY.json
Processing enriched_-9OWV6kio94.json with 3 chunks
Upserted batch 1. Response: {'upserted_count': 3}
Processed and stored embeddings for enriched_-9OWV6kio94.json
Processing enriched_-bP24FFkQ54.json with 5 chunks
Upserted batch 1. Response: {'upserted_count': 5}
Processed and stored embeddings for enriched_-bP24FFkQ54.json
Processing enriched_-BzNffL_6YE.json with 1 chunks
Upserted batch 1. Response: {'upserted_count': 1}
Processed and stored embeddings for enriched_-BzNffL_6YE.json
Processing enriched_-CHmorGmjmg.json with 6 chunks
Upserted batch 1. Response: {'upserted_count': 6}
Processed and stored embeddings for enriched_-C

In [None]:
# Querying example
query = "How to do a proper push-up"
results = collection.query(
    query_texts=[query],
    n_results=5
)

for doc, metadata in zip(results['documents'][0], results['metadatas'][0]):
    print(f"Document: {doc[:100]}...")
    print(f"Metadata: {metadata}")
    print("---")