<a href="https://colab.research.google.com/github/sheldonkemper/bank_of_england/blob/main/notebooks/processed/sk_process_santander.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Modules

In [13]:
"""
===================================================
Author: Sheldon Kemper
Role: Data Engineering Lead, Bank of England Employer Project (Quant Collective)
LinkedIn: https://www.linkedin.com/in/sheldon-kemper
Date: 2025-02-04
Version: 1.1

Description:
    This notebook implements a system for transcribing and processing audio transcripts for the Bank of England project.
    The workflow downloads an audio file from a specified URL, applies a machine learning-based speech-to-text model
    (e.g., OpenAI’s Whisper) to convert the audio into text, and segments the resulting transcript into two sections:
    the Manager Presentation and the Question & Answer (Q&A) sections. Each section is subsequently exported into its
    own CSV file using Python libraries such as requests, regex, and CSV (or pandas). This pipeline builds on our existing
    data engineering infrastructure to facilitate efficient extraction, segmentation, and analysis of key project content.

===================================================
"""




In [14]:
# Install whisper (if not already installed)
# !pip install git+https://github.com/openai/whisper.git

In [15]:
import os
import glob
import subprocess
import requests
from bs4 import BeautifulSoup
import whisper
import re
import csv
import whisper

In [16]:
import os
from google.colab import drive

# Mount Google Drive to the root location with force_remount
drive.mount('/content/drive', force_remount=True)

# Assuming 'BOE' folder is in 'MyDrive' and already shared
BOE_path = '/content/drive/MyDrive/BOE/bank_of_england/data'

# List the contents of the BOE directory
print("BOE Directory Contents:", os.listdir(BOE_path))

# Define the raw data path (assuming your audio files are under raw/santander)
raw_data_path = os.path.join(BOE_path, 'raw', 'santander')
print("Raw Data Directory Contents:", os.listdir(raw_data_path))


Mounted at /content/drive
BOE Directory Contents: ['model', 'processed', 'raw']
Raw Data Directory Contents: ['text']


## Download Videos


In [None]:
import os
import subprocess
import requests
from bs4 import BeautifulSoup

def extract_video_url(page_url):
    """
    Given a webpage URL, this function scrapes the HTML to find the <video> tag,
    then extracts the src attribute from the <source type="video/mp4"> tag.
    If the URL is protocol-relative (starting with "//"), it prepends "https:".
    Returns the extracted video URL or None if not found.
    """
    try:
        response = requests.get(page_url)
        response.raise_for_status()  # raise exception if the request failed
    except Exception as e:
        print(f"Error requesting page {page_url}: {e}")
        return None

    soup = BeautifulSoup(response.text, 'html.parser')
    video_tag = soup.find('video')
    if not video_tag:
        print(f"No video tag found on {page_url}")
        return None

    source_tag = video_tag.find('source', {'type': 'video/mp4'})
    if not source_tag:
        print(f"MP4 source not found on {page_url}")
        return None

    video_url = source_tag.get('src')
    if video_url.startswith('//'):
        video_url = 'https:' + video_url
    return video_url

def download_video(video_url, output_filename):
    """
    Given a direct video URL (including any token/signature parameters),
    this function uses ffmpeg to download the video without re-encoding.
    """
    if not video_url:
        print("No video URL provided for download.")
        return

    # Build the ffmpeg command
    command = [
        "ffmpeg",
        "-i", video_url,
        "-c", "copy",
        output_filename
    ]

    print(f"Running command: {' '.join(command)}")
    try:
        subprocess.run(command, check=True)
        print(f"Download complete: {output_filename}")
    except subprocess.CalledProcessError as e:
        print(f"Error during download: {e}")

def process_video_page(page_url, output_filename):
    """
    Combines the steps:
      1. Scrapes the provided page URL to extract the video URL.
      2. Downloads the video using ffmpeg.
    """
    print(f"Processing page: {page_url}")
    video_url = extract_video_url(page_url)
    if video_url:
        print(f"Extracted video URL: {video_url}")
        download_video(video_url, output_filename)
    else:
        print(f"Could not extract video URL from {page_url}")

