In [1]:
import os, re, math, time, textwrap
from typing import List, Tuple
import psycopg2
import psycopg2.extras
import ollama
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
import requests
from bs4 import BeautifulSoup

In [2]:
# ---------- Config ----------
PGHOST     = os.getenv("PGHOST", "localhost")
PGPORT     = int(os.getenv("PGPORT", 5432))
PGDATABASE = os.getenv("PGDATABASE", "ragdb")
PGUSER     = os.getenv("PGUSER", "rag")
PGPASSWORD = os.getenv("PGPASSWORD", "ragpw")

GEN_MODEL = os.getenv("GEN_MODEL", "llama3:8b")
EMB_MODEL = os.getenv("EMB_MODEL", "nomic-embed-text")  # dimension typically 768
TOP_K     = int(os.getenv("TOP_K", 6))
CHUNK_TARGET_CHARS = int(os.getenv("CHUNK_TARGET_CHARS", 1000))  # ~roughly 150–200 tokens
OVERLAP_CHARS      = int(os.getenv("OVERLAP_CHARS", 120))

TABLE_NAME = "youtube_docs"

In [14]:
import psycopg2

conn = psycopg2.connect(
    host="localhost",   # or "127.0.0.1"
    port=5432,
    dbname="ragdb",
    user="rag"
)

cur = conn.cursor()
cur.execute("SELECT 1;")
print(cur.fetchone())
conn.close()


OperationalError: connection to server at "localhost" (::1), port 5432 failed: fe_sendauth: no password supplied


In [47]:
def get_conn():
    return psycopg2.connect(
        host=PGHOST, port=PGPORT, dbname=PGDATABASE, user=PGUSER, password=PGPASSWORD
    )

test = get_conn()

OperationalError: connection to server at "localhost" (::1), port 5432 failed: FATAL:  password authentication failed for user "rag"


In [None]:
# ---------- DB ----------
def get_conn():
    return psycopg2.connect(
        host=PGHOST, port=PGPORT, dbname=PGDATABASE, user=PGUSER, password=PGPASSWORD
    )

def init_db(embedding_dim: int):
    with get_conn() as conn, conn.cursor() as cur:
        cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")
        # Create table with dynamic vector dimension
        cur.execute(f"""
            CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
              id SERIAL PRIMARY KEY,
              video_id TEXT NOT NULL,
              chunk_id INT NOT NULL,
              content TEXT NOT NULL,
              embedding vector({embedding_dim}) NOT NULL
            );
        """)
        # Fast index (IVFFlat) for cosine distance
        cur.execute(f"CREATE INDEX IF NOT EXISTS {TABLE_NAME}_ivfflat_idx ON {TABLE_NAME} USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);")
        # Helpful default probing
        cur.execute("SET ivfflat.probes = 10;")
        conn.commit()

def clear_video(video_id: str):
    with get_conn() as conn, conn.cursor() as cur:
        cur.execute(f"DELETE FROM {TABLE_NAME} WHERE video_id = %s;", (video_id,))
        conn.commit()

def upsert_chunks(video_id: str, chunks: List[str], embeddings: List[List[float]]):
    assert len(chunks) == len(embeddings)
    with get_conn() as conn, conn.cursor() as cur:
        psycopg2.extras.execute_batch(
            cur,
            f"INSERT INTO {TABLE_NAME} (video_id, chunk_id, content, embedding) VALUES (%s, %s, %s, %s);",
            [(video_id, i, chunks[i], embeddings[i]) for i in range(len(chunks))],
            page_size=500
        )
        conn.commit()

def search(query_embedding: List[float], video_id: str, k: int = TOP_K) -> List[Tuple[int, str, float]]:
    with get_conn() as conn, conn.cursor() as cur:
        cur.execute(
            f"""
            SELECT chunk_id, content, (embedding <=> %s::vector) AS distance
            FROM {TABLE_NAME}
            WHERE video_id = %s
            ORDER BY embedding <=> %s::vector
            LIMIT %s;
            """,
            (query_embedding, video_id, query_embedding, k)
        )
        rows = cur.fetchall()
        return [(r[0], r[1], float(r[2])) for r in rows]

In [2]:
def extract_video_id(url: str) -> str:
    m = re.search(r"(?:v=|youtu\.be/|shorts/)([A-Za-z0-9_-]{6,})", url)
    return m.group(1) if m else url

