In [None]:
pip install yt-dlp youtube-transcript-api openai-whisper pytube requests


In [None]:
!pip install --quiet yt-dlp youtube-transcript-api requests openai-whisper
!apt-get update -y && apt-get install -y ffmpeg

from google.colab import drive
drive.mount('/content/drive', force_remount=True)


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os, json, time, random, requests, xml.etree.ElementTree as ET
import yt_dlp, whisper
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound

CHANNEL_ID   = "UC-PaZZpjgJ61wkK9yKfpe8w"
OUTPUT_FILE  = "/content/drive/MyDrive/transcripts.json"
PAUSE_MIN    = 2
PAUSE_MAX    = 5

print(" Loading Whisper (small.en)…")
whisper_model = whisper.load_model("small.en")

def uploads_playlist_url(cid: str) -> str:
    return f"https://www.youtube.com/playlist?list=UU{cid[2:]}"

def list_video_ids(pl_url: str) -> list[str]:
    opts = {"extract_flat": "in_playlist", "skip_download": True, "quiet": True}
    with yt_dlp.YoutubeDL(opts) as ydl:
        info = ydl.extract_info(pl_url, download=False)
    return [
        e["id"] for e in info.get("entries", [])
        if isinstance(e.get("id"), str) and len(e["id"]) == 11
    ]

def fetch_api(vid: str) -> str | None:
    try:
        segs = YouTubeTranscriptApi.get_transcript(vid, languages=["en"])
        return " ".join(s["text"] for s in segs)
    except (TranscriptsDisabled, NoTranscriptFound):
        return None
    except:
        return None

def fetch_xml(vid: str) -> str | None:
    url = f"https://video.google.com/timedtext?lang=en&v={vid}"
    try:
        r = requests.get(url, timeout=10)
        r.raise_for_status()
        if not r.text.strip():
            return None
        root = ET.fromstring(f"<root>{r.text}</root>")
        return " ".join(node.text or "" for node in root.findall("text"))
    except:
        return None

def fetch_whisper(vid: str) -> str | None:
    out_fn = f"{vid}.m4a"
    ydl_opts = {
        "format": "bestaudio[ext=m4a]/bestaudio",
        "outtmpl": out_fn,
        "quiet": True
    }
    try:
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            ydl.download([f"https://youtu.be/{vid}"])
        result = whisper_model.transcribe(out_fn)
        os.remove(out_fn)
        return result["text"]
    except Exception as e:
        print(f"     Whisper error for {vid}: {e}")
        if os.path.exists(out_fn):
            os.remove(out_fn)
        return None

def scrape_all():
    pl_url = uploads_playlist_url(CHANNEL_ID)
    print("  Uploads playlist:", pl_url)
    vids = list_video_ids(pl_url)
    print(f"  Found {len(vids)} videos. Starting fetch…\n")

    cache = {}
    if os.path.exists(OUTPUT_FILE):
        cache = json.load(open(OUTPUT_FILE))

    for idx, vid in enumerate(vids, 1):
        if vid in cache:
            mark = "yes" if cache[vid] else "no"
            print(f"{idx}/{len(vids)} • {vid} … cached {mark}")
            continue

        print(f"{idx}/{len(vids)} • {vid} …", end="", flush=True)
        text = fetch_api(vid) or fetch_xml(vid) or fetch_whisper(vid)
        cache[vid] = text or ""
        print("yes" if text else "no")

        with open(OUTPUT_FILE, "w") as f:
            json.dump(cache, f, indent=2)

        time.sleep(random.uniform(PAUSE_MIN, PAUSE_MAX))

    done = sum(1 for t in cache.values() if t)
    print(f"\n Completed: {done}/{len(vids)} transcripts saved to {OUTPUT_FILE}")

if __name__ == "__main__":
    scrape_all()


In [None]:
!pip install -q yt-dlp pandas
!apt-get update -y && apt-get install -y ffmpeg  # for yt-dlp’s internal checks


