In [None]:
from datasets import load_dataset, load_metric, Audio
import evaluate

In [None]:
dataset=load_dataset("mozilla-foundation/common_voice_13_0", "en", split="train[:20000]", trust_remote_code=True)

In [None]:
import noisereduce as nr
import librosa
import numpy as np

def reduce_noise(audio_file):
    y, sr = librosa.load(audio_file, sr=None)
    reduced_audio = nr.reduce_noise(y=y, sr=sr)
    return reduced_audio, sr

In [None]:
import scipy.signal
import librosa.display
import matplotlib.pyplot as plt

def spectral_noise_reduction(y, sr):
    D = librosa.stft(y)
    D_magnitude, D_phase = librosa.magphase(D)
    D_denoised = np.maximum(D_magnitude - 0.02, 0)
    y_denoised = librosa.istft(D_denoised * D_phase)
    return y_denoised

In [None]:
import soundfile as sf

def clean_audio(sample):
    y, sr = librosa.load(sample["path"], sr=None)
    y_clean = nr.reduce_noise(y=y, sr=sr)
    output_path = sample["path"].replace(".wav", "_clean.wav")
    sf.write(output_path, y_clean, sr)
    sample["clean_path"] = output_path
    return sample

dataset = dataset.map(clean_audio)

In [None]:
dataset.column_names

In [None]:
dataset[0]

In [None]:
split_dataset = dataset.train_test_split(test_size=0.2, seed=42)
common_voice_train = split_dataset["train"]
common_voice_test = split_dataset["test"]

In [None]:
len(common_voice_train)

In [None]:
len(common_voice_test)

In [None]:
common_voice_train = common_voice_train.remove_columns(["accent", "age", "client_id", "down_votes", "gender", "locale", "segment", "up_votes"])
common_voice_test = common_voice_test.remove_columns(["accent", "age", "client_id", "down_votes", "gender", "locale", "segment", "up_votes"])

In [None]:
from datasets import ClassLabel
import random
import pandas as pd
from IPython.display import display, HTML

def show_random_elements(dataset, num_examples=10):
    assert num_examples <= len(dataset), "Can't pick more elements than there are in the dataset."
    picks = []
    for _ in range(num_examples):
        pick = random.randint(0, len(dataset)-1)
        while pick in picks:
            pick = random.randint(0, len(dataset)-1)
        picks.append(pick)
    
    df = pd.DataFrame(dataset[picks])
    display(HTML(df.to_html()))

In [None]:
show_random_elements(common_voice_train.remove_columns(["path", "audio"]), num_examples=10)

In [None]:
import re
chars_to_remove_regex = '[\,\?\.\!\-\;\:\"\“\%\‘\”\�\']'

def remove_special_characters(batch):
    batch["sentence"] = re.sub(chars_to_remove_regex, '', batch["sentence"]).lower()
    return batch

In [None]:
common_voice_train = common_voice_train.map(remove_special_characters)
common_voice_test = common_voice_test.map(remove_special_characters)

In [None]:
def extract_all_chars(batch):
    all_text = " ".join(batch["sentence"])
    vocab = list(set(all_text))
    return {"vocab": [vocab], "all_text": [all_text]}

In [None]:
vocab_train = common_voice_train.map(extract_all_chars, batched=True, batch_size=-1, keep_in_memory=False, remove_columns=common_voice_train.column_names)
vocab_test = common_voice_test.map(extract_all_chars, batched=True, batch_size=-1, keep_in_memory=False, remove_columns=common_voice_test.column_names)

In [None]:
vocab_list = list(set(vocab_train["vocab"][0]) | set(vocab_test["vocab"][0]))

In [None]:
vocab_dict = {v: k for k, v in enumerate(sorted(vocab_list))}
vocab_dict

In [None]:
vocab_dict["|"] = vocab_dict[" "]
del vocab_dict[" "]

In [None]:
vocab_dict["[UNK]"] = len(vocab_dict)
vocab_dict["[PAD]"] = len(vocab_dict)
len(vocab_dict)

In [None]:
import json
with open('vocab.json', 'w') as vocab_file:
    json.dump(vocab_dict, vocab_file)

In [None]:
from transformers import Wav2Vec2CTCTokenizer

tokenizer = Wav2Vec2CTCTokenizer.from_pretrained("./", unk_token="[UNK]", pad_token="[PAD]", word_delimiter_token="|")

In [None]:
from transformers import Wav2Vec2FeatureExtractor

feature_extractor = Wav2Vec2FeatureExtractor(feature_size=1, sampling_rate=16000, padding_value=0.0, do_normalize=True, return_attention_mask=True)

In [None]:
from transformers import Wav2Vec2Processor

processor = Wav2Vec2Processor(feature_extractor=feature_extractor, tokenizer=tokenizer)