# Define a list of video pages along with their associated year and quarter.
video_pages = [
    {"url": "https://wavedw01.santandergroup.net/content/5SRc1sBhrbQShIGv", "year": "2024", "quarter": "Q1"},
    {"url": "https://wavedw01.santandergroup.net/content/mWmPFSlzp8bbFaY6", "year": "2024", "quarter": "Q2"},
    {"url": "https://wavedw01.santandergroup.net/content/FDze0qyNaPlaLMGd", "year": "2024", "quarter": "Q3"},
    {"url": "https://wavedw01.santandergroup.net/content/CZXmizS4fhXXBe1Y", "year": "2024", "quarter": "Q4"},

    {"url": "https://wavedw01.santandergroup.net/content/yVQDX2NYLvKCfejI", "year": "2023", "quarter": "Q1"},
    {"url": "https://wavedw01.santandergroup.net/content/GSHpNVJBNGIlmjoC", "year": "2023", "quarter": "Q2"},
    {"url": "https://wavedw01.santandergroup.net/content/fmjK8h3yPneBlW6H", "year": "2023", "quarter": "Q3"},
    {"url": "https://wavedw01.santandergroup.net/content/aon6bYIQV5etHkKj", "year": "2023", "quarter": "Q4"},

]

# Set the output directory to your raw/santander directory on Google Drive.
output_dir = '/content/drive/MyDrive/BOE/bank_of_england/data/raw/santander'
os.makedirs(output_dir, exist_ok=True)

# Process each video page and include year and quarter in the output filename.
for idx, video_info in enumerate(video_pages, start=1):
    page_url = video_info["url"]
    year = video_info["year"]
    quarter = video_info["quarter"]
    # Construct the output filename using the metadata.
    output_file = os.path.join(output_dir, f"video_{year}_{quarter}_{idx}.mp4")
    process_video_page(page_url, output_file)


Processing page: https://wavedw01.santandergroup.net/content/5SRc1sBhrbQShIGv
Extracted video URL: https://waves3.santandergroup.net/waves3/EXTERNOS/sWEB/media/5SRc1sBhrbQShIGv/st3_270_EN01_900p.mp4?AWSAccessKeyId=Z3NuZXR3YXZl&Expires=1739196850&Signature=%2F12zEH3XYyHzhdFuALcSNABYuH4%3D
Running command: ffmpeg -i https://waves3.santandergroup.net/waves3/EXTERNOS/sWEB/media/5SRc1sBhrbQShIGv/st3_270_EN01_900p.mp4?AWSAccessKeyId=Z3NuZXR3YXZl&Expires=1739196850&Signature=%2F12zEH3XYyHzhdFuALcSNABYuH4%3D -c copy /content/drive/MyDrive/BOE/bank_of_england/data/raw/santander/video_2024_Q1_1.mp4


## Process All Downloaded MP4 Files

In [None]:
# Define directories – adjust these paths as needed.
raw_dir = '/content/drive/MyDrive/BOE/bank_of_england/data/raw/santander'
processed_dir = '/content/drive/MyDrive/BOE/bank_of_england/data/processed'
os.makedirs(processed_dir, exist_ok=True)

# Load the Whisper transcription model (choose an appropriate size)
model = whisper.load_model("base")

# Define the CSV file where all transcripts will be appended.
all_transcripts_csv = os.path.join(processed_dir, "santander_all_transcripts.csv")

# Prepare a set to store already processed file names to check for duplicates.
existing_files = set()
if os.path.exists(all_transcripts_csv):
    with open(all_transcripts_csv, "r", newline="", encoding="utf-8") as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            existing_files.add(row["file_name"])

# If the CSV file doesn't exist, create it and write a header row with the new fields.
if not os.path.exists(all_transcripts_csv):
    with open(all_transcripts_csv, "w", newline="", encoding="utf-8") as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(["file_name", "year", "quarter", "transcript", "duplicate"])

# Process each MP4 file in the raw directory.
mp4_files = glob.glob(os.path.join(raw_dir, "*.mp4"))

for mp4_file in mp4_files:
    print(f"Processing MP4 file: {mp4_file}")
    # Transcribe the video file using Whisper.
    result = model.transcribe(mp4_file)
    transcript_text = result["text"]

    # Use the file's base name as an identifier.
    base_name = os.path.splitext(os.path.basename(mp4_file))[0]

    # Attempt to extract the year and quarter from the base_name.
    # Expecting a pattern like "video_2023_Q3_1" in the file name.
    match = re.search(r'(\d{4})_(Q[1-4])', base_name)
    if match:
        year = match.group(1)
        quarter = match.group(2)
    else:
        year = "Unknown"
        quarter = "Unknown"

    # Check if this file has already been processed.
    if base_name in existing_files:
        duplicate_flag = "Yes"
    else:
        duplicate_flag = "No"
        existing_files.add(base_name)

    # Append the new transcript record to the CSV with the additional fields.
    with open(all_transcripts_csv, "a", newline="", encoding="utf-8") as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow([base_name, year, quarter, transcript_text, duplicate_flag])

    print(f"Transcript for '{base_name}' appended to {all_transcripts_csv} (year: {year}, quarter: {quarter}, duplicate: {duplicate_flag})")
