In [None]:
import json
import os

# --- Configuration Section ---

# 1. Project and Data Paths
# This should be the root folder of your project.
PROJECT_ROOT = "." 
# The name of your JSON file with audio paths and transcripts.
TRAIN_JSON_PATH = os.path.join(PROJECT_ROOT, "train.json") 
# The folder where all your .wav files are stored.
WAV_DIR = os.path.join(PROJECT_ROOT, "wavs") 




EXP_NAME = "whisper_finetune_experiment"


# --- End of Configuration Section ---

import os
from glob import glob

import numpy as np
import librosa

import torch
from espnet2.bin.s2t_inference_ctc import Speech2TextGreedySearch   
from espnet2.layers.create_adapter_fn import create_lora_adapter
import espnetez as ez

# Define hyper parameters
DUMP_DIR = f"./dump"
CSV_DIR = f"./transcription"
EXP_DIR = f"./exp/finetune"
STATS_DIR = f"./exp/stats_finetune"

FINETUNE_MODEL =  "espnet/owsm_ctc_v4_1B" # "espnet/owsm_v3.1_ebf"
LORA_TARGET = [
    "w_1", "w_2", "merge_proj"
]



In [None]:
import json
import os
import random


def prepare_espnet_data(json_path, wav_dir):
    """
    Loads data from a JSON file and formats it for ESPnet-EZ.
    It also splits the data into training and validation sets.

    Args:
        json_path (str): Path to the train.json file.
        wav_dir (str): Path to the directory containing wav files.
        val_split (float): The fraction of data to use for validation.

    Returns:
        tuple: A tuple containing two dictionaries: (train_data, valid_data)
               Each dictionary is in the format expected by ESPnet-EZ.
    """
    print("Preparing data for ESPnet-EZ...")
    
    # 1. Load the JSON file (as dict, then to list)
    try:
        with open(json_path, 'r', encoding='utf-8') as f:
            all_data_dict = json.load(f)
            all_data = list(all_data_dict.values())
    except FileNotFoundError:
        print(f"Error: The file {json_path} was not found.")
        exit()
    except json.JSONDecodeError:
        print(f"Error: The file {json_path} is not a valid JSON file.")
        exit()

    # 2. Shuffle data for a random split
    random.shuffle(all_data)

    train_list = all_data
    
    print(f"Total samples: {len(all_data)}")
    # 4. Format the data into the required dictionary structure
    def format_to_dict(data_list):
        data_dict = {}
        for i, item in enumerate(data_list):
            # Use 'audio_path' and 'transcription' keys
            audio_path = os.path.join(wav_dir, os.path.basename(item['audio_path'] + ".wav"))
            text = item['transcription']
            
            if not os.path.exists(audio_path):
                print(f"Warning: Audio file not found, skipping: {audio_path}")
                continue

            # Create a unique utterance ID
            utt_id = f"utt_{i:05d}"
            data_dict[utt_id] = {
                "wav": audio_path,
                "text": text
            }
        return data_dict

    train_data = format_to_dict(train_list)

    return train_data


valid_data = prepare_espnet_data(
    "/ocean/projects/cis250085p/shared/A_track/dev_test.json",
    "/ocean/projects/cis250085p/shared/track_a_audio_files"
)


train_data = valid_data

#prepare_espnet_data(
#    "/ocean/projects/cis250085p/shared/A_track/train.json",
#    "/ocean/projects/cis250085p/shared/track_a_audio_files"
#)


In [None]:
# import os
# os.environ["OMP_NUM_THREADS"] = "1"
# os.environ["OPENBLAS_NUM_THREADS"] = "1"
# os.environ["MKL_NUM_THREADS"] = "1"
# os.environ["VECLIB_MAXIMUM_THREADS"] = "1"
# os.environ["NUMEXPR_NUM_THREADS"] = "1"
# os.environ["NUMBA_NUM_THREADS"] = "1"
# import torch
# torch.set_num_threads(1)
# torch.set_num_interop_threads(1)

In [None]:
# pretrained_model = Speech2TextGreedySearch.from_pretrained(
#     FINETUNE_MODEL,
#         device='cuda' if torch.cuda.is_available() else 'cpu',
   
# ) # Load model to extract configs.
# pretrain_config = vars(pretrained_model.s2t_train_args)
# tokenizer = pretrained_model.tokenizer
# converter = pretrained_model.converter
# del pretrained_model


In [None]:
import pickle
# Load finetune_config from pickle file
with open('finetune_config.pkl', 'rb') as f:
    finetune_config = pickle.load(f)


In [None]:
finetune_config

In [None]:
finetune_config['batch_size'] = 16

In [None]:

# # For the configuration, please refer to the last cell in this notebook.
# finetune_config = ez.config.update_finetune_config(
# 	's2t',
# 	pretrain_config,
# 	f"finetune_with_lora.yaml"
# )

# When you don't use yaml file, you can load finetune_config in the following way.
# task_class = ez.task.get_ez_task("s2t")
# default_config = task_class.get_default_config()
# training_config = default_config.update(your_config_in_dict)

# define model loading function
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

def freeze_parameters(model):
    for p in model.parameters():
        if p.requires_grad:
            p.requires_grad = False

def build_model_fn(args):
    pretrained_model = Speech2TextGreedySearch.from_pretrained(
        FINETUNE_MODEL,
        device='cuda' if torch.cuda.is_available() else 'cpu',
    )
    model = pretrained_model.s2t_model
    model.train()
    print(f'Trainable parameters: {count_parameters(model)}')
    freeze_parameters(model)

    # apply lora
    create_lora_adapter(model, target_modules=LORA_TARGET)
    print(f'Trainable parameters after LORA: {count_parameters(model)}')
    return model

In [None]:
!nvcc --version

In [None]:
# Check PyTorch CUDA version
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA version: {torch.version.cuda}")
    print(f"Current CUDA device: {torch.cuda.current_device()}")
    print(f"Device name: {torch.cuda.get_device_name()}")


In [None]:
trainer = ez.Trainer(
    task='s2t',
    train_config=finetune_config,
    train_dataset=train_data,
    valid_dataset=valid_data,
    build_model_fn=build_model_fn, # provide the pre-trained model
    # data_info=data_info,
    output_dir=EXP_DIR,
    stats_dir=STATS_DIR,
    ngpu=1,
    device="cuda"
)
trainer.collect_stats()
trainer.train()

In [None]:
DEVICE = "cuda"

model = Speech2TextGreedySearch.from_pretrained(
       FINETUNE_MODEL,
        device='cuda' if torch.cuda.is_available() else 'cpu',
)
# create_lora_adapter(model.s2t_model, target_modules=LORA_TARGET)
model.s2t_model.eval()
# d = torch.load("./exp/finetune/1epoch.pth")
# model.s2t_model.load_state_dict(d)

In [None]:
import pandas as pd
from jiwer import wer

import soundfile as sf

# Create empty lists to store results
utterance_ids = []
original_texts = []
predicted_texts = []

# Process each audio file in the training data
for utt_id, data in valid_data.items():
    audio_path = data['wav']
    speech, rate = sf.read(audio_path)
    
    text = model.decode_long_batched_buffered(
        speech,
        batch_size=16,
        context_len_in_secs=4,
    )
    print(utt_id)
    
    # Store results
    utterance_ids.append(utt_id)
    original_texts.append(data['text'])
    predicted_texts.append(text)

# Create DataFrame
results_df = pd.DataFrame({
    'utterance_id': utterance_ids,
    'original_text': original_texts,
    'predicted_text': predicted_texts
})

# Save results to CSV file
csv_filename = "transcription_results.csv"
results_df.to_csv(csv_filename, index=False)
print(f"\nResults saved to {csv_filename}")


# Calculate WER for each utterance
results_df['wer'] = [wer(orig, pred) for orig, pred in zip(results_df['original_text'], results_df['predicted_text'])]

# Calculate average WER
avg_wer = results_df['wer'].mean()

print(f"Average WER: {avg_wer:.4f}")
print("\nDetailed Results:")
print(results_df)

In [None]:
results_df["predicted_text"] = results_df["predicted_text"] + "."

In [None]:
results_df

In [None]:
print(f"Average WER: {avg_wer:.4f}")
print("\nDetailed Results:")

In [None]:
import difflib

def calculate_wer(ref: str, hyp: str) -> float:
    """Calculate Word Error Rate (WER)."""
    ref_words = ref.strip().split()
    hyp_words = hyp.strip().split()
    sm = difflib.SequenceMatcher(None, ref_words, hyp_words)
    edits = sum(max(i2 - i1, j2 - j1) for tag, i1, i2, j1, j2 in sm.get_opcodes() if tag != 'equal')
    return edits / len(ref_words) if ref_words else 0.0

def calculate_cer(ref: str, hyp: str) -> float:
    """Calculate Character Error Rate (CER)."""
    ref_chars = list(ref.strip())
    hyp_chars = list(hyp.strip())
    sm = difflib.SequenceMatcher(None, ref_chars, hyp_chars)
    edits = sum(max(i2 - i1, j2 - j1) for tag, i1, i2, j1, j2 in sm.get_opcodes() if tag != 'equal')
    return edits / len(ref_chars) if ref_chars else 0.0

