In [1]:
import os
import wave
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import pathlib
import matplotlib.pyplot as plt
from IPython import display
from jiwer import wer
from pydub import AudioSegment
from pydub.playback import play
import librosa
import winsound
from scipy.io import wavfile
import random

# loading text files
df = pd.read_csv("Text/metadata_TZ_.csv")
print(df.dtypes)

df = df[['filename','transcription']]
df.head(5)

filename          object
transcription     object
filepath          object
sample_rate        int64
duration         float64
dtype: object




Unnamed: 0,filename,transcription
0,SWH-05-20101106_16k-emission_swahili_05h30_-_0...,rais wa tanzania jakaya mrisho kikwete
1,SWH-05-20101106_16k-emission_swahili_05h30_-_0...,yanayo andaliwa nami pendo pondo idhaa ya kisw...
2,SWH-05-20101106_16k-emission_swahili_05h30_-_0...,inayokutangazia moja kwa moja kutoka jijini da...
3,SWH-05-20101106_16k-emission_swahili_05h30_-_0...,juma hili bara la afrika limeshuhudia raia wa ...
4,SWH-05-20101106_16k-emission_swahili_05h30_-_0...,wakipiga kura ya maoni ilikufanya mabadiliko ya


In [2]:
split = int(len(df) * 0.8)
df_train_text = df[:split]
df_val_text = df[split:]

# df_train.describe
print(f"Size of the training set: {len(df_train_text)}")
print(f"Size of the testing set: {len(df_val_text)}")



Size of the training set: 8144
Size of the testing set: 2036


SETTING UP THE SWAHILI VOCABULARY TO INCLUDE IN THE MODEL

In [3]:
# The set of characters accepted in the transcription
char = [x for x in "aeioubcdghjklmnprstvwyz'?! "]

# Mapping characters to intergers
char_to_num = keras.layers.StringLookup(vocabulary=char, oov_token="")

# mapping intergers back to original characters 
num_to_char = keras.layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True
)

print(
    f"The vocabulary is: {char_to_num.get_vocabulary()} "
    f"size ={char_to_num.vocabulary_size()} "
)

The vocabulary is: ['', 'a', 'e', 'i', 'o', 'u', 'b', 'c', 'd', 'g', 'h', 'j', 'k', 'l', 'm', 'n', 'p', 'r', 's', 't', 'v', 'w', 'y', 'z', "'", '?', '!', ' '] size =28 


In [4]:
char_to_num

<keras.layers.preprocessing.string_lookup.StringLookup at 0x2699b9156a0>

AUDIO DATA LOADING

In [5]:
# loading the audio files
# Directory
folder_path = 'tz_swh_train'

wav_paths = os.listdir(folder_path)

file_paths = [os.path.join(folder_path, f) for f in wav_paths if f.endswith(".wav")]

audio_data = tf.data.Dataset.from_tensor_slices(file_paths)

# check if file exists
if len(file_paths) > 0:
    print('Audio files succesfully uploaded')
else:
    print("Files Note loaded")

# function loading the audio files
def load_audio(file_path):
    audio_binary = tf.io.read_file(file_path)
    waveform, sample_rate = tf.audio.decode_wav(audio_binary)
    return waveform, sample_rate


#  Map the dataset with the load_audio function
audio_data = audio_data.map(load_audio)

# Print the first audio file
for waveform, sample_rate in audio_data.take(1):
    print("Waveform shape: ", waveform.shape)
    print("Sample rate: ", sample_rate.numpy())



Audio files succesfully uploaded
Waveform shape:  (50240, 1)
Sample rate:  16000


AUDIO PLAYBACK, FIRST 3 AUDIO FILES

In [6]:

audio_files = os.listdir(folder_path)
# Playing the first audio files
# Play the first three audio files
for i, swahili in enumerate(audio_files):
    audio_path = os.path.join(folder_path, swahili)
    if swahili.endswith(".wav") and i < 1:
        sound = os.path.join(folder_path, swahili)
        winsound.PlaySound(sound, winsound.SND_FILENAME + winsound.SND_LOOP)
        winsound.PlaySound(None, 0)
    

AUDIO PROCESSING VIA TENSORFLOW

In [7]:
# An integer scalar Tensor. The window length in samples
frame_length = 256
# An integer scalar Tensor. The number of samples to step
frame_step = 160
# An integer scalar Tensor. the size of the FFT to apply
# If not provided, uses the smallest power of 2 enclosing frame_length.
fft_length = 1024


