<a href="https://colab.research.google.com/github/parth31533/YT-Project/blob/main/MASTER.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import os
import yt_dlp
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import requests
import concurrent.futures

# Step 1: Set up your API keys and constants
API_KEY = "AIzaSyD3yF_r1J0DkcbKNtTBwzQlmMN_LWSWRlk"  # Replace with your valid YouTube Data API v3 key
ASSEMBLYAI_API_KEY = "d773b67f986746528b961cd5772004b1"  # Replace with your AssemblyAI API key

youtube = build("youtube", "v3", developerKey=API_KEY)
FAILED_VIDEOS_FILE = "failed_videos.txt"

def get_channel_video_links_and_dates(channel_id):
    try:
        # Fetch the channel's uploads playlist ID
        response = youtube.channels().list(
            part="contentDetails",
            id=channel_id
        ).execute()

        uploads_playlist_id = response["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"]

        # Fetch videos in the uploads playlist
        video_links = []
        next_page_token = None

        while True:
            playlist_response = youtube.playlistItems().list(
                part="snippet",
                playlistId=uploads_playlist_id,
                maxResults=50,
                pageToken=next_page_token
            ).execute()

            for item in playlist_response["items"]:
                video_id = item["snippet"]["resourceId"]["videoId"]
                video_links.append(f"https://www.youtube.com/watch?v={video_id}")

            next_page_token = playlist_response.get("nextPageToken")
            if not next_page_token or len(video_links) >= 2:  # Stop after collecting 2 videos
                break

        return video_links

    except HttpError as e:
        print(f"Error fetching channel videos: {e}")
        return []

# Step 2: Fetch transcripts using YouTubeTranscriptApi
def fetch_transcripts(video_links):
    failed_videos = []

    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Parallelize transcript fetching
        futures = {executor.submit(YouTubeTranscriptApi.get_transcript, link.split("v=")[1]): link for link in video_links}

        for future in concurrent.futures.as_completed(futures):
            link = futures[future]
            try:
                future.result()  # Will raise an exception if transcript fetch fails
                print(f"Transcript fetched successfully for video: {link}")
            except (TranscriptsDisabled, NoTranscriptFound):
                print(f"Transcript not available for video: {link}")
                failed_videos.append(link)

    return failed_videos

# Step 3: Download audio files for failed videos
def download_audio(video_links):
    ydl_opts = {
        'format': 'bestaudio/best',
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'mp3',
            'preferredquality': '192',
        }],
        'outtmpl': 'downloads/%(title)s.%(ext)s',
        'noplaylist': True,
    }

    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Parallelize audio downloading
        futures = {executor.submit(yt_dlp.YoutubeDL(ydl_opts).download, [link]): link for link in video_links}

        for future in concurrent.futures.as_completed(futures):
            link = futures[future]
            try:
                future.result()  # Will raise an exception if download fails
                print(f"Audio downloaded successfully for video: {link}")
            except Exception as e:
                print(f"Error downloading audio for {link}: {e}")

# Step 4: Transcribe audio using AssemblyAI
def transcribe_audio(assemblyai_api_key, audio_file):
    headers = {"authorization": assemblyai_api_key}
    upload_url = "https://api.assemblyai.com/v2/upload"

    # Upload audio file
    with open(audio_file, "rb") as f:
        response = requests.post(upload_url, headers=headers, data=f)
        audio_url = response.json()["upload_url"]

    # Request transcription
    transcript_url = "https://api.assemblyai.com/v2/transcript"
    data = {"audio_url": audio_url}
    transcript_response = requests.post(transcript_url, headers=headers, json=data)

    transcript_id = transcript_response.json()["id"]
    status = "processing"

    while status == "processing":
        result = requests.get(f"{transcript_url}/{transcript_id}", headers=headers)
        status = result.json()["status"]

    if status == "completed":
        return result.json()["text"]
    else:
        return None

# Main function to stop after processing two videos
def main():
    channel_id = "UCsfp0zw1hNxpy_wDig8oExA"  # Replace with the actual channel ID

    # Step 1: Get the first two video links from the channel
    video_links = get_channel_video_links_and_dates(channel_id)

    # If less than two videos, stop the process
    if len(video_links) < 2:
        print("Not enough videos found.")
        return

    print(f"Processing videos: {video_links}")

    # Step 2: Fetch transcripts using YouTubeTranscriptApi
    failed_videos = fetch_transcripts(video_links)

    # Step 3: Download audio files for failed videos
    if failed_videos:
        download_audio(failed_videos)

    # Step 4: Transcribe downloaded audio using AssemblyAI
    audio_files = [f"downloads/{f}" for f in os.listdir("downloads") if f.endswith(".mp3")]

    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Parallelize audio transcription
        futures = {executor.submit(transcribe_audio, ASSEMBLYAI_API_KEY, audio_file): audio_file for audio_file in audio_files}

        for future in concurrent.futures.as_completed(futures):
            audio_file = futures[future]
            transcript = future.result()
            if transcript:
                print(f"Transcription completed for {audio_file}: {transcript[:100]}...")  # Display first 100 chars of the transcript

if __name__ == "__main__":
    main()


ModuleNotFoundError: No module named 'yt_dlp'

In [3]:
pip install yt-dlp


