<a href="https://colab.research.google.com/github/johanskaremo/audio_recognition/blob/main/List.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Live Audio Speech-To-Text
**By Gustaf Larsson and Johan Skäremo**

In [49]:
# Installing Transformer

!pip install transformers
!pip install pydub
!pip install scipy
!pip install datasets --upgrade
!pip install librosa
!pip install soundfile
!pip install datasets[audio]
!pip install jiwer
!apt install git-lfs

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
git-lfs is already the newest version (3.0.2-1ubuntu0.2).
0 upgraded, 0 newly installed, 0 to remove and 45 not upgraded.


In [50]:
import torch
import torchaudio
from torch import nn
from torch.utils.data import DataLoader
from torchaudio import datasets, transforms
from torchaudio.utils import download_asset
from matplotlib import pyplot as plt
import random
from transformers import Wav2Vec2ForCTC, Wav2Vec2Tokenizer, TrainingArguments, Trainer
from transformers import Wav2Vec2CTCTokenizer, Wav2Vec2FeatureExtractor, Wav2Vec2Processor
import ipywidgets as widgets
from IPython import display as disp
from IPython.display import display, Audio, clear_output
from google.colab import output
import base64
from pydub import AudioSegment
import io
import tempfile
import librosa
from scipy.io.wavfile import write
from transformers import pipeline
from datasets import load_dataset, Audio, Dataset, DatasetDict, Features, Value, ClassLabel, Array2D, Sequence, load_metric
import pandas as pd
import re
import json
import os
import soundfile as sf
import numpy as np
import operator
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union
from huggingface_hub import notebook_login

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cpu


In [55]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
#@title CODE FOR RESAMPLING AND CONVERTING THE MP3 FILES, DON'T EXECUTE THESE 2 CELLS
def convert_and_resample(mp3_file_path, output_dir, sample_rate=16000):
    # Load MP3 file
    audio, sr = librosa.load(mp3_file_path, sr=None)  # Load with original sample rate
    # Resample to 16 kHz if necessary
    if sr != sample_rate:
        audio = librosa.resample(audio, orig_sr=sr, target_sr=sample_rate)

    # Save the resampled audio to a WAV file
    output_file_path = os.path.join(output_dir, os.path.basename(mp3_file_path).replace('.mp3', '.wav'))
    sf.write(output_file_path, audio, sample_rate)


In [None]:
#@title CODE FOR RESAMPLING AND CONVERTING THE MP3 FILES, DON'T EXECUTE THESE 2 CELLS
# Define the path to your MP3 files and the output directory for WAV files
mp3_directory = '/content/drive/MyDrive/Datasets/common_voice/clips'
wav_directory = '/content/drive/MyDrive/Datasets/common_voice/clips_wav'
os.makedirs(wav_directory, exist_ok=True)

# Convert and resample all MP3 files in the directory
for filename in os.listdir(mp3_directory):
    if filename.endswith('.mp3'):
        mp3_file_path = os.path.join(mp3_directory, filename)
        convert_and_resample(mp3_file_path, wav_directory)

# Data preperation


In [None]:
pd.set_option('display.max_colwidth', None)
data = load_dataset("csv", data_files='/content/drive/MyDrive/Datasets/common_voice/validated.tsv', delimiter='\t')
data = data.remove_columns(['sentence_domain', 'up_votes', 'down_votes', 'age', 'gender', 'accents', 'variant', 'locale', 'segment','client_id','sentence_id'])

# Prepare our dataset for tokenization and processing

Translating our audio files into an array of values which is interpretable for machines

In [None]:
# ANVÄNDS för att skapa audio dataset som innehåller paths och audio arrays av audiofilerna
# Define the directory containing your WAV files
wav_directory = '/content/drive/MyDrive/Datasets/common_voice/clips_wav'

# List all WAV files in the directory
audio_files = [os.path.join(wav_directory, f) for f in os.listdir(wav_directory) if f.endswith('.wav')]

# Create a dataset from the audio file paths
audio_dataset = Dataset.from_dict({"audio": audio_files})

# Cast the 'audio' column to use the Audio feature
audio_dataset = audio_dataset.cast_column("audio", Audio(sampling_rate=16000))


Next 4 cells is a process to add the information below into the original dataset in the correct order:

*   Paths to the audio files with ".wav" format
*   Arrays of audio values
*   Sentences


In [None]:
# ANVÄNDS för att lägga över paths och arrays i listor för att enkelt kunna arbeta med
# my_list för paths och my_list_arr för audio arrays
my_list = []
my_list_arr = []
for entry in audio_dataset['audio']:
  path = entry['path'].replace('/content/drive/MyDrive/Datasets/common_voice/clips_wav/', "")
  my_list.append(path)
  my_list_arr.append(entry['array'])

