**INGESTION**: Download videos from Belcorp Youtube Channel

In [None]:
# Util step if you run this in a Google Colab to test as we did
# !pip install youtube-search-python yt-dlp openai-whisper

In [2]:
BASE_YOUTUBE_PLAYLIST_URL = "https://www.youtube.com/playlist?list="
playlist_id = "PLxF7HdNkCOLTELPKp_nSyEduDJoIbXNG-" #Minuto académico: Info de productos
# playlist_id = "PLxF7HdNkCOLQOAOmQsR3I0I76qklNZucW" #Herramientas digitales: Tecnicas de venta
# playlist_id = "PLxF7HdNkCOLRCoOL4jTdLWV7nJPHXJyCI" #Lanzamientos: Info de productos

In [3]:
from youtubesearchpython import *
from typing import List, Dict
import yt_dlp
import whisper
import os
import json

In [4]:
def get_new_videos(playlist_id: str) -> List:
    """
    Get all the videos from the specified playlist.
    Return an array of dictionaries, every dictionary is a video.
    The information from each video is selected from id, title, link and thumbnail keys
    - playlist_id: str > ID in a youtube playlist
    """
    
    playlist = Playlist(f'{BASE_YOUTUBE_PLAYLIST_URL}{playlist_id}')
    while playlist.hasMoreVideos:
        print('Getting more videos...')
        playlist.getNextVideos()
        print(f'Videos Retrieved: {len(playlist.videos)}')
    
    print("Total number of videos: ", len(playlist.videos))

    videos = [video for video in playlist.videos]

    new_videos = []
    for video in videos:
        video_id = video.get('id')
        video_title = video.get('title')
        video_url = video.get('link')
        video_thumbnail = video.get('thumbnails')[0].get('url')
        new_video = {"id": video_id,
                     "title": video_title,
                     "url": video_url, 
                     "thumbnail": video_thumbnail,
                    }
        new_videos.append(new_video)
    return new_videos

def save_audio(video: Dict):
    """
    It uses yt_dlp to save the audio from the url of an individual video.
    - video: Dict > Video in a dictionary format to extract the url and id
    """
    video_id = video.get('id')
    video_url = video.get('url')
    ydl_opts = {
        'format': 'm4a/bestaudio/best',
        'outtmpl': f'audio/{video_id}.m4a',
        'noplaylist': True,
        'postprocessors': [{  
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'm4a',
        }]}
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        result = ydl.download(video_url)
    return result

def get_audio(videos: List) -> Dict:
    """
    From a list of videos, saves the audio from each one.
    And also adds some values that are necessary to match the work already done
    - videos: List > List of videos, each one as a dictionary with relevant keys.
    """
    new_videos = []
    for video in videos:
        try:
            result = save_audio(video)
            video_id = video.get('id')
            video_location = f'audio/{video_id}.m4a'
            
            if(result != 0):
                print("Error downloading audio for video:", video_id)
                continue
            
            new_video = video.copy()
            new_video["audio"] = video_location
            new_video["transcription"] = "NA"
            new_video["transcribed"] = False
            new_video["processed"] = False
            new_videos.append(new_video)
        except Exception as e:
            video_title = video.get('title')
            print("Error downloading audio for episode:", video_title, e)
    return new_videos

def save_new_videos(new_videos: List, existing_videos: List):
    """
    Makes a videos.json file and save the satus of each audio download.
    It prevents to do the same work in the future it its already done.
    - new_videos: List > New videos that will be processed.
    - existing_videos: List > Videos already processed.
    """
    with open('videos.json', 'w') as f:
        f.write(json.dumps(existing_videos + new_videos))

Main function for ingestion

In [5]:
def start_audio_download():
    """
    Entry point to start downloading youtube videos and saving the audio
    """
    existing_videos = []
    try:
        with open('videos.json', 'r') as f:
            existing_videos = json.load(f)
    except Exception as e:
        print("Exception when opening file", e)
    videos = get_new_videos(playlist_id)
    existing_videos_id = [video.get('id') for video in existing_videos]
    filtered_videos = [video for video in videos if video.get('id') not in existing_videos_id]
    new_videos = get_audio(filtered_videos)
    save_new_videos(new_videos, existing_videos)

**INGESTION**: Audio transcription

In [6]:
def transcribe_audio(path: str):
    """
    Runs whisper model for the audio file sent in the path argument.
    - path: str > Location of the specific m4a audio file.
    """
    model = whisper.load_model("base")
    result = model.transcribe(path)
    return result

In [7]:
def format_transcription(transcription: str):
    """
    Format a transcription as a list of segments with metadata
    - transcription: str > Text of transcription from a video
    """
    formatted_segments = []
    for segment in transcription['segments']:
        formatted_segment = {
            'start': segment['start'],
            'end': segment['end'],
            'text': segment['text']}
        formatted_segments.append(formatted_segment)
    return formatted_segments

def save_transcription(transcription: List, filename: str, directory: str = "transcriptions"):
    """
    Save the formatted transcription in directory
    - transcription: List > List of segments from transcription
    - filename: str > Name for the output file.
    - directory: str > Place where it will be located.
    """
    if not os.path.exists(directory):
        os.makedirs(directory)
    with open(f'{directory}/{filename}', 'w') as f:
        f.write(json.dumps(transcription))
    print("Transcription saved")

def save_updated_videos(videos: List, filename: str = "videos.json"):
    """
    Update the keys from the videos.json related to transcript flag
    - videos: List > List of videos transcripted.
    - filename: str > Name for output in json file.
    """
    with open(filename, 'w') as f:
        f.write(json.dumps(videos))
    print("Updated videos saved")

In [9]:
def start_audio_transcription():
    """
    Entry point to transcribe the audio downloaded in the previous step
    It will get the location of the audio files from videos.json file
    """
    print("Starting")
    directory = "transcriptions"
    with open('videos.json', 'r') as f:
        videos = json.load(f)
        if(len(videos) == 0):
            print("No videos to transcribe")
            pass
        for video in videos:
            if video['processed'] == True or video['transcribed'] == True:
                print("Video already transcribed or processed")
                continue
            audio_path = video['audio']
            filename = video['title'].replace(" ", "_").replace("/","_") + ".json"
            transcription = transcribe_audio(audio_path)
            formatted_transcription = format_transcription(transcription)
            save_transcription(formatted_transcription, filename, directory)
            video['transcription'] = f'{directory}/{filename}'
            video['transcribed'] = True
            save_updated_videos(videos)

Run full download and transcriptions

In [None]:
start_audio_download()
start_audio_transcription()