Collecting yt-dlp
  Downloading yt_dlp-2024.12.6-py3-none-any.whl.metadata (172 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/172.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━[0m [32m163.8/172.1 kB[0m [31m5.4 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m172.1/172.1 kB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading yt_dlp-2024.12.6-py3-none-any.whl (3.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m47.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: yt-dlp
Successfully installed yt-dlp-2024.12.6


In [4]:
import os
import yt_dlp
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import requests
import concurrent.futures

# Step 1: Set up your API keys and constants
API_KEY = "AIzaSyD3yF_r1J0DkcbKNtTBwzQlmMN_LWSWRlk"  # Replace with your valid YouTube Data API v3 key
ASSEMBLYAI_API_KEY = "d773b67f986746528b961cd5772004b1"  # Replace with your AssemblyAI API key

youtube = build("youtube", "v3", developerKey=API_KEY)
FAILED_VIDEOS_FILE = "failed_videos.txt"

def get_channel_video_links_and_dates(channel_id):
    try:
        # Fetch the channel's uploads playlist ID
        response = youtube.channels().list(
            part="contentDetails",
            id=channel_id
        ).execute()

        uploads_playlist_id = response["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"]

        # Fetch videos in the uploads playlist
        video_links = []
        next_page_token = None

        while True:
            playlist_response = youtube.playlistItems().list(
                part="snippet",
                playlistId=uploads_playlist_id,
                maxResults=50,
                pageToken=next_page_token
            ).execute()

            for item in playlist_response["items"]:
                video_id = item["snippet"]["resourceId"]["videoId"]
                video_links.append(f"https://www.youtube.com/watch?v={video_id}")

            next_page_token = playlist_response.get("nextPageToken")
            if not next_page_token or len(video_links) >= 2:  # Stop after collecting 2 videos
                break

        return video_links

    except HttpError as e:
        print(f"Error fetching channel videos: {e}")
        return []

# Step 2: Fetch transcripts using YouTubeTranscriptApi
def fetch_transcripts(video_links):
    failed_videos = []

    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Parallelize transcript fetching
        futures = {executor.submit(YouTubeTranscriptApi.get_transcript, link.split("v=")[1]): link for link in video_links}

        for future in concurrent.futures.as_completed(futures):
            link = futures[future]
            try:
                future.result()  # Will raise an exception if transcript fetch fails
                print(f"Transcript fetched successfully for video: {link}")
            except (TranscriptsDisabled, NoTranscriptFound):
                print(f"Transcript not available for video: {link}")
                failed_videos.append(link)

    return failed_videos

# Step 3: Download audio files for failed videos
def download_audio(video_links):
    ydl_opts = {
        'format': 'bestaudio/best',
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'mp3',
            'preferredquality': '192',
        }],
        'outtmpl': 'downloads/%(title)s.%(ext)s',
        'noplaylist': True,
    }

    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Parallelize audio downloading
        futures = {executor.submit(yt_dlp.YoutubeDL(ydl_opts).download, [link]): link for link in video_links}

        for future in concurrent.futures.as_completed(futures):
            link = futures[future]
            try:
                future.result()  # Will raise an exception if download fails
                print(f"Audio downloaded successfully for video: {link}")
            except Exception as e:
                print(f"Error downloading audio for {link}: {e}")

# Step 4: Transcribe audio using AssemblyAI
def transcribe_audio(assemblyai_api_key, audio_file):
    headers = {"authorization": assemblyai_api_key}
    upload_url = "https://api.assemblyai.com/v2/upload"

    # Upload audio file
    with open(audio_file, "rb") as f:
        response = requests.post(upload_url, headers=headers, data=f)
        audio_url = response.json()["upload_url"]

    # Request transcription
    transcript_url = "https://api.assemblyai.com/v2/transcript"
    data = {"audio_url": audio_url}
    transcript_response = requests.post(transcript_url, headers=headers, json=data)

    transcript_id = transcript_response.json()["id"]
    status = "processing"

    while status == "processing":
        result = requests.get(f"{transcript_url}/{transcript_id}", headers=headers)
        status = result.json()["status"]

    if status == "completed":
        return result.json()["text"]
    else:
        return None

# Main function to stop after processing two videos
def main():
    channel_id = "UCsfp0zw1hNxpy_wDig8oExA"  # Replace with the actual channel ID

    # Step 1: Get the first two video links from the channel
    video_links = get_channel_video_links_and_dates(channel_id)

    # If less than two videos, stop the process
    if len(video_links) < 2:
        print("Not enough videos found.")
        return

    print(f"Processing videos: {video_links}")

    # Step 2: Fetch transcripts using YouTubeTranscriptApi
    failed_videos = fetch_transcripts(video_links)

    # Step 3: Download audio files for failed videos
    if failed_videos:
        download_audio(failed_videos)

    # Step 4: Transcribe downloaded audio using AssemblyAI
    audio_files = [f"downloads/{f}" for f in os.listdir("downloads") if f.endswith(".mp3")]

    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Parallelize audio transcription
        futures = {executor.submit(transcribe_audio, ASSEMBLYAI_API_KEY, audio_file): audio_file for audio_file in audio_files}

        for future in concurrent.futures.as_completed(futures):
            audio_file = futures[future]
            transcript = future.result()
            if transcript:
                print(f"Transcription completed for {audio_file}: {transcript[:100]}...")  # Display first 100 chars of the transcript

if __name__ == "__main__":
    main()


ModuleNotFoundError: No module named 'youtube_transcript_api'

In [5]:
pip install youtube-transcript-api


Collecting youtube-transcript-api
  Downloading youtube_transcript_api-0.6.3-py3-none-any.whl.metadata (17 kB)
Downloading youtube_transcript_api-0.6.3-py3-none-any.whl (622 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m622.3/622.3 kB[0m [31m10.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: youtube-transcript-api
Successfully installed youtube-transcript-api-0.6.3


In [6]:
import os
import yt_dlp
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import requests
import concurrent.futures

# Step 1: Set up your API keys and constants
API_KEY = "AIzaSyD3yF_r1J0DkcbKNtTBwzQlmMN_LWSWRlk"  # Replace with your valid YouTube Data API v3 key
ASSEMBLYAI_API_KEY = "d773b67f986746528b961cd5772004b1"  # Replace with your AssemblyAI API key

youtube = build("youtube", "v3", developerKey=API_KEY)
FAILED_VIDEOS_FILE = "failed_videos.txt"

def get_channel_video_links_and_dates(channel_id):
    try:
        # Fetch the channel's uploads playlist ID
        response = youtube.channels().list(
            part="contentDetails",
            id=channel_id
        ).execute()

        uploads_playlist_id = response["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"]

        # Fetch videos in the uploads playlist
        video_links = []
        next_page_token = None

        while True:
            playlist_response = youtube.playlistItems().list(
                part="snippet",
                playlistId=uploads_playlist_id,
                maxResults=50,
                pageToken=next_page_token
            ).execute()

            for item in playlist_response["items"]:
                video_id = item["snippet"]["resourceId"]["videoId"]
                video_links.append(f"https://www.youtube.com/watch?v={video_id}")

            next_page_token = playlist_response.get("nextPageToken")
            if not next_page_token or len(video_links) >= 2:  # Stop after collecting 2 videos
                break

        return video_links

    except HttpError as e:
        print(f"Error fetching channel videos: {e}")
        return []

# Step 2: Fetch transcripts using YouTubeTranscriptApi
def fetch_transcripts(video_links):
    failed_videos = []

    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Parallelize transcript fetching
        futures = {executor.submit(YouTubeTranscriptApi.get_transcript, link.split("v=")[1]): link for link in video_links}

        for future in concurrent.futures.as_completed(futures):
            link = futures[future]
            try:
                future.result()  # Will raise an exception if transcript fetch fails
                print(f"Transcript fetched successfully for video: {link}")
            except (TranscriptsDisabled, NoTranscriptFound):
                print(f"Transcript not available for video: {link}")
                failed_videos.append(link)

    return failed_videos

# Step 3: Download audio files for failed videos
def download_audio(video_links):
    ydl_opts = {
        'format': 'bestaudio/best',
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'mp3',
            'preferredquality': '192',
        }],
        'outtmpl': 'downloads/%(title)s.%(ext)s',
        'noplaylist': True,
    }

    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Parallelize audio downloading
        futures = {executor.submit(yt_dlp.YoutubeDL(ydl_opts).download, [link]): link for link in video_links}

        for future in concurrent.futures.as_completed(futures):
            link = futures[future]
            try:
                future.result()  # Will raise an exception if download fails
                print(f"Audio downloaded successfully for video: {link}")
            except Exception as e:
                print(f"Error downloading audio for {link}: {e}")

# Step 4: Transcribe audio using AssemblyAI
def transcribe_audio(assemblyai_api_key, audio_file):
    headers = {"authorization": assemblyai_api_key}
    upload_url = "https://api.assemblyai.com/v2/upload"

    # Upload audio file
    with open(audio_file, "rb") as f:
        response = requests.post(upload_url, headers=headers, data=f)
        audio_url = response.json()["upload_url"]

    # Request transcription
    transcript_url = "https://api.assemblyai.com/v2/transcript"
    data = {"audio_url": audio_url}
    transcript_response = requests.post(transcript_url, headers=headers, json=data)

    transcript_id = transcript_response.json()["id"]
    status = "processing"

    while status == "processing":
        result = requests.get(f"{transcript_url}/{transcript_id}", headers=headers)
        status = result.json()["status"]

    if status == "completed":
        return result.json()["text"]
    else:
        return None

# Main function to stop after processing two videos
def main():
    channel_id = "UCsfp0zw1hNxpy_wDig8oExA"  # Replace with the actual channel ID

    # Step 1: Get the first two video links from the channel
    video_links = get_channel_video_links_and_dates(channel_id)

    # If less than two videos, stop the process
    if len(video_links) < 2:
        print("Not enough videos found.")
        return

    print(f"Processing videos: {video_links}")

    # Step 2: Fetch transcripts using YouTubeTranscriptApi
    failed_videos = fetch_transcripts(video_links)

    # Step 3: Download audio files for failed videos
    if failed_videos:
        download_audio(failed_videos)

    # Step 4: Transcribe downloaded audio using AssemblyAI
    audio_files = [f"downloads/{f}" for f in os.listdir("downloads") if f.endswith(".mp3")]

    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Parallelize audio transcription
        futures = {executor.submit(transcribe_audio, ASSEMBLYAI_API_KEY, audio_file): audio_file for audio_file in audio_files}

        for future in concurrent.futures.as_completed(futures):
            audio_file = futures[future]
            transcript = future.result()
            if transcript:
                print(f"Transcription completed for {audio_file}: {transcript[:100]}...")  # Display first 100 chars of the transcript

if __name__ == "__main__":
    main()


Processing videos: ['https://www.youtube.com/watch?v=mdGpEqki8Lk', 'https://www.youtube.com/watch?v=AU_m12Nuk4k', 'https://www.youtube.com/watch?v=izZ0ZzsJ82A', 'https://www.youtube.com/watch?v=Og142I-11hw', 'https://www.youtube.com/watch?v=u7uOUHbhqXw', 'https://www.youtube.com/watch?v=O0P3FqKdUnY', 'https://www.youtube.com/watch?v=dJ3q7EEKI_8', 'https://www.youtube.com/watch?v=R6z_dkHAHh4', 'https://www.youtube.com/watch?v=3mNStBOkZ2U', 'https://www.youtube.com/watch?v=qw2-GVk8z4w', 'https://www.youtube.com/watch?v=LvxFWod6n98', 'https://www.youtube.com/watch?v=Orhca1vV8Uw', 'https://www.youtube.com/watch?v=HlQKQGvLeqo', 'https://www.youtube.com/watch?v=bUo7AAoHHfA', 'https://www.youtube.com/watch?v=AQFFMveRavQ', 'https://www.youtube.com/watch?v=4ZOMrfVKS9g', 'https://www.youtube.com/watch?v=zh2PTLPfmn8', 'https://www.youtube.com/watch?v=DhwBunH7pGc', 'https://www.youtube.com/watch?v=RbQAz9JPWKg', 'https://www.youtube.com/watch?v=y4HFT6VQodE', 'https://www.youtube.com/watch?v=4L8e73U

ERROR: [youtube] mdGpEqki8Lk: This live event will begin in 3 hours.


Error downloading audio for https://www.youtube.com/watch?v=mdGpEqki8Lk: ERROR: [youtube] mdGpEqki8Lk: This live event will begin in 3 hours.
[youtube] u7uOUHbhqXw: Downloading ios player API JSON
[youtube] u7uOUHbhqXw: Downloading mweb player API JSON
[youtube] dJ3q7EEKI_8: Downloading ios player API JSON


ERROR: [youtube] u7uOUHbhqXw: Video unavailable. This video contains content from WMG, who has blocked it in your country on copyright grounds


Error downloading audio for https://www.youtube.com/watch?v=u7uOUHbhqXw: ERROR: [youtube] u7uOUHbhqXw: Video unavailable. This video contains content from WMG, who has blocked it in your country on copyright grounds
[youtube] _Z-hZ_URBFE: Downloading ios player API JSON
[youtube] DhwBunH7pGc: Downloading ios player API JSON
[youtube] _Z-hZ_URBFE: Downloading mweb player API JSON
[youtube] dJ3q7EEKI_8: Downloading mweb player API JSON
[youtube] DhwBunH7pGc: Downloading mweb player API JSON


ERROR: [youtube] _Z-hZ_URBFE: Video unavailable. This video contains content from NBC Universal, who has blocked it in your country on copyright grounds


Error downloading audio for https://www.youtube.com/watch?v=_Z-hZ_URBFE: ERROR: [youtube] _Z-hZ_URBFE: Video unavailable. This video contains content from NBC Universal, who has blocked it in your country on copyright grounds
[youtube] dJ3q7EEKI_8: Downloading player 5b77d519


ERROR: [youtube] DhwBunH7pGc: Video unavailable. This video contains content from NBC Universal, who has blocked it in your country on copyright grounds


Error downloading audio for https://www.youtube.com/watch?v=DhwBunH7pGc: ERROR: [youtube] DhwBunH7pGc: Video unavailable. This video contains content from NBC Universal, who has blocked it in your country on copyright grounds
[youtube] dJ3q7EEKI_8: Downloading m3u8 information
[info] dJ3q7EEKI_8: Downloading 1 format(s): 251
[download] Destination: downloads/Jerome Powell LIVE @ 1_45PM EST - Stock Market LIVE, Live Trading, Stocks To Buy NOW.webm
[download] 100% of  365.82MiB in 00:00:20 at 17.93MiB/s  
[ExtractAudio] Destination: downloads/Jerome Powell LIVE @ 1_45PM EST - Stock Market LIVE, Live Trading, Stocks To Buy NOW.mp3


ERROR: Postprocessing: audio conversion failed: Exiting normally, received signal 2.


KeyboardInterrupt: 

In [8]:
import os
import yt_dlp
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import requests

# Step 1: Set up your API keys and constants
API_KEY = "AIzaSyD3yF_r1J0DkcbKNtTBwzQlmMN_LWSWRlk"  # Replace with your valid YouTube Data API v3 key
ASSEMBLYAI_API_KEY = "d773b67f986746528b961cd5772004b1"  # Replace with your AssemblyAI API key

youtube = build("youtube", "v3", developerKey=API_KEY)
FAILED_VIDEOS_FILE = "failed_videos.txt"

# Fetch 2 videos (one with transcript and one without)
def get_channel_video_links_and_dates(channel_id):
    try:
        # Fetch the channel's uploads playlist ID
        response = youtube.channels().list(
            part="contentDetails",
            id=channel_id
        ).execute()

        uploads_playlist_id = response["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"]

        # Fetch videos in the uploads playlist
        video_links = []
        next_page_token = None

        while len(video_links) < 2:  # Stop after 2 videos
            playlist_response = youtube.playlistItems().list(
                part="snippet",
                playlistId=uploads_playlist_id,
                maxResults=50,
                pageToken=next_page_token
            ).execute()

            for item in playlist_response["items"]:
                video_id = item["snippet"]["resourceId"]["videoId"]
                video_links.append(f"https://www.youtube.com/watch?v={video_id}")
                if len(video_links) >= 2:
                    break

            next_page_token = playlist_response.get("nextPageToken")
            if not next_page_token:
                break

        return video_links

    except HttpError as e:
        print(f"Error fetching channel videos: {e}")
        return []

# Step 2: Fetch transcripts using YouTubeTranscriptApi
def fetch_transcripts(video_links):
    failed_videos = []

    for link in video_links:
        video_id = link.split("v=")[1]
        try:
            transcript = YouTubeTranscriptApi.get_transcript(video_id)
            print(f"Transcript fetched successfully for video: {link}")

        except (TranscriptsDisabled, NoTranscriptFound):
            print(f"Transcript not available for video: {link}")
            failed_videos.append(link)

    return failed_videos

# Step 3: Download audio files for failed videos
def download_audio(video_links):
    ydl_opts = {
        'format': 'bestaudio/best',
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'mp3',
            'preferredquality': '192',
        }],
        'outtmpl': 'downloads/%(title)s.%(ext)s',
        'noplaylist': True,
    }

    for link in video_links:
        try:
            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                ydl.download([link])
            print(f"Audio downloaded successfully for video: {link}")
        except Exception as e:
            print(f"Error downloading audio for {link}: {e}")

# Step 4: Transcribe audio using AssemblyAI
def transcribe_audio(assemblyai_api_key, audio_file):
    headers = {"authorization": assemblyai_api_key}
    upload_url = "https://api.assemblyai.com/v2/upload"

    # Upload audio file
    with open(audio_file, "rb") as f:
        response = requests.post(upload_url, headers=headers, data=f)
        audio_url = response.json()["upload_url"]

    # Request transcription
    transcript_url = "https://api.assemblyai.com/v2/transcript"
    data = {"audio_url": audio_url}
    transcript_response = requests.post(transcript_url, headers=headers, json=data)

    transcript_id = transcript_response.json()["id"]
    status = "processing"

    while status == "processing":
        result = requests.get(f"{transcript_url}/{transcript_id}", headers=headers)
        status = result.json()["status"]

    if status == "completed":
        return result.json()["text"]
    else:
        return None

# Main function
def main():
    channel_id = "UCsfp0zw1hNxpy_wDig8oExA"  # Replace with the actual channel ID

    # Step 1: Get 2 video links from the channel
    video_links = get_channel_video_links_and_dates(channel_id)
    print(f"Processing videos: {video_links}")

    if len(video_links) < 2:
        print("Not enough videos found. Exiting.")
        return

    # Step 2: Fetch transcripts using YouTubeTranscriptApi
    failed_videos = fetch_transcripts(video_links)

    # Step 3: Download audio files for failed videos
    if failed_videos:
        download_audio(failed_videos)

    # Step 4: Transcribe downloaded audio using AssemblyAI
    audio_files = [f"downloads/{f}" for f in os.listdir("downloads") if f.endswith(".mp3")]
    for audio_file in audio_files:
        transcript = transcribe_audio(ASSEMBLYAI_API_KEY, audio_file)
        if transcript:
            print(f"Transcription completed for {audio_file}: {transcript}")

if __name__ == "__main__":
    main()


Processing videos: ['https://www.youtube.com/watch?v=mdGpEqki8Lk', 'https://www.youtube.com/watch?v=AU_m12Nuk4k']
Transcript not available for video: https://www.youtube.com/watch?v=mdGpEqki8Lk
Transcript fetched successfully for video: https://www.youtube.com/watch?v=AU_m12Nuk4k
[youtube] Extracting URL: https://www.youtube.com/watch?v=mdGpEqki8Lk
[youtube] mdGpEqki8Lk: Downloading webpage
[youtube] mdGpEqki8Lk: Downloading ios player API JSON
[youtube] mdGpEqki8Lk: Downloading mweb player API JSON


ERROR: [youtube] mdGpEqki8Lk: This live event will begin in 3 hours.


Error downloading audio for https://www.youtube.com/watch?v=mdGpEqki8Lk: ERROR: [youtube] mdGpEqki8Lk: This live event will begin in 3 hours.


In [9]:
import os
import yt_dlp
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import requests
import zipfile

# Step 1: Set up your API keys and constants
YOUTUBE_API_KEY = "AIzaSyD3yF_r1J0DkcbKNtTBwzQlmMN_LWSWRlk"
ASSEMBLYAI_API_KEY = "d773b67f986746528b961cd5772004b1"
channel_id = "UCsfp0zw1hNxpy_wDig8oExA"  # The channel ID you provided

youtube = build("youtube", "v3", developerKey=YOUTUBE_API_KEY)
FAILED_VIDEOS_FILE = "failed_videos.txt"

# Fetch 2 videos (one with transcript and one without)
def get_channel_video_links_and_dates(channel_id):
    try:
        # Fetch the channel's uploads playlist ID
        response = youtube.channels().list(
            part="contentDetails",
            id=channel_id
        ).execute()

        uploads_playlist_id = response["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"]

        # Fetch videos in the uploads playlist
        video_links = []
        next_page_token = None

        while len(video_links) < 2:  # Stop after 2 videos
            playlist_response = youtube.playlistItems().list(
                part="snippet",
                playlistId=uploads_playlist_id,
                maxResults=50,
                pageToken=next_page_token
            ).execute()

            for item in playlist_response["items"]:
                video_id = item["snippet"]["resourceId"]["videoId"]
                video_status = youtube.videos().list(
                    part="snippet, liveStreamingDetails",
                    id=video_id
                ).execute()

                # Check if the video is scheduled or live (skip if scheduled or live)
                live_status = video_status["items"][0].get("liveStreamingDetails", {}).get("liveStatus", None)
                if live_status in ["upcoming", "live"]:
                    continue

                video_links.append(f"https://www.youtube.com/watch?v={video_id}")

                if len(video_links) >= 2:
                    break

            next_page_token = playlist_response.get("nextPageToken")
            if not next_page_token:
                break

        return video_links

    except HttpError as e:
        print(f"Error fetching channel videos: {e}")
        return []

# Step 2: Fetch transcripts using YouTubeTranscriptApi
def fetch_transcripts(video_links):
    failed_videos = []

    for link in video_links:
        video_id = link.split("v=")[1]
        try:
            transcript = YouTubeTranscriptApi.get_transcript(video_id)
            print(f"Transcript fetched successfully for video: {link}")

        except (TranscriptsDisabled, NoTranscriptFound):
            print(f"Transcript not available for video: {link}")
            failed_videos.append(link)

    return failed_videos

# Step 3: Download audio files for failed videos
def download_audio(video_links):
    ydl_opts = {
        'format': 'bestaudio/best',
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'mp3',
            'preferredquality': '192',
        }],
        'outtmpl': 'downloads/%(title)s.%(ext)s',
        'noplaylist': True,
    }

    for link in video_links:
        try:
            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                ydl.download([link])
            print(f"Audio downloaded successfully for video: {link}")
        except Exception as e:
            print(f"Error downloading audio for {link}: {e}")

# Step 4: Transcribe audio using AssemblyAI
def transcribe_audio(assemblyai_api_key, audio_file):
    headers = {"authorization": assemblyai_api_key}
    upload_url = "https://api.assemblyai.com/v2/upload"

    # Upload audio file
    with open(audio_file, "rb") as f:
        response = requests.post(upload_url, headers=headers, data=f)
        audio_url = response.json()["upload_url"]

    # Request transcription
    transcript_url = "https://api.assemblyai.com/v2/transcript"
    data = {"audio_url": audio_url}
    transcript_response = requests.post(transcript_url, headers=headers, json=data)

    transcript_id = transcript_response.json()["id"]
    status = "processing"

    while status == "processing":
        result = requests.get(f"{transcript_url}/{transcript_id}", headers=headers)
        status = result.json()["status"]

    if status == "completed":
        return result.json()["text"]
    else:
        return None

# Step 5: Save videos in a zip file
def save_videos_to_zip(video_files, zip_filename):
    with zipfile.ZipFile(zip_filename, 'w') as zipf:
        for video_file in video_files:
            zipf.write(video_file, os.path.basename(video_file))
    print(f"Videos saved to {zip_filename}")

# Main function
def main():
    # Step 1: Get 2 video links from the channel
    video_links = get_channel_video_links_and_dates(channel_id)
    print(f"Processing videos: {video_links}")

    if len(video_links) < 2:
        print("Not enough videos found. Exiting.")
        return

    # Step 2: Fetch transcripts using YouTubeTranscriptApi
    failed_videos = fetch_transcripts(video_links)

    # Step 3: Download audio files for failed videos
    if failed_videos:
        download_audio(failed_videos)

    # Step 4: Transcribe downloaded audio using AssemblyAI
    audio_files = [f"downloads/{f}" for f in os.listdir("downloads") if f.endswith(".mp3")]
    for audio_file in audio_files:
        transcript = transcribe_audio(ASSEMBLYAI_API_KEY, audio_file)
        if transcript:
            print(f"Transcription completed for {audio_file}: {transcript}")

    # Step 5: Save the processed videos to a zip file
    video_files = [f"downloads/{f}" for f in os.listdir("downloads") if f.endswith(".mp3")]
    if video_files:
        save_videos_to_zip(video_files, "test.zip")

if __name__ == "__main__":
    main()


Processing videos: ['https://www.youtube.com/watch?v=mdGpEqki8Lk', 'https://www.youtube.com/watch?v=AU_m12Nuk4k']
Transcript not available for video: https://www.youtube.com/watch?v=mdGpEqki8Lk
Transcript fetched successfully for video: https://www.youtube.com/watch?v=AU_m12Nuk4k
[youtube] Extracting URL: https://www.youtube.com/watch?v=mdGpEqki8Lk
[youtube] mdGpEqki8Lk: Downloading webpage
[youtube] mdGpEqki8Lk: Downloading ios player API JSON
[youtube] mdGpEqki8Lk: Downloading mweb player API JSON


ERROR: [youtube] mdGpEqki8Lk: This live event will begin in 3 hours.


Error downloading audio for https://www.youtube.com/watch?v=mdGpEqki8Lk: ERROR: [youtube] mdGpEqki8Lk: This live event will begin in 3 hours.
Videos saved to test.zip


In [10]:
import os
import yt_dlp
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import requests
import zipfile

# Step 1: Set up your API keys and constants
YOUTUBE_API_KEY = "AIzaSyD3yF_r1J0DkcbKNtTBwzQlmMN_LWSWRlk"
ASSEMBLYAI_API_KEY = "d773b67f986746528b961cd5772004b1"
channel_id = "UCsfp0zw1hNxpy_wDig8oExA"  # The channel ID you provided

youtube = build("youtube", "v3", developerKey=YOUTUBE_API_KEY)
FAILED_VIDEOS_FILE = "failed_videos.txt"

# Fetch 2 videos (one with transcript and one without)
def get_channel_video_links_and_dates(channel_id):
    try:
        # Fetch the channel's uploads playlist ID
        response = youtube.channels().list(
            part="contentDetails",
            id=channel_id
        ).execute()

        uploads_playlist_id = response["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"]

        # Fetch videos in the uploads playlist
        video_links = []
        next_page_token = None

        while len(video_links) < 2:  # Stop after 2 videos
            playlist_response = youtube.playlistItems().list(
                part="snippet",
                playlistId=uploads_playlist_id,
                maxResults=50,
                pageToken=next_page_token
            ).execute()

            for item in playlist_response["items"]:
                video_id = item["snippet"]["resourceId"]["videoId"]
                video_status = youtube.videos().list(
                    part="snippet, liveStreamingDetails",
                    id=video_id
                ).execute()

                # Check if the video is scheduled or live (skip if scheduled or live)
                live_status = video_status["items"][0].get("liveStreamingDetails", {}).get("liveStatus", None)
                if live_status in ["upcoming", "live"]:
                    print(f"Skipping video {video_id} as it is scheduled or live.")
                    continue

                video_links.append(f"https://www.youtube.com/watch?v={video_id}")

                if len(video_links) >= 2:
                    break

            next_page_token = playlist_response.get("nextPageToken")
            if not next_page_token:
                break

        return video_links

    except HttpError as e:
        print(f"Error fetching channel videos: {e}")
        return []

# Step 2: Fetch transcripts using YouTubeTranscriptApi
def fetch_transcripts(video_links):
    failed_videos = []

    for link in video_links:
        video_id = link.split("v=")[1]
        try:
            transcript = YouTubeTranscriptApi.get_transcript(video_id)
            print(f"Transcript fetched successfully for video: {link}")

        except (TranscriptsDisabled, NoTranscriptFound):
            print(f"Transcript not available for video: {link}")
            failed_videos.append(link)

    return failed_videos

# Step 3: Download audio files for failed videos
def download_audio(video_links):
    ydl_opts = {
        'format': 'bestaudio/best',
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'mp3',
            'preferredquality': '192',
        }],
        'outtmpl': 'downloads/%(title)s.%(ext)s',
        'noplaylist': True,
    }

    for link in video_links:
        try:
            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                ydl.download([link])
            print(f"Audio downloaded successfully for video: {link}")
        except Exception as e:
            print(f"Error downloading audio for {link}: {e}")