In [None]:
#ANVÄNDS för att skapa lister som skall bli sorterade
my_list_sorted = [None] * len(data['train'])
my_list_arr_sorted = [None] * len(data['train'])

In [None]:
#ANVÄNDS för att sortera audio filerna såsom de ligger tillsammans med sentences i orginal dataset
i = 0
for entry_org in data['train']:
    org_path = entry_org['path']
    org_path = org_path.replace(".mp3", ".wav")
    try:
      # check if string is present in list
      index = my_list.index(org_path)
      my_list_sorted[i] = my_list[index]
      my_list_arr_sorted[i] = my_list_arr[index]
      i = i+1
    except ValueError:
        print(f'{org_path} is not present in the list')

my_list_arr_sorted = [np.array(arr, dtype='float32') for arr in my_list_arr_sorted]

In [None]:
# ANVÄNDS för att lägga ihop rätt paths med ".wav" format samt audio arraysen i orginal dataset
data = data.remove_columns(['path'])
data['train'] = data['train'].add_column('path', my_list_sorted)
data['train'] = data['train'].add_column('array', my_list_arr_sorted) #1

In [None]:
#printing
array = np.array(data['train'][0]['array'], dtype='float32')
print("Data type of the array:", array.dtype)
print("Length of the array:", len(array))
print(array)

Data type of the array: float32
Length of the array: 96192
[ 0.0000000e+00  0.0000000e+00  0.0000000e+00 ...  0.0000000e+00
 -3.0517578e-05  0.0000000e+00]


# Prepare sentences for tokenization

Functions for removing characters and creating dictionary

In [None]:
# Removing special characters for easier training
# Works with load_dataset

pattern = r'[^\w\s\']'

def remove_special_characters(batch):
    batch["sentence"] = re.sub(pattern, '', batch["sentence"]).lower()
    return batch

In [None]:
# Extracting all chars for our tokenizer
def extract_all_chars(batch):
  all_text = " ".join(batch["sentence"])
  vocab = list(set(all_text))
  return {"vocab": [vocab], "all_text": [all_text]}

Remove special characters and extract all different chars in "sentences" data.

In [None]:
data = data.map(remove_special_characters)
#Generate our vocabulary for tokens
#Generate one long text from all sentences to extract all characters
vocabs = data.map(extract_all_chars, batched=True, batch_size=-1, keep_in_memory=True, remove_columns=data.column_names["train"])
vocab_list = list(set(vocabs["train"]["vocab"][0]))
vocab_dict = {v: k for k, v in enumerate(vocab_list)}

Map:   0%|          | 0/1877 [00:00<?, ? examples/s]

In [None]:
# deleting white space for a more visible token, its a common approach
vocab_dict["|"] = vocab_dict[" "]
del vocab_dict[" "]

# Adding an unknown token so that the model later can deal with characters not
# in the training set.
# Adding a padding token also, corresponds to CTC's "blank token", it's a
# core coponent
vocab_dict["[UNK]"] = len(vocab_dict)
vocab_dict["[PAD]"] = len(vocab_dict)


In [None]:
print(vocab_dict)

{"'": 0, 'r': 1, 'a': 2, 't': 3, 'h': 4, 'x': 5, 'i': 6, 'g': 7, 'u': 8, 'b': 9, 'k': 11, 'd': 12, 'q': 13, 'f': 14, 'm': 15, 'c': 16, 'p': 17, 'o': 18, 'v': 19, 'j': 20, 's': 21, 'e': 22, 'y': 23, 'z': 24, 'l': 25, 'n': 26, 'w': 27, '|': 10, '[UNK]': 28, '[PAD]': 29}


In [None]:
#save the dictionary
with open('vocab.json', 'w') as vocab_file:
    json.dump(vocab_dict, vocab_file)

Now initiating wav2vec2 tokenizer, feature extractor and processor using our own json-file

In [None]:
tokenizer_fine = Wav2Vec2CTCTokenizer("./vocab.json", unk_token="[UNK]", pad_token="[PAD]", word_delimiter_token="|")
feature_extractor_fine = Wav2Vec2FeatureExtractor(feature_size=1, sampling_rate=16000, padding_value=0.0, do_normalize=True, return_attention_mask=False)
processor_fine = Wav2Vec2Processor(feature_extractor=feature_extractor_fine, tokenizer=tokenizer_fine)

# Fortsättning, att göra:


*   Träna model
*   Ev skriva audio files till fil på ett effektivare sätt



Start processing the data. Processor combines feature extractor and tokenizer

In [None]:
def prepare_dataset(batch):
    #audio = batch["audio"]

    # batched output is "un-batched" to ensure mapping is correct
    batch["input_values"] = processor_fine(batch["array"], sampling_rate=16000).input_values[0]

    with processor_fine.as_target_processor():
        batch["labels"] = processor_fine(batch["sentence"]).input_ids
    return batch