In [None]:
import yt_dlp
import pandas as pd
CHANNEL_ID    = "UC-PaZZpjgJ61wkK9yKfpe8w"
PLAYLIST_URL  = f"https://www.youtube.com/playlist?list=UU{CHANNEL_ID[2:]}"
ydl_opts = {
    "skip_download":     True,
    "dump_single_json":  True,
    "extract_flat":      False,
    "quiet":             True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
    playlist_info = ydl.extract_info(PLAYLIST_URL, download=False)
playlist_id    = playlist_info.get("id")
playlist_title = playlist_info.get("title")
videos = playlist_info.get("entries", [])
print(f"Fetched metadata for {len(videos)} videos from playlist '{playlist_title}'")


In [None]:
clean = []
for entry in videos:
    clean.append({
        "video_id":      entry.get("id"),
        "title":         entry.get("title"),
        "publish_date":  entry.get("upload_date"),
        "view_count":    entry.get("view_count"),
        "like_count":    entry.get("like_count"),
        "comment_count": entry.get("comment_count"),
        "description":   entry.get("description"),
        "tags":          entry.get("tags"),
        "thumbnail_url": (entry.get("thumbnails") or [{}])[-1].get("url"),
        "duration":      entry.get("duration"),
        "video_url":     entry.get("webpage_url"),
        "channel_name":  entry.get("uploader"),
        "channel_id":    entry.get("uploader_id"),
        "playlist_id":   playlist_id,
        "playlist_title":playlist_title
    })

df = pd.DataFrame(clean)
df.head()


In [None]:
df.to_csv("videos_clean_metadata.csv", index=False)
df.to_json("videos_clean_metadata.json", orient="records", indent=2)

print("Saved videos_clean_metadata.csv & .json")


In [None]:
import json
import pandas as pd
metadata_json_path    = "/content/videos_clean_metadata.json"
transcripts_json_path = "/content/drive/MyDrive/transcripts.json"
merged_json_path      = "/content/drive/MyDrive/videos_merged.json"
merged_csv_path       = "/content/drive/MyDrive/videos_merged.csv"

with open(METADATA_JSON, 'r') as f:
    metadata_list = json.load(f)

with open(TRANSCRIPTS_JSON, 'r') as f:
    transcripts_dict = json.load(f)

for entry in metadata_list:
    vid = entry.get('id') or entry.get('video_id') \
          or entry.get('video_url', '').split('/')[-1]
    entry['transcript'] = transcripts_dict.get(vid, "")
with open(merged_json_path, 'w') as f:
    json.dump(metadata_list, f, indent=2)
df = pd.json_normalize(metadata_list)
df.to_csv(merged_csv_path, index=False)

print(f" Merged data saved to:\n  • {merged_json_path}\n  • {merged_csv_path}")

In [None]:
import json
import re
INPUT_JSON  = "/content/drive/MyDrive/videos_merged_normalized.json"
OUTPUT_JSON = "/content/videos_cleaned.json"
def clean_text(text):
    if not isinstance(text, str):
        return ""
    text = re.sub(r"\[?\d{1,2}:\d{2}(?::\d{2})?\]?", " ", text)
    text = re.sub(r"https?://\S+", " ", text)
    text = text.replace('\n', ' ').replace('\r', ' ').replace('\t', ' ')
    text = re.sub(r'\s+', ' ', text)
    return text.strip()
with open(INPUT_JSON, "r", encoding="utf-8") as f:
    data = json.load(f)
cleaned = []
for entry in data:
    entry = entry.copy()
    entry["transcript_clean"] = clean_text(entry.get("transcript", ""))
    entry["description_clean"] = clean_text(entry.get("description", ""))
    cleaned.append(entry)
with open(OUTPUT_JSON, "w", encoding="utf-8") as f:
    json.dump(cleaned, f, indent=2, ensure_ascii=False)
print(f"Saved cleaned transcripts and metadata to: {OUTPUT_JSON}")


In [None]:
import json
from langchain.text_splitter import RecursiveCharacterTextSplitter
INPUT_JSON  = "/content/videos_cleaned.json"
OUTPUT_JSON = "/content/videos_chunked.json"
CHUNK_SIZE  = 1000
OVERLAP     = 200
with open(INPUT_JSON, "r", encoding="utf-8") as f:
    data = json.load(f)
splitter = RecursiveCharacterTextSplitter(
    chunk_size=CHUNK_SIZE,
    chunk_overlap=OVERLAP,
    separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""]
)
chunked_records = []
for entry in data:
    transcript = entry.get("transcript_clean", "")
    if not transcript.strip():
        continue
    chunks = splitter.split_text(transcript)
    for idx, chunk in enumerate(chunks):
        record = {k: v for k, v in entry.items() if k != "transcript_clean"}
        record["chunk_index"] = idx
        record["chunk_text"] = chunk
        chunked_records.append(record)