def calculate_asr_score(ref: str, hyp: str) -> dict:

    wer = calculate_wer(ref, hyp)
    cer = calculate_cer(ref, hyp)
    combined_error = 0.4 * wer + 0.6 * cer
    score = (1 - combined_error) * 100
    return {'WER': wer, 'CER': cer, 'Score': score}
ref = "the quick brown fox jumps over the lazy dog"
hyp = "the quick brown fox jump over a lazy dog"
result = calculate_asr_score(ref, hyp)

print(f"WER: {result['WER']:.2%}")
print(f"CER: {result['CER']:.2%}")
print(f"Score: {result['Score']:.2f}")

In [None]:
# Calculate metrics for each row
metrics = [calculate_asr_score(orig, pred) for orig, pred in zip(results_df['original_text'], results_df['predicted_text'])]

# Add metrics to dataframe 
results_df['WER'] = [m['WER'] for m in metrics]
results_df['CER'] = [m['CER'] for m in metrics]
results_df['Score'] = [m['Score'] for m in metrics]

# Calculate averages
avg_wer = results_df['WER'].mean()
avg_cer = results_df['CER'].mean() 
avg_score = results_df['Score'].mean()

print(f"Average WER: {avg_wer:.2%}")
print(f"Average CER: {avg_cer:.2%}") 
print(f"Average Score: {avg_score:.2f}")


In [None]:
# Calculate WER for each utterance
results_df['wer'] = [wer(orig, pred) for orig, pred in zip(results_df['original_text'], results_df['predicted_text'])]

# Calculate average WER
avg_wer = results_df['wer'].mean()
print(f"Average WER: {avg_wer:.4f}")
print("\nDetailed Results:")

In [None]:
original_texts

In [None]:
import torch
import os



# Configuration
method = "lora"  # or "full"
model_tag = "espnet/owsm_ctc_v4_1B"  # Using same model as inference
epochs = 3
batch_size = 4
learning_rate = 1e-5

# --- 1. Load the Pre-trained Model ---
print(f"Loading pre-trained model: {model_tag}")
s2t = Speech2TextGreedySearch.from_pretrained(
    model_tag,
    device='cuda' if torch.cuda.is_available() else 'cpu',
    generate_interctc_outputs=False,
    task_sym='<asr>',
)

# --- 2. Apply Fine-Tuning Method ---
if method == "lora":
    print("Applying LoRA adapters to the model for parameter-efficient fine-tuning...")
    s2t.s2t_model = apply_lora_to_model(s2t.s2t_model, rank=8, alpha=16)
    print("LoRA applied. Only adapter weights will be trained.")
else:
    print("Proceeding with FULL fine-tuning. All model weights will be updated.")

# Print trainable parameters info
trainable_params = sum(p.numel() for p in s2t.s2t_model.parameters() if p.requires_grad)
total_params = sum(p.numel() for p in s2t.s2t_model.parameters())
print(f"Fine-tuning Method: {method.upper()}")
print(f"Trainable parameters: {trainable_params:,} (~{trainable_params/total_params:.4%})")
print(f"Total parameters:     {total_params:,}")

# --- 3. Prepare Training Data ---
# Using valid_data from previous cell
train_data = valid_data  # Assuming valid_data contains the training examples

# --- 4. Training Loop ---
output_dir = f"./exp/{method}_finetune_{model_tag.replace('/', '_')}"
os.makedirs(output_dir, exist_ok=True)

optimizer = torch.optim.AdamW(
    [p for p in s2t.s2t_model.parameters() if p.requires_grad],
    lr=learning_rate
)

s2t.s2t_model.train()
print("\nStarting training...")

for epoch in range(epochs):
    total_loss = 0
    for utt_id, data in train_data.items():
        optimizer.zero_grad()
        
        # Load audio
        speech, rate = sf.read(data['wav'])
        speech = torch.tensor(speech).to(s2t.device)
        
        # Get target text
        target_text = data['text']
        
        # Forward pass
        output = s2t.s2t_model(speech)
        loss = output['loss']
        
        # Backward pass
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        
    avg_loss = total_loss / len(train_data)
    print(f"Epoch {epoch+1}/{epochs}, Average Loss: {avg_loss:.4f}")

# Save the fine-tuned model
torch.save({
    'model_state_dict': s2t.s2t_model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
}, f"{output_dir}/model.pt")

print("\nFine-tuning complete!")
print(f"Model saved to: {output_dir}/model.pt")


In [None]:
import torch
import os
import soundfile as sf
from espnet2.bin.asr_inference import Speech2Text

# Configuration
method = "lora"  # or "full"
model_tag = "espnet/owsm_ctc_v4_1B"
epochs = 3
batch_size = 4
learning_rate = 1e-5