def extract_video_title(url: str) -> str:
    response = requests.get(url)
    soup = BeautifulSoup(response.text, 'html.parser')
    title = soup.title.string.replace(" - YouTube", "").strip()
    return re.sub(r'[^0-9a-zA-Z\u0E00-\u0E7F\.]', '', title)

def get_youtube_transcript(video_id: str) -> str:
    try:
        tlist = YouTubeTranscriptApi.get_transcript(video_id, languages=['th', 'en'])
    except TranscriptsDisabled:
        raise RuntimeError("Transcripts are disabled for this video.")
    except NoTranscriptFound:
        raise RuntimeError("No transcript found for this video.")
    # Merge with spaces; keep punctuation minimal
    text = " ".join([item["text"].strip() for item in tlist if item["text"].strip()])
    # Normalize whitespace
    text = re.sub(r"\s+", " ", text).strip()
    return text

In [9]:
# ---------- Ollama helpers ----------
def embed_texts(texts: List[str]) -> List[List[float]]:
    # Batch embed via Ollama (simple loop; Ollama also supports /embeddings endpoint per text)
    embs = []
    for t in texts:
        r = ollama.embeddings(model=EMB_MODEL, prompt=t)
        embs.append(r["embedding"])
    return embs

def generate(prompt: str, temperature: float = 0.2) -> str:
    resp = ollama.generate(
        model=GEN_MODEL,
        prompt=prompt,
        options={"temperature": temperature}
    )
    return resp["response"]

In [10]:
url = "https://youtu.be/KSbXyRZQbUY?si=gGv553ps5TI-FDQa"

video_id = extract_video_id(url=url)
video_title = extract_video_title(url=url)
text = get_youtube_transcript(video_id=video_id)

print(f"url: {url}")
print(f"video_id: {video_id}")
print(f"video_title: {video_title}")

url: https://youtu.be/KSbXyRZQbUY?si=gGv553ps5TI-FDQa
video_id: KSbXyRZQbUY
video_title: ยังมีโอกาสสร้างความมั่งคั่งได้อยู่ไหมในยุคแบบนี้


In [17]:
from pythainlp.tokenize import word_tokenize

def chunk_thai_text(text, max_tokens: int = CHUNK_TARGET_CHARS, overlap: int = OVERLAP_CHARS) -> List[str]:
    """
    Chunk Thai text using PyThaiNLP word tokenizer with overlap.
    """
    words = word_tokenize(text, keep_whitespace=False)
    chunks = []
    start = 0

    while start < len(words):
        end = min(start + max_tokens, len(words))
        chunk = ''.join(words[start:end])  # Join with no spaces for Thai
        chunks.append(chunk)
        start += max_tokens - overlap

    return chunks

# Example usage:
chunks = chunk_thai_text(text=text)

In [25]:
def ingest_video(url_or_id: str) -> Tuple[str, int]:
    video_id = extract_video_id(url_or_id)
    print(f"Fetching transcript for: {video_id}")
    transcript = get_youtube_transcript(video_id)
    chunks = chunk_thai_text(transcript)
    print(f"Transcript length: {len(transcript):,} chars; chunks: {len(chunks)}")

    print("Embedding chunks via Ollama...")
    chunk_embs = embed_texts(chunks)
    dim = len(chunk_embs[0])

    print(f"Initializing DB (dim={dim}) and storing chunks...")
    init_db(dim)
    clear_video(video_id)
    upsert_chunks(video_id, chunks, chunk_embs)

    return video_id, len(chunks)

In [26]:
ingest_video(url)

Fetching transcript for: KSbXyRZQbUY
Transcript length: 57,018 chars; chunks: 18
Embedding chunks via Ollama...
Initializing DB (dim=768) and storing chunks...


OperationalError: connection to server at "localhost" (::1), port 5432 failed: FATAL:  password authentication failed for user "rag"


In [22]:
import argparse
parser = argparse.ArgumentParser(description="RAG on YouTube subtitles using Ollama + pgvector")
parser

ArgumentParser(prog='ipykernel_launcher.py', usage=None, description='RAG on YouTube subtitles using Ollama + pgvector', formatter_class=<class 'argparse.HelpFormatter'>, conflict_handler='error', add_help=True)