In [None]:
common_voice_train = common_voice_train.cast_column("audio", Audio(sampling_rate=16_000))
common_voice_test = common_voice_test.cast_column("audio", Audio(sampling_rate=16_000))

In [None]:
import IPython.display as ipd
import numpy as np
import random

rand_int = random.randint(0, len(common_voice_train)-1)

print(common_voice_train[rand_int]["sentence"])
ipd.Audio(data=common_voice_train[rand_int]["audio"]["array"], autoplay=True, rate=16000)

In [None]:
rand_int = random.randint(0, len(common_voice_train)-1)

print("Target text:", common_voice_train[rand_int]["sentence"])
print("Input array shape:", common_voice_train[rand_int]["audio"]["array"].shape)
print("Sampling rate:", common_voice_train[rand_int]["audio"]["sampling_rate"])

In [None]:
def prepare_dataset(batch):
    audio = batch["audio"]

    # batched output is "un-batched"
    batch["input_values"] = processor(audio["array"], sampling_rate=audio["sampling_rate"]).input_values[0]
    batch["input_length"] = len(batch["input_values"])
    
    with processor.as_target_processor():
        batch["labels"] = processor(batch["sentence"]).input_ids
    return batch

In [None]:
common_voice_train=common_voice_train.select(range(16000))
common_voice_test=common_voice_test.select(range(4000))

In [None]:
common_voice_train = common_voice_train.map(prepare_dataset, remove_columns=common_voice_train.column_names)
common_voice_test = common_voice_test.map(prepare_dataset, remove_columns=common_voice_test.column_names)

In [None]:
max_input_length_in_sec = 5.0
common_voice_train = common_voice_train.filter(lambda x: x < max_input_length_in_sec * processor.feature_extractor.sampling_rate, input_columns=["input_length"])

In [None]:
import torch

from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union

In [None]:
@dataclass
class DataCollatorCTCWithPadding:
    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True
    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features]

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            return_tensors="pt",
        )
        
        with self.processor.as_target_processor():
            labels_batch = self.processor.pad(
                label_features,
                padding=self.padding,
                return_tensors="pt",
            )

        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)
        batch["labels"] = labels
        return batch

In [None]:
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)

In [None]:
from datasets import load_metric

In [None]:
import jiwer

In [None]:
wer_metric = load_metric("wer",trust_remote_code=True)

In [None]:
def compute_metrics(pred):
    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)

    pred.label_ids[pred.label_ids == -100] = processor.tokenizer.pad_token_id

    pred_str = processor.batch_decode(pred_ids)
    
    label_str = processor.batch_decode(pred.label_ids, group_tokens=False)

    wer = wer_metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}

In [None]:
#old model

from transformers import Wav2Vec2ForCTC

model = Wav2Vec2ForCTC.from_pretrained(
    "facebook/wav2vec2-xls-r-300m", 
    attention_dropout=0.1,
    hidden_dropout=0.1,
    feat_proj_dropout=0.1,
    mask_time_prob=0.15,
    layerdrop=0.1,
    ctc_loss_reduction="mean", 
    pad_token_id=processor.tokenizer.pad_token_id,
    vocab_size=len(processor.tokenizer),
)

In [None]:
#new model
from transformers import Wav2Vec2ForCTC
model = Wav2Vec2ForCTC.from_pretrained(
    "facebook/wav2vec2-xls-r-300m", 
    attention_dropout=0.1,
    hidden_dropout=0.1,
    feat_proj_dropout=0.1, 
    mask_time_prob=0.05,
    layerdrop=0.05,
    ctc_loss_reduction="mean", 
    pad_token_id=processor.tokenizer.pad_token_id,
    vocab_size=len(processor.tokenizer),
)

In [None]:
model.freeze_feature_extractor()

In [None]:
#oldtrainargs

from transformers import TrainingArguments

training_args = TrainingArguments(
  output_dir="./",
  group_by_length=True,
  per_device_train_batch_size=16,
  gradient_accumulation_steps=2,
  eval_strategy="epoch",
  num_train_epochs=50,
  gradient_checkpointing=True,
  fp16=True,
  save_steps=1000,
  eval_steps=1000,
  logging_steps=500,
  learning_rate=3e-4,
  warmup_ratio=0.01,
  save_total_limit=2,
  push_to_hub=False,
  optim="adamw_bnb_8bit",
  report_to="none"
)


In [None]:
from transformers import TrainingArguments

training_args = TrainingArguments(
  output_dir="./",
  group_by_length=True,
  per_device_train_batch_size=16,
  gradient_accumulation_steps=8,
  eval_strategy="epoch",
  num_train_epochs=10,
  gradient_checkpointing=True,
  fp16=True,
  save_steps=100,
  eval_steps=100,
  logging_steps=100,
  learning_rate=3e-5,
  warmup_ratio=0.1,
  save_total_limit=2,
  push_to_hub=False,
  optim="adamw_hf",
)