def encode_single_sample(wav_file, label):
    wav_file, label = sample
    ####################################
    ###    Audio processing for tensorflow 
    ####################################
    # 1. Read wav files
    file = tf.io.read_file(audio_files + wav_file + audio_data)
    # 2. Decode the wav file
    audio = tf.audio.decode_wav(file)
    
    audio = tf.squeeze(audio, axis=-1)
    # change the file type to float
    audio = tf.cast(audio, tf.float32)
    
    spectogram = tf.signal.stft(
        audio, frame_length=frame_length, frame_step=frame_step, fft_length=fft_length
    )
    # we only need the magnitude, which can be derived by applying tf.abs
    spectogram = tf.abs(spectogram)
    spectogram = tf.math.pow(spectogram, 0.5)
    # normalization
    means = tf.math.reduce_mean(spectogram, 1, keepdims=True)
    stddevs = tf.math.reduce_std(spectogram, 1, keepdims=True)
    spectogram = (spectogram - means) / (stddevs + 1e-10)


    label = tf.strings.lower(label)
    label = tf.string.unicode_split(label, input_encoding="UTF-8")
    label = char_to_num(label)
    return spectogram, label


SPLITTING THE TRAIN AND TEST SET FOR MODEL TRAINING AND TESTING USING TENSORFLOW

In [None]:
# Set the spectrogram parameters
frame_length = 0.025
frame_stride = 0.010
n_fft = 512
hop_length = int(frame_stride * 16000)
win_length = int(frame_length * 16000)

# For audio spliting
train_size = 0.8
test_size = 0.2

random.shuffle(audio_files)
split_index = int(test_size * len(audio_files))
train_audio_files = audio_files[:split_index]
test_audio_files = audio_files[split_index:]


ds = tf.data.Dataset.from_tensor_slices((df['filename'], df['transcription']))
batch_size = 32
dataset = ds.batch(batch_size)
# print(audio_path)

train_spectrograms = []
test_spectrograms = []
for folder_path in train_audio_files:
   rate, data = wavfile.read(os.path.join(folder_path, ' '.join(audio_files)))
   spectrogram = tf.signal.stft(data, frame_length=win_length, frame_step=hop_length)
   train_spectrograms.append(spectogram)



In [None]:
fig = plt.figure(figsize=(8, 5))
for batch in train_dataset.take(1):
   print(train_dataset)
   spectogram = batch[0][0].numpy()
   spectogram = np.array([np.trim_zeros(x) for x in np.transpose(spectogram)])
   label = batch[1][0]
   # spectogram
   label = tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
   ax = plt.subplot(2, 1, 1)
   ax.imshow(spectogram, vmax=1)
   ax.set_title(label)
   ax.axis("off")
   # wav
   file = tf.io.read_file(audio_files + list(df_train['filename'])[0] + audio_data)
   audio = tf.audio.decode_wav(file)
   audio = audio.numpy()
   ax = plt.subplot(2, 1, 2)
   plt.plot(audio)
   ax.set_title("signal wave")
   ax.set_xlim(0, len(audio))
   display.display(display.Audio(np.transpose(audio), rate=16000))
plt.show()

<_TakeDataset element_spec=(TensorSpec(shape=(None,), dtype=tf.string, name=None), TensorSpec(shape=(None,), dtype=tf.string, name=None))>


TypeError: iteration over a 0-d array

<Figure size 800x500 with 0 Axes>

CTCLOSS detection algorithmn

In [None]:
def CTCLoss(y_true, y_pred):
   batch_len = tf.cast(tf.shape(y_true)[0], dtype='int64')
   input_length = tf.cast(tf.shape(y_pred)[1], dytpe="int64")
   label_length = tf.cast(tf.shape(y_true)[0], dtype='int64')

   input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
   label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

   loss = keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
   return loss

Model Build

In [None]:
def build_model(input_dim, output_dim, rnn_layers=5, rnn_units=138):

   input_spectogram = layers.Input((None, input_dim), name="input")

   x = layers.Reshape((-1, 193, 1), name="expand_dim")(input_spectogram)

   x = layers.Conv2D(
      filters=32,
      kernel_size=[11, 41],
      strides=[2, 2],
      padding="same",
      use_bias=False,
      name="conv_1",
   )(x)
   x = layers.BatchNormalization(name="conv_1_bn")(x)
   x = layers.ReLU(name="conv_1_relu")(x)

   x = layers.Conv2D(
      filters=32,
      kernel_size=[11, 21],
      strides=[1, 2],
      padding="same",
      use_bias=False,
      name="conv_2",
   )(x)
   x = layers.BatchNormalization(name="conv_2_bn")(x)
   x = layers.ReLU(name="conv_2_relu")(x)
   x = layers.Reshape((-1, x.shape[-2] * x.shape[-1]))(x)

   for i in range(1, rnn_layers + 1):
      recurrent = layers.GRU(
         units=rnn_units,
         activation="tanh",
         recurrent_activation="sigmoid",
         use_bias=True,
         return_sequences=True,
         reset_after=True,
         name=f"gru_{i}",
      )
      x = layers.Bidirectional(
         recurrent, name=f"bidirectional_{i}", merge_mode="concat"
      )(x)
      if i < rnn_layers:
         x = layers.Dropout(rate=0.5)(x)
   x = layers.Dense(units=rnn_units * 2, name="dense_1")(x)
   x = layers.ReLU(name="dense_1_relu")(x)
   x = layers.Dropout(rate=0.5)(x)

   output = layers.Dense(units=output_dim + 1, activation="softmax")(x)

   model = keras.Model(input_spectogram, output, name="DeepSpeech_2")

   opt = keras.optimizers.Adam(learning_rate=1e-4)

   model.compile(optimizer=opt, loss=CTCLoss)
   return model


