In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow import keras
import matplotlib as plt
from IPython import display


In [None]:
!pip install jiwer
from jiwer import wer

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting jiwer
  Downloading jiwer-3.0.1-py3-none-any.whl (21 kB)
Collecting rapidfuzz==2.13.7
  Downloading rapidfuzz-2.13.7-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.2/2.2 MB[0m [31m21.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: rapidfuzz, jiwer
Successfully installed jiwer-3.0.1 rapidfuzz-2.13.7


In [None]:
data_url="https://data.keithito.com/data/speech/LJSpeech-1.1.tar.bz2"
data_path=keras.utils.get_file("LJSpeech-1.1",data_url,untar=True)

Downloading data from https://data.keithito.com/data/speech/LJSpeech-1.1.tar.bz2


In [None]:
wav_path=data_path+"/wavs"
metadata_path=data_path+"/metadata.csv"

In [None]:
metadata_df=pd.read_csv(metadata_path,sep="|" ,header=None,quoting=1)

In [None]:
metadata_df.tail()

In [None]:
 metadata_df.columns=["file_name","transcription","normalized_transcription"]
 metadata_df=metadata_df[["file_name","transcription"]]
 metadata_df=metadata_df.sample(frac=1).reset_index(drop=True)
 metadata_df.head(5)

Unnamed: 0,file_name,transcription
0,LJ047-0214,"End quote. Mr. Bouck pointed out, however, tha..."
1,LJ040-0220,could have led anyone to predict the outburst ...
2,LJ018-0174,and the prominent part he played secured for h...
3,LJ029-0068,The adequacy of the intelligence system mainta...
4,LJ047-0162,that Oswald was living alone in Dallas because...


In [None]:
 #We now split the data into training and validation set
split=int(len(metadata_df)*0.90)
df_train=metadata_df[:split]
df_val=metadata_df[split:]

print(f"size of the trianing set:{len(df_train)}")
print(f"size of the testing set:{len(df_val)}")

size of the trianing set:11790
size of the testing set:1310


In [None]:
#preprocessing
#We first prepare the vocabulary needed

In [None]:
#The set of characters accepted in the transcription
characters=[x for x in "abcdefghijklmnopqrstuvwxyz'?! "]
#Mapping characters to integers
char_to_num=keras.layers.StringLookup(vocabulary=characters,oov_token="")
#mapping integers back to original characters
num_to_char=keras.layers.StringLookup(vocabulary=char_to_num.get_vocabulary(),oov_token="",invert=True)
print(f"The vocabulary is:{char_to_num.get_vocabulary()}"f"(size={char_to_num.vocabulary_size()}")

The vocabulary is:['', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', "'", '?', '!', ' '](size=31


In [None]:
  char_to_num                                                                                                        

<keras.layers.preprocessing.string_lookup.StringLookup at 0x7ffa858e9790>

In [None]:
 #an integer scaler tensor.The window length in samples
 frame_length=250
 #an integer scaler tensor.The number of samples to step
 frame_step=160
 #an integer scaler tensor.The size of the FFT to apply.
 fft_length=384

 def encode_single_sample(wav_file,label):
   #1.read wav file
   file=tf.io.read_file(wav_path+wav_file+".wav")

   #2.decode the wav file
   audio,_=tf.audio.decode_wav(file)
   audio=tf.squeeze(audio,axis=1)

   #3.change type to float
   audio=tf.cast(audio,tf.float32)

   #4.get the spectrogram
   spectrogram=tf.signal.stft(audio,frame_length=frame_length,frame_step=frame_step,fft_length=fft_length)

   #5.we only need the magnitude,which can be derived by applying tf.abs
   spectrogram=tf.abs(spectrogram)
   spectrogram=tf.math.pow(spectrogram,0.5)

   #6.normalization
   means=tf.math.reduce_mean(spectrogram,1,keepdims=True)
   stddevs=tf.math.reduce_std(spectrogram,1,keepdims=True)
   spectrogram=(spectrogram-means)/(stddevs+1e-10)

   #7.convert label to lower case
   label=tf.strings.lower(label)

   #8.split the model
   label=tf.strings.unicode_split(label,input_encoding="UTF-8")

   #9.map the characters in label to numbers
   label=char_to_num(label)

   #10.return a dict as our model is expecting two inputs
   return spectrogram,label

<h3>Creating Dataset Objects</h3>


In [None]:
batch_size=32
#define the training dataset
train_dataset=tf.data.Dataset.from_tensor_slices(
    (list(df_train["file_name"]),list(df_train["transcription"]))
)
train_dataset=(
    train_dataset.map(encode_single_sample,num_parallel_calls=tf.data.AUTOTUNE)
    .padded_batch(batch_size)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)
#Define the validation dataset
validation_dataset=tf.data.Dataset.from_tensor_slices(
    (list(df_val["file_name"]),list(df_val["transcription"]))
)
validation_dataset=(
    validation_dataset.map(encode_single_sample,num_parallel_calls=tf.data.AUTOTUNE)
    .padded_batch(batch_size)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)


<h3><b>Visualizing<b></h3>

In [None]:
import matplotlib.pyplot as plt
fig=plt.figure(figsize=(8,5))
for batch in train_dataset.take(1):
  spectrogram=batch[0][0].numpy()
  spectrogram=np.array([np.trim_zeros(x) for x in np.transpose(spectrogram)])
  label=batch[1][0]

  #spectrogram
  label=tf.stringss.reduce_join(num_to_char(label)).numpy().decode("utf-8")
  ax=plt.subplot(2,1,1)
  ax.imshow(spectrogram,vmax=1)
  ax.set_title(label)
  ax.axis("off")

  #wav
  file=tf.io.read_file(wav_path+list(df_train["file_name"])[0]+".wav")
  audio,_=tf.audio.decode_wav(file)
  audio=audio.numpy()
  ax=plt.subplot(2,1,2)
  plt.plot(audio)
  ax.set_title("signal wave")
  ax.set_xlim(0,len(audio))
  display.display(display.Audio(np.transpose(audio),rate=16000))
plt.show()

In [None]:
def CTCLoss(y_true,y_pred):
  batch_len=tf.cast(tf.shape(y_true)[0],dtype="int64")
  input_length=tf.cast(tf.shape(y_pred)[1],dtype="int64")
  label_length=tf.cast(tf.shape(y_true)[1],dtype="int64")

  input_length=input_length*tf.ones(shape=(batch_len,1),dtype="int64")
  label_length=label_length*tf.ones(shape=(batch_len,1),dtype="int64")

  loss=keras.backend.ctc_batch_cost(y_true,y_pred,input_length,label_length)
  return loss

In [None]:
from tensorflow.keras import layers
from tensorflow.keras.layers import Conv2D

def build_model(input_dim,output_dim,rnn_layers=5,rnn_units=128):
  input_spectrogram=layers.Input((None,input_dim),name="input")
  x=layers.Reshape((-1,input_dim,1),name="expand_dim")(input_spectrogram)
  #convulation layer 1
  x=layers.Conv2D(
      filters=32,kernel_size=[11,41],strides=[2,2],padding="same",use_bias=False,
      name="conv_1"
  )(x)
  x=layers.BatchNormalization(name="conv_1_bn")(x)
  x=layers.ReLU(name="conv_1_relu")(x)

  #convulation layer2
  x=layers.Conv2D(
      filters=32,kernel_size=[11,41],strides=[2,2],padding="same",use_bias=False,
      name="conv_2"
  )(x)
  x=layers.BatchNormalization(name="conv_2_bn")(x)
  x=layers.ReLU(name="conv_2_relu")(x)

  x=layers.Reshape((-1,x.shape[-2]*x.shape[-1]))(x)

  #RNN layers
  for i in range(1,rnn_layers+1):
    recurrent=layers.GRU(
        units=rnn_units,
        activation="tanh",recurrent_activation="sigmoid",use_bias=True,return_sequences=True,
        reset_after=True,name=f"gru_{i}",
    )
    x=layers.Bidirectional(
        recurrent,name=f"bidirectional_{i}",merge_mode="concat"
    )(x)
    if i<rnn_layers:
      x=layers.Dropout(rate=0.5)(x)

  #Dense layer
  x=layers.Dense(units=rnn_units*2,name="dense_1")(x)
  x=layers.ReLU(name="dense_2_relu")(x)
  x=layers.Dropout(rate=0.5)(x)
  output=layers.Dense(units=output_dim+1,activation="softmax")(x)
  model=keras.Model(input_spectrogram,output,name="DeepSpeech_2")
  opt=keras.optimizers.Adam(learning_rate=1e-4)
  model.compile(optimizer=opt,loss=CTCLoss)
  return model

model=build_model(
    input_dim=fft_length//2+1,
    output_dim=char_to_num.vocabulary_size(),
    rnn_units=512,
)
model.summary(line_length=110)


Model: "DeepSpeech_2"
______________________________________________________________________________________________________________
 Layer (type)                                    Output Shape                                Param #          
 input (InputLayer)                              [(None, None, 193)]                         0                
                                                                                                              
 expand_dim (Reshape)                            (None, None, 193, 1)                        0                
                                                                                                              
 conv_1 (Conv2D)                                 (None, None, 97, 32)                        14432            
                                                                                                              
 conv_1_bn (BatchNormalization)                  (None, None, 97, 32)                     

<h3>Training and Evaluating</h3>

In [None]:
def decode_batch_prediction(pred):
  input_len=np.ones(pred.shape[0])*pred.shape[1]
  results=keras.backend.ctc_decode(pred,input_length=input_len,greedy=True)[0][0]
  output_text=[]
  for result in results:
    result=tf.strings.reduce_join(num_to_char(result)).numpy().decode("utf-8")
    output_text.append(result)
  return output_text

  class CallbackEval(keras.callbacks.Callback):
    def on_epoch_end(self,epoch:int,logs=None):
      predictions=[]
      targets=[]
      for batch in self.dataset:
        X,y=batch
        batch_predictions=model.predict(X)
        batch_predictions=decode_batch_predictions(batch_predictions)
        predictions.extend(batch_predictions)
        for label in y:
          label=(
              tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
          )
          targets.append(label)
      wer_score=wer(targets,predictions)
      print("-"*100)
      print("-"*100)
      for i in np.random.randint(0,len(predictions),2):
        print(f"Target:{targets[i]}")
        print(f"Prediction:{predictions[i]}")
        print("-"*100)
        #training process
      epochs=2
      validation_callback=CallbackEval(validation_dataset)
      history=model.fit(train_dataset,validation_data=validation_dataset,
                  epochs=epochs,callbacks=[validation_callback],
                  )



In [42]:
#Let's check results on more validation samples
predictions=[]
targets=[]
for batch in validation_dataset:
  X,y=batch
  batch_predictions=model.predict(X)
  batch_predictions=decode_batch_predictions(batch_predictions)
  predictions.extend(batch_predictions)
  for label in y:
    label=tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
    targets.append(label)
wer_score=wer(targets,predictions)
print("-"*100)
print(f"word error rate:{wer_score:4f}")
print("-"*100)
for i in np.random.randint(0,len(predictions),5):
  print(f"target:{targets[i]}")
  print(f"prediction:")

NotFoundError: ignored