<a href="https://colab.research.google.com/github/somewhereovertherainbo/audio_processing/blob/main/scoring_transcripts.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Algorithm

1. get audio file as a path
2. Extract audio file from audio path
3. send this file to mod_wsola algorithm to get y, inp_pos, out_pos
4. send y to whisper model to transcribe
5. Get the transcriptions
6. append words, wst, wet from transcriptions
7. process wst, wet
8. do the reverse mapping
9. return three important lists

In [1]:
# Installs and imports

!pip install torch librosa pydub
import torch
import librosa
from pydub import AudioSegment
!pip install git+https://github.com/openai/whisper.git
import whisper
import numpy as np

Collecting pydub
  Downloading pydub-0.25.1-py2.py3-none-any.whl (32 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (823 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (14.1 MB)
Collecting nvidia-cudnn-cu12==8.9.2.26 (from torch)
  Using cached nvidia_cudnn_cu12-8.9.2.26-py3-none-manylinux1_x86_64.whl (731.7 MB)
Collecting nvidia-cublas-cu12==12.1.3.1 (from torch)
  Using cached nvidia_cublas_cu12-12.1.3.1-py3-none-manylinux1_x86_64.whl (410.6 MB)
Collecting nvidia-cufft-cu12==11.0.2.54 (from torch)
  Using cached nvidia_cufft_cu12-11.0.2.54-py3-none-manylinux1_x86_64.whl (121.6 MB)
Collecting nvidia-curand-cu12==10.3.2.106 (from torch)
  Using cached nvidia_

In [2]:
# %%writefile mod_wsola.py


### Modified WSOLA Algorithm that returns input positions and output positions as well.


import numpy as np


def win_func(win_type='hann', win_size=4096, zero_pad=0):
    """Generate diverse type of window function

    Parameters
    ----------

    win_type : str
               the type of window function.
               Currently, Hann and Sin are supported.
    win_size : int > 0 [scalar]
               the size of window function.
               It doesn't contains the length of zero padding.
    zero_pad : int > 0 [scalar]
               the total length of zero-pad.
               Zeros are equally distributed
               for both left and right of the window.

    Returns
    -------

    win : numpy.ndarray([shape=(win_size)])
          the window function generated.
    """

    if win_type == 'hann':
        win = np.hanning(win_size)
    elif win_type == 'sin':
        win = np.sin(np.pi * np.arange(win_size) / (win_size - 1))
    else:
        raise Exception("Please use the valid window type. (hann, sin)")

    win = np.pad(win, zero_pad // 2, 'constant')

    return win



import numpy as np
from warnings import warn


def _validate_audio(audio):
    """validate the input audio and modify the order of channels.

    Parameters
    ----------

    audio : numpy.ndarray [shape=(channel, num_samples) or (num_samples)\
                           or (num_samples, channel)]
            the input audio sequence to validate.

    Returns
    -------

    audio : numpy.ndarray [shape=(channel, num_samples)]
            the validataed output audio sequence.
    """
    if audio.ndim == 1:
        audio = np.expand_dims(audio, 0)
    elif audio.ndim > 2:
        raise Exception("Please use the valid audio source. "
                        + "Number of dimension of input should be less than 3.")
    elif audio.shape[0] > audio.shape[1]:
        warn('it seems that the 2nd axis of the input audio source '
             + 'is a channel. it is recommended that fix channel '
             + 'to the 1st axis.', stacklevel=3)
        audio = audio.T

    return audio


def _validate_scale_factor(audio, s):
    """Validate the scale factor s and
    convert the fixed scale factor to anchor points.

    Parameters
    ----------

    audio : numpy.ndarray [shape=(num_channels, num_samples) \
                           or (num_samples) or (num_samples, num_channels)]
            the input audio sequence.
    s : number > 0 [scalar] or numpy.ndarray [shape=(2, num_points) \
        or (num_points, 2)]
        the time stretching factor. Either a constant value (alpha)
        or an (2 x n) (or (n x 2)) array of anchor points
        which contains the sample points of the input signal in the first row
        and the sample points of the output signal in the second row.

    Returns
    -------

    anc_points : numpy.ndarray [shape=(2, num_points)]
                 anchor points which contains the sample points
                 of the input signal in the first row
                 and the sample points of the output signal in the second row.
    """
    if np.isscalar(s):
        anc_points = np.array([[0, np.shape(audio)[1] - 1],
                               [0, np.ceil(s * np.shape(audio)[1]) - 1]])
    elif s.ndim == 2:
        if s.shape[0] == 2:
            anc_points = s
        elif s.shape[1] == 2:
            warn('it seems that the anchor points '
                 + 'has shape (num_points, 2). '
                 + 'it is recommended to '
                 + 'have shape (2, num_points).', stacklevel=3)
            anc_points = s.T
    else:
        raise Exception('Please use the valid anchor points. '
                        + '(scalar or pair of input/output sample points)')

    return anc_points


def _validate_f0(audio, f0):
    """Validate the input f0 is suitable for input audio.

    Parameters
    ----------

    audio : numpy.ndarray [shape=(num_channels, num_samples) or \
                           (num_samples) or (num_samples, num_channels)]
            the input audio sequence.
    f0 : numpy.ndarray [shape=(num_channels, num_pitches) or \
                        (num_pitches) or (num_pitches, num_channels)]
         the f0 sequence that used for TD-PSOLA. If f0 is 1D array,
         the f0 of all audio channels are regarded as the same f0.

    Returns
    -------

    f0 : numpy.ndarray [shape=(num_channels, num_freqs)]
         the f0 sequence that used for TD-PSOLA.
    """
    n_chan = audio.shape[0]

    if f0.ndim == 1:
        f0 = np.tile(f0, (n_chan, 1))
    elif f0.ndim == 2:
        if f0.shape[0] == n_chan:
            pass
        elif f0.shape[1] == n_chan:
            warn('it seems that the f0 has shape (num_pitches, num_channels). '
                 + 'it is recommended to '
                 + 'have shape (num_channels, num_pitches).', stacklevel=3)
            f0 = f0.T
        else:
            raise Exception("The number of channels of f0 value "
                            + "should 1 or same as the input audio.")
    else:
        raise Exception("Please use the valid f0 value. "
                        + "Number of dimension of f0 "
                        + "should be less than 3.")

    return f0

import numpy as np
from scipy.interpolate import interp1d
# from .utils import win_func
# from .utils import _validate_audio, _validate_scale_factor


def mod_wsola(x, s, win_type='hann',
          win_size=1024, syn_hop_size=512, tolerance=512):
    """Modify the length of the audio sequence using the WSOLA algorithm.

    Parameters
    ----------
    x : numpy.ndarray [shape=(channel, num_samples) or (num_samples)]
        The input audio sequence to modify.
    s : number > 0 [scalar] or numpy.ndarray [shape=(2, num_points)]
        The time stretching factor. Either a constant value (alpha)
        or a 2 x n array of anchor points containing the sample points
        of the input signal in the first row and the sample points of the
        output signal in the second row.
    win_type : str, optional
               Type of the window function. Options are 'hann' and 'sin'.
               Default is 'hann'.
    win_size : int > 0, optional
               Size of the window function. Default is 1024.
    syn_hop_size : int > 0, optional
                   Hop size of the synthesis window, usually half of the window size.
                   Default is 512.
    tolerance : int >= 0, optional
                Number of samples the window positions in the input signal may
                be shifted to avoid phase discontinuities when overlap-adding
                them to form the output signal. Default is 512.

    Returns
    -------
    y : numpy.ndarray [shape=(channel, num_samples) or (num_samples)]
        The modified output audio sequence.
    input_positions : numpy.ndarray
                      The input positions corresponding to each output position.
    output_positions : numpy.ndarray
                      The output positions in the output signal.
    """
    # Validate the input audio and scale factor.
    x = _validate_audio(x)
    anc_points = _validate_scale_factor(x, s)

    n_chan = x.shape[0]
    output_length = int(anc_points[1, -1]) + 1

    win = win_func(win_type=win_type, win_size=win_size, zero_pad=0)

    sw_pos = np.arange(0, output_length + win_size // 2, syn_hop_size)
    ana_interpolated = interp1d(anc_points[1, :], anc_points[0, :],
                                fill_value='extrapolate')
    aw_pos = np.round(ana_interpolated(sw_pos)).astype(int)
    ana_hop = np.insert(aw_pos[1:] - aw_pos[:-1], 0, 0)

    y = np.zeros((n_chan, output_length))

    min_fac = np.min(syn_hop_size / ana_hop[1:])

    # Padding the input audio sequence.
    left_pad = int(win_size // 2 + tolerance)
    right_pad = int(np.ceil(1 / min_fac) * win_size + tolerance)
    x_padded = np.pad(x, ((0, 0), (left_pad, right_pad)), 'constant')

    aw_pos += tolerance

    deltas = np.zeros(len(aw_pos))
    input_positions = []
    output_positions = []

    # Applying WSOLA to each channel
    for c, x_chan in enumerate(x_padded):
        y_chan = np.zeros(output_length + 2 * win_size)
        ow = np.zeros(output_length + 2 * win_size)

        delta = 0

        for i in range(len(aw_pos) - 1):
            x_adj = x_chan[aw_pos[i] + delta: aw_pos[i] + win_size + delta]
            y_chan[sw_pos[i]: sw_pos[i] + win_size] += x_adj * win
            ow[sw_pos[i]: sw_pos[i] + win_size] += win

            nat_prog = x_chan[aw_pos[i] + delta + syn_hop_size:
                              aw_pos[i] + delta + syn_hop_size + win_size]

            next_aw_range = np.arange(aw_pos[i+1] - tolerance,
                                      aw_pos[i+1] + win_size + tolerance)

            x_next = x_chan[next_aw_range]

            cross_corr = np.correlate(nat_prog, x_next)
            max_index = np.argmax(cross_corr)

            delta = tolerance - max_index
            deltas[i] = delta

            # Record input-output position mapping
            input_positions.append(aw_pos[i] + delta)
            output_positions.append(sw_pos[i])

        # Calculate last frame
        x_adj = x_chan[aw_pos[-1] + delta: aw_pos[-1] + win_size + delta]
        y_chan[sw_pos[-1]: sw_pos[-1] + win_size] += x_adj * win
        ow[sw_pos[-1]: sw_pos[-1] + win_size] += win

        ow[ow < 1e-3] = 1

        y_chan = y_chan / ow
        y_chan = y_chan[win_size // 2:]
        y_chan = y_chan[: output_length]

        y[c, :] = y_chan

    # Add the last positions
    input_positions.append(aw_pos[-1] + delta)
    output_positions.append(sw_pos[-1])

    return y.squeeze(), np.array(input_positions), np.array(output_positions)


### Code to generate transcripts using whisper large v2 model

# %%writefile whatever.py

import whisper
import torch
from pydub import AudioSegment

def generate_transcriptions(audio_path):

    # def split_audio(audio, chunk_length=30, overlap=0):
    #     audio = audio
    #     chunks = []
    #     step = chunk_length - overlap
    #     for start in range(0, len(audio), step * 1000):
    #         end = start + chunk_length * 1000
    #         chunks.append(audio[start:end])
    #     return chunks

    def transcribe_chunks(audio_path, model, device):
        transcriptions = []
        # for chunk in chunks:
        audio = whisper.load_audio(audio_path)
        audio = whisper.pad_or_trim(audio)
        audio = torch.tensor(audio).to(device)
        result = model.transcribe(audio, language='sa', word_timestamps=True)
        transcriptions.append(result)
        return transcriptions

    device = "cuda" if torch.cuda.is_available() else "cpu"
    model = whisper.load_model("large-v2").to(device)

    # chunks = split_audio(audio)
    # print(f"Split audio into {len(chunks)} chunks")

    transcriptions = transcribe_chunks(audio_path, model, device)
    # print(f"Transcribed chunks: {transcriptions}")

    return transcriptions


In [3]:
def get_lists(speed_factor, audio_path ):

  """
  Slows down the audio file from audio path and splits it into 30 sec chunks.
  These chunks are transcribed spearately using whisper large-v2 model.
  The lists of words, word start times, word end times from each of the chunks is merged.
  Returns lists named words, word_start_times, word_end_times.

  Inputs: speed_factor, audio_path
  Output: words, word_start_times, word_end_times

  """

  # Get the audio
  audio = AudioSegment.from_file(audio_path)

  # converting audio into numpy nd array
  audio_array, sr = librosa.load(audio_path, sr= 16000)

  # Slow it down for processing
  audio, input_positions, output_positions = mod_wsola(audio_array, speed_factor)

  # Generate transcripts
  transcriptions = generate_transcriptions(audio_path)

  # Get the words, word start times, word end times
  words = []
  word_start_times = []
  word_end_times = []
  for transcription in transcriptions:
    for segment in transcription['segments']:
      for word in segment['words']:
        # print(word['word'])
        words.append(word['word'])
        # print(word['start'])
        word_start_times.append(word['start'])
        # print(word['end'])
        word_end_times.append(word['end'])

  # Process words, word_start_times, word_end_times
  # idxs = np.where(np.array(word_start_times == 0))[0]

  # gg = []
  # gg.extend(word_start_times[0:idxs[0]])
  # # print(len(gg))
  # gg.extend(word_start_times[idxs[0]:idxs[1]]+word_end_times[idxs[0]])
  # # print(len(gg))
  # gg.extend(word_start_times[idxs[1]:]+word_end_times[idxs[0]]+word_end_times[idxs[1]])
  # # print(len(gg))

  # ge = []
  # ge.extend(word_end_times[0:idxs[0]])
  # # print(len(gg))
  # ge.extend(word_end_times[idxs[0]:idxs[1]]+word_end_times[idxs[0]])
  # # print(len(gg))
  # ge.extend(word_end_times[idxs[1]:]+word_end_times[idxs[0]]+word_end_times[idxs[1]])


  def mod_time_mapping_wsola(input_positions, output_positions):
      """Map time stamps from the input to the stretched output using exact positions.

      Parameters
      ----------
      input_positions : numpy.ndarray
                        The input positions corresponding to each output position.
      output_positions : numpy.ndarray
                        The output positions in the output signal.

      Returns
      -------
      input_to_output : function
                        A function that maps input time stamps to output time stamps.
      output_to_input : function
                        A function that maps output time stamps to input time stamps.
      """
      # Create a list of tuples for quick lookup
      position_map = list(zip(input_positions, output_positions))

      # Define the mapping functions
      def input_to_output(input_time):
        # Find the closest input position
        closest_idx = np.argmin(np.abs(input_positions - input_time))
        return output_positions[closest_idx]

      def output_to_input(output_time):
        # Find the closest output position
        closest_idx = np.argmin(np.abs(output_positions - output_time))
        return input_positions[closest_idx]

      return input_to_output, output_to_input


  word_start_times_reversed = []
  word_end_times_reversed = []

  input_to_output, output_to_input = mod_time_mapping_wsola(input_positions, output_positions)

  for i in word_start_times:
    word_start_times_reversed.append(output_to_input(i*sr)/sr)

  for i in word_end_times:
    word_end_times_reversed.append(output_to_input(i*sr)/sr)

  return words, word_start_times_reversed, word_end_times_reversed

In [4]:
w, wst, wet = get_lists(float(9/10), audio_path = '/content/Kanda4_Mantra_no_sil_pydub_sil_0004.wav')

100%|██████████████████████████████████████| 2.87G/2.87G [00:28<00:00, 109MiB/s]


In [5]:
for a,b,c in zip(w,wst,wet):
  print(a,b,c)

 सही 0.0284375 0.4239375
 दिवस्सपुरूतिव्यारूतस्था 0.4239375 2.5570625
 महीक्षेमंरोदासी 2.5570625 4.302
 असकाभागतु 4.302 5.1955625
 महान 5.1955625 5.90775
 मही 5.90775 6.2596875
 असकाभागत्विजातो 6.2596875 7.9501875
 द्याम् 7.9501875 8.5739375
 सत्वपार्थीवन्चरजाहम् 8.5739375 9.4255


In [12]:
from difflib import SequenceMatcher

def match(a, b):
  return SequenceMatcher(None, a, b).ratio()

sent = ''.join(w)
# print(sent)

match(sent, 'स हि दि॒वः स पृ॑थि॒व्या ऋ॑त॒स्था म॒ही क्षेमं॒ रोद॑सी अस्कभायत् म॒हान्म॒ही अस्क॑भाय॒द्वि जा॒तो द्यां सद्म॒ पार्थि॑वं च॒ रजः॑')

0.6782608695652174

In [7]:
w

[' सही',
 ' दिवस्सपुरूतिव्यारूतस्था',
 ' महीक्षेमंरोदासी',
 ' असकाभागतु',
 ' महान',
 ' मही',
 ' असकाभागत्विजातो',
 ' द्याम्',
 ' सत्वपार्थीवन्चरजाहम्']

In [3]:
# transcriptions = generate_transcriptions('/content/Kanda4_Mantra_no_sil_pydub_sil_0004.wav')

100%|██████████████████████████████████████| 2.87G/2.87G [00:28<00:00, 109MiB/s]


In [12]:
a,b = librosa.load('/content/Kanda4_Mantra_no_sil_pydub_sil_0004.wav', sr = 16000)

In [None]:
# def get_score(lists):
#   ###
#   return score