In [None]:
data['train'] = data['train'].map(prepare_dataset, remove_columns=["path"], num_proc=1)


Map:   0%|          | 0/1877 [00:00<?, ? examples/s]



In [None]:
print(data['train'])

Dataset({
    features: ['sentence', 'array', 'input_values', 'labels'],
    num_rows: 1877
})


In [None]:
#This is copied code for data collator
class DataCollatorCTCWithPadding:
    """
    Data collator that will dynamically pad the inputs received.
    Args:
        processor (:class:`~transformers.Wav2Vec2Processor`)
            The processor used for proccessing the data.
        padding (:obj:`bool`, :obj:`str` or :class:`~transformers.tokenization_utils_base.PaddingStrategy`, `optional`, defaults to :obj:`True`):
            Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
            among:
            * :obj:`True` or :obj:`'longest'`: Pad to the longest sequence in the batch (or no padding if only a single
              sequence if provided).
            * :obj:`'max_length'`: Pad to a maximum length specified with the argument :obj:`max_length` or to the
              maximum acceptable input length for the model if that argument is not provided.
            * :obj:`False` or :obj:`'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of
              different lengths).
        max_length (:obj:`int`, `optional`):
            Maximum length of the ``input_values`` of the returned list and optionally padding length (see above).
        max_length_labels (:obj:`int`, `optional`):
            Maximum length of the ``labels`` returned list and optionally padding length (see above).
        pad_to_multiple_of (:obj:`int`, `optional`):
            If set will pad the sequence to a multiple of the provided value.
            This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability >=
            7.5 (Volta).
    """
    def __init__(self, processor, padding=True, max_length=None, max_length_labels=None, pad_to_multiple_of=None, pad_to_multiple_of_labels=None):
        self.processor = processor
        self.padding = padding
        self.max_length = max_length
        self.max_length_labels = max_length_labels
        self.pad_to_multiple_of = pad_to_multiple_of
        self.pad_to_multiple_of_labels = pad_to_multiple_of_labels

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need
        # different padding methods
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features]

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="pt",
        )
        with self.processor.as_target_processor():
            labels_batch = self.processor.pad(
                label_features,
                padding=self.padding,
                max_length=self.max_length_labels,
                pad_to_multiple_of=self.pad_to_multiple_of_labels,
                return_tensors="pt",
            )

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        batch["labels"] = labels

        return batch

In [None]:
data_collator = DataCollatorCTCWithPadding(processor=processor_fine, padding=True)

In [None]:
wer_metric = load_metric("wer")

You can avoid this message in future by passing the argument `trust_remote_code=True`.
Passing `trust_remote_code=True` will be mandatory to load this metric from the next major release of `datasets`.


In [None]:
def compute_metrics(pred):
    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)

    pred.label_ids[pred.label_ids == -100] = processor.tokenizer.pad_token_id

    pred_str = processor.batch_decode(pred_ids)
    # we do not want to group tokens when computing the metrics
    label_str = processor.batch_decode(pred.label_ids, group_tokens=False)

    wer = wer_metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}

In [None]:
# Importing Wav2Vec pretrained model and orignial tokenizer
#tokenizer = Wav2Vec2Tokenizer.from_pretrained("facebook/wav2vec2-base-960h")
model = Wav2Vec2ForCTC.from_pretrained(
    "facebook/wav2vec2-base-960h",
    ctc_loss_reduction="mean",
    pad_token_id=processor_fine.tokenizer.pad_token_id,
    )

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/1.60k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/378M [00:00<?, ?B/s]

Some weights of the model checkpoint at facebook/wav2vec2-base-960h were not used when initializing Wav2Vec2ForCTC: ['wav2vec2.encoder.pos_conv_embed.conv.weight_g', 'wav2vec2.encoder.pos_conv_embed.conv.weight_v']
- This IS expected if you are initializing Wav2Vec2ForCTC from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing Wav2Vec2ForCTC from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of Wav2Vec2ForCTC were not initialized from the model checkpoint at facebook/wav2vec2-base-960h and are newly initialized: ['wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original1', 'wav2vec2.masked_spec_embed']
You sho

In [None]:
model.freeze_feature_extractor()



Training the model

In [None]:
training_args = TrainingArguments(
  output_dir=repo_name,
  group_by_length=True,
  per_device_train_batch_size=32,
  evaluation_strategy="steps",
  num_train_epochs=30,
  fp16=True,
  gradient_checkpointing=True,
  save_steps=500,
  eval_steps=500,
  logging_steps=500,
  learning_rate=1e-4,
  weight_decay=0.005,
  warmup_steps=1000,
  save_total_limit=2,
)