# Step 4: Transcribe audio using AssemblyAI
def transcribe_audio(assemblyai_api_key, audio_file):
    headers = {"authorization": assemblyai_api_key}
    upload_url = "https://api.assemblyai.com/v2/upload"

    # Upload audio file
    with open(audio_file, "rb") as f:
        response = requests.post(upload_url, headers=headers, data=f)
        audio_url = response.json()["upload_url"]

    # Request transcription
    transcript_url = "https://api.assemblyai.com/v2/transcript"
    data = {"audio_url": audio_url}
    transcript_response = requests.post(transcript_url, headers=headers, json=data)

    transcript_id = transcript_response.json()["id"]
    status = "processing"

    while status == "processing":
        result = requests.get(f"{transcript_url}/{transcript_id}", headers=headers)
        status = result.json()["status"]

    if status == "completed":
        return result.json()["text"]
    else:
        return None

# Step 5: Save videos in a zip file
def save_videos_to_zip(video_files, zip_filename):
    with zipfile.ZipFile(zip_filename, 'w') as zipf:
        for video_file in video_files:
            zipf.write(video_file, os.path.basename(video_file))
    print(f"Videos saved to {zip_filename}")

# Main function
def main():
    # Step 1: Get 2 video links from the channel
    video_links = get_channel_video_links_and_dates(channel_id)
    print(f"Processing videos: {video_links}")

    if len(video_links) < 2:
        print("Not enough videos found. Exiting.")
        return

    # Step 2: Fetch transcripts using YouTubeTranscriptApi
    failed_videos = fetch_transcripts(video_links)

    # Step 3: Download audio files for failed videos
    if failed_videos:
        download_audio(failed_videos)

    # Step 4: Transcribe downloaded audio using AssemblyAI
    audio_files = [f"downloads/{f}" for f in os.listdir("downloads") if f.endswith(".mp3")]
    for audio_file in audio_files:
        transcript = transcribe_audio(ASSEMBLYAI_API_KEY, audio_file)
        if transcript:
            print(f"Transcription completed for {audio_file}: {transcript}")

    # Step 5: Save the processed videos to a zip file
    video_files = [f"downloads/{f}" for f in os.listdir("downloads") if f.endswith(".mp3")]
    if video_files:
        save_videos_to_zip(video_files, "test.zip")