# 1. Load the Pre-trained Model
print(f"Loading pre-trained model: {model_tag}")
s2t = Speech2Text.from_pretrained(
    model_tag,
    device='cuda' if torch.cuda.is_available() else 'cpu',
    # Add other necessary arguments as needed
)


In [None]:
import torch
from espnet2.bin.s2t_inference_ctc import Speech2TextGreedySearch

model_tag = "espnet/owsm_ctc_v4_1B"

s2t = Speech2TextGreedySearch.from_pretrained(
    model_tag,
    device='cuda' if torch.cuda.is_available() else 'cpu',
    generate_interctc_outputs=False,
    task_sym='<asr>',
)


In [None]:
type(s2t.s2t_model)

In [None]:

def find_lora_target_modules(model: torch.nn.Module):
    """
    Scans the model and prints the names of all linear layers, which are
    the typical targets for LoRA. This helps users identify which modules
    the `lora_state_dict` function will adapt.

    Args:
        model: The PyTorch model to inspect.
    """
    print("\n--- Finding Potential LoRA Target Modules ---")
    print("These are the names of the `torch.nn.Linear` layers in the model.")
    print("The `lora_state_dict` utility will adapt these by default.\n")
    
    found_targets = []
    for name, module in model.named_modules():
        if isinstance(module, torch.nn.Linear):
            found_targets.append(name)
            
    if found_targets:
        for name in found_targets:
            print(name)
    else:
        print("No `torch.nn.Linear` layers found in the model.")
    
    print("\n---------------------------------------------\n")
    return found_targets

In [None]:
find_lora_target_modules(s2t.s2t_model)

In [None]:
lora_target_modules = [
            "*.attn.linear_q",
            "*.attn.linear_v",
        ]

In [None]:
s2t.s2t_model.enable_lora(rank=8, alpha=16)


create_lora_adapter(
    model: torch.nn.Module,
    rank: int = 8,
    alpha: int = 8,
    dropout_rate: float = 0.0,
    target_modules: List[str] = ["query"],
    bias_type: Optional[str] = "none",
)

In [None]:

# 2. Apply Fine-Tuning Method
if method == "lora":
    print("Enabling LoRA adapters for parameter-efficient fine-tuning...")
    # LoRA is now enabled via config or CLI, not by manual injection.
    # If using CLI: add --use_lora true --lora_rank 8 --lora_alpha 16
    # If using code, check if your ESPnet2 exposes a method like enable_lora()
    if hasattr(s2t.model, "enable_lora"):
        s2t.model.enable_lora(rank=8, alpha=16)
    else:
        raise NotImplementedError("LoRA must be enabled via config or CLI in this ESPnet2 version.")
    print("LoRA enabled. Only adapter weights will be trained.")
else:
    print("Proceeding with FULL fine-tuning. All model weights will be updated.")


In [None]:

# Print trainable parameters info
trainable_params = sum(p.numel() for p in s2t.s2t_model.parameters() if p.requires_grad)
total_params = sum(p.numel() for p in s2t.s2t_model.parameters())
print(f"Fine-tuning Method: {method.upper()}")
print(f"Trainable parameters: {trainable_params:,} (~{trainable_params/total_params:.4%})")
print(f"Total parameters:     {total_params:,}")


In [None]:

# 3. Prepare Training Data
# You must provide 'train_data' as a dict: {utt_id: {'wav': path, 'text': str}}
# Example: train_data = {'utt1': {'wav': 'audio1.wav', 'text': 'hello world'}, ...}

# 4. Training Loop
output_dir = f"./exp/{method}_finetune_{model_tag.replace('/', '_')}"
os.makedirs(output_dir, exist_ok=True)

optimizer = torch.optim.AdamW(
    [p for p in s2t.model.parameters() if p.requires_grad],
    lr=learning_rate
)

s2t.model.train()
print("\nStarting training...")

for epoch in range(epochs):
    total_loss = 0
    for utt_id, data in train_data.items():
        optimizer.zero_grad()
        speech, rate = sf.read(data['wav'])
        speech = torch.tensor(speech).to(s2t.device)
        target_text = data['text']
        # Forward pass (update as per your model's API)
        output = s2t.model(speech)
        loss = output['loss']
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    avg_loss = total_loss / len(train_data)
    print(f"Epoch {epoch+1}/{epochs}, Average Loss: {avg_loss:.4f}")

# Save the fine-tuned model
torch.save({
    'model_state_dict': s2t.model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
}, f"{output_dir}/model.pt")

print("\nFine-tuning complete!")
print(f"Model saved to: {output_dir}/model.pt")
