<a href="https://colab.research.google.com/github/leomoshe/migdalor/blob/main/youtube_manager.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install pytube
!pip install youtube-transcript-api

Collecting pytube
  Downloading pytube-15.0.0-py3-none-any.whl (57 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/57.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━[0m [32m30.7/57.6 kB[0m [31m845.2 kB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.6/57.6 kB[0m [31m937.1 kB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pytube
Successfully installed pytube-15.0.0
Collecting youtube-transcript-api
  Downloading youtube_transcript_api-0.6.2-py3-none-any.whl (24 kB)
Installing collected packages: youtube-transcript-api
Successfully installed youtube-transcript-api-0.6.2


In [3]:
import os
import sys
import json
import re
from pathlib import Path
import argparse
import logging
import csv
import pytube
from googleapiclient.discovery import build
import youtube_transcript_api
from youtube_transcript_api.formatters import SRTFormatter

In [4]:
logging.basicConfig(level=logging.DEBUG, format=f'%(asctime)s %(levelname)s %(name)s %(threadName)s : %(message)s')
logger = logging.getLogger(__name__)
file_handler = logging.FileHandler(filename="youtube_manager.log", mode='w', encoding='utf-8')
console_handler = logging.StreamHandler()
logger.addHandler(file_handler)
logger.addHandler(console_handler)

In [None]:
class Result:
  def __init__(self, value=None, error=None):
    self.Value = value
    self.Error = error


In [None]:
def get_youtube_service(api_key):
    youtube = build('youtube', 'v3', developerKey=api_key)
    return youtube

In [None]:
def download_transcript(video_id, output_folder, language="iw"): #iw is Hebrew
    result = Result()
    try:

        transcript_api = youtube_transcript_api.YouTubeTranscriptApi()
        transcript = transcript_api.get_transcript(video_id, languages=[language])
        formatter = SRTFormatter()

        srt = formatter.format_transcript(transcript)
        full_path = os.path.normpath(os.path.join(output_folder, f"{video_id}.srt"))
        # Open a text file in write mode to save the SRT
        with open(full_path, "w", encoding="utf-8") as file:
            file.write(srt)

        # Print a confirmation message to indicate successful saving
        print(f"SRT transcript saved to {full_path}")
        result.Value = full_path
    except youtube_transcript_api._errors.NoTranscriptFound as e:
        print(f"Failed to retrieve transcript: {e}")
        result.Error = e.CAUSE_MESSAGE.replace('\n', ';n')
    except youtube_transcript_api.CouldNotRetrieveTranscript as e:
        print(f"Failed to retrieve transcript: {e}")
        result.Error = e.cause.replace(',', ';')
    except KeyError as e:
        print(f"Failed to retrieve transcript: {e}")
        result.Error = f"KeyError: {e.args[0]}".replace(',', ';')
    except Exception as e:
        print(f"An error occurred: {e}")
        result.Error = e
    return result

In [None]:
def download_video(video_id, output_folder):
    result = Result()
    url = f'https://www.youtube.com/watch?v={video_id}'
    try:
        yt = pytube.YouTube(
            url,
            use_oauth=True,
            allow_oauth_cache=True)  # Create a YouTube object
        video = yt.streams.first()  # Get the first available stream
        print(f"Downloading video: {video.title}")
        full_path = os.path.normpath(os.path.join(output_folder, f"{video_id}{Path(video.default_filename).suffix}"))
        video.download(filename=full_path)  # Download the video
        title = video.title
        print(f"Download complete: {title}")
        result.Value = title
    except pytube.exceptions.AgeRestrictedError as e:
        result.Error = e.error_string.replace(',', ';')
    except pytube.exceptions.LiveStreamError as e:
        result.Error = e.error_string.replace(',', ';')
    except Exception as e:
        print(f"Error downloading {url}: {e}")
        result.Error = e
    return result


In [None]:
def searchVideosByKeyword(youtube, keyword):
    videos = []
    pageToken = ""
    while True:
        res = youtube.search().list(
            q=keyword,
            type='video',
            part='id,snippet',
            maxResults=50,
            pageToken=pageToken if pageToken != "" else ""
        ).execute()
        video_items = res.get('items', [])
        if video_items:
            for video_item in video_items:
                if video_item['id']['kind'] == 'youtube#video':
                    videos.append({
                        'video_id': video_item['id']['videoId'],
                        'video_title': video_item['snippet']['title']
                    })
            #video_response.extend(video_items)
        pageToken = res.get('nextPageToken')
        if not pageToken:
            break
    return videos

In [None]:
def main(api_key, keyword, keyword_title) -> None:
    report_filename = f"{keyword_title}_report.csv"
    # Create dest folder
    dest_path = keyword_title
    Path(dest_path).mkdir(parents=True, exist_ok=True)

    # load the videos ids
    if os.path.isfile(f"{keyword_title}.json"):
        with open(f"{keyword_title}.json", "r", encoding="utf-8") as file:
            videos = json.load(file)
    else:
        youtube = get_youtube_service(api_key)
        videos = searchVideosByKeyword(youtube, keyword)
        with open(f"{keyword_title}.json", "w", encoding="utf-8") as file:
            file.write(json.dumps(videos))

    report_data = []
    if os.path.isfile(report_filename):
        with open(report_filename, "r", encoding="utf-8") as report_file:
            reader_file = csv.DictReader(report_file)
            for row in reader_file:
                report_data.append(row)

    with open(report_filename, "w", encoding="utf-8") as report_file:
        report_file.write(f"Videoid,Media,Srt\n")
    for video in videos:
        videoid = video["video_id"]
        report_item = next((item for item in report_data if item["Videoid"] == videoid), None)
        if report_item is None:
            report_item = {"Videoid": videoid, "Media": "0", "Srt": "0"}
            report_data.append(report_item)

        # download srt
        srt_full_path = os.path.join(dest_path, f"{videoid}.srt")
        if not os.path.isfile(srt_full_path):
            result = download_transcript(videoid, keyword_title)
            if result.Error is not None:
                report_item["Srt"] = result.Error
            else:
                report_item["Srt"] = "1"

        # download media
        gpp_full_path = os.path.join(dest_path, f"{videoid}.3gpp")
        mp4_full_path = os.path.join(dest_path, f"{videoid}.mp4")
        if not os.path.isfile(gpp_full_path) and not os.path.isfile(mp4_full_path):
            result = download_video(videoid, keyword_title)
            if result.Error is None:
                video_title = result.Value
                full_path = os.path.join(dest_path, f"{videoid}.json")
                with open(full_path, "w", encoding="utf-8") as file:
                    file.write(f'{{"video_title": "{video_title}"}}')
                report_item["Media"] = "1"
            else:
                report_item["Media"] = result.Error

        with open(report_filename, "a", encoding="utf-8") as report_file:
            report_file.write(f'{report_item["Videoid"]},{report_item["Media"]},{report_item["Srt"]}\n')

In [None]:
logger.info("Program running")
api_key = input("Api key (https://support.google.com/googleapi/answer/6158862?hl=en): ")
keyword = input("Keyword, for example'כאן'")
title = input("Title, for example 'kan'")

In [None]:
main(api_key, keyword, title)