In [3]:
import yt_dlp
import json

def get_youtube_transcript(url):
    ydl_opts = {
        "quiet": True,
        "writesubtitles": True,
        "subtitleslangs": ["vi", "en"],
        "skip_download": True,
        "extractor_args": {
            "youtube": {"player_client": ["web"]}
        },
    }

    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        try:
            info_dict = ydl.extract_info(url, download=False)
            subtitles = info_dict.get("subtitles", {})
            automatic_captions = info_dict.get("automatic_captions", {})

            subtitle_url = None
            selected_lang = None

            if "vi" in subtitles:
                subtitle_url = subtitles["vi"][0]["url"]
                selected_lang = "vi"
            elif "en" in subtitles:
                subtitle_url = subtitles["en"][0]["url"]
                selected_lang = "en"
            elif "vi" in automatic_captions:
                subtitle_url = automatic_captions["vi"][0]["url"]
                selected_lang = "vi (auto)"
            elif "en" in automatic_captions:
                subtitle_url = automatic_captions["en"][0]["url"]
                selected_lang = "en (auto)"
            elif subtitles:
                first_lang = next(iter(subtitles))
                subtitle_url = subtitles[first_lang][0]["url"]
                selected_lang = f"{first_lang} (manual)"
            elif automatic_captions:
                first_lang = next(iter(automatic_captions))
                subtitle_url = automatic_captions[first_lang][0]["url"]
                selected_lang = f"{first_lang} (auto)"
            else:
                print("No subtitles found.")
                return None, None

            transcript = ydl.urlopen(subtitle_url).read().decode("utf-8")
            transcript_dict = json.loads(transcript)
            return extract_utf_from_events(transcript_dict), selected_lang

        except Exception as e:
            print(f"Error occurred while fetching subtitles: {e}")
            return None, None


def extract_utf_from_events(data):
    utf_scripts = []

    for event in data["events"]:
        if "segs" in event:
            utf_event = []
            for seg in event["segs"]:
                if "utf8" in seg and seg["utf8"] != "\n":
                    utf_event.append(seg["utf8"])
            if utf_event:
                utf_scripts.append([event["tStartMs"], " ".join(utf_event)])

    return utf_scripts


def chunk_text(data, chunk_size=250):
    chunks = []
    current_chunk = []
    current_chunk_word_count = 0
    current_start_time = None

    for start_time, text in data:
        words = text.split()
        for word in words:
            if current_chunk_word_count == chunk_size:
                chunks.append(
                    {
                        "timestamp": time_output(current_start_time),
                        "text": " ".join(current_chunk),
                    }
                )
                current_chunk = []
                current_chunk_word_count = 0
                current_start_time = None

            if current_start_time is None:
                current_start_time = start_time

            current_chunk.append(word)
            current_chunk_word_count += 1

    if current_chunk:
        chunks.append(
            {
                "timestamp": time_output(current_start_time),
                "text": " ".join(current_chunk),
            }
        )

    return chunks


def time_output(time):
    return f"{time // 3600000:02d}:{(time // 60000) % 60:02d}:{(time // 1000) % 60:02d}"


def process_youtube(url, scope):
    transcript_data, selected_lang = get_youtube_transcript(url)

    if transcript_data:
        transcript = " ".join([x[1] for x in transcript_data])
        chunks = chunk_text(transcript_data)

        document_data = {
            "type": "youtube",
            "scope": scope,
            "original_data": url,
            "language_used": selected_lang,
            "extracted_text": transcript,
            "chunks": chunks,
        }

        return document_data
    else:
        return None


def main():
    url = "https://www.youtube.com/watch?v=_nuQ39Y4T5Q" 
    scope = "test-scope"

    result = process_youtube(url, scope)

    if result:
        print(json.dumps(result, indent=4, ensure_ascii=False))
    else:
        print("Failed to retrieve or process transcript.")

if __name__ == '__main__':
    main()