if __name__ == "__main__":
    main()


Processing videos: ['https://www.youtube.com/watch?v=mdGpEqki8Lk', 'https://www.youtube.com/watch?v=AU_m12Nuk4k']
Transcript not available for video: https://www.youtube.com/watch?v=mdGpEqki8Lk
Transcript fetched successfully for video: https://www.youtube.com/watch?v=AU_m12Nuk4k
[youtube] Extracting URL: https://www.youtube.com/watch?v=mdGpEqki8Lk
[youtube] mdGpEqki8Lk: Downloading webpage
[youtube] mdGpEqki8Lk: Downloading ios player API JSON
[youtube] mdGpEqki8Lk: Downloading mweb player API JSON


ERROR: [youtube] mdGpEqki8Lk: This live event will begin in 3 hours.


Error downloading audio for https://www.youtube.com/watch?v=mdGpEqki8Lk: ERROR: [youtube] mdGpEqki8Lk: This live event will begin in 3 hours.
Videos saved to test.zip


In [11]:
import os
import yt_dlp
import random
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import requests
import zipfile

# Step 1: Set up your API keys and constants
YOUTUBE_API_KEY = "AIzaSyD3yF_r1J0DkcbKNtTBwzQlmMN_LWSWRlk"
ASSEMBLYAI_API_KEY = "d773b67f986746528b961cd5772004b1"
channel_id = "UCsfp0zw1hNxpy_wDig8oExA"  # The channel ID you provided

