In [1]:
import pandas as pd
import os
from transformers import Wav2Vec2ForCTC, Wav2Vec2Processor
import torchaudio
import torch


  from .autonotebook import tqdm as notebook_tqdm


# Test ASR implementation

In [2]:
## Load data
path = "/Users/reekaestacio/Documents/GitHub/Whisper_LLM_Bias/data"
df = pd.read_csv(path+"/stimuli.csv")
df["file_name"] = path + "/auditory_stimuli/" + df["id"].astype(str) + df["condition_id"] + ".mp3"

# Load pretrained model and processor
model_name = "facebook/wav2vec2-base-960h"  # or your fine-tuned model path
processor = Wav2Vec2Processor.from_pretrained(model_name)
model = Wav2Vec2ForCTC.from_pretrained(model_name)

Some weights of Wav2Vec2ForCTC were not initialized from the model checkpoint at facebook/wav2vec2-base-960h and are newly initialized: ['wav2vec2.masked_spec_embed']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [3]:
def transcribe_mp3(file_path, target_sr=16000):
    # Load and resample audio
    waveform, sample_rate = torchaudio.load(file_path)
    if sample_rate != target_sr:
        resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=target_sr)
        waveform = resampler(waveform)
    
    # Mono-channel and flatten
    input_values = processor(waveform.squeeze(), sampling_rate=target_sr, return_tensors="pt").input_values

    # Predict
    with torch.no_grad():
        logits = model(input_values).logits
        predicted_ids = torch.argmax(logits, dim=-1)

    # Decode to text
    transcription = processor.decode(predicted_ids[0])
    return transcription


In [4]:
# Run model
df["prediction"] = df["file_name"].apply(lambda x: transcribe_mp3(x))

In [5]:
# standardize labels
# w2v2 labels with uppercase
df['sentence'] = df['sentence'].str.upper()
df['sentence'] = df['sentence'].str.replace(".", "", regex=False)



num_incorrect = 0
accurate_list = []

# labels accurate/inaccurate
for _, row in df.iterrows():
    if row["sentence"] == row["prediction"]:
        accurate_list.append(1)
    else:
        accurate_list.append(0)
        num_incorrect += 1

df["accurate"] = accurate_list
print(f"{num_incorrect} incorrect predictions.")

# see inaccuracies
incorrect_df = df[df['accurate']==0]
incorrect_df

## spelling differences in wav2vec, not a problem for real dataset because of simple, single word utterances
## only 2 real mislabels, better to convert to phones


12 incorrect predictions.


Unnamed: 0,id,condition_id,condition_name,sentence,file_name,prediction,accurate
20,5,A,expected,THE CHEF SHARPENED THE KNIFE,/Users/reekaestacio/Documents/GitHub/Whisper_L...,THE CHEFF SHARPENED THE KNIFE,0
21,5,B,phonologically related,THE CHEF SHARPENED THE NIGHT,/Users/reekaestacio/Documents/GitHub/Whisper_L...,THE CHEFF SHARPENED THE NIGHT,0
22,5,C,semantically related,THE CHEF SHARPENED THE BLADE,/Users/reekaestacio/Documents/GitHub/Whisper_L...,THE CHEFF SHARPENED THE BLADE,0
23,5,D,both,THE CHEF SHARPENED THE KNIVES,/Users/reekaestacio/Documents/GitHub/Whisper_L...,THE CHEFF SHARPENED THE KNIVES,0
24,5,E,neither,THE CHEF SHARPENED THE SUN,/Users/reekaestacio/Documents/GitHub/Whisper_L...,THE CHEFF SHARPENED THE SUN,0
25,6,A,expected,THE LUMBERJACK CHOPPED THE TREE,/Users/reekaestacio/Documents/GitHub/Whisper_L...,THE LUMBER JACK CHOPPED THE TREE,0
26,6,B,phonologically related,THE LUMBERJACK CHOPPED THE TREAT,/Users/reekaestacio/Documents/GitHub/Whisper_L...,THE LUMBER JACK CHOPPED THE TREAT,0
27,6,C,semantically related,THE LUMBERJACK CHOPPED THE LOG,/Users/reekaestacio/Documents/GitHub/Whisper_L...,THE LUMBER JACK CHOPPED THE LOG,0
28,6,D,both,THE LUMBERJACK CHOPPED THE TRUNK,/Users/reekaestacio/Documents/GitHub/Whisper_L...,THE LUMBER JACK CHOPPED THE TRUNK,0
29,6,E,neither,THE LUMBERJACK CHOPPED THE COFFEE,/Users/reekaestacio/Documents/GitHub/Whisper_L...,THE LUMBER JACK CHOPPED THE COFFEE,0


# Try LoRA

In [6]:
import os
import sys
# import soundfile
import random
import pandas as pd
import torch
import torchaudio
from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC, TrainingArguments, Trainer
from datasets import Dataset
from sklearn.model_selection import KFold
from peft import LoraConfig, get_peft_model

In [7]:
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union