NameError: name 'repo_name' is not defined

Code for recording our voice in google colab


In [None]:
#
# original code: https://github.com/magenta/ddsp/blob/master/ddsp/colab/colab_utils.py
#


# Copyright 2020 The DDSP Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

def record_audio(seconds=3,
                 sample_rate=44100,
                 normalize_db=0.1):
    """Record audio from the browser in colab using javascript.
    Based on: https://gist.github.com/korakot/c21c3476c024ad6d56d5f48b0bca92be
    Args:
      seconds: Number of seconds to record.
      sample_rate: Resample recorded audio to this sample rate.
      normalize_db: Normalize the audio to this many decibels. Set to None to skip
        normalization step.
    Returns:
      An array of the recorded audio at sample_rate.
    """
    # Use Javascript to record audio.
    record_js_code = """
      const sleep  = time => new Promise(resolve => setTimeout(resolve, time))
      const b2text = blob => new Promise(resolve => {
        const reader = new FileReader()
        reader.onloadend = e => resolve(e.srcElement.result)
        reader.readAsDataURL(blob)
      })
      var record = time => new Promise(async resolve => {
        stream = await navigator.mediaDevices.getUserMedia({ audio: true })
        recorder = new MediaRecorder(stream)
        chunks = []
        recorder.ondataavailable = e => chunks.push(e.data)
        recorder.start()
        await sleep(time)
        recorder.onstop = async ()=>{
          blob = new Blob(chunks)
          text = await b2text(blob)
          resolve(text)
        }
        recorder.stop()
      })
      """
    print('Starting recording for {} seconds...'.format(seconds))
    display(disp.Javascript(record_js_code))
    audio_string = output.eval_js('record(%d)' % (seconds * 1000.0))
    print('Finished recording!')
    audio_bytes = base64.b64decode(audio_string.split(',')[1])
    return audio_bytes_to_np(audio_bytes,
                             sample_rate=sample_rate,
                             normalize_db=normalize_db)

def audio_bytes_to_np(wav_data,
                      sample_rate=44100,
                      normalize_db=0.1):
    """Convert audio file data (in bytes) into a numpy array.
    Saves to a tempfile and loads with librosa.
    Args:
      wav_data: A byte stream of audio data.
      sample_rate: Resample recorded audio to this sample rate.
      normalize_db: Normalize the audio to this many decibels. Set to None to skip
        normalization step.
    Returns:
      An array of the recorded audio at sample_rate.
    """
    # Parse and normalize the audio.
    audio = AudioSegment.from_file(io.BytesIO(wav_data))
    audio.remove_dc_offset()
    if normalize_db is not None:
        audio.normalize(headroom=normalize_db)
    # Save to tempfile and load with librosa.
    with tempfile.NamedTemporaryFile(suffix='.wav') as temp_wav_file:
        fname = temp_wav_file.name
        audio.export(fname, format='wav')
        audio_np, unused_sr = librosa.load(fname, sr=sample_rate)
    return audio_np

In [None]:
#@title Record
#@markdown * Set recording time:

SAMPLE_RATE = 16000
record_seconds =   5#@param {type:"number", min:1, max:10, step:1}

def _record_audio(b):
  global audio
  clear_output()
  audio = record_audio(record_seconds, sample_rate=SAMPLE_RATE)
  display(Audio(audio, rate=SAMPLE_RATE))

button = widgets.Button(description="Start recording...")
button.on_click(_record_audio)
display(button)

Button(description='Start recording...', style=ButtonStyle())

In [None]:
write('output.wav', 16000, audio)

In [None]:
audio_in, rate = librosa.load("output.wav", sr = 16000)

In [None]:
# Taking an input value
input_values = tokenizer(audio_in, return_tensors = "pt").input_values
# Storing logits (non-normalized prediction values)
logits = model(input_values).logits
# Storing predicted ids
prediction = torch.argmax(logits, dim = -1)
# Passing the prediction to the tokenizer decode to get the transcription
transcription = tokenizer.batch_decode(prediction)[0]

print(transcription)

tensor([[-0.0005, -0.0005, -0.0005,  ...,  0.0016,  0.0011,  0.0004]])
CAN I PET THAT DOG


In [None]:
def correct_grammar(text):
    result = nlp(text)
    return result[0]['generated_text']

In [None]:
# Load a pre-trained model for grammar correction
nlp = pipeline("text2text-generation", model="prithivida/grammar_error_correcter_v1")



config.json:   0%|          | 0.00/1.39k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/892M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/1.89k [00:00<?, ?B/s]

spiece.model:   0%|          | 0.00/792k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.39M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/1.79k [00:00<?, ?B/s]

In [None]:
corrected_transcription = correct_grammar(transcription)
print(corrected_transcription)



CAN I PET THAT DOG?