youtube = build("youtube", "v3", developerKey=YOUTUBE_API_KEY)
FAILED_VIDEOS_FILE = "failed_videos.txt"

# Fetch random 5 videos from the channel
def get_random_channel_video_links(channel_id, num_videos=5):
    try:
        # Fetch the channel's uploads playlist ID
        response = youtube.channels().list(
            part="contentDetails",
            id=channel_id
        ).execute()

        uploads_playlist_id = response["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"]

        # Fetch videos in the uploads playlist
        video_links = []
        next_page_token = None

        while len(video_links) < num_videos:  # Stop after 5 videos
            playlist_response = youtube.playlistItems().list(
                part="snippet",
                playlistId=uploads_playlist_id,
                maxResults=50,
                pageToken=next_page_token
            ).execute()

            for item in playlist_response["items"]:
                video_id = item["snippet"]["resourceId"]["videoId"]
                video_status = youtube.videos().list(
                    part="snippet, liveStreamingDetails",
                    id=video_id
                ).execute()

                # Skip live or upcoming videos
                live_status = video_status["items"][0].get("liveStreamingDetails", {}).get("liveStatus", None)
                if live_status in ["upcoming", "live"]:
                    continue

                video_links.append(f"https://www.youtube.com/watch?v={video_id}")
                if len(video_links) >= num_videos:
                    break

            next_page_token = playlist_response.get("nextPageToken")
            if not next_page_token:
                break

        return video_links

    except HttpError as e:
        print(f"Error fetching channel videos: {e}")
        return []

# Step 2: Fetch transcripts using YouTubeTranscriptApi
def fetch_transcripts(video_links):
    available_transcripts = []
    failed_videos = []

    for link in video_links:
        video_id = link.split("v=")[1]
        try:
            transcript = YouTubeTranscriptApi.get_transcript(video_id)
            available_transcripts.append((link, transcript))
            print(f"Transcript fetched successfully for video: {link}")

        except (TranscriptsDisabled, NoTranscriptFound):
            print(f"Transcript not available for video: {link}")
            failed_videos.append(link)

    return available_transcripts, failed_videos

# Step 3: Download audio files for failed videos
def download_audio(video_links):
    ydl_opts = {
        'format': 'bestaudio/best',
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'mp3',
            'preferredquality': '192',
        }],
        'outtmpl': 'downloads/%(title)s.%(ext)s',
        'noplaylist': True,
    }

    downloaded_files = []

    for link in video_links:
        try:
            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                ydl.download([link])
            print(f"Audio downloaded successfully for video: {link}")
            downloaded_files.append(link)
        except Exception as e:
            print(f"Error downloading audio for {link}: {e}")

    return downloaded_files

# Step 4: Transcribe audio using AssemblyAI
def transcribe_audio(assemblyai_api_key, audio_file):
    headers = {"authorization": assemblyai_api_key}
    upload_url = "https://api.assemblyai.com/v2/upload"

    # Upload audio file
    with open(audio_file, "rb") as f:
        response = requests.post(upload_url, headers=headers, data=f)
        audio_url = response.json()["upload_url"]

    # Request transcription
    transcript_url = "https://api.assemblyai.com/v2/transcript"
    data = {"audio_url": audio_url}
    transcript_response = requests.post(transcript_url, headers=headers, json=data)

    transcript_id = transcript_response.json()["id"]
    status = "processing"

    while status == "processing":
        result = requests.get(f"{transcript_url}/{transcript_id}", headers=headers)
        status = result.json()["status"]

    if status == "completed":
        return result.json()["text"]
    else:
        return None

# Step 5: Save videos in a zip file
def save_videos_to_zip(video_files, zip_filename):
    with zipfile.ZipFile(zip_filename, 'w') as zipf:
        for video_file in video_files:
            zipf.write(video_file, os.path.basename(video_file))
    print(f"Videos saved to {zip_filename}")

# Main function
def main():
    # Step 1: Get random 5 video links from the channel
    video_links = get_random_channel_video_links(channel_id)
    print(f"Processing videos: {video_links}")

    if len(video_links) < 5:
        print("Not enough videos found. Exiting.")
        return

    # Variables to track whether AssemblyAI has been used
    assembly_ai_used = False

    # Step 2: Fetch transcripts using YouTubeTranscriptApi
    available_transcripts = []
    failed_videos = []

    for link in video_links:
        video_id = link.split("v=")[1]
        try:
            transcript = YouTubeTranscriptApi.get_transcript(video_id)
            available_transcripts.append((link, transcript))
            print(f"Transcript fetched successfully for video: {link}")
        except (TranscriptsDisabled, NoTranscriptFound):
            failed_videos.append(link)

    # Step 3: Ensure at least one video is processed with AssemblyAI
    if failed_videos:
        # Pick the first failed video and transcribe using AssemblyAI
        assembly_ai_video = failed_videos.pop(0)
        audio_file = download_audio([assembly_ai_video])[0]
        transcript = transcribe_audio(ASSEMBLYAI_API_KEY, audio_file)
        if transcript:
            available_transcripts.append((assembly_ai_video, transcript))
            assembly_ai_used = True
            print(f"AssemblyAI transcription completed for {assembly_ai_video}")

    # Step 4: Save the transcripts
    for video_link, transcript in available_transcripts:
        transcript_filename = f"transcripts/{video_link.split('=')[1]}.txt"
        os.makedirs(os.path.dirname(transcript_filename), exist_ok=True)
        with open(transcript_filename, 'w') as f:
            for entry in transcript:
                f.write(f"{entry['start']}: {entry['text']}\n")
        print(f"Transcript saved for video: {video_link}")

    # Step 5: Save the processed videos to a zip file
    video_files = [f"downloads/{f}" for f in os.listdir("downloads") if f.endswith(".mp3")]
    if video_files:
        save_videos_to_zip(video_files, "test.zip")

    print("Finished processing 5 videos. Exiting...")

if __name__ == "__main__":
    main()


Processing videos: ['https://www.youtube.com/watch?v=mdGpEqki8Lk', 'https://www.youtube.com/watch?v=AU_m12Nuk4k', 'https://www.youtube.com/watch?v=izZ0ZzsJ82A', 'https://www.youtube.com/watch?v=Og142I-11hw', 'https://www.youtube.com/watch?v=u7uOUHbhqXw']
Transcript fetched successfully for video: https://www.youtube.com/watch?v=AU_m12Nuk4k
Transcript fetched successfully for video: https://www.youtube.com/watch?v=izZ0ZzsJ82A
Transcript fetched successfully for video: https://www.youtube.com/watch?v=Og142I-11hw
[youtube] Extracting URL: https://www.youtube.com/watch?v=mdGpEqki8Lk
[youtube] mdGpEqki8Lk: Downloading webpage
[youtube] mdGpEqki8Lk: Downloading ios player API JSON
[youtube] mdGpEqki8Lk: Downloading mweb player API JSON


ERROR: [youtube] mdGpEqki8Lk: This live event will begin in 3 hours.


Error downloading audio for https://www.youtube.com/watch?v=mdGpEqki8Lk: ERROR: [youtube] mdGpEqki8Lk: This live event will begin in 3 hours.


IndexError: list index out of range

In [12]:
import os
import yt_dlp
import random
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import requests
import zipfile

# Step 1: Set up your API keys and constants
YOUTUBE_API_KEY = "AIzaSyD3yF_r1J0DkcbKNtTBwzQlmMN_LWSWRlk"
ASSEMBLYAI_API_KEY = "d773b67f986746528b961cd5772004b1"
channel_id = "UCsfp0zw1hNxpy_wDig8oExA"  # The channel ID you provided