with open(OUTPUT_JSON, "w", encoding="utf-8") as f:
    json.dump(chunked_records, f, indent=2, ensure_ascii=False)
print(f"Saved {len(chunked_records)} chunks to {OUTPUT_JSON}")

In [None]:
pip install sentence-transformers

In [None]:
import json
from sentence_transformers import SentenceTransformer
from tqdm import tqdm
CHUNKS_PATH  = "/content/videos_chunked.json"
OUTPUT_PATH  = "/content/chunks_with_embeddings.json"
EMBED_MODEL  = "all-MiniLM-L6-v2"
BATCH_SIZE   = 64
with open(CHUNKS_PATH, "r", encoding="utf-8") as f:
    chunks = json.load(f)
texts = [c["chunk_text"] for c in chunks]
model = SentenceTransformer(EMBED_MODEL)
embeddings = []
for i in tqdm(range(0, len(texts), BATCH_SIZE)):
    batch = texts[i:i+BATCH_SIZE]
    batch_embeds = model.encode(batch, show_progress_bar=False, convert_to_numpy=True)
    embeddings.extend(batch_embeds)
assert len(embeddings) == len(chunks)
for rec, vec in zip(chunks, embeddings):
    rec["embedding"] = vec.tolist()
with open(OUTPUT_PATH, "w", encoding="utf-8") as f:
    json.dump(chunks, f, indent=2, ensure_ascii=False)
print(f"Saved {len(chunks)} embedded chunks to {OUTPUT_PATH}")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

100%|██████████| 20/20 [02:46<00:00,  8.34s/it]


Saved 1274 embedded chunks to /content/chunks_with_embeddings.json


In [None]:
!pip install faiss-cpu



In [None]:
import json
import numpy as np
import faiss
from sentence_transformers import SentenceTransformer
EMBEDDED_CHUNKS_JSON = "/content/drive/MyDrive/chunks_with_embeddings.json"
FAISS_INDEX_PATH     = "/content/faiss_index.index"
METADATA_JSON_PATH   = "/content/faiss_metadata.json"
with open(EMBEDDED_CHUNKS_JSON, "r", encoding="utf-8") as f:
    records = json.load(f)
embeddings = np.array([r["embedding"] for r in records]).astype("float32")
metadata   = [ {k: v for k, v in r.items() if k != "embedding"} for r in records ]
dim        = embeddings.shape[1]

print(f"Loaded {len(embeddings)} embeddings, dimension={dim}")
index = faiss.IndexFlatL2(dim)
index.add(embeddings)
print(f"FAISS index built with {index.ntotal} vectors.")
faiss.write_index(index, FAISS_INDEX_PATH)
with open(METADATA_JSON_PATH, "w", encoding="utf-8") as f:
    json.dump(metadata, f, indent=2, ensure_ascii=False)

print(f"Saved FAISS index to: {FAISS_INDEX_PATH}")
print(f"Saved metadata to:   {METADATA_JSON_PATH}")
index = faiss.read_index(FAISS_INDEX_PATH)
with open(METADATA_JSON_PATH, "r", encoding="utf-8") as f:
    metadata = json.load(f)
model = SentenceTransformer("all-MiniLM-L6-v2")
def search_faiss(query, top_k=5):
    query_emb = model.encode([query]).astype("float32")
    D, I = index.search(query_emb, top_k)
    results = []
    for idx, dist in zip(I[0], D[0]):
        result = metadata[idx].copy()
        result["score"] = float(dist)
        results.append(result)
    return results
