In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import locale

locale.getpreferredencoding = lambda: "UTF-8"

In [None]:
!pip install pydub

In [None]:
!pip install pyannote.audio

## Setup

In [None]:
import pandas as pd
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
from IPython import display
from pyannote.audio import Pipeline
import os
from pydub import AudioSegment
import json

In [None]:
pipeline = Pipeline.from_pretrained(
    "pyannote/speaker-diarization-3.1",
    use_auth_token="hugging face authentication token")

# send pipeline to GPU (when available)
import torch
pipeline.to(torch.device("cuda"))
files = os.listdir("path to the audio files")
print(len(files))
for i in files:
  diarization = pipeline("path to the audio files" + i)
  list_of_dict = []
  index = 0
  name, ext = os.path.splitext(i)
  os.makedirs(f"path where you wanna save the partitions of each file/{name}", exist_ok=True)
  # print the result
  for turn, _, speaker in diarization.itertracks(yield_label=True):
      list_of_dict.append({"start":turn.start, "stop":turn.end, "speaker":speaker , "transcript":None})
      current_audio = AudioSegment.from_wav("path to the audio files" + i)
      current_audio = current_audio[int(turn.start*1000):int(turn.end*1000)]
      current_audio.export("path where you wanna save the partitions of each file/" + name + f"/{index}.wav", format="wav")
      index += 1
  json_file = open(f"path where you wanna save the partitions of each file/{name}/{name}.json", "w")
  json.dump(list_of_dict, json_file)
  json_file.close()


In [None]:
len(os.listdir("path where you wanna save the partitions of each file"))

In [None]:
# The set of characters accepted in the transcription.
characters = [x for x in "غظضذخثةتشقرصفعسمنلكيطحزوؤهدجبىائءإآأ "]
# Mapping characters to integers
char_to_num = keras.layers.StringLookup(vocabulary=characters, oov_token="")
# Mapping integers back to original characters
num_to_char = keras.layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True
)

print(
    f"The vocabulary is: {char_to_num.get_vocabulary()} "
    f"(size ={char_to_num.vocabulary_size()})"
)

Next, we create the function that describes the transformation that we apply to each
element of our dataset.

In [None]:
# An integer scalar Tensor. The window length in samples.
frame_length = 240 # previously 256 400
# An integer scalar Tensor. The number of samples to step.
frame_step = 120 # previously 160 200
# An integer scalar Tensor. The size of the FFT to apply.
# If not provided, uses the smallest power of 2 enclosing frame_length. /////////////////////////////

fft_length = 256 # previously 384

sample_rate = 16000
def encode_single_sample(wav_file):
    ###########################################
    ##  Process the Audio
    ##########################################
    # 1. Read wav file
    file = tf.io.read_file(wavs_path + wav_file + ".wav")
    # 2. Decode the wav file
    audio, _ = tf.audio.decode_wav(file)
    audio = tf.squeeze(audio, axis=-1)
    # 3. Change type to float
    audio = tf.cast(audio, tf.float32)
    # 4. Get the spectrogram
    stfts = tf.signal.stft(
        audio, frame_length=frame_length, frame_step=frame_step, fft_length=fft_length
    )

    # 5. We only need the magnitude, which can be derived by applying tf.abs
    spectrogram = tf.abs(stfts)
    spectrogram = tf.math.pow(spectrogram, 0.5)
    # 6. normalisation
    means = tf.math.reduce_mean(spectrogram, 1, keepdims=True)
    stddevs = tf.math.reduce_std(spectrogram, 1, keepdims=True)
    spectrogram = (spectrogram - means) / (stddevs + 1e-10)
    return spectrogram   #spectrogram

## Model

We first define the CTC Loss function.

In [None]:

def CTCLoss(y_true, y_pred):
    # Compute the training-time loss value
    batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
    input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
    label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

    input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

    loss = keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
    return loss


