<a href="https://colab.research.google.com/github/mohanravinibav/golang-gorm-postgres/blob/master/models_and_scripts/whisper_tflite_model_generation_and_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##Install TensorFlow, Tranformers and datasets

In [None]:
!pip install tensorflow==2.14.0
!pip install transformers
!pip install datasets

Collecting tensorflow==2.14.0
  Downloading tensorflow-2.14.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.1 kB)
Collecting ml-dtypes==0.2.0 (from tensorflow==2.14.0)
  Downloading ml_dtypes-0.2.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (20 kB)
Collecting protobuf!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5,<5.0.0dev,>=3.20.3 (from tensorflow==2.14.0)
  Downloading protobuf-4.25.7-cp37-abi3-manylinux2014_x86_64.whl.metadata (541 bytes)
Collecting wrapt<1.15,>=1.11.0 (from tensorflow==2.14.0)
  Downloading wrapt-1.14.1-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.7 kB)
Collecting tensorboard<2.15,>=2.14 (from tensorflow==2.14.0)
  Downloading tensorboard-2.14.1-py3-none-any.whl.metadata (1.7 kB)
Collecting tensorflow-estimator<2.15,>=2.14.0 (from tensorflow==2.14.0)
  Downloading tensorflow_estimator-2.14.0-py2.py3-none-any.whl.metadata (1.3 kB)
Collecting kera

## Configure model to be generated as per requirement

In [None]:
import requests
import json

######## Set the model as per requirement
model_name = "whisper-base"          # whisper-tiny, whisper-tiny.en, whisper-base, whisper-base.en, whisper-small, whisper-small.en

######## Set the language, task, and options as per requirement
language_code = "<|en|>"             # <|en|>, <|fr|>, <|hi|>, <|ko|>, <|de|>, <|zh|>, <|ja|>, <|es|>, <|ar|>, <|ru|>, ...
task_code     = "<|transcribe|>"     # <|transcribe|>, <|translate|>
option_code   = "<|notimestamps|>"   # <|notimestamps|>, <|nocaptions|>

# URL of the JSON file which stores the code mappings
url = "https://huggingface.co/openai/whisper-large/resolve/main/added_tokens.json"

# Send a GET request to download the file
response = requests.get(url)

# Check if the request was successful
if response.status_code == 200:
    # Parse the JSON content
    code_mappings = response.json()
else:
    print(f"Failed to download the file. Status code: {response.status_code}")
    code_mappings = {}

# Construct forced_decoder_ids using the mappings
forced_decoder_ids = [
    [1, code_mappings[language_code]],
    [2, code_mappings[task_code]],
    [3, code_mappings[option_code]]
]

print(forced_decoder_ids)

[[1, 50259], [2, 50359], [3, 50363]]


##Import the libraries, load the model, do the inference

In [None]:
import tensorflow as tf
import transformers
import datasets

from datasets import load_dataset
from transformers import WhisperProcessor, WhisperFeatureExtractor, TFWhisperForConditionalGeneration, WhisperTokenizer

pretrained_model = f"openai/{model_name}"
tflite_model_path = f"{model_name}.tflite"
saved_model_dir = f"tf_{model_name}_saved"

feature_extractor = WhisperFeatureExtractor.from_pretrained(pretrained_model)
tokenizer = WhisperTokenizer.from_pretrained(pretrained_model, predict_timestamps=True)
processor = WhisperProcessor(feature_extractor, tokenizer)
model = TFWhisperForConditionalGeneration.from_pretrained(pretrained_model)

# Loading dataset
ds = load_dataset("hf-internal-testing/librispeech_asr_dummy", "clean", split="validation")
inputs = feature_extractor(ds[0]["audio"]["array"], sampling_rate=ds[0]["audio"]["sampling_rate"], return_tensors="tf")
input_features = inputs.input_features

# Generating Transcription
generated_ids = model.generate(input_features=input_features)
print(generated_ids)

transcription = processor.tokenizer.decode(generated_ids[0])
print(transcription)

# Save the model
# model.save(saved_model_dir) # not need to save here, saving using tf.saved_model.save() call

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


