In [None]:
!pip install numpy==1.25.0 transformers==4.41.0
!pip install tensorflow==2.14.0

Collecting tensorflow==2.14.0
  Using cached tensorflow-2.14.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.1 kB)
Collecting ml-dtypes==0.2.0 (from tensorflow==2.14.0)
  Using cached ml_dtypes-0.2.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (20 kB)
Collecting protobuf!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5,<5.0.0dev,>=3.20.3 (from tensorflow==2.14.0)
  Using cached protobuf-4.25.8-cp37-abi3-manylinux2014_x86_64.whl.metadata (541 bytes)
Collecting wrapt<1.15,>=1.11.0 (from tensorflow==2.14.0)
  Using cached wrapt-1.14.1-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.7 kB)
Collecting tensorboard<2.15,>=2.14 (from tensorflow==2.14.0)
  Using cached tensorboard-2.14.1-py3-none-any.whl.metadata (1.7 kB)
Collecting tensorflow-estimator<2.15,>=2.14.0 (from tensorflow==2.14.0)
  Using cached tensorflow_estimator-2.14.0-py2.py3-none-any.whl.metadata (1.3 kB)
Collectin

In [None]:
from transformers import TFWhisperForConditionalGeneration

model = TFWhisperForConditionalGeneration.from_pretrained("openai/whisper-base")
##openai/whisper-tiny

config.json: 0.00B [00:00, ?B/s]

model.safetensors:   0%|          | 0.00/290M [00:00<?, ?B/s]

All PyTorch model weights were used when initializing TFWhisperForConditionalGeneration.

All the weights of TFWhisperForConditionalGeneration were initialized from the PyTorch model.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFWhisperForConditionalGeneration for predictions without further training.


In [None]:
model.save('content/tiny_saved')

In [None]:
import tensorflow as tf
import numpy as np
from transformers import TFForceTokensLogitsProcessor, TFLogitsProcessor
from typing import List, Optional, Union, Any

# Patching methods of class TFForceTokensLogitsProcessor(TFLogitsProcessor):

def my__init__(self, force_token_map: List[List[int]]):
    force_token_map = dict(force_token_map)
    # Converts the dictionary of format {index: token} containing the tokens to be forced to an array, where the
    # index of the array corresponds to the index of the token to be forced, for XLA compatibility.
    # Indexes without forced tokens will have an negative value.
    force_token_array = np.ones((max(force_token_map.keys()) + 1), dtype=np.int32) * -1
    for index, token in force_token_map.items():
        if token is not None:
            force_token_array[index] = token
    self.force_token_array = tf.convert_to_tensor(force_token_array, dtype=tf.int32)

def my__call__(self, input_ids: tf.Tensor, scores: tf.Tensor, cur_len: int) -> tf.Tensor:
    def _force_token(generation_idx):
        batch_size = scores.shape[0]
        current_token = self.force_token_array[generation_idx]

        # Original code below generates NaN values when the model is exported to tflite
        # it just needs to be a negative number so that the forced token's value of 0 is the largest
        # so it will get chosen
        #new_scores = tf.ones_like(scores, dtype=scores.dtype) * -float("inf")
        new_scores = tf.ones_like(scores, dtype=scores.dtype) * -float(1)
        indices = tf.stack((tf.range(batch_size), tf.tile([current_token], [batch_size])), axis=1)
        updates = tf.zeros((batch_size,), dtype=scores.dtype)
        new_scores = tf.tensor_scatter_nd_update(new_scores, indices, updates)
        return new_scores

    scores = tf.cond(
        tf.greater_equal(cur_len, tf.shape(self.force_token_array)[0]),
        # If the current length is geq than the length of force_token_array, the processor does nothing.
        lambda: tf.identity(scores),
        # Otherwise, it may force a certain token.
        lambda: tf.cond(
            tf.greater_equal(self.force_token_array[cur_len], 0),
            # Only valid (positive) tokens are forced
            lambda: _force_token(cur_len),
            # Otherwise, the processor does nothing.
            lambda: scores,
        ),
    )
    return scores

TFForceTokensLogitsProcessor.__init__ = my__init__
TFForceTokensLogitsProcessor.__call__ = my__call__

In [None]:
class GenerateModel(tf.Module):
  def __init__(self, model):
    super(GenerateModel, self).__init__()
    self.model = model

  @tf.function(
    # shouldn't need static batch size, but throws exception without it (needs to be fixed)
    input_signature=[
      tf.TensorSpec((1, 80, 3000), tf.float32, name="input_features"),
    ],
  )
  def serving(self, input_features):
    outputs = self.model.generate(
      input_features,
      max_new_tokens=500, #change as needed
      return_dict_in_generate=True,
      forced_decoder_ids=[[1,50264], [2, 50359], [3, 50363]], # 50264: Korean | 50359:transcribe | 50363: notimestamp
    )
    return {"sequences": outputs["sequences"]}

generate_model = GenerateModel(model=model)


saved_model_dir = 'content/tiny_saved'
tflite_model_path = 'fast-whisper-tiny-ko.tflite'

tf.saved_model.save(generate_model, saved_model_dir, signatures={"serving_default": generate_model.serving})


# Convert the model
converter = tf.lite.TFLiteConverter.from_saved_model(saved_model_dir)
converter.target_spec.supported_ops = [
  tf.lite.OpsSet.TFLITE_BUILTINS, # enable TensorFlow Lite ops.
  tf.lite.OpsSet.SELECT_TF_OPS # enable TensorFlow ops.
]

converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()

#Save the model
with open(tflite_model_path, 'wb') as f:
    f.write(tflite_model)