We now define our model. We will define a model similar to
[DeepSpeech2](https://nvidia.github.io/OpenSeq2Seq/html/speech-recognition/deepspeech2.html).

In [None]:

def build_model(input_dim, output_dim, rnn_layers= 5, rnn_units=256):
    """Model similar to DeepSpeech2."""
    # Model's input
    input_spectrogram = layers.Input((None, input_dim), name="input")
    # Expand the dimension to use 2D CNN.
    x = layers.Reshape((-1, input_dim, 1), name="expand_dim")(input_spectrogram)

     # Convolutional layers
    for i, (filters, kernel_size, strides) in enumerate(
        [(96, [11, 41], [2, 2]), (128, [11, 21], [1, 2])
        ]
    ):
        x = layers.Conv2D(
            filters=filters,
            kernel_size=kernel_size,
            strides=strides,
            padding="same",
            use_bias=False,
            name=f"conv_{i+1}",
            kernel_initializer=tf.initializers.GlorotUniform(),
        )(x)
        x = layers.ReLU(name=f"conv_{i+1}_relu")(x)

    # Reshape the resulted volume to feed the RNNs layers
    x = layers.Reshape((-1, x.shape[-2] * x.shape[-1]))(x)
    # RNN layers
    for i in range(1, rnn_layers + 1):
        recurrent = layers.GRU(
            units=rnn_units,
            activation="tanh",
            recurrent_activation="sigmoid",
            use_bias=True,
            return_sequences=True,
            reset_after=True,
            name=f"gru_{i}",
            kernel_initializer=tf.initializers.GlorotUniform(),
        )
        x = layers.Bidirectional(
            recurrent, name=f"bidirectional_{i}", merge_mode="concat"
        )(x)

    # Dense layer
    x = layers.Dense(units=rnn_units * 2, name="dense_1")(x)
    x = layers.ReLU(name="dense_1_relu")(x)

    output = layers.Dense(units=output_dim + 1, activation="softmax")(x)
    model = keras.Model(input_spectrogram, output, name="DeepSpeech_2")
    # Optimizer

    opt = keras.optimizers.Adam(learning_rate=1e-5)
    # Compile the model and return
    model.compile(optimizer=opt, loss=CTCLoss)
    return model


# Get the model
model = build_model(
    input_dim=fft_length // 2 + 1,
    output_dim=char_to_num.vocabulary_size(),
    rnn_units=768,
)
model.summary(line_length=110)

In [None]:
model.load_weights("./model_01_224.15.h5")

In [None]:
# A utility function to decode the output of the network
def decode_batch_predictions(pred):
    input_len = np.ones(pred.shape[0]) * pred.shape[1]
    # Use greedy search. For complex tasks, you can use beam search
    results = keras.backend.ctc_decode(pred, input_length=input_len, greedy=True, beam_width=512)[0][0]
    # Iterate over the results and get back the text
    output_text = []
    for result in results:
        result = tf.strings.reduce_join(num_to_char(result)).numpy().decode("utf-8")
        output_text.append(result)
    return output_text


# A callback class to output a few transcriptions during training
class CallbackEval(keras.callbacks.Callback):
    """Displays a batch of outputs after every epoch."""

    def __init__(self, dataset):
        super().__init__()
        self.dataset = dataset

    def on_epoch_end(self, epoch: int, logs=None):
        predictions = []
        targets = []
        for batch in self.dataset:
            X, y = batch
            batch_predictions = model.predict(X)
            batch_predictions = decode_batch_predictions(batch_predictions)
            predictions.extend(batch_predictions)
            for label in y:
                label = (
                    tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
                )
                targets.append(label)
            break

        for i in np.random.randint(0, len(predictions), 32):
            print(f"Target    : {targets[i]}")
            print(f"Prediction: {predictions[i]}")
            print("-" * 100)

In [None]:
files = os.listdir("path to the audio files") #Add your path here
print(len(files))
i = 0
for i in files:
  name, ext = os.path.splitext(i)
  wavs_path = f"path where you want to save the partitions of each file/{name}/" #Add your path here like this: f"home/data/intervals/{name}/"
  wavs = os.listdir(wavs_path)
  wavs.remove(f"{name}.json")
  for i in range(len(wavs)):
    wavs[i] = wavs[i].split(".")[0]
  df = pd.DataFrame({
      'audio': wavs,
  })
  print(df)
  batch_size = 32
  df = tf.data.Dataset.from_tensor_slices(
      (np.array(df["audio"].tolist()))
  )
  intervals = (
      df.map(encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE)
      .padded_batch(batch_size)
      .prefetch(buffer_size=tf.data.AUTOTUNE)
  )
  predictions = []
  with tf.device('/device:GPU:0'):
    for batch in intervals:
        X = batch
        batch_predictions = model.predict(X)
        batch_predictions = decode_batch_predictions(batch_predictions)
        predictions.extend(batch_predictions)
  list_of_dicts = []

  #Add your path here
  with open(f'path where you want to save the partitions of each file/{name}/{name}.json', 'r', encoding='utf-8') as file:
    list_of_dicts = json.load(file)
  file.close()
  for i in range (len(list_of_dicts)):
    list_of_dicts[i]["transcript"] = predictions[i]
    print(list_of_dicts[i])

  #Add your path here
  with open(f"path where you want to save the jsons/{name}.json", 'w', encoding='utf-8') as file:
    json.dump(list_of_dicts, file, ensure_ascii=False, indent=4)

  file.close()
  print(name)
  print(i)
  i += 1