# get the model
model = build_model(
   input_dim = fft_length // 2 + 1,
   output_dim = char_to_num.vocabulary_size(),
   rnn_units = 512,
)
model.summary(line_length=110)

Model: "DeepSpeech_2"
______________________________________________________________________________________________________________
 Layer (type)                                    Output Shape                                Param #          
 input (InputLayer)                              [(None, None, 193)]                         0                
                                                                                                              
 expand_dim (Reshape)                            (None, None, 193, 1)                        0                
                                                                                                              
 conv_1 (Conv2D)                                 (None, None, 97, 32)                        14432            
                                                                                                              
 conv_1_bn (BatchNormalization)                  (None, None, 97, 32)                     

Training and Evaluation

In [None]:
def decode_batch_predictions(pred, input_shape):
   input_len = np.ones(pred.shape[0]) * input_shape[1]
   results = keras.backend.ctc_decode(pred, input_length=input_len, greedy=True)[0][0]
   output_text = []
   for result in results:
      result = tf.strings.reduce_join(num_to_char(result)).numpy().decode("utf-8")
      output_text.append(result)
   return output_text

class CallbackEval(keras.callbacks.Callback):

   def __init__(self, dataset):
      super().__init__()
      self.dataset = dataset
   
   def on_epoch_end(self, epoch: int, logs=None):
      predictions = []
      targets = []
      for batch in self.dataset:
         X, y = batch
         batch_predictions = model.predict(X)
         batch_predictions = decode_batch_predictions(batch_predictions, X.shape)
         predictions.extend(batch_predictions)
         for label in y:
            label = (
               tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
            )
            targets.append(label)
      wer_score = wer(targets, predictions)
      print("-" * 100)
      print(f"word error rate: {wer_score: 4f}")
      print("-" * 100)
      for i in np.random.randint(0, len(predictions), 2):
         print(f"Target :  {targets[i]}")
         print(f"Prediction: {predictions[i]}")
         print("-" * 100)

Begin Training Sequence

In [None]:
# Define the number of epochs.
epochs = 50
# Callback function to check transcription on the val set.
validation_callback = CallbackEval(test_dataset)
# Train the model
history = model.fit(
   train_dataset,
   validation_data=test_dataset,
   epochs=epochs,
   callbacks=[validation_callback],
)

Epoch 1/50


ValueError: in user code:

    File "c:\Users\Rigobert Kiata\AppData\Local\Programs\Python\Python39\lib\site-packages\keras\engine\training.py", line 1284, in train_function  *
        return step_function(self, iterator)
    File "c:\Users\Rigobert Kiata\AppData\Local\Programs\Python\Python39\lib\site-packages\keras\engine\training.py", line 1268, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "c:\Users\Rigobert Kiata\AppData\Local\Programs\Python\Python39\lib\site-packages\keras\engine\training.py", line 1249, in run_step  **
        outputs = model.train_step(data)
    File "c:\Users\Rigobert Kiata\AppData\Local\Programs\Python\Python39\lib\site-packages\keras\engine\training.py", line 1050, in train_step
        y_pred = self(x, training=True)
    File "c:\Users\Rigobert Kiata\AppData\Local\Programs\Python\Python39\lib\site-packages\keras\utils\traceback_utils.py", line 70, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "c:\Users\Rigobert Kiata\AppData\Local\Programs\Python\Python39\lib\site-packages\keras\layers\reshaping\reshape.py", line 115, in _fix_unknown_dimension
        raise ValueError(msg)

    ValueError: Exception encountered when calling layer 'expand_dim' (type Reshape).
    
    total size of new array must be unchanged, input_shape = [], output_shape = [-1, 193, 1]
    
    Call arguments received by layer 'expand_dim' (type Reshape):
      • inputs=tf.Tensor(shape=(None,), dtype=float32)


Inference

In [None]:
predictions = []
targets = []
for batch in test_dataset:
   X, y = batch
   batch_predictions = model.predict(X)
   batch_predictions = decode_batch_predictions(batch_predictions)
   predictions.extend(batch_predictions)
   for label in y:
      label = tf.strings.reduce_join(num_to_char(label)).numpy.decode("utf-8")
      targets.append(label)
wer_score = wer(targets, predictions)
print("-" * 100)
print("word error rate: {wer_score:.4f}")
print("-" * 100)
for i in np.random.randint(0, len(predictions), 5):
   print(f"Target :  {targets[i]}")
   print(f"Prediction:  {predictions[i]}")
   print("-" * 100)

ValueError: After applying the transformation, each reference should be a non-empty list of strings, with each string being a single word.