In [4]:
###############################################################################
# 1. CONFIGURATION
###############################################################################

# Define the job name (for output file)
job_name = input("Job name: ")

# Load environment variables from config.env
from dotenv import load_dotenv
load_dotenv('./config.env')

# Retrieve API keys from environment variables
ASSEMBLYAI_API_KEY = os.getenv("ASSEMBLYAI_API_KEY")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

# Validate API keys
if not ASSEMBLYAI_API_KEY:
    raise ValueError("ASSEMBLYAI_API_KEY not found in config.env")
if not OPENAI_API_KEY:
    raise ValueError("OPENAI_API_KEY not found in config.env")

# Initialize the OpenAI client
from openai import OpenAI
client = OpenAI(api_key=OPENAI_API_KEY)


# client = OpenAI(api_key="<DeepSeek API Key>", base_url="https://api.deepseek.com")from dotenv import load_dotenv

# Define OpenAI model to use
OPENAI_MODEL = "gpt-4o-mini"  # Ensure this model is accessible with your API key

# Define chunking parameters
CHUNK_WORD_TARGET = 500  # Target words per chunk
MAX_SUMMARY_WORDS = 300  # Maximum words in running summary before summarization
ENABLE_SUMMARY_SUMMARIZATION = True  # Toggle for summary summarization


In [8]:
import os
import time
import yt_dlp
import assemblyai as aai

###############################################################################
# 2. ASSEMBLYAI TRANSCRIPTION
###############################################################################

def extract_audio_from_youtube(youtube_url: str) -> str:
    """
    Extracts the best available audio URL (m4a format) from the given YouTube URL.
    """

    ydl_opts = {
        "cookiesfrombrowser": ('firefox',)
    }
    
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        info = ydl.extract_info(youtube_url, download=True)

    # Iterate over formats in reverse (best quality first) and pick one with audio only
    for fmt in reversed(info.get("formats", [])):
        if fmt.get("acodec") != "none" and fmt.get("ext") == "m4a":
            return fmt["url"]

def transcribe_audio_assemblyai(audio_url_or_path: str, language_code: str = "en") -> aai.Transcript:
    """
    Transcribe the audio from the given URL or local file path using AssemblyAI.
    If a YouTube URL is provided, it automatically extracts the audio URL.
    Returns the transcript object.
    """
    # If the input appears to be a YouTube URL, extract the audio URL.
    if "youtube.com" in audio_url_or_path or "youtu.be" in audio_url_or_path:
        print("YouTube URL detected. Extracting audio URL...")
        audio_url_or_path = extract_audio_from_youtube(audio_url_or_path)
        print(f"YouTube video audio URL: {audio_url_or_path}")
    
    # Set up AssemblyAI
    aai.settings.api_key = ASSEMBLYAI_API_KEY
    config = aai.TranscriptionConfig(language_code=language_code)
    transcriber = aai.Transcriber(config=config)

    # Start transcription
    transcript = transcriber.transcribe(audio_url_or_path)

    # Poll for completion
    while transcript.status not in ['completed', 'error']:
        print(f"Transcription status: {transcript.status}. Waiting...")
        time.sleep(5)  # Wait for 5 seconds before checking again
        transcript = transcriber.get_transcription(transcript.id)

    # Check for errors
    if transcript.status == aai.TranscriptStatus.error:
        raise RuntimeError(f"Transcription failed: {transcript.error}")

    return transcript

###############################################################################
# 3. CHUNKING THE TRANSCRIPT
###############################################################################

def chunk_text_by_paragraphs(transcript: aai.Transcript, chunk_word_target: int = 600) -> list:
    """
    Splits the transcript into chunks based on its paragraphs, aiming for about
    `chunk_word_target` words each. Accumulates paragraphs until the target is reached.
    
    Returns a list of textual chunks (strings).
    """
    paragraphs = transcript.get_paragraphs()
    chunks = []
    current_chunk = []
    current_word_count = 0

    for paragraph in paragraphs:
        paragraph_text = paragraph.text.strip()
        if not paragraph_text:
            continue  # Skip empty paragraphs

        paragraph_word_count = len(paragraph_text.split())

        # If adding this paragraph exceeds the target and current chunk is not empty, create a new chunk
        if (current_word_count + paragraph_word_count) > chunk_word_target and current_chunk:
            chunk = "\n".join(current_chunk)
            chunks.append(chunk)
            current_chunk = []
            current_word_count = 0

        # Add the paragraph to the current chunk
        current_chunk.append(paragraph_text)
        current_word_count += paragraph_word_count

    # Add any remaining paragraphs as the last chunk
    if current_chunk:
        chunk = "\n".join(current_chunk)
        chunks.append(chunk)

    return chunks

