**INGESTION STEP**: Download podcast episodes audio

In [None]:
!pip install youtube-search-python yt-dlp openai-whisper

In [2]:
BASE_YOUTUBE_PLAYLIST_URL = "https://www.youtube.com/playlist?list="
playlist_id = "PLrYeaDpClt33OD3orEdZ2GUBqG03z5a2U"

In [3]:
from youtubesearchpython import *
from typing import List
import yt_dlp
import re
from tempfile import NamedTemporaryFile
import whisper
import csv
import shutil
import os
from typing import List
import json

In [4]:
def getNewEpisodes(playlist_id: str, last_episode_number: int) -> List:
    playlist = Playlist(f'{BASE_YOUTUBE_PLAYLIST_URL}{playlist_id}')
    while playlist.hasMoreVideos:
        print('Getting more videos...')
        playlist.getNextVideos()
        print(f'Videos Retrieved: {len(playlist.videos)}')
    
    print("Total number of videos: ", len(playlist.videos))

    # filter out videos that have already been processed
    new_videos = [video for video in playlist.videos if int(re.search(r'\d+', video.get('title')).group().replace(" ", "")) > int(last_episode_number)]

    return new_videos
def save_audio(ep_link: str, ep_number: int):
    ydl_opts = {
        'format': 'm4a/bestaudio/best',
        'outtmpl': 'audio/%s.m4a'%str(ep_number),
        'noplaylist': True,
        'postprocessors': [{  
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'm4a',
        }]}
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        result = ydl.download(ep_link)
    return result
def get_audio(videos: List):
    new_episodes = []
    for video in videos:
        try:
            ep_number = re.search(r'\d+', video.get('title')).group().replace(" ", "")
            result = save_audio(video.get('link'), ep_number)
            if(result != 0):
                print("Error downloading audio for episode number:", ep_number)
                continue
            video_location = "audio/%s.m4a"%str(ep_number)
            video_title = video.get('title')
            video_url = video.get('link')
            video_thumbnail = video.get('thumbnails')[0].get('url')
            new_episode = {"episodeNumber": ep_number, "title": video_title, "url": video_url, 
                        "thumbnail": video_thumbnail, "audio": video_location, "transcription": "NA", 
                        "transcribed": False, "processed": False}
            new_episodes.append(new_episode)
        except Exception as e:
            print("Error downloading audio for episode number:", video.get('title'), e)
    return new_episodes
def save_new_episodes(new_episodes: List, existing_episodes: List):
    with open('episodes.json', 'w') as f:
        f.write(json.dumps(existing_episodes + new_episodes))

Download all episodes: Do 

```
get_audio(episodes[:1])
```
if you want to limit the number of episodes to download for testing


In [5]:
"""
This function is the entry point to start downloading youtube episodes and saving the audio
"""
def start_audio_download():
  existing_episodes = []
  try:
      with open('episodes.json', 'r') as f:
          existing_episodes = json.load(f)
          last_episode = existing_episodes[-1]
          last_episode_number = int(last_episode.get("episodeNumber"))
  except Exception as e:
      print("Exception when opening file", e)
      last_episode_number = 0
  episodes: List = getNewEpisodes(playlist_id, last_episode_number)
  episodes.reverse()
  new_episodes = get_audio(episodes)
  save_new_episodes(new_episodes, existing_episodes)

INGESTION STEP: Audio transcription

Run whisper 

In [6]:
"""
This function runs whisper model for the audio file sent in the path argument
"""
def transcribe_audio(path: str):
    model = whisper.load_model("small")
    result = model.transcribe(path)
    return result

In [7]:
def format_transcription(transcription: str):
    formatted_segments: List = []
    for segment in transcription['segments']:
        formatted_segment = {'start': segment['start'], 'end': segment['end'], 'text': segment['text']}
        formatted_segments.append(formatted_segment)
    return formatted_segments
def save_transcription(transcription: List, filename: str, directory: str = "transcriptions"):
  if not os.path.exists(directory):
      os.makedirs(directory)
  with open(f'{directory}/{filename}', 'w') as f:
      f.write(json.dumps(transcription))
  print("Transcription saved")
def save_updated_episodes(episodes: List, filename: str = "episodes.json"):
    with open(filename, 'w') as f:
        f.write(json.dumps(episodes))
    print("Updated episodes saved")
def save_transcription_gdrive(transcription, filename, directory):
  with open(f"/content/gdrive/MyDrive/{directory}/{filename}", "w") as f:
    f.write(json.dumps(transcription))
def save_json_gdrive(directory: str, episodes: list):
  with open(f"/content/gdrive/MyDrive/{directory}/episodes.json", "w") as f:
    f.write(json.dumps(episodes))

In [8]:
"""
Mount google drive in "/content/gdrive" directory to save the transcribed episodes
"""
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [9]:
"""
This function is the entry point to transcribe the audio downloaded in the previous step
It will get the location of the audio files from episodes.json file
"""
def start_audio_transcription():
  print("Starting")
  directory = "transcriptions"
  gdrive_directory = "transcriptions"
  with open('episodes.json', 'r') as f:
    episodes = json.load(f)
    if(len(episodes) == 0):
        print("No episodes to transcribe")
        pass
    for episode in episodes:
      if episode['processed'] == True or episode['transcribed'] == True:
        print("Episode already transcribed or processed")
        continue
      audio_path = episode['audio']
      filename = episode['title'].replace(" ", "_") + ".json"
      transcription = transcribe_audio(audio_path)
      formatted_transcription = format_transcription(transcription)
      save_transcription(formatted_transcription, filename, directory)
      episode['transcription'] = f'{directory}/{filename}'
      episode['transcribed'] = True
      save_json_gdrive(gdrive_directory, episodes)
      save_transcription_gdrive(formatted_transcription, filename, gdrive_directory)
      save_updated_episodes(episodes)

In [None]:
"""
Start process
Remember to upload the updated episodes.csv file from the repo if there are already processed episodes
"""
start_audio_download()
start_audio_transcription()