youtube = build("youtube", "v3", developerKey=YOUTUBE_API_KEY)
FAILED_VIDEOS_FILE = "failed_videos.txt"

# Fetch random 5 videos from the channel
def get_random_channel_video_links(channel_id, num_videos=5):
    try:
        # Fetch the channel's uploads playlist ID
        response = youtube.channels().list(
            part="contentDetails",
            id=channel_id
        ).execute()

        uploads_playlist_id = response["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"]

        # Fetch videos in the uploads playlist
        video_links = []
        next_page_token = None

        while len(video_links) < num_videos:  # Stop after 5 videos
            playlist_response = youtube.playlistItems().list(
                part="snippet",
                playlistId=uploads_playlist_id,
                maxResults=50,
                pageToken=next_page_token
            ).execute()

            for item in playlist_response["items"]:
                video_id = item["snippet"]["resourceId"]["videoId"]
                video_status = youtube.videos().list(
                    part="snippet, liveStreamingDetails",
                    id=video_id
                ).execute()

                # Skip live or upcoming videos
                live_status = video_status["items"][0].get("liveStreamingDetails", {}).get("liveStatus", None)
                if live_status in ["upcoming", "live"]:
                    continue

                video_links.append(f"https://www.youtube.com/watch?v={video_id}")
                if len(video_links) >= num_videos:
                    break

            next_page_token = playlist_response.get("nextPageToken")
            if not next_page_token:
                break

        return video_links

    except HttpError as e:
        print(f"Error fetching channel videos: {e}")
        return []

# Step 2: Fetch transcripts using YouTubeTranscriptApi
def fetch_transcripts(video_links):
    available_transcripts = []
    failed_videos = []

    for link in video_links:
        video_id = link.split("v=")[1]
        try:
            transcript = YouTubeTranscriptApi.get_transcript(video_id)
            available_transcripts.append((link, transcript))
            print(f"Transcript fetched successfully for video: {link}")

        except (TranscriptsDisabled, NoTranscriptFound):
            print(f"Transcript not available for video: {link}")
            failed_videos.append(link)

    return available_transcripts, failed_videos

# Step 3: Download audio files for failed videos
def download_audio(video_links):
    ydl_opts = {
        'format': 'bestaudio/best',
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'mp3',
            'preferredquality': '192',
        }],
        'outtmpl': 'downloads/%(title)s.%(ext)s',
        'noplaylist': True,
    }

    downloaded_files = []

    for link in video_links:
        try:
            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                ydl.download([link])
            print(f"Audio downloaded successfully for video: {link}")
            downloaded_files.append(link)
        except Exception as e:
            print(f"Error downloading audio for {link}: {e}")

    return downloaded_files

# Step 4: Transcribe audio using AssemblyAI
def transcribe_audio(assemblyai_api_key, audio_file):
    headers = {"authorization": assemblyai_api_key}
    upload_url = "https://api.assemblyai.com/v2/upload"

    # Upload audio file
    with open(audio_file, "rb") as f:
        response = requests.post(upload_url, headers=headers, data=f)
        audio_url = response.json()["upload_url"]

    # Request transcription
    transcript_url = "https://api.assemblyai.com/v2/transcript"
    data = {"audio_url": audio_url}
    transcript_response = requests.post(transcript_url, headers=headers, json=data)

    transcript_id = transcript_response.json()["id"]
    status = "processing"

    while status == "processing":
        result = requests.get(f"{transcript_url}/{transcript_id}", headers=headers)
        status = result.json()["status"]

    if status == "completed":
        return result.json()["text"]
    else:
        return None

# Step 5: Save videos in a zip file
def save_videos_to_zip(video_files, zip_filename):
    with zipfile.ZipFile(zip_filename, 'w') as zipf:
        for video_file in video_files:
            zipf.write(video_file, os.path.basename(video_file))
    print(f"Videos saved to {zip_filename}")

# Main function
def main():
    # Step 1: Get random 5 video links from the channel
    video_links = get_random_channel_video_links(channel_id)
    print(f"Processing videos: {video_links}")

    if len(video_links) < 5:
        print("Not enough videos found. Exiting.")
        return

    # Variables to track whether AssemblyAI has been used
    assembly_ai_used = False

    # Step 2: Fetch transcripts using YouTubeTranscriptApi
    available_transcripts = []
    failed_videos = []

    for link in video_links:
        video_id = link.split("v=")[1]
        try:
            transcript = YouTubeTranscriptApi.get_transcript(video_id)
            available_transcripts.append((link, transcript))
            print(f"Transcript fetched successfully for video: {link}")
        except (TranscriptsDisabled, NoTranscriptFound):
            failed_videos.append(link)

    # Step 3: Ensure at least one video is processed with AssemblyAI
    if failed_videos:
        # Pick the first failed video and transcribe using AssemblyAI
        assembly_ai_video = failed_videos.pop(0)
        # Check if the video is live or upcoming before attempting to download
        video_id = assembly_ai_video.split("v=")[1]
        video_status = youtube.videos().list(
            part="snippet, liveStreamingDetails",
            id=video_id
        ).execute()

        live_status = video_status["items"][0].get("liveStreamingDetails", {}).get("liveStatus", None)

        if live_status not in ["upcoming", "live"]:  # Only process if it's not live
            audio_file = download_audio([assembly_ai_video])
            if audio_file:
                transcript = transcribe_audio(ASSEMBLYAI_API_KEY, audio_file[0])
                if transcript:
                    available_transcripts.append((assembly_ai_video, transcript))
                    assembly_ai_used = True
                    print(f"AssemblyAI transcription completed for {assembly_ai_video}")
        else:
            print(f"Skipping live or scheduled video: {assembly_ai_video}")

    # Step 4: Save the transcripts
    for video_link, transcript in available_transcripts:
        transcript_filename = f"transcripts/{video_link.split('=')[1]}.txt"
        os.makedirs(os.path.dirname(transcript_filename), exist_ok=True)
        with open(transcript_filename, 'w') as f:
            for entry in transcript:
                f.write(f"{entry['start']}: {entry['text']}\n")
        print(f"Transcript saved for video: {video_link}")

    # Step 5: Save the processed videos to a zip file
    video_files = [f"downloads/{f}" for f in os.listdir("downloads") if f.endswith(".mp3")]
    if video_files:
        save_videos_to_zip(video_files, "test.zip")

    print("Finished processing 5 videos. Exiting...")

if __name__ == "__main__":
    main()


Processing videos: ['https://www.youtube.com/watch?v=mdGpEqki8Lk', 'https://www.youtube.com/watch?v=AU_m12Nuk4k', 'https://www.youtube.com/watch?v=izZ0ZzsJ82A', 'https://www.youtube.com/watch?v=Og142I-11hw', 'https://www.youtube.com/watch?v=u7uOUHbhqXw']
Transcript fetched successfully for video: https://www.youtube.com/watch?v=AU_m12Nuk4k
Transcript fetched successfully for video: https://www.youtube.com/watch?v=izZ0ZzsJ82A
Transcript fetched successfully for video: https://www.youtube.com/watch?v=Og142I-11hw
[youtube] Extracting URL: https://www.youtube.com/watch?v=mdGpEqki8Lk
[youtube] mdGpEqki8Lk: Downloading webpage
[youtube] mdGpEqki8Lk: Downloading ios player API JSON
[youtube] mdGpEqki8Lk: Downloading mweb player API JSON


ERROR: [youtube] mdGpEqki8Lk: This live event will begin in 3 hours.


Error downloading audio for https://www.youtube.com/watch?v=mdGpEqki8Lk: ERROR: [youtube] mdGpEqki8Lk: This live event will begin in 3 hours.
Transcript saved for video: https://www.youtube.com/watch?v=AU_m12Nuk4k
Transcript saved for video: https://www.youtube.com/watch?v=izZ0ZzsJ82A
Transcript saved for video: https://www.youtube.com/watch?v=Og142I-11hw
Videos saved to test.zip
Finished processing 5 videos. Exiting...


In [13]:
import os
import yt_dlp
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import requests
import random
import zipfile

# Step 1: Set up your API keys and constants
API_KEY = "AIzaSyD3yF_r1J0DkcbKNtTBwzQlmMN_LWSWRlk"  # Replace with your valid YouTube Data API v3 key
ASSEMBLYAI_API_KEY = "d773b67f986746528b961cd5772004b1"  # Replace with your AssemblyAI API key
channel_id = "UCsfp0zw1hNxpy_wDig8oExA"  # Replace with your actual channel ID
youtube = build("youtube", "v3", developerKey=API_KEY)

# Output file
OUTPUT_ZIP = "test.zip"

def get_channel_video_links_and_dates(channel_id):
    try:
        response = youtube.channels().list(part="contentDetails", id=channel_id).execute()
        uploads_playlist_id = response["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"]

        video_links = []
        next_page_token = None

        while True:
            playlist_response = youtube.playlistItems().list(
                part="snippet",
                playlistId=uploads_playlist_id,
                maxResults=50,
                pageToken=next_page_token
            ).execute()

            for item in playlist_response["items"]:
                video_id = item["snippet"]["resourceId"]["videoId"]
                video_links.append(f"https://www.youtube.com/watch?v={video_id}")

            next_page_token = playlist_response.get("nextPageToken")
            if not next_page_token:
                break

        return video_links

    except HttpError as e:
        print(f"Error fetching channel videos: {e}")
        return []