###############################################################################
# 4. OPENAI REWRITING (PAGE-BY-PAGE)
###############################################################################
def rewrite_chunk_with_openai(chunk_text: str,
                              model: str = OPENAI_MODEL,
                              prev_summary: str = "") -> str:
    """
    Sends a chunk of text to OpenAI for rewriting in a 'professorial' register.

    Optionally includes `prev_summary` – a short summary of all previously
    processed chunks – as context for better continuity across chunks.

    Returns the revised chunk as a string.
    """

    # Build system prompt with instructions
    system_prompt = (
        "You are an expert in rewriting transcripts with a professorial register. "
        "You will receive fragments of a university lesson transcript generated "
        "from an audio recording. Your role is to correct grammar, punctuation, "
        "and spelling, fix words that may be misrecognized, remove filler words, "
        "and elevate the text to an academic standard. Output only the revised "
        "transcript text in plain text, without titles, markdown, or other formatting. "
        "Maintain context as if it were in medias res."
    )

    # Build user prompt with the chunk, plus the short summary of prior chunks
    # The summary is for context only; it helps the model keep track of earlier topics.
    if prev_summary:
        user_prompt = (
            f"Here is a short summary of what has come before:\n{prev_summary}\n\n"
            f"Now, rewrite the following chunk:\n\n{chunk_text}\n\n"
            "Output only the revised text. Do not add extra commentary or formatting."
        )
    else:
        user_prompt = (
            f"Now, rewrite the following chunk:\n\n{chunk_text}\n\n"
            "Output only the revised text. Do not add extra commentary or formatting."
        )

    # Call OpenAI ChatCompletion using the client
    response = client.chat.completions.create(
        model=model,
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
        temperature=0.2,  # Keep temperature low for consistent rewriting
        max_tokens=1500,   # Enough tokens to handle rewriting a ~600-word chunk
    )

    revised_text = response.choices[0].message.content
    return revised_text.strip()

def summarize_text_with_openai(text: str,
                               model: str = "chatgpt-4o-mini") -> str:
    """
    Summarizes the given text in a couple of sentences to maintain context
    for future rewriting chunks.
    """

    system_prompt = (
        "You are a concise and precise summarizer. Summarize the following text "
        "in one sentence, focusing on the key ideas. Keep it short. Do not referes "
        "to the text itself, just provide a single sentence that capture the kay ideas."
    )

    user_prompt = f"Text to summarize:\n{text}"

    response = client.chat.completions.create(
        model=model,
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
        temperature=0.2,
        max_tokens=200,
    )

    summary = response.choices[0].message.content
    return summary.strip()

def get_word_count(text: str) -> int:
    """
    Returns the word count of the given text.
    """
    return len(text.split())

In [9]:
import os
import yt_dlp

def download_youtube_audio(youtube_url: str) -> str:
    """
    Downloads the best available audio from a YouTube video, converts it to MP3 (192 kbps),
    and saves it into ./audio/ with a proper file name.
    
    Returns the final path to the downloaded MP3 file.
    """

    # Ensure the output directory exists
    output_dir = './audio'
    os.makedirs(output_dir, exist_ok=True)

    # We'll store the final file in ./audio/<title>.mp3
    outtmpl = os.path.join(output_dir, "%(title)s.%(ext)s")

    ydl_opts = {
        "format": "bestaudio/best",             # Download best-quality audio
        "outtmpl": outtmpl,                    # Save to ./audio/<title>.<ext>
        "postprocessors": [{
            "key": "FFmpegExtractAudio",
            "preferredcodec": "mp3",
            "preferredquality": "192",         # ~192 kbps MP3
        }],
        "cookiesfrombrowser": ("firefox",),    # Use Firefox cookies if needed
        "quiet": True,                         # Suppress non-error messages
        "no_warnings": True
    }

    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        info = ydl.extract_info(youtube_url, download=True)
        # Prepare the base filename (e.g. 'My Song.m4a' before post-processing)
        temp_path = ydl.prepare_filename(info)

    # Because of the FFmpegExtractAudio postprocessor, the final file is .mp3
    # We just replace the extension if needed:
    base, _ = os.path.splitext(temp_path)
    final_path = base + ".mp3"

    return final_path

def transcribe_audio_assemblyai(audio_url_or_path: str, language_code: str = "en") -> aai.Transcript:
    """
    Transcribe the audio from a local file path or (if it's YouTube) download it first,
    and then pass the local file to AssemblyAI for transcription.
    """
    # If the input appears to be a YouTube URL, download the audio locally
    if "youtube.com" in audio_url_or_path or "youtu.be" in audio_url_or_path:
        print("Detected YouTube URL. Downloading audio locally...")
        audio_url_or_path = download_youtube_audio(audio_url_or_path)
        print(f"Local file path: {audio_url_or_path}")
    
    # Set up AssemblyAI
    aai.settings.api_key = ASSEMBLYAI_API_KEY
    config = aai.TranscriptionConfig(language_code=language_code)
    transcriber = aai.Transcriber(config=config)

    print("Uploading file to AssemblyAI for transcription...")
    # The transcriber can accept a local path, in which case it will upload it to AssemblyAI.
    transcript = transcriber.transcribe(audio_url_or_path)

    # Poll for completion
    while transcript.status not in ['completed', 'error']:
        print(f"Transcription status: {transcript.status}. Waiting...")
        time.sleep(5)  # Wait for 5 seconds before checking again
        transcript = transcriber.get_transcription(transcript.id)

    # Check for errors
    if transcript.status == aai.TranscriptStatus.error:
        raise RuntimeError(f"Transcription failed: {transcript.error}")

    return transcript



