In [1]:
import os
import numpy as np
import webvtt
from datasets import Dataset, concatenate_datasets, load_from_disk
from scipy.io import wavfile
import torch
import numpy as np
from transformers import WhisperProcessor, WhisperFeatureExtractor, WhisperTokenizer, DataCollatorForSeq2Seq
from transformers import WhisperForConditionalGeneration, Seq2SeqTrainer, Seq2SeqTrainingArguments,DataCollatorForSeq2Seq, pipeline
from dataclasses import dataclass
from typing import Any, Dict, List, Union



  _torch_pytree._register_pytree_node(
  _torch_pytree._register_pytree_node(
  _torch_pytree._register_pytree_node(


In [2]:
def vtt_to_text(vtt_file):
    transcript = []
    for caption in webvtt.read(vtt_file):
        transcript.append((caption.start, caption.end, caption.text.strip()))
    return transcript



def create_dataset(audio_file, transcript_data, target_sample_rate=16000):
    try:
        #Load the audio file with scipy
        sample_rate, audio_data = wavfile.read(audio_file)
        print(f"Audio data shape: {audio_data.shape}, Sample rate: {sample_rate}")
    except Exception as e:
        raise RuntimeError(f"Failed to load audio file {audio_file}: {e}")

    #Preparing the dataset
    data = []
    
    for start, end, text in transcript_data:
        #Converting time from HH:MM:SS to seconds
        start_sec = sum(float(x) * 60 ** i for i, x in enumerate(reversed(start.split(":"))))
        end_sec = sum(float(x) * 60 ** i for i, x in enumerate(reversed(end.split(":"))))
        
        #Calculating start and end frames based on the target sample rate
        start_frame = max(0, min(int(start_sec * sample_rate), len(audio_data)))
        end_frame = max(0, min(int(end_sec * sample_rate), len(audio_data)))

        #Cutting the audio chunk for the corresponding transcript
        audio_chunk = audio_data[start_frame:end_frame]

        #Ensure the audio chunk is in the correct format
        audio_chunk_list = audio_chunk.tolist()
        data.append({"audio": audio_chunk_list, "text": text})

    #Check if transcript data is empty
    if not data:
        print(f"No valid transcript data for {audio_file}.")
        return None
    
    #Creating the dataset
    dataset = Dataset.from_dict({
        "audio": [d["audio"] for d in data],
        "text": [d["text"] for d in data]
    })
    
    print(f"Created dataset for {audio_file} with {len(data)} entries.")
    return dataset


def find_subtitle_file(audio_filename, subtitle_folder):
    base_audio_name = os.path.splitext(audio_filename)[0]  
    # Searching for the subtitle file that contains the audio filename in their name
    for subtitle_filename in os.listdir(subtitle_folder):
        if base_audio_name in subtitle_filename and subtitle_filename.endswith('.vtt'):
            return os.path.join(subtitle_folder, subtitle_filename)
    
    return None


def process_videos(audio_folder, subtitle_folder):
    datasets = []

    for audio_file in os.listdir(audio_folder):
        if audio_file.endswith('.wav'):
            input_file_path = os.path.join(audio_folder, audio_file)

            #Find the corresponding subtitle file
            subtitle_file_path = find_subtitle_file(audio_file, subtitle_folder)
            if subtitle_file_path:
                print(f"Found subtitle file: {subtitle_file_path}")

                #Convert subtitles to text transcript
                transcript_data = vtt_to_text(subtitle_file_path)
                if transcript_data:
                    #Create the dataset using the existing WAV file
                    dataset = create_dataset(input_file_path, transcript_data)
                    if dataset:  #Check if the dataset is created
                        datasets.append(dataset)
                    else:
                        print(f"Failed to create dataset for {input_file_path}.")
                else:
                    print(f"No transcript data found for {subtitle_file_path}. The file may be empty or incorrectly formatted.")
            else:
                print(f"Subtitle file not found for {audio_file}. Expected naming convention: {os.path.splitext(audio_file)[0]}*.vtt")

    #Concatenate all datasets
    if datasets:
        combined_dataset = concatenate_datasets(datasets)
        print(f"Combined dataset created with {len(datasets)} individual datasets.")
        return combined_dataset
    else:
        print("No datasets were created.")
        return None
    
    
    
    
audio_path = 'audio_files_copy/'
subtitle_path = 'manual_sub_copy/'
combined_dataset = process_videos(audio_path, subtitle_path)

Found subtitle file: manual_sub_copy/Lecture12TheRatioRootandAlternatingSeriesTests_manual.en-j3PyPqV-e1s.vtt
Audio data shape: (57920598,), Sample rate: 16000
Created dataset for audio_files_copy/Lecture12TheRatioRootandAlternatingSeriesTests.wav with 661 entries.
Found subtitle file: manual_sub_copy/Lecture9LimsupLiminfandtheBolzano-WeierstrassTheorem_manual.en-j3PyPqV-e1s.vtt
Audio data shape: (70756011,), Sample rate: 16000
Created dataset for audio_files_copy/Lecture9LimsupLiminfandtheBolzano-WeierstrassTheorem.wav with 987 entries.
Found subtitle file: manual_sub_copy/Lecture8TheSqueezeTheoremandOperationsInvolvingConvergentSequences_manual.en-j3PyPqV-e1s.vtt
Audio data shape: (71873877,), Sample rate: 16000
Created dataset for audio_files_copy/Lecture8TheSqueezeTheoremandOperationsInvolvingConvergentSequences.wav with 923 entries.
Found subtitle file: manual_sub_copy/Lecture10TheCompletenessoftheRealNumbersandBasicPropertiesofInfiniteSeries_manual.en-j3PyPqV-e1s.vtt
Audio data s

In [3]:
from transformers import WhisperProcessor

# Load the processor for Whisper
processor = WhisperProcessor.from_pretrained("openai/whisper-base.en")

# Function to preprocess each entry in the dataset
def preprocess(batch):
    # Convert audio (list of samples) to input features (Mel spectrograms)
    audio = np.array(batch['audio'], dtype=np.float32)
    batch["input_features"] = processor.feature_extractor(audio, sampling_rate=16000).input_features[0]
    
    # Tokenize the transcription text
    batch["labels"] = processor.tokenizer(batch["text"]).input_ids

    return batch

# Apply the preprocessing to the entire dataset
processed_dataset = combined_dataset.map(preprocess, remove_columns=["audio", "text"])



preprocessor_config.json:   0%|          | 0.00/185k [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/805 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/798k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/2.41M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

normalizer.json:   0%|          | 0.00/52.7k [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/34.6k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/1.83k [00:00<?, ?B/s]

Map:   0%|          | 0/8930 [00:00<?, ? examples/s]

In [4]:
processed_dataset

Dataset({
    features: ['input_features', 'labels'],
    num_rows: 8930
})

In [5]:
@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need different padding methods
        # first treat the audio inputs by simply returning torch tensors
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # get the tokenized label sequences
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        # pad the labels to max length
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        # if bos token is appended in previous tokenization step,
        # cut bos token here as it's appended later anyways
        if (labels[:, 0] == self.processor.tokenizer.bos_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch

In [6]:
data_collator_n = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)


In [7]:
dataset_dict = processed_dataset.train_test_split(test_size=0.2)
train_dataset = dataset_dict['train']
eval_dataset = dataset_dict['test']

In [8]:
print("Number of entries in the dataset:", len(processed_dataset))
print("Dataset shape:", processed_dataset.shape)
print("Features of the dataset:", processed_dataset.features)


print("Number of entries in the train dataset:", len(train_dataset))
print("Train dataset shape:", train_dataset.shape)
print("Features of the train dataset:", train_dataset.features)


print("Number of entries in the eval dataset:", len(eval_dataset))
print("Eval dataset shape:", eval_dataset.shape)
print("Features of the eval dataset:", eval_dataset.features)

Number of entries in the dataset: 8930
Dataset shape: (8930, 2)
Features of the dataset: {'input_features': Sequence(feature=Sequence(feature=Value(dtype='float32', id=None), length=-1, id=None), length=-1, id=None), 'labels': Sequence(feature=Value(dtype='int64', id=None), length=-1, id=None)}
Number of entries in the train dataset: 7144
Train dataset shape: (7144, 2)
Features of the train dataset: {'input_features': Sequence(feature=Sequence(feature=Value(dtype='float32', id=None), length=-1, id=None), length=-1, id=None), 'labels': Sequence(feature=Value(dtype='int64', id=None), length=-1, id=None)}
Number of entries in the eval dataset: 1786
Eval dataset shape: (1786, 2)
Features of the eval dataset: {'input_features': Sequence(feature=Sequence(feature=Value(dtype='float32', id=None), length=-1, id=None), length=-1, id=None), 'labels': Sequence(feature=Value(dtype='int64', id=None), length=-1, id=None)}


In [9]:
from transformers import WhisperForConditionalGeneration, Seq2SeqTrainer, Seq2SeqTrainingArguments
# For clearing MPS memory (macbook issues lol)
torch.mps.empty_cache()  
# Load the Whisper model
model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-base.en")

# Define training arguments
training_args = Seq2SeqTrainingArguments(
    output_dir="./whisper-finetuned-en",
    per_device_train_batch_size=8,
    #To use cpu since gpu memory was exceeded - mac issues again
    no_cuda=True,  
    num_train_epochs=1,  
    save_steps=500,  
    evaluation_strategy="steps",
    logging_steps=50,
    predict_with_generate=False, 
    gradient_accumulation_steps=4,  
)

# Create the trainer
trainer = Seq2SeqTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    data_collator=data_collator_n,
    tokenizer=processor.tokenizer,
)

# Start training
trainer.train()


config.json:   0%|          | 0.00/1.94k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/290M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/1.53k [00:00<?, ?B/s]



Step,Training Loss,Validation Loss


KeyboardInterrupt: 

In [None]:
# Save the final model and processor
model.save_pretrained("./whisper-finetuned-en/final-model")
processor.save_pretrained("./whisper-finetuned-en/final-model")  

In [None]:
import os
import torch
import librosa
from transformers import WhisperForConditionalGeneration, WhisperProcessor

def split_audio(audio, chunk_duration=30):
    """Split audio into chunks of specified duration (in seconds)."""
    sample_rate = 16000
    chunk_size = chunk_duration * sample_rate
    return [audio[i:i + chunk_size] for i in range(0, len(audio), chunk_size)]

def full_audio_transcription(audio_file):
    # Load the Whisper model and processor
    model = WhisperForConditionalGeneration.from_pretrained("./whisper-finetuned-en/final-model")
    processor = WhisperProcessor.from_pretrained("./whisper-finetuned-en/final-model")

    # Load and preprocess the audio file
    audio, _ = librosa.load(audio_file, sr=16000)  # Load audio file at 16 kHz

    # Split the audio into chunks of 30 seconds each
    audio_chunks = split_audio(audio, chunk_duration=30)

    full_transcript = []

    for chunk in audio_chunks:
        # Process the audio chunk to get inputs for the model
        audio_input = processor(chunk, return_tensors="pt", sampling_rate=16000)

        # Generate subtitles (full transcript for this chunk)
        with torch.no_grad():
            generated_ids = model.generate(**audio_input)

        # Decode the generated ids to text
        transcript_chunk = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
        full_transcript.append(transcript_chunk)

    # Join all chunks to form the full transcript
    return " ".join(full_transcript)

def save_transcript_to_file(transcript, filename, folder_path):
    """Save the transcript to a file in the specified folder."""
    # Create the folder if it doesn't exist
    os.makedirs(folder_path, exist_ok=True)
    
    # Define the path for the transcript file
    transcript_file = os.path.join(folder_path, f"{filename}.txt")
    
    # Save the transcript as a text file
    with open(transcript_file, "w", encoding="utf-8") as f:
        f.write(transcript)

def process_all_audio_files(folder_path, output_folder):
    """Process all audio files in the specified folder and save transcripts as separate files."""
    # Loop through all files in the folder
    for filename in os.listdir(folder_path):
        if filename.endswith(('.wav', '.mp3', '.m4a')):  # Add other audio formats as needed
            audio_file = os.path.join(folder_path, filename)
            print(f"Processing {filename}...")
            transcript = full_audio_transcription(audio_file)
            
            # Save each transcript as a text file in the output folder
            base_filename = os.path.splitext(filename)[0]  # Remove file extension
            save_transcript_to_file(transcript, base_filename, output_folder)

# Example usage
audio_folder_path = "audio_files"
output_folder_path = "finetuned_en_whisper_sub"
process_all_audio_files(audio_folder_path, output_folder_path)

print(f"Transcripts saved to the folder '{output_folder_path}'.")