def fetch_transcripts(video_links):
    available_transcripts = []
    failed_videos = []

    for link in video_links:
        video_id = link.split("v=")[1]
        try:
            transcript = YouTubeTranscriptApi.get_transcript(video_id)
            print(f"Transcript fetched successfully for video: {link}")
            available_transcripts.append((link, transcript))
        except (TranscriptsDisabled, NoTranscriptFound):
            print(f"Transcript not available for video: {link}")
            failed_videos.append(link)

    return available_transcripts, failed_videos

def download_audio(video_links):
    ydl_opts = {
        'format': 'bestaudio/best',
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'mp3',
            'preferredquality': '192',
        }],
        'outtmpl': 'downloads/%(title)s.%(ext)s',
        'noplaylist': True,
    }

    downloaded_files = []

    for link in video_links:
        try:
            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                info_dict = ydl.extract_info(link, download=True)
                audio_file = ydl.prepare_filename(info_dict)
                downloaded_files.append(audio_file)
                print(f"Audio downloaded successfully for video: {link}")
        except Exception as e:
            print(f"Error downloading audio for {link}: {e}")

    return downloaded_files

def transcribe_audio(assemblyai_api_key, audio_file):
    headers = {"authorization": assemblyai_api_key}
    upload_url = "https://api.assemblyai.com/v2/upload"

    with open(audio_file, "rb") as f:
        response = requests.post(upload_url, headers=headers, data=f)
        audio_url = response.json()["upload_url"]

    transcript_url = "https://api.assemblyai.com/v2/transcript"
    data = {"audio_url": audio_url}
    transcript_response = requests.post(transcript_url, headers=headers, json=data)

    transcript_id = transcript_response.json()["id"]
    status = "processing"

    while status == "processing":
        result = requests.get(f"{transcript_url}/{transcript_id}", headers=headers)
        status = result.json()["status"]

    if status == "completed":
        return result.json()["text"]
    else:
        return None

def save_to_zip(files):
    with zipfile.ZipFile(OUTPUT_ZIP, 'w') as zipf:
        for file in files:
            zipf.write(file, os.path.basename(file))
    print(f"Files saved to {OUTPUT_ZIP}")

def main():
    video_links = get_channel_video_links_and_dates(channel_id)

    # Select 5 random videos
    random_videos = random.sample(video_links, 5)
    print(f"Processing videos: {random_videos}")

    # Fetch transcripts and find videos needing AssemblyAI
    available_transcripts, failed_videos = fetch_transcripts(random_videos)

    # Process AssemblyAI if any failed video exists
    if failed_videos:
        # Ensure one video is processed via AssemblyAI
        assembly_ai_video = failed_videos.pop(0)
        audio_file = download_audio([assembly_ai_video])[0]
        transcript = transcribe_audio(ASSEMBLYAI_API_KEY, audio_file)
        if transcript:
            print(f"Transcription completed via AssemblyAI for {assembly_ai_video}")
            available_transcripts.append((assembly_ai_video, transcript))

    # Save all transcripts and videos into a zip
    transcript_files = []
    for video_link, transcript in available_transcripts:
        transcript_file = f"transcript_{video_link.split('v=')[1]}.txt"
        with open(transcript_file, 'w') as f:
            f.write("\n".join([entry['text'] for entry in transcript]))
        transcript_files.append(transcript_file)

    # Save audio files if downloaded
    audio_files = []
    if failed_videos:
        audio_files = download_audio(failed_videos)

    save_to_zip(transcript_files + audio_files)

    print(f"Finished processing {len(transcript_files)} transcripts and {len(audio_files)} audio files.")

if __name__ == "__main__":
    main()


Processing videos: ['https://www.youtube.com/watch?v=9B3xJPk5_Q4', 'https://www.youtube.com/watch?v=VvB4c42_guw', 'https://www.youtube.com/watch?v=gnPB8FOmA-0', 'https://www.youtube.com/watch?v=p-8AXE35zuY', 'https://www.youtube.com/watch?v=gczxEUKTSYo']
Transcript fetched successfully for video: https://www.youtube.com/watch?v=9B3xJPk5_Q4
Transcript not available for video: https://www.youtube.com/watch?v=VvB4c42_guw
Transcript not available for video: https://www.youtube.com/watch?v=gnPB8FOmA-0
Transcript not available for video: https://www.youtube.com/watch?v=p-8AXE35zuY
Transcript fetched successfully for video: https://www.youtube.com/watch?v=gczxEUKTSYo
[youtube] Extracting URL: https://www.youtube.com/watch?v=VvB4c42_guw
[youtube] VvB4c42_guw: Downloading webpage
[youtube] VvB4c42_guw: Downloading ios player API JSON
[youtube] VvB4c42_guw: Downloading mweb player API JSON
[youtube] VvB4c42_guw: Downloading m3u8 information
[info] VvB4c42_guw: Downloading 1 format(s): 251
[downl

KeyboardInterrupt: 

In [14]:
import os
import yt_dlp
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import requests
import random
import zipfile

# Step 1: Set up your API keys and constants
API_KEY = "AIzaSyD3yF_r1J0DkcbKNtTBwzQlmMN_LWSWRlk"  # Replace with your valid YouTube Data API v3 key
ASSEMBLYAI_API_KEY = "d773b67f986746528b961cd5772004b1"  # Replace with your AssemblyAI API key
channel_id = "UCsfp0zw1hNxpy_wDig8oExA"  # Replace with your actual channel ID
youtube = build("youtube", "v3", developerKey=API_KEY)

# Output file
OUTPUT_ZIP = "test.zip"

def get_channel_video_links_and_dates(channel_id):
    try:
        response = youtube.channels().list(part="contentDetails", id=channel_id).execute()
        uploads_playlist_id = response["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"]

        video_links = []
        next_page_token = None

        while True:
            playlist_response = youtube.playlistItems().list(
                part="snippet",
                playlistId=uploads_playlist_id,
                maxResults=50,
                pageToken=next_page_token
            ).execute()

            for item in playlist_response["items"]:
                video_id = item["snippet"]["resourceId"]["videoId"]
                video_links.append(f"https://www.youtube.com/watch?v={video_id}")

            next_page_token = playlist_response.get("nextPageToken")
            if not next_page_token:
                break

        return video_links

    except HttpError as e:
        print(f"Error fetching channel videos: {e}")
        return []

def fetch_transcripts(video_links):
    available_transcripts = []
    failed_videos = []

    for link in video_links:
        video_id = link.split("v=")[1]
        try:
            transcript = YouTubeTranscriptApi.get_transcript(video_id)
            print(f"Transcript fetched successfully for video: {link}")
            available_transcripts.append((link, transcript))
        except (TranscriptsDisabled, NoTranscriptFound):
            print(f"Transcript not available for video: {link}")
            failed_videos.append(link)

    return available_transcripts, failed_videos

def download_audio(video_links):
    ydl_opts = {
        'format': 'bestaudio/best',
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'mp3',
            'preferredquality': '192',
        }],
        'outtmpl': 'downloads/%(title)s.%(ext)s',
        'noplaylist': True,
    }

    downloaded_files = []

    for link in video_links:
        try:
            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                info_dict = ydl.extract_info(link, download=True)
                audio_file = ydl.prepare_filename(info_dict)
                downloaded_files.append(audio_file)
                print(f"Audio downloaded successfully for video: {link}")
        except Exception as e:
            print(f"Error downloading audio for {link}: {e}")

    return downloaded_files

def transcribe_audio(assemblyai_api_key, audio_file):
    headers = {"authorization": assemblyai_api_key}
    upload_url = "https://api.assemblyai.com/v2/upload"

    with open(audio_file, "rb") as f:
        response = requests.post(upload_url, headers=headers, data=f)
        audio_url = response.json()["upload_url"]

    transcript_url = "https://api.assemblyai.com/v2/transcript"
    data = {"audio_url": audio_url}
    transcript_response = requests.post(transcript_url, headers=headers, json=data)

    transcript_id = transcript_response.json()["id"]
    status = "processing"

    while status == "processing":
        result = requests.get(f"{transcript_url}/{transcript_id}", headers=headers)
        status = result.json()["status"]

    if status == "completed":
        return result.json()["text"]
    else:
        return None

def save_to_zip(files):
    with zipfile.ZipFile(OUTPUT_ZIP, 'w') as zipf:
        for file in files:
            zipf.write(file, os.path.basename(file))
    print(f"Files saved to {OUTPUT_ZIP}")