In [None]:
from transformers import Trainer

trainer = Trainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=common_voice_train,
    eval_dataset=common_voice_test,
    tokenizer=processor.feature_extractor,
)

In [None]:
trainer.train()

In [None]:
save_directory = "./asr_saved_model_en_v3"
model.save_pretrained(save_directory)
processor.save_pretrained(save_directory)
print(f"Model and processor saved to {save_directory}")

In [None]:
import torch
from transformers import Wav2Vec2ForCTC
from torchviz import make_dot

model = Wav2Vec2ForCTC.from_pretrained("asr_saved_model_50000v2")
dummy_input = torch.randn(1, 16000, requires_grad=True)
output = model(dummy_input)
make_dot(output.logits, params=dict(model.named_parameters())).render("wav2vec2_model", format="png", view=True)


In [None]:
import torch
from transformers import Wav2Vec2ForCTC
from torchviz import make_dot

model = Wav2Vec2ForCTC.from_pretrained("asr_saved_model_50000v2")
print(model)

In [None]:
! pip install sounddevice

In [1]:
#load model
from transformers import Wav2Vec2ForCTC
from transformers import Wav2Vec2Processor
import torch

model = Wav2Vec2ForCTC.from_pretrained("asr_saved_model_en_v3")
processor = Wav2Vec2Processor.from_pretrained("asr_saved_model_en_v3")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval()


Wav2Vec2ForCTC(
  (wav2vec2): Wav2Vec2Model(
    (feature_extractor): Wav2Vec2FeatureEncoder(
      (conv_layers): ModuleList(
        (0): Wav2Vec2LayerNormConvLayer(
          (conv): Conv1d(1, 512, kernel_size=(10,), stride=(5,))
          (layer_norm): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
          (activation): GELUActivation()
        )
        (1-4): 4 x Wav2Vec2LayerNormConvLayer(
          (conv): Conv1d(512, 512, kernel_size=(3,), stride=(2,))
          (layer_norm): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
          (activation): GELUActivation()
        )
        (5-6): 2 x Wav2Vec2LayerNormConvLayer(
          (conv): Conv1d(512, 512, kernel_size=(2,), stride=(2,))
          (layer_norm): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
          (activation): GELUActivation()
        )
      )
    )
    (feature_projection): Wav2Vec2FeatureProjection(
      (layer_norm): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
      (projec

In [2]:
import sounddevice as sd
import queue
import numpy as np

# Queue to hold audio chunks
q = queue.Queue()

# Callback to continuously fetch audio
def callback(indata, frames, time, status):
    if status:
        print(status)
    q.put(indata.copy())

# Stream setup
samplerate = 16000  # Make sure it matches your ASR model's expected input
blocksize = 8000    # You can tune this for latency/performance tradeoff

# Start streaming
stream = sd.InputStream(callback=callback, channels=1, samplerate=samplerate, blocksize=blocksize)
stream.start()

In [3]:
audio_buffer = []

In [None]:
print("Listening... (Press Ctrl+C to stop)")
try:
    while True:
        chunk = q.get()
        audio_buffer.append(chunk)

        # Combine and flatten audio chunks
        if len(audio_buffer) < 10:  # You can adjust this threshold
            audio_data = np.concatenate(audio_buffer, axis=0).flatten().astype(np.float32)
            audio_buffer = []  # Reset buffer

            # Normalize and convert to torch tensor
            input_values = processor(audio_data, sampling_rate=samplerate, return_tensors="pt").input_values.to(device)

            # Predict
            with torch.no_grad():
                logits = model(input_values).logits
            predicted_ids = torch.argmax(logits, dim=-1)

            # Decode
            transcription = processor.decode(predicted_ids[0])
            print("Recognized:", transcription)

except KeyboardInterrupt:
    print("Stopped.")
    stream.stop()
    stream.close()

Listening... (Press Ctrl+C to stop)
Recognized: 


In [None]:
def transcribe_audio(file_path):
    audio_input, sr = librosa.load(file_path, sr=16000)
    min_length = 16000
    if len(audio_input) < min_length:
        audio_input = np.pad(audio_input, (0, min_length - len(audio_input)), mode='constant')
    inputs = processor(audio_input, sampling_rate=16000, return_tensors="pt")
    input_values = inputs.input_values.to(device)
    with torch.no_grad():
        logits = model(input_values).logits
    predicted_ids = torch.argmax(logits, dim=-1)
    transcription = processor.decode(predicted_ids[0])
    return transcription

In [None]:
import librosa
sample_audio_path = "C:\\Users\\ruebe\\OneDrive\\Desktop\\Project-2\\Audio\\common_voice_en_10891.mp3"
try:
    transcription = transcribe_audio(sample_audio_path)
    print("Transcription:", transcription)
except Exception as e:
    print("Error during inference (check your audio file path):", e)