# Data Acquisition

This notebook contains the code used to download 1 year's worth of transcripts from the [top 50 most listened to podcasts](https://www.edisonresearch.com/the-top-50-podcasts-in-the-u-s-for-q1-2025-from-edison-podcast-metrics/) in the US for the first quarter of 2025.

Transcripts are hosted on [podscribe.app](https://podscribe.app/), and have all been transcribed using Google Cloud's [Google Cloud's Speech-to-Text API](https://cloud.google.com/speech-to-text).

## Imports
Add necessary imports.

In [None]:
import asyncio
import nest_asyncio
from datetime import datetime, timedelta
from playwright.async_api import async_playwright
from playwright.async_api import TimeoutError as PlaywrightTimeoutError
import random
import os
import re
from pathlib import Path
import pandas as pd

In [None]:
# automated downloading of transcripts from Podscribe
# does this by navigating through relevant episode pages, toggling UI controls, and saving transcripts

# sanitizing strings for safe filenames
def sanitize_filename(s):
    return re.sub(r"[^a-zA-Z0-9_\- \(\)\[\]\.,]", "", s).strip()

# Get the episode title from the page header
async def extract_episode_title(page):
    selectors = [
        "h1", "h2", ".MuiTypography-h1", ".MuiTypography-h2"
    ]
    for sel in selectors:
        el = await page.query_selector(sel)
        if el:
            text = (await el.inner_text()).strip()
            if text and len(text) > 3:
                match = re.split(r"\s+episode\b", text, flags=re.I)
                return match[0].strip()
    # fallback: use the <title> tag
    title_el = await page.query_selector("""title""")
    if title_el:
        text = (await title_el.inner_text()).strip()
        match = re.split(r"\s+episode\b", text, flags=re.I)
        return match[0].strip()
    return "Untitled"

# pause briefly to mimic human browsing, with mouse and scroll movements
async def random_human_sleep(page, min_ms=800, max_ms=3500, move_mouse=True, scroll=True):
    t = random.uniform(min_ms, max_ms)
    print(f"Sleeping for {t:.2f} ms...")
    if move_mouse:
        for _ in range(random.randint(1, 3)):
            x = random.randint(0, 1200)
            y = random.randint(0, 800)
            await page.mouse.move(x, y, steps=random.randint(5, 15))
            await asyncio.sleep(random.uniform(0.08, 0.25))
    if scroll and random.random() < 0.5:
        pixels = random.randint(-400, 400)
        await page.mouse.wheel(0, pixels)
    await page.wait_for_timeout(t)

# toggling off switches to remove timestamps and speaker tags
async def toggle_switch(page, label_text):
    labels = await page.query_selector_all("""label.MuiFormControlLabel-root""")
    found = False
    for label in labels:
        spans = await label.query_selector_all("""span""")
        if not spans:
            continue
        text = await spans[-1].inner_text()
        if text.strip().lower() == label_text.lower():
            found = True
            checkbox = await label.query_selector("""input[type="checkbox"]""")
            if checkbox:
                checked = await checkbox.is_checked()
                if checked:
                    box = await label.bounding_box()
                    if box:
                        x = box["x"] + box["width"] / 2
                        y = box["y"] + box["height"] / 2
                        print(f"Moving mouse to switch '{label_text}' at ({x:.1f}, {y:.1f})")
                        await page.mouse.move(x, y, steps=random.randint(5, 10))
                        await asyncio.sleep(random.uniform(0.08, 0.15))
                    print(f"Toggling off '{label_text}'")
                    await checkbox.click(force=True)
                else:
                    print(f"'{label_text}' already off")
            else:
                print(f"Checkbox for '{label_text}' not found")
            break
    if not found:
        print(f"Label for '{label_text}' not found")
    return found

# Retry toggling with timeout handling if fails
async def safe_toggle_switch(page, label_text, max_retries=2):
    for attempt in range(max_retries):
        try:
            result = await toggle_switch(page, label_text)
            return result
        except PlaywrightTimeoutError as e:
            print(f"Timeout clicking '{label_text}' switch, attempt {attempt+1}/{max_retries}: {e}")
            await random_human_sleep(page, 1500, 3000)
    print(f"Giving up on toggling '{label_text}'. Continuing...")
    return False

# puts everything together to download Podscribe transcript for a given episode
async def download_podscribe_transcript(page, transcript_url: str, save_dir: str, published_date_str: str):
    Path(save_dir).mkdir(parents=True, exist_ok=True)
    print(f"Navigating to {transcript_url}")
    try:
        await page.goto(transcript_url, timeout=60000)
    except PlaywrightTimeoutError as e:
        print(f"Timeout navigating to {transcript_url}: {e}")
        return

    await random_human_sleep(page)

    episode_title = await extract_episode_title(page)
    episode_title = sanitize_filename(episode_title)
    published_date_str = sanitize_filename(published_date_str)
    base_filename = f"{episode_title} ({published_date_str}).txt"

    await random_human_sleep(page)
    await safe_toggle_switch(page, """Speakers""")
    await random_human_sleep(page)
    await safe_toggle_switch(page, """Times""")
    await random_human_sleep(page)

    try:
        download_btn = await page.query_selector("""button svg[data-testid="GetAppIcon"]""")
        if download_btn:
            parent_btn = await download_btn.evaluate_handle("""el => el.closest("button")""")
            box = await parent_btn.bounding_box()
            if box:
                x = box["x"] + box["width"] / 2
                y = box["y"] + box["height"] / 2
                print(f"Moving mouse to Download button at ({x:.1f}, {y:.1f})")
                await page.mouse.move(x, y, steps=random.randint(5, 10))
                await asyncio.sleep(random.uniform(0.08, 0.18))
            print("Clicking download button...")
            await parent_btn.click()
            await random_human_sleep(page, 800, 1600)
            txt_option = await page.wait_for_selector("""li span.jss97:text("TXT")""", timeout=5000)
            if txt_option:
                txt_box = await txt_option.bounding_box()
                if txt_box:
                    x = txt_box["x"] + txt_box["width"] / 2
                    y = txt_box["y"] + txt_box["height"] / 2
                    print(f"Moving mouse to TXT dropdown at ({x:.1f}, {y:.1f})")
                    await page.mouse.move(x, y, steps=random.randint(5, 10))
                    await asyncio.sleep(random.uniform(0.08, 0.15))
                async with page.expect_download() as download_info:
                    await txt_option.click()
                download = await download_info.value
                save_path = os.path.join(save_dir, base_filename)
                await download.save_as(save_path)
                print(f"Transcript downloaded to {save_path}")
            else:
                print("TXT option not found!")
        else:
            print("Download button not found.")
    except PlaywrightTimeoutError as e:
        print(f"Timeout during download for {transcript_url}: {e}")
    except Exception as e:
        print(f"Error downloading transcript for {transcript_url}: {e}")

    await random_human_sleep(page)

# get all available transcripts on a given page
async def scrape_and_download(series_url, save_dir, months=12, old_limit=10):
    cutoff = datetime.now() - timedelta(days=30*months)
    page_num = 1

    from playwright.async_api import async_playwright
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=False)
        context = await browser.new_context(accept_downloads=True)
        page = await context.new_page()
        await page.set_extra_http_headers({
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                          "AppleWebKit/537.36 (KHTML, like Gecko) "
                          "Chrome/124.0.0.0 Safari/537.36"
        })
        await page.goto(series_url)
        await page.wait_for_timeout(2000)

        while True:
            # scroll to bottom to make sure all episodes load
            for _ in range(10):
                await page.mouse.wheel(0, 1000)
                await page.wait_for_timeout(200)
            await page.wait_for_timeout(1000)

            rows = await page.query_selector_all("""tr""")
            print(f"Page {page_num}: found {len(rows)} table rows.")
            old_count = 0
            page_episodes = []

            # get all episode links and published dates
            for row in rows:
                anchor = await row.query_selector("""a[href^="/episode/"]""")
                if not anchor:
                    continue
                href = await anchor.get_attribute("href")
                if not href:
                    continue

                date_elem = await row.query_selector("""p[aria-label*="/"]""")
                published_str = await date_elem.inner_text() if date_elem else None

                published_dt = None
                if published_str:
                    try:
                        if published_str.count("/") == 1:
                            published_dt = datetime.strptime(f"{published_str}/{datetime.now().year}", "%m/%d/%Y")
                        elif published_str.count("/") == 2:
                            published_dt = datetime.strptime(published_str, "%m/%d/%Y")
                    except Exception as e:
                        print(f"Could not parse date '{published_str}' for {href}: {e}")
                        continue

                if published_dt and published_dt < cutoff:
                    old_count += 1

                if published_dt and published_dt >= cutoff:
                    episode_url = "https://app.podscribe.com" + href
                    page_episodes.append({
                        "url": episode_url,
                        "published_date_str": published_str,
                        "published_date_dt": published_dt
                    })
                else:
                    print(f"Skipping episode with date {published_str} ({published_dt}) for {href}")

            # download transcripts one by one in new tabs
            for ep in page_episodes:
                try:
                    transcript_page = await context.new_page()
                    await download_podscribe_transcript(transcript_page, ep["url"], save_dir, ep["published_date_str"])
                    await transcript_page.close()
                except Exception as e:
                    print(f"Failed to process {ep['url']}: {e}")

            print(f"Page {page_num}: {old_count} episodes older than cutoff.")
            if old_count >= old_limit:
                print(f"Found {old_count} old episodes on page {page_num}. Stopping pagination.")
                break

            # go to next page (if exists)
            next_btn = await page.query_selector("""button[aria-label="Go to next page"]""")
            if next_btn:
                is_disabled = await next_btn.get_attribute("disabled")
                aria_disabled = await next_btn.get_attribute("aria-disabled")
                if is_disabled or aria_disabled == "true":
                    print("Next button is disabled. Stopping.")
                    break
                else:
                    sleep_time = random.uniform(1.5, 20)
                    print(f"Clicking next page... (sleeping {sleep_time:.2f}s)")
                    await next_btn.click()
                    await page.wait_for_timeout(int(sleep_time * 1000))
                    page_num += 1
            else:
                print("No next button found. Done with pagination.")
                break

        await browser.close()

In [None]:
# ensuring safe filenames
def safe_string(title):
    title = title.lower()
    title = re.sub(r"\s+", "-", title)         
    title = re.sub(r"[^a-z0-9\-]", "", title)   
    return title

In [None]:
# initiate scraping process
df = pd.read_csv("data\\top_50_pods_USA_2025_Q1.csv", index_col="Rank")
pod_count = len(df)

print("Starting podcast transcript scraping for the top 50 podcasts in the US for Q1 of 2025...")
for index, row in df.iterrows():
    title = safe_string(row["Title"])
    if not os.path.exists(title):
        print(f"Scraping starting for podcast {index} of {pod_count} ({row["Title"]}).")
        await scrape_and_download(
            row["Transcript Link"],
            f"./{title}",
            months=12,
            old_limit=10
        )
        sleep_time = random.uniform(30, 600)
        print(f"Scraping completed for podcast {index} of {pod_count} ({row["Title"]}).")
        print(f"Sleeping for {sleep_time:.2f} seconds...")
        await asyncio.sleep(sleep_time)
    else:
        print(f"Podcast {row["Title"]} already exists.")
        print("Skipping...")
        
print("Transcript scraping COMPLETE!")