preprocessor_config.json:   0%|          | 0.00/185k [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/283k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/836k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/2.48M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/494k [00:00<?, ?B/s]

normalizer.json:   0%|          | 0.00/52.7k [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/34.6k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/2.19k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.98k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/290M [00:00<?, ?B/s]

All PyTorch model weights were used when initializing TFWhisperForConditionalGeneration.

All the weights of TFWhisperForConditionalGeneration were initialized from the PyTorch model.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFWhisperForConditionalGeneration for predictions without further training.


README.md:   0%|          | 0.00/520 [00:00<?, ?B/s]

validation-00000-of-00001.parquet:   0%|          | 0.00/9.19M [00:00<?, ?B/s]

Generating validation split:   0%|          | 0/73 [00:00<?, ? examples/s]

tf.Tensor(
[[50258 50259 50359 50363  2221    13  2326   388   391   307   264 50244
    295   264  2808  5359    11   293   321   366  5404   281  2928   702
  14943    13 50257]], shape=(1, 27), dtype=int32)
<|startoftranscript|><|en|><|transcribe|><|notimestamps|> Mr. Quilter is the apostle of the middle classes, and we are glad to welcome his gospel.<|endoftext|>


## Prompt fix, patch to make forced_decoder_ids work

In [None]:
import tensorflow as tf
import numpy as np
from transformers import TFForceTokensLogitsProcessor, TFLogitsProcessor
from typing import List, Optional, Union, Any

# Patching methods of class TFForceTokensLogitsProcessor(TFLogitsProcessor):

def my__init__(self, force_token_map: List[List[int]]):
    force_token_map = dict(force_token_map)
    # Converts the dictionary of format {index: token} containing the tokens to be forced to an array, where the
    # index of the array corresponds to the index of the token to be forced, for XLA compatibility.
    # Indexes without forced tokens will have an negative value.
    force_token_array = np.ones((max(force_token_map.keys()) + 1), dtype=np.int32) * -1
    for index, token in force_token_map.items():
        if token is not None:
            force_token_array[index] = token
    self.force_token_array = tf.convert_to_tensor(force_token_array, dtype=tf.int32)

def my__call__(self, input_ids: tf.Tensor, scores: tf.Tensor, cur_len: int) -> tf.Tensor:
    def _force_token(generation_idx):
        batch_size = scores.shape[0]
        current_token = self.force_token_array[generation_idx]

        # Original code below generates NaN values when the model is exported to tflite
        # it just needs to be a negative number so that the forced token's value of 0 is the largest
        # so it will get chosen
        #new_scores = tf.ones_like(scores, dtype=scores.dtype) * -float("inf")
        new_scores = tf.ones_like(scores, dtype=scores.dtype) * -float(1)
        indices = tf.stack((tf.range(batch_size), tf.tile([current_token], [batch_size])), axis=1)
        updates = tf.zeros((batch_size,), dtype=scores.dtype)
        new_scores = tf.tensor_scatter_nd_update(new_scores, indices, updates)
        return new_scores

    scores = tf.cond(
        tf.greater_equal(cur_len, tf.shape(self.force_token_array)[0]),
        # If the current length is geq than the length of force_token_array, the processor does nothing.
        lambda: tf.identity(scores),
        # Otherwise, it may force a certain token.
        lambda: tf.cond(
            tf.greater_equal(self.force_token_array[cur_len], 0),
            # Only valid (positive) tokens are forced
            lambda: _force_token(cur_len),
            # Otherwise, the processor does nothing.
            lambda: scores,
        ),
    )
    return scores

TFForceTokensLogitsProcessor.__init__ = my__init__
TFForceTokensLogitsProcessor.__call__ = my__call__

##Define a model with a serving signature and save it in TF SavedModel format.















In [None]:
class GenerateModel(tf.Module):
  def __init__(self, model):
    super(GenerateModel, self).__init__()
    self.model = model

  @tf.function(
    # shouldn't need static batch size, but throws exception without it (needs to be fixed)
    input_signature=[
      tf.TensorSpec((1, 80, 3000), tf.float32, name="input_features"),
    ],
  )
  def serving(self, input_features):
    outputs = self.model.generate(
      input_features,
      # change below if you think your output will be bigger
      # aka if you have bigger transcriptions
      # you can make it 200 for example
      max_new_tokens=448,
      return_dict_in_generate=True,
      forced_decoder_ids=forced_decoder_ids,
    )
    return {"sequences": outputs["sequences"]}

generate_model = GenerateModel(model=model)
tf.saved_model.save(generate_model, saved_model_dir, signatures={"serving_default": generate_model.serving})

## Convert the model from TF SavedModel format to TF lite

In [None]:
# Convert the model
converter = tf.lite.TFLiteConverter.from_saved_model(saved_model_dir)
converter.target_spec.supported_ops = [
  tf.lite.OpsSet.TFLITE_BUILTINS, # enable TensorFlow Lite ops.
  tf.lite.OpsSet.SELECT_TF_OPS # enable TensorFlow ops.
]

# Learn about post training quantization
# https://www.tensorflow.org/lite/performance/post_training_quantization

# Dynamic range quantization which reduces the size of the model to 25%
converter.optimizations = [tf.lite.Optimize.DEFAULT]

# Float16 quantization reduces the size to 50%
# converter.target_spec.supported_types = [tf.float16]

tflite_model = converter.convert()

# Save the model
with open(tflite_model_path, 'wb') as f:
    f.write(tflite_model)

##Test tflite model using TFLite Interpreter. Check transcription for dataset



In [None]:
# loaded model... now with generate!
interpreter = tf.lite.Interpreter(tflite_model_path)

tflite_generate = interpreter.get_signature_runner()
generated_ids = tflite_generate(input_features=input_features)["sequences"]
# print(generated_ids)

transcription = processor.batch_decode(generated_ids, skip_special_tokens=False)[0]
print(transcription)

<|startoftranscript|><|en|><|transcribe|><|notimestamps|> Mr. Quilter is the apostle of the middle classes, and we are glad to welcome his gospel.<|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endofte

## Install faster-whisper for audio processing and testing model

In [None]:
!git clone https://github.com/SYSTRAN/faster-whisper.git
!pip install faster-whisper

Cloning into 'faster-whisper'...
remote: Enumerating objects: 1105, done.[K
remote: Counting objects: 100% (501/501), done.[K
remote: Compressing objects: 100% (222/222), done.[K
remote: Total 1105 (delta 371), reused 279 (delta 279), pack-reused 604 (from 3)[K
Receiving objects: 100% (1105/1105), 38.07 MiB | 25.15 MiB/s, done.
Resolving deltas: 100% (697/697), done.
Collecting faster-whisper
  Downloading faster_whisper-1.1.1-py3-none-any.whl.metadata (16 kB)
Collecting ctranslate2<5,>=4.0 (from faster-whisper)
  Downloading ctranslate2-4.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (10 kB)
Collecting onnxruntime<2,>=1.14 (from faster-whisper)
  Downloading onnxruntime-1.20.1-cp310-cp310-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (4.5 kB)
Collecting av>=11 (from faster-whisper)
  Downloading av-14.0.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.5 kB)
Collecting coloredlogs (from onnxruntime<2,>=1.14->faster-whispe

## Test all audio files in loop

In [None]:
import os
import tensorflow as tf
from transformers import WhisperProcessor, WhisperFeatureExtractor
from faster_whisper import decode_audio

# Set up paths and model (whisper-tiny, whisper-tiny.en, whisper-base, whisper-base.en, whisper-small, whisper-small.en)
# model_name = "whisper-base.en"
# pretrained_model = f"openai/{model_name}"
# tflite_model_path = f"{model_name}.tflite"

######## NOTE: Specify the folder containing audio files
!git clone https://github.com/vilassn/audio_samples.git
audio_folder_path = 'audio_samples/en'
#audio_folder_path = '/content/drive/MyDrive/Colab Notebooks/audio'

feature_extractor = WhisperFeatureExtractor.from_pretrained(pretrained_model)
tokenizer = WhisperTokenizer.from_pretrained(pretrained_model, predict_timestamps=True)
processor = WhisperProcessor(feature_extractor, tokenizer)

interpreter = tf.lite.Interpreter(tflite_model_path)
tflite_generate = interpreter.get_signature_runner()

# Number of iterations you want the loop to run
iterations = 1000

for i in range(1, iterations + 1):  # Start from 1 to print iteration number
    print(f"Iteration {i}.......................................................\n")  # Print iteration number and newline

    # Loop through all files in the folder
    for audio_file_name in os.listdir(audio_folder_path):
        audio_file_path = os.path.join(audio_folder_path, audio_file_name)

        if audio_file_name.endswith('.wav'):  # Process only .wav files
            print(f"Processing {audio_file_name}...")

            # Preprocess the audio file
            input_audio = decode_audio(audio_file_path, sampling_rate=16000)
            input_features = feature_extractor(input_audio, sampling_rate=16000, return_tensors="tf").input_features

            # Run the model
            generated_ids = tflite_generate(input_features=input_features)["sequences"]

            # Decode and print transcription
            transcription = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
            print(f"{transcription}\n")  # Add newline after each transcription

Cloning into 'audio_samples'...
remote: Enumerating objects: 20, done.[K
remote: Counting objects:   5% (1/20)[Kremote: Counting objects:  10% (2/20)[Kremote: Counting objects:  15% (3/20)[Kremote: Counting objects:  20% (4/20)[Kremote: Counting objects:  25% (5/20)[Kremote: Counting objects:  30% (6/20)[Kremote: Counting objects:  35% (7/20)[Kremote: Counting objects:  40% (8/20)[Kremote: Counting objects:  45% (9/20)[Kremote: Counting objects:  50% (10/20)[Kremote: Counting objects:  55% (11/20)[Kremote: Counting objects:  60% (12/20)[Kremote: Counting objects:  65% (13/20)[Kremote: Counting objects:  70% (14/20)[Kremote: Counting objects:  75% (15/20)[Kremote: Counting objects:  80% (16/20)[Kremote: Counting objects:  85% (17/20)[Kremote: Counting objects:  90% (18/20)[Kremote: Counting objects:  95% (19/20)[Kremote: Counting objects: 100% (20/20)[Kremote: Counting objects: 100% (20/20), done.[K
remote: Compressing objects: 100% (17/17), don