In [12]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import pandas as pd
import requests
import time
from youtube_transcript_api import YouTubeTranscriptApi
import re
import yt_dlp
import json
import glob
import os
import csv
from dotenv import load_dotenv

In [13]:
AUDIO_DIR = "audio"

load_dotenv()
API_KEY = os.getenv("API_KEY")

AIzaSyAzcMyJXolJyMBW1k7zfrgid2rGOVitFr8
Hello, World!


In [14]:
def get_transcript(id: str) -> None:
    transcript = YouTubeTranscriptApi.get_transcript(
        id, languages=('en', 'en-US', 'en-GB'))
    return transcript

In [15]:
def download_audio(url):
    print("in download_audio")
    try:
        ydl_opts = {
            'format': 'bestaudio/best',
            'postprocessors': [{
                'key': 'FFmpegExtractAudio',
                'preferredcodec': 'mp3',
                'preferredquality': '192',
            },
            {
                'key': 'FFmpegSplitChapters',  # Adding split chapters processor
                'force_keyframes': True,  # Optional: force keyframes at the start of each chapter
                }],
            'noplaylist': True,
            'writeinfojson': True,  # Optional: write metadata into a JSON file
            'writeannotations': True,  # Optional: write annotations into a file
        }

        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            ydl.download([url])
    except Exception as e:
        print("Error downloading audio: ", e)

In [16]:
def extract_text(transcript, start_time, end_time):
    # Initialize an empty string to hold the extracted text
    extracted_text = ""
    
    # Loop through each entry in the transcript
    for entry in transcript:
        # Calculate the end time of the current entry
        entry_end_time = entry['start'] + entry['duration']
        
        # Check if the entry overlaps with the given time range
        if entry['start'] < end_time and entry_end_time > start_time:
            # Add the text to the extracted text string
            extracted_text += entry['text'] + " "
    
    return extracted_text.strip()


In [17]:
def load_podcast_urls(filename):
    urls = []  # List to hold the URLs
    try:
        with open(filename, mode='r', newline='', encoding='utf-8') as file:
            reader = csv.DictReader(file)
            for i, row in enumerate(reader):
                if i >= 14:  # Break the loop after the first 14 entries
                    break
                urls.append(row['URL'])
    except FileNotFoundError:
        print(f"Error: The file '{filename}' does not exist.")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
    return urls

In [18]:
def get_title_and_channel(url):
    video_id = url.split('v=')[1].split('&')[0]

    print("video_id: ", video_id)

    # YouTube Data API endpoint
    youtube_api_url = f"https://www.googleapis.com/youtube/v3/videos?id={video_id}&key={API_KEY}&part=snippet"

    # Make the API request
    response = requests.get(youtube_api_url)
    data = response.json()

    print("data: ", data)

    # Check if the API call returns items
    if not data.get('items'):
        return "No video found with the given ID."

    # Extract title and channel title
    video_title = data['items'][0]['snippet']['title']
    channel_title = data['items'][0]['snippet']['channelTitle']

    return video_title, channel_title

In [19]:
video_urls_raw = load_podcast_urls("podcast_list.csv")

video_urls = []
for url in video_urls_raw:
    res = get_title_and_channel(url)
    print(str(res))
    video_title = res[0]
    channel_title = res[1]
    video_urls.append({"link": url, "channel": channel_title, "episode" : video_title})

for vid in video_urls:
        print(str(vid))

video_id:  dX7d6bRJI9k
data:  {'kind': 'youtube#videoListResponse', 'etag': 'tByUidWgh05ODITM9I655sJ03NA', 'items': [{'kind': 'youtube#video', 'etag': 'NAJ3urhBqPm8JMSlG5ocWFDfZUk', 'id': 'dX7d6bRJI9k', 'snippet': {'publishedAt': '2024-04-02T22:53:38Z', 'channelId': 'UC9cn0TuPq4dnbTY-CBsm8XA', 'title': 'Politics & the Future of Tech', 'description': "“If America is going to be America in the next hundred years, we have to get this right.” - Ben Horowitz\n\nWelcome to “The Ben & Marc Show”, featuring a16z co-founders Ben Horowitz and Marc Andreessen.  In this latest episode, Marc and Ben take on one of the most hot button issues facing technology today: regulation and policy.\n\nIn this one-on-one conversation, Ben and Marc delve into why the political interests of “Big Tech” often conflict with a positive technological future, the necessity of decentralized AI, and how the future of American innovation is at its most critical point. They also answer YOUR questions from X (formerly Twit

In [20]:
def main():

    all_results = []  # Start with an empty list to store dictionaries

    episodes_to_transcripts = {}

    curr_base_idx = 0

    for video in video_urls:
        episode = video['episode']
        channel = video['channel']
        id = video['link'].split("v=")[1]
        print("id is: " + id)

        if episode not in episodes_to_transcripts:
            transcript_data = get_transcript(id)
            episodes_to_transcripts[episode] = transcript_data
        
        download_audio(video['link'])

        files = glob.glob(f'{episode[:10]}*.json')
        if not files:
            raise FileNotFoundError("No JSON file found matching the pattern.")
        json_file = files[0]
        with open(json_file, 'r') as file:
            data = json.load(file)
            chapters = data['chapters']
        
        for index, chapter in enumerate(chapters):
            extracted_text = extract_text(episodes_to_transcripts[episode], chapter['start_time'], chapter['end_time'])

            chapter_dict = {
                "channel": channel,
                "episode": episode,
                "chapter": chapter['title'],
                "start_time": chapter['start_time'],
                "end_time": chapter['end_time'],
                "text": extracted_text,
                "file_name_prefix": f"{episode} - {str(index + 1).zfill(3)}",
                "prev_index": curr_base_idx + index - 1 if index > 0 else None,
                "next_index": curr_base_idx + index + 1 if index < len(chapters) - 1 else None
            }
            
            all_results.append(chapter_dict)  # Add dictionaries to the list
        curr_base_idx += len(chapters)

    columns = ['channel', 'episode', 'chapter', 'start_time', 'end_time', 'text', 'file_name_prefix', 'prev_index', 'next_index']
    return pd.DataFrame(all_results, columns=columns)  # Create DataFrame from list of dictionaries

# Run the script
df = main()
print(df)

id is: dX7d6bRJI9k&ab_channel=a16z
in download_audio
[youtube] Extracting URL: https://www.youtube.com/watch?v=dX7d6bRJI9k&ab_channel=a16z
[youtube] dX7d6bRJI9k: Downloading webpage
[youtube] dX7d6bRJI9k: Downloading ios player API JSON
[youtube] dX7d6bRJI9k: Downloading android player API JSON
[youtube] dX7d6bRJI9k: Downloading m3u8 information
[info] dX7d6bRJI9k: Downloading 1 format(s): 251
[info] Writing video metadata as JSON to: Politics & the Future of Tech [dX7d6bRJI9k].info.json




[download] Destination: Politics & the Future of Tech [dX7d6bRJI9k].webm
[download] 100% of  105.43MiB in 00:00:03 at 29.08MiB/s    
[ExtractAudio] Destination: Politics & the Future of Tech [dX7d6bRJI9k].mp3


KeyboardInterrupt: 

In [None]:
df.to_csv('ballsack.csv', index=False)