In [10]:
# 1) Transcribe audio
# You can point to a local file, remote URL or a YouTube video. E.g.:
# audio_source = "https://assembly.ai/path_to_your_audio_file.mp3"
# or
# audio_source = "./local_file.mp3"
# or
# audio_source = "https://www.youtube.com/watch?v=YOUR_VIDEO"
audio_source = input('Path to audio file: ')  # Replace with your audio file path or URL

print("Transcribing audio... please wait.")
try:
    full_transcript_text = transcribe_audio_assemblyai(audio_source, language_code='en')
except RuntimeError as e:
    print(str(e))

print("Transcription complete.")

Transcribing audio... please wait.
Detected YouTube URL. Downloading audio locally...
Local file path: ./audio/Stanford CS229 I Machine Learning I Building Large Language Models (LLMs).mp3
Uploading file to AssemblyAI for transcription...
Transcription complete.


In [11]:
# 2) Chunk the transcript using paragraphs
print("Splitting transcript into chunks based on paragraphs...")
chunks = chunk_text_by_paragraphs(full_transcript_text, chunk_word_target=CHUNK_WORD_TARGET)
print(f"Created {len(chunks)} chunk(s) of ~{CHUNK_WORD_TARGET} words each.")

Splitting transcript into chunks based on paragraphs...
Created 41 chunk(s) of ~500 words each.


In [None]:
# 3) For each chunk, rewrite with OpenAI
final_rewritten_text = []
running_summary = ""  # Will accumulate short summaries of prior chunks
print("Revriting the transcript ...")

for i, chunk_text in enumerate(chunks, start=1):
    # print(f"Rewriting chunk {i}/{len(chunks)}...")

    # Rewrite the chunk
    try:
        revised_text = rewrite_chunk_with_openai(
            chunk_text=chunk_text,
            model=OPENAI_MODEL,
            prev_summary=running_summary
        )
    except RuntimeError as e:
        print(f"Error rewriting chunk {i}: {str(e)}")
        continue  # Skip to the next chunk

    # Append the revised text to our final output
    final_rewritten_text.append(revised_text)

    # Summarize this revised chunk to update context
    try:
        chunk_summary = summarize_text_with_openai(revised_text, model=OPENAI_MODEL)
        # print(f"Summary for chunk {i}: {chunk_summary}")
    except RuntimeError as e:
        print(f"Error summarizing chunk {i}: {str(e)}")
        chunk_summary = ""

    # Append new summary to the running summary
    
    if ENABLE_SUMMARY_SUMMARIZATION:
        running_summary += f" {chunk_summary}"
        # Check if running_summary exceeds MAX_SUMMARY_WORDS
        if get_word_count(running_summary) > MAX_SUMMARY_WORDS:
            # print("Running summary exceeds maximum word limit. Summarizing the running summary...")
            try:
                summarized_running_summary = summarize_text_with_openai(running_summary, model=OPENAI_MODEL)
                running_summary = summarized_running_summary
                # print(f"Summarized running summary: {running_summary}")
            except RuntimeError as e:
                print(f"Error summarizing running summary: {str(e)}")
                # Optionally, you can reset the running_summary or keep it as is
    else:
        running_summary += f" {chunk_summary}"

Revriting the transcript ...


In [13]:
final_rewritten_text

['Let us begin. Today, I will be discussing the construction of large language models (LLMs). Many of you are likely familiar with LLMs, which are essentially the chatbots that have garnered significant attention recently, such as ChatGPT from OpenAI, Claude from Anthropic, Gemini, Llama, and other similar models. \n\nOur focus today will be on how these models function. This will be an overview, as we have only one lecture and it is challenging to condense all the information. However, I aim to address the various components necessary for training these LLMs. If you have questions, please feel free to interrupt me. If you have a question, it is likely that others in the room or on Zoom share the same inquiry, so do not hesitate to ask.\n\nWhat is crucial when training LLMs? There are several key components to consider. First is the architecture; as you may know, LLMs are a type of neural network. When contemplating neural networks, one must consider the architecture employed. Another 

In [14]:
import json

# 4) Output the final revised text and running summary as a JSON file
final_text = "\n".join(final_rewritten_text)

data = {
    "final_text": final_text,
    "running_summary": running_summary
}

output_filename = f"transcript_{job_name}.json"
try:
    with open(output_filename, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=4)
    print(f"Final transcript and running summary saved to {output_filename}")
except Exception as e:
    print(f"Error saving JSON file: {str(e)}")

print("\nDone.")


Final transcript and running summary saved to transcript_Standford_University_Building_LLMs.json

Done.