def main():
    video_links = get_channel_video_links_and_dates(channel_id)

    # Select 2 random videos
    random_videos = random.sample(video_links, 2)
    print(f"Processing videos: {random_videos}")

    # Fetch transcripts and find videos needing AssemblyAI
    available_transcripts, failed_videos = fetch_transcripts(random_videos)

    # Process AssemblyAI if any failed video exists
    if failed_videos:
        # Ensure one video is processed via AssemblyAI
        assembly_ai_video = failed_videos.pop(0)
        audio_file = download_audio([assembly_ai_video])[0]
        transcript = transcribe_audio(ASSEMBLYAI_API_KEY, audio_file)
        if transcript:
            print(f"Transcription completed via AssemblyAI for {assembly_ai_video}")
            available_transcripts.append((assembly_ai_video, transcript))

    # Save all transcripts and videos into a zip
    transcript_files = []
    for video_link, transcript in available_transcripts:
        transcript_file = f"transcript_{video_link.split('v=')[1]}.txt"
        with open(transcript_file, 'w') as f:
            f.write("\n".join([entry['text'] for entry in transcript]))
        transcript_files.append(transcript_file)

    # Save audio files if downloaded
    audio_files = []
    if failed_videos:
        audio_files = download_audio(failed_videos)

    save_to_zip(transcript_files + audio_files)

    print(f"Finished processing {len(transcript_files)} transcripts and {len(audio_files)} audio files.")

if __name__ == "__main__":
    main()


Processing videos: ['https://www.youtube.com/watch?v=VzgrpWPzK6U', 'https://www.youtube.com/watch?v=nBF7mLbw_V4']
Transcript not available for video: https://www.youtube.com/watch?v=VzgrpWPzK6U
Transcript fetched successfully for video: https://www.youtube.com/watch?v=nBF7mLbw_V4
[youtube] Extracting URL: https://www.youtube.com/watch?v=VzgrpWPzK6U
[youtube] VzgrpWPzK6U: Downloading webpage
[youtube] VzgrpWPzK6U: Downloading ios player API JSON
[youtube] VzgrpWPzK6U: Downloading mweb player API JSON
[youtube] VzgrpWPzK6U: Downloading m3u8 information
[info] VzgrpWPzK6U: Downloading 1 format(s): 251
[download] Destination: downloads/STOCKS & CRYPTO OPEN STRONG! - Live Trading, DOW & S&P, Stock Picks, Day Trading & STOCK NEWS.webm
[download] 100% of  335.10MiB in 00:00:29 at 11.51MiB/s  
[ExtractAudio] Destination: downloads/STOCKS & CRYPTO OPEN STRONG! - Live Trading, DOW & S&P, Stock Picks, Day Trading & STOCK NEWS.mp3
Deleting original file downloads/STOCKS & CRYPTO OPEN STRONG! - Liv

FileNotFoundError: [Errno 2] No such file or directory: 'downloads/STOCKS & CRYPTO OPEN STRONG! - Live Trading, DOW & S&P, Stock Picks, Day Trading & STOCK NEWS.webm'

In [15]:
import os
import yt_dlp
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import requests
import random
import zipfile

# Step 1: Set up your API keys and constants
API_KEY = "AIzaSyD3yF_r1J0DkcbKNtTBwzQlmMN_LWSWRlk"  # Replace with your valid YouTube Data API v3 key
ASSEMBLYAI_API_KEY = "d773b67f986746528b961cd5772004b1"  # Replace with your AssemblyAI API key
channel_id = "UCsfp0zw1hNxpy_wDig8oExA"  # Replace with your actual channel ID
youtube = build("youtube", "v3", developerKey=API_KEY)

# Output file
OUTPUT_ZIP = "test.zip"


def get_channel_video_links_and_dates(channel_id):
    try:
        response = youtube.channels().list(part="contentDetails", id=channel_id).execute()
        uploads_playlist_id = response["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"]

        video_links = []
        next_page_token = None

        while True:
            playlist_response = youtube.playlistItems().list(
                part="snippet",
                playlistId=uploads_playlist_id,
                maxResults=50,
                pageToken=next_page_token
            ).execute()

            for item in playlist_response["items"]:
                video_id = item["snippet"]["resourceId"]["videoId"]
                video_links.append(f"https://www.youtube.com/watch?v={video_id}")

            next_page_token = playlist_response.get("nextPageToken")
            if not next_page_token:
                break

        return video_links

    except HttpError as e:
        print(f"Error fetching channel videos: {e}")
        return []


def fetch_transcripts(video_links):
    available_transcripts = []
    failed_videos = []

    for link in video_links:
        video_id = link.split("v=")[1]
        try:
            transcript = YouTubeTranscriptApi.get_transcript(video_id)
            print(f"Transcript fetched successfully for video: {link}")
            available_transcripts.append((link, transcript))
        except (TranscriptsDisabled, NoTranscriptFound):
            print(f"Transcript not available for video: {link}")
            failed_videos.append(link)

    return available_transcripts, failed_videos


def download_audio(video_links):
    ydl_opts = {
        'format': 'bestaudio/best',
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'mp3',
            'preferredquality': '192',
        }],
        'outtmpl': 'downloads/%(title)s.%(ext)s',
        'noplaylist': True,
    }

    downloaded_files = []

    for link in video_links:
        try:
            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                info_dict = ydl.extract_info(link, download=True)
                base_filename = ydl.prepare_filename(info_dict)
                mp3_file = base_filename.rsplit('.', 1)[0] + '.mp3'

                if os.path.exists(mp3_file):
                    downloaded_files.append(mp3_file)
                    print(f"Audio downloaded successfully for video: {link}")
                else:
                    print(f"Error: Converted MP3 file not found for {link}")
        except Exception as e:
            print(f"Error downloading audio for {link}: {e}")

    return downloaded_files


def transcribe_audio(assemblyai_api_key, audio_file):
    headers = {"authorization": assemblyai_api_key}
    upload_url = "https://api.assemblyai.com/v2/upload"

    try:
        with open(audio_file, "rb") as f:
            response = requests.post(upload_url, headers=headers, data=f)
            audio_url = response.json()["upload_url"]

        transcript_url = "https://api.assemblyai.com/v2/transcript"
        data = {"audio_url": audio_url}
        transcript_response = requests.post(transcript_url, headers=headers, json=data)

        transcript_id = transcript_response.json()["id"]
        status = "processing"

        while status == "processing":
            result = requests.get(f"{transcript_url}/{transcript_id}", headers=headers)
            status = result.json()["status"]

        if status == "completed":
            return result.json()["text"]
        else:
            print(f"Transcription failed for {audio_file}: {status}")
            return None
    except Exception as e:
        print(f"Error transcribing audio: {e}")
        return None


def save_to_zip(files):
    with zipfile.ZipFile(OUTPUT_ZIP, 'w') as zipf:
        for file in files:
            zipf.write(file, os.path.basename(file))
    print(f"Files saved to {OUTPUT_ZIP}")


def main():
    video_links = get_channel_video_links_and_dates(channel_id)

    # Select 2 random videos
    random_videos = random.sample(video_links, 2)
    print(f"Processing videos: {random_videos}")

    # Fetch transcripts and find videos needing AssemblyAI
    available_transcripts, failed_videos = fetch_transcripts(random_videos)

    # Process AssemblyAI if any failed video exists
    if failed_videos:
        # Ensure one video is processed via AssemblyAI
        assembly_ai_video = failed_videos.pop(0)
        audio_files = download_audio([assembly_ai_video])

        if audio_files:
            transcript = transcribe_audio(ASSEMBLYAI_API_KEY, audio_files[0])
            if transcript:
                print(f"Transcription completed via AssemblyAI for {assembly_ai_video}")
                available_transcripts.append((assembly_ai_video, transcript))

    # Save all transcripts and videos into a zip
    transcript_files = []
    for video_link, transcript in available_transcripts:
        video_id = video_link.split('v=')[1]
        transcript_file = f"transcript_{video_id}.txt"
        with open(transcript_file, 'w') as f:
            if isinstance(transcript, list):
                f.write("\n".join([entry['text'] for entry in transcript]))
            else:
                f.write(transcript)
        transcript_files.append(transcript_file)

    # Save audio files if downloaded
    audio_files = []
    if failed_videos:
        audio_files = download_audio(failed_videos)

    save_to_zip(transcript_files + audio_files)

    print(f"Finished processing {len(transcript_files)} transcripts and {len(audio_files)} audio files.")


if __name__ == "__main__":
    main()


Processing videos: ['https://www.youtube.com/watch?v=S2aNk4po3AE', 'https://www.youtube.com/watch?v=lY0mUvycAWg']
Transcript not available for video: https://www.youtube.com/watch?v=S2aNk4po3AE
Transcript not available for video: https://www.youtube.com/watch?v=lY0mUvycAWg
[youtube] Extracting URL: https://www.youtube.com/watch?v=S2aNk4po3AE
[youtube] S2aNk4po3AE: Downloading webpage
[youtube] S2aNk4po3AE: Downloading ios player API JSON
[youtube] S2aNk4po3AE: Downloading mweb player API JSON
[youtube] S2aNk4po3AE: Downloading m3u8 information
[info] S2aNk4po3AE: Downloading 1 format(s): 251
[download] Destination: downloads/STOCKS GAP UP!!! – Live Trading, Robinhood Options, Day Trading & STOCK MARKET NEWS TODAY.webm
[download] 100% of  325.11MiB in 00:00:24 at 13.41MiB/s  
[ExtractAudio] Destination: downloads/STOCKS GAP UP!!! – Live Trading, Robinhood Options, Day Trading & STOCK MARKET NEWS TODAY.mp3
Deleting original file downloads/STOCKS GAP UP!!! – Live Trading, Robinhood Option

In [None]:
from google.colab import drive
drive.mount('/content/drive')