query = "How do I grow my YouTube channel quickly?"
results = search_faiss(query, top_k=3)
print("\n--- Top Results ---")
for r in results:
    print(f"\nTitle: {r.get('title')}")
    print(f"Video URL: {r.get('video_url')}")
    print(f"Chunk: {r.get('chunk_text')[:350]}...")
    print(f"Distance: {r.get('score'):.4f}")


In [None]:
!pip install langchain-google-genai google-generativeai faiss-cpu sentence-transformers


In [None]:
import os
import json
import faiss
from sentence_transformers import SentenceTransformer
from langchain_google_genai import ChatGoogleGenerativeAI
FAISS_INDEX_PATH   = "/content/faiss_index.index"
METADATA_JSON_PATH = "/content/faiss_metadata.json"
EMBED_MODEL        = "all-MiniLM-L6-v2"
os.environ["GOOGLE_API_KEY"] = "API key"
index = faiss.read_index(FAISS_INDEX_PATH)
with open(METADATA_JSON_PATH, "r", encoding="utf-8") as f:
    metadata = json.load(f)
embedder = SentenceTransformer(EMBED_MODEL)
llm = ChatGoogleGenerativeAI(model='gemini-2.5-flash')
def search_faiss(query, top_k=4):
    query_emb = embedder.encode([query]).astype("float32")
    D, I = index.search(query_emb, top_k)
    results = []
    for idx, dist in zip(I[0], D[0]):
        rec = metadata[idx].copy()
        rec["score"] = float(dist)
        results.append(rec)
    return results
def build_rag_prompt(context_chunks, user_question):
    context_text = ""
    for c in context_chunks:
        context_text += (
            f"\n---\nTitle: {c.get('title','')}\n"
            f"URL: {c.get('video_url','')}\n"
            f"Excerpt: {c.get('chunk_text','')[:800]}...\n"
        )
    prompt = f"""You are an advanced YouTube Channel Growth Coach.
Your job is to give creative, actionable, and specific advice based ONLY on the following video excerpts (with titles and URLs).

{context_text}

User's question:
{user_question}

INSTRUCTIONS:
- Cite video titles/URLs as sources whenever possible.
- If the user asks for a script or hook, write a full YouTube script/hook in the creator's style.
- Be clear, step-by-step, and as practical as possible.
"""
    return prompt
print("YouTube Growth RAG Chatbot (type 'exit' to stop)\n")
while True:
    user_question = input("Your question: ")
    if user_question.strip().lower() in {"exit", "quit"}:
        print("Goodbye!")
        break
    chunks = search_faiss(user_question, top_k=4)
    prompt = build_rag_prompt(chunks, user_question)
    print("\n[Retrieving, reasoning...]\n")
    response = llm.invoke(prompt)
    print("\n==== AI's Answer ====\n")
    print(response.content)
    print("\n==== Source Videos ====\n")
    for c in chunks:
        print(f"- {c.get('title')} ({c.get('video_url')}) [Score: {c.get('score'):.4f}]")
    print("\n---\n")


YouTube Growth RAG Chatbot (type 'exit' to stop)

Your question: how to create hooks for a new channel tech based

[Retrieving, reasoning...]


==== AI's Answer ====

Creating compelling hooks for a new tech-based channel is crucial for capturing attention and attracting new viewers from the start. Here's a step-by-step approach based on the insights from these YouTube creators:

### How to Create Hooks for a New Tech Channel

**1. Research Niche-Specific Curiosity & Adjacent Channels (The "Model 10" for Tech)**

*   **Identify Your Tech Niche's Core Questions:** Start by thinking about what problems your target audience in the tech space is trying to solve, what new technologies they're curious about, or what common misconceptions they have. The goal is to "evoke curiosity" immediately.
*   **Analyze "Adjacent Channels":** As mentioned in *How to START & GROW a YouTube Channel in 2025* (https://www.youtube.com/watch?v=SXN9SMLWLzw), look beyond just direct competitors. If you're doing 