@dataclass
class DataCollatorCTCWithPadding:
    """
    Data collator that will dynamically pad the inputs received.
    Args:
        processor (:class:`~transformers.Wav2Vec2Processor`)
            The processor used for proccessing the data.
        padding (:obj:`bool`, :obj:`str` or :class:`~transformers.tokenization_utils_base.PaddingStrategy`, `optional`, defaults to :obj:`True`):
            Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
            among:
            * :obj:`True` or :obj:`'longest'`: Pad to the longest sequence in the batch (or no padding if only a single
              sequence if provided).
            * :obj:`'max_length'`: Pad to a maximum length specified with the argument :obj:`max_length` or to the
              maximum acceptable input length for the model if that argument is not provided.
            * :obj:`False` or :obj:`'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of
              different lengths).
        max_length (:obj:`int`, `optional`):
            Maximum length of the ``input_values`` of the returned list and optionally padding length (see above).
        max_length_labels (:obj:`int`, `optional`):
            Maximum length of the ``labels`` returned list and optionally padding length (see above).
        pad_to_multiple_of (:obj:`int`, `optional`):
            If set will pad the sequence to a multiple of the provided value.
            This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability >=
            7.5 (Volta).
    """

    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True
    max_length: Optional[int] = None
    max_length_labels: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    pad_to_multiple_of_labels: Optional[int] = None

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need
        # different padding methods
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features]

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="pt",
        )
        with self.processor.as_target_processor():
            labels_batch = self.processor.pad(
                label_features,
                padding=self.padding,
                max_length=self.max_length_labels,
                pad_to_multiple_of=self.pad_to_multiple_of_labels,
                return_tensors="pt",
            )

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        batch["labels"] = labels

        return batch

In [95]:
lora_df = df[['file_name', 'sentence']]
dataset = Dataset.from_pandas(lora_df)
# Train-test split
dataset = dataset.train_test_split(test_size=0.2)

In [97]:
def preprocess(example):
    # Load audio
    waveform, sr = torchaudio.load(example["file_name"])
    
    # Resample
    if sr != 16000:
        resampler = torchaudio.transforms.Resample(orig_freq=sr, new_freq=16000)
        waveform = resampler(waveform)

    # Flatten list of waveforms for correct dimensionality
    waveform = waveform.squeeze(0)  # shape becomes [samples]

    # Use the feature extractor — output is a dict with lists
    inputs = processor.feature_extractor(
        waveform,
        sampling_rate=16000,
        padding=True,
        return_attention_mask=True,
        return_tensors="pt"  
    )

    # Remove the batch dimension but keep as tensors
    input_values = inputs["input_values"].squeeze(0)
    attention_mask = inputs["attention_mask"].squeeze(0)

    # Tokenize word (label_ids)
    with processor.as_target_processor():
        labels = processor.tokenizer(
            example["sentence"].upper(),
            return_tensors="pt",
            padding=True,
            truncation=True
        ).input_ids.squeeze(0) 

    return {
        "input_values": input_values,
        "attention_mask": attention_mask,
        "labels": labels
    }

In [None]:
# def preprocess(example):
#     # Load audio
#     waveform, sr = torchaudio.load(example["file_name"])
    
#     # Resample
#     if sr != 16000:
#         resampler = torchaudio.transforms.Resample(orig_freq=sr, new_freq=16000)
#         waveform = resampler(waveform)
        
#     waveform = waveform.squeeze(0)
    
#     # Extract audio features
#     inputs = processor(waveform, 
#                        sampling_rate=16000,
#                        return_tensors="pt",
#                        padding=True)
    
#     # Tokenize text.upper()
#     labels = processor.tokenizer(
#         example['sentence'].upper(),
#         return_tensors="pt",
#         padding=True
#     ).input_ids
    
#     return {
#         # keep all inputs lists
#         "input_values": inputs,
#         "labels": labels
#     }

In [98]:
processed_dataset_train= dataset['train'].map(preprocess, remove_columns=["file_name", "sentence"])
processed_dataset_test= dataset['test'].map(preprocess, remove_columns=["file_name", "sentence"])

Map: 100%|██████████| 32/32 [00:00<00:00, 442.33 examples/s]
Map: 100%|██████████| 8/8 [00:00<00:00, 556.61 examples/s]


In [99]:
# # Load Wav2Vec model and processor
# # Instantiate Wav2Vec2 model
# base_model = Wav2Vec2ForCTC.from_pretrained(
#     "facebook/wav2vec2-base-960h", 
#     ctc_loss_reduction="mean", 
#     pad_token_id=processor.tokenizer.pad_token_id
# )

# processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base-960h")

# # Configure LoRA arguments
# lora_config = LoraConfig(
#     r=8, # can change rank, r=8 is most common
#     lora_alpha=32,
#     lora_dropout=0.1,
#     bias="none",
#     target_modules=["q_proj", "v_proj"] # attention layers
# )

# # Add LoRA layers to Wav2Vec2 model
# model = get_peft_model(base_model, lora_config)
# model.freeze_feature_extractor()
# model.print_trainable_parameters()

In [100]:
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)

In [None]:
# Initialize training arguments
# Same args as finetuming
training_args = TrainingArguments(
    output_dir="./lora_w2v2_test",
    group_by_length=True,
    per_device_train_batch_size=5,
    eval_strategy="no", 
    num_train_epochs=3,
    fp16=False,
    gradient_checkpointing=True, 
    save_steps=500,
    eval_steps=500,
    logging_steps=10,
    learning_rate=1e-4,
    weight_decay=0.0,
    warmup_steps=1000,
    save_total_limit=2,
    label_names=['labels'],
    remove_unused_columns=False
)

# Instantiate Trainer
trainer = Trainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    train_dataset=processed_dataset_train,
    eval_dataset=processed_dataset_test,
    tokenizer=processor.feature_extractor
)

  trainer = Trainer(


In [109]:
# not currently working on mac, try on lab computer
trainer.train()

NotImplementedError: The operator 'aten::_ctc_loss' is not currently implemented for the MPS device. If you want this op to be considered for addition please comment on https://github.com/pytorch/pytorch/issues/141287 and mention use-case, that resulted in missing op as well as commit hash 2236df1770800ffea5697b11b0bb0d910b2e59e1. As a temporary fix, you can set the environment variable `PYTORCH_ENABLE_MPS_FALLBACK=1` to use the CPU as a fallback for this op. WARNING: this will be slower than running natively on MPS.