{
    "type": "youtube",
    "scope": "test-scope",
    "original_data": "https://www.youtube.com/watch?v=_nuQ39Y4T5Q",
    "language_used": "vi (auto)",
    "extracted_text": "Ừ  tiếng  Anh  của  mình  kém  lắm  bạn  nghe Mình  làm  gì  đâu  Mình  nói  có  ra  cái  gì đâu  dù  nó  thức  tỉnh  tâm  hồn  thc  tỉnh  con người  mình  và  mình  bắt  đầu  mở  máy  mở sách  mở  sổ  mở  bút  mình  học  ngay  lập  tức Đây  là  cái  cách  học  mình  áp  dụng  cho việc  học  tiếng  Hà  và  mình  cảm  thấy  rất là  hiệu  quả  conu  này  hay  có  thể  diễn  bả mình  cũng  được  English  Has  definitely Changed  My  Life  Why  English  is  important to  my  life  cách  để  tự  học  tiếng  Anh  áp dụng  cho  tất  cả  mọi  người  xin  chào  các bạn  mình  là  Khánh  Vi  và  các  bạn  đã  theo dõi  Hey  I  have  been  learning  English  for years  and  My  English  is  not  Perfect  but I  got  to  say  that  study  English  has helped  me  progress  in  life  both personally  and  professionally  mor

In [6]:
import yt_dlp
import json
import re
import os
from sentence_transformers import SentenceTransformer

# Load embedding model once
embedder = SentenceTransformer("all-MiniLM-L6-v2")


def get_youtube_transcript(url):
    ydl_opts = {
        "quiet": True,
        "writesubtitles": True,
        "subtitleslangs": ["vi", "en"],
        "skip_download": True,
        "extractor_args": {
            "youtube": {"player_client": ["web"]}
        },
    }

    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        try:
            info_dict = ydl.extract_info(url, download=False)
            title = info_dict.get("title", "")
            subtitles = info_dict.get("subtitles", {})
            automatic_captions = info_dict.get("automatic_captions", {})

            subtitle_url = None
            selected_lang = None

            if "vi" in subtitles:
                subtitle_url = subtitles["vi"][0]["url"]
                selected_lang = "vi"
            elif "en" in subtitles:
                subtitle_url = subtitles["en"][0]["url"]
                selected_lang = "en"
            elif "vi" in automatic_captions:
                subtitle_url = automatic_captions["vi"][0]["url"]
                selected_lang = "vi (auto)"
            elif "en" in automatic_captions:
                subtitle_url = automatic_captions["en"][0]["url"]
                selected_lang = "en (auto)"
            elif subtitles:
                first_lang = next(iter(subtitles))
                subtitle_url = subtitles[first_lang][0]["url"]
                selected_lang = f"{first_lang} (manual)"
            elif automatic_captions:
                first_lang = next(iter(automatic_captions))
                subtitle_url = automatic_captions[first_lang][0]["url"]
                selected_lang = f"{first_lang} (auto)"
            else:
                print("No subtitles found.")
                return None, None, None

            transcript = ydl.urlopen(subtitle_url).read().decode("utf-8")
            transcript_dict = json.loads(transcript)
            return extract_utf_from_events(transcript_dict), selected_lang, title

        except Exception as e:
            print(f"Error occurred while fetching subtitles: {e}")
            return None, None, None


def extract_utf_from_events(data):
    utf_scripts = []
    for event in data["events"]:
        if "segs" in event:
            utf_event = []
            for seg in event["segs"]:
                if "utf8" in seg and seg["utf8"] != "\n":
                    utf_event.append(seg["utf8"])
            if utf_event:
                utf_scripts.append([event["tStartMs"], " ".join(utf_event)])
    return utf_scripts


def time_output(time):
    return f"{time // 3600000:02d}:{(time // 60000) % 60:02d}:{(time // 1000) % 60:02d}"


def split_into_sentence_chunks(transcript_data, chunk_size=250):
    full_text = " ".join([x[1] for x in transcript_data])
    sentences = re.split(r'(?<=[.!?])\s+', full_text)

    chunks = []
    current_chunk = []
    word_count = 0
    index = 0
    current_start_time = transcript_data[0][0] if transcript_data else 0

    for sentence in sentences:
        words = sentence.split()
        if word_count + len(words) > chunk_size:
            chunk_text = " ".join(current_chunk)
            chunks.append({
                "timestamp": time_output(current_start_time),
                "text": chunk_text
            })
            current_chunk = []
            word_count = 0
            if index < len(transcript_data):
                current_start_time = transcript_data[index][0]

        current_chunk.extend(words)
        word_count += len(words)
        index += 1

    if current_chunk:
        chunk_text = " ".join(current_chunk)
        chunks.append({
            "timestamp": time_output(current_start_time),
            "text": chunk_text
        })

    return chunks


def process_youtube(url, output_dir="./json_output"):
    transcript_data, selected_lang, title = get_youtube_transcript(url)

    if transcript_data:
        transcript = " ".join([x[1] for x in transcript_data])
        chunks = split_into_sentence_chunks(transcript_data)

        chunk_texts = [chunk["text"] for chunk in chunks]
        embeddings = embedder.encode(chunk_texts, convert_to_tensor=False).tolist()

        for chunk, embedding in zip(chunks, embeddings):
            chunk["embedding"] = embedding

        # ✅ language_used REMOVED here
        document_data = {
            "type": "youtube",
            "scope": title,
            "original_data": url,
            "extracted_text": transcript,
            "chunks": chunks,
        }

        os.makedirs(output_dir, exist_ok=True)
        safe_title = re.sub(r"[^\w\s-]", "", title).strip().replace(" ", "_")
        filename = f"{safe_title}.json"
        output_path = os.path.join(output_dir, filename)

        with open(output_path, "w", encoding="utf-8") as f:
            json.dump(document_data, f, indent=4, ensure_ascii=False)

        print(f"Transcript saved to: {output_path}")
        return output_path
    else:
        return None

def main():
    url = "https://www.youtube.com/watch?v=_nuQ39Y4T5Q" 
    saved_file = process_youtube(url)
    if not saved_file:
        print("Failed to process and save transcript.")


main()


Transcript saved to: ./json_output\Cách_Tự_Học_Tiếng_Anh__Áp_dụng_cho_mọi_người_phương_pháp__tài_liệu_gợi_ý_VyVocab_Ep110.json
