In [1]:

import math
import numpy
import librosa
from enum import Enum
import struct

#from scipy.spatial.distance import cdist

NUM_MFCC = 13
NUM_MELS = 40
SAMPLE_RATE = 16000
NUM_FFT = 2048
FFT_ORDER  = 512
LOWER_FREQ = 133.333300
HIGHEST_FREQ = 6855.497600
WINDOW_LENGTH = int(0.10 * SAMPLE_RATE)
HOP_LENGTH = int(0.04 * SAMPLE_RATE)
EMPHASIS_FACTOR = 0.97
DTW_MARGIN = 60

class AlignmentAlgorithm(Enum):
    DTW_STRIPE = "DTW_STRIPE"
    DTW_EXACT = "DTW_EXACT"

DTW_ALGORITHM = AlignmentAlgorithm.DTW_EXACT


"""
This module contains the implementation
of dynamic time warping (DTW) algorithms
to align two audio waves, represented by their
Mel-frequency cepstral coefficients (MFCCs).

This module contains the following classes:

* :class:`~aeneas.dtw.DTWAlgorithm`,
  an enumeration of the available algorithms;
* :class:`~aeneas.dtw.DTWAligner`,
  the actual wave aligner;
* :class:`~aeneas.dtw.DTWExact`,
  a DTW aligner implementing the exact (full) DTW algorithm;
* :class:`~aeneas.dtw.DTWStripe`,
  a DTW aligner implementing the Sachoe-Chiba band heuristic.

To align two wave files:

1. build an :class:`~aeneas.dtw.DTWAligner` object,
   passing in the constructor
   the paths of the two wave files
   or their MFCC representations;
2. call :func:`~aeneas.dtw.DTWAligner.compute_path`
   to compute the min cost path between
   the MFCC representations of the two wave files.

.. warning:: This module might be refactored in a future version
"""

class aeneas_dtw():

    def __init__(
        self,
        params, q_audio, r_audio
    ):
        self.params = params
        self.query_audio = q_audio 
        self.ref_audio = r_audio
        self._setup_dtw()

    def compute_accumulated_cost_matrix(self):
        """
        Compute the accumulated cost matrix, and return it.

        Return ``None`` if the accumulated cost matrix cannot be computed
        because one of the two waves is empty after masking (if requested).

        :rtype: :class:`numpy.ndarray` (2D)
        :raises: RuntimeError: if both the C extension and
                               the pure Python code did not succeed.

        .. versionadded:: 1.2.0
        """
        if self.dtw is None:
            assert 0
        return self.dtw.compute_accumulated_cost_matrix()

    def compute_path(self):
        """
        Compute the min cost path between the two waves, and return it.

        Return the computed path as a tuple with two elements,
        each being a :class:`numpy.ndarray` (1D) of ``int`` indices: ::

        ([r_1, r_2, ..., r_k], [s_1, s_2, ..., s_k])

        where ``r_i`` are the indices in the real wave
        and ``s_i`` are the indices in the synthesized wave,
        and ``k`` is the length of the min cost path.

        Return ``None`` if the accumulated cost matrix cannot be computed
        because one of the two waves is empty after masking (if requested).

        :rtype: tuple (see above)
        :raises: RuntimeError: if both the C extension and
                               the pure Python code did not succeed.
        """
        wave_path = self.dtw.compute_path()
        return wave_path    

    def _setup_dtw(self):
        """
        Set the DTW object up.
        """

        # setup
        window_shift = self.params.hop_length/self.params.sr
        delta = int(2 * self.params.dtw_margin / window_shift)
        
        # set the selected algorithm
        if self.params.dtw_algorithm == AlignmentAlgorithm.DTW_EXACT:
            self.dtw = DTWExact(
                self.query_audio.mfcc,
                self.ref_audio.mfcc                
            )
        else:
            self.dtw = DTWStripe(
                self.query_audio.mfcc,
                self.ref_audio.mfcc,
                delta
            )

class DTWStripe():

    def __init__(self, m1, m2, delta):
        self.m1 = m1
        self.m2 = m2
        self.delta = delta

    def compute_path(self):
        try:
            cost_matrix, centers = self._compute_cost_matrix()
            accumulated_cost_matrix = self._compute_accumulated_cost_matrix(cost_matrix, centers)
            best_path = self._compute_best_path(accumulated_cost_matrix, centers)
            return best_path
        except Exception as exc:
            print("An unexpected error occurred while running pure Python code", exc, False, None)
        return (False, None)

    def _compute_cost_matrix(self):
        # discard first MFCC component
        mfcc1 = self.m1[1:, :]
        mfcc2 = self.m2[1:, :]
        norm2_1 = numpy.sqrt(numpy.sum(mfcc1 ** 2, 0))
        norm2_2 = numpy.sqrt(numpy.sum(mfcc2 ** 2, 0))
        n = mfcc1.shape[1]
        m = mfcc2.shape[1]
        delta = self.delta
        if delta > m:
            delta = m
        cost_matrix = numpy.zeros((n, delta))
        centers = numpy.zeros(n, dtype=int)
        for i in range(n):
            # center j at row i
            center_j = (m * i) // n
            # COMMENTED self.log([u"Center at row %d is %d", i, center_j])
            range_start = max(0, center_j - (delta // 2))
            range_end = range_start + delta
            if range_end > m:
                range_end = m
                range_start = range_end - delta
            centers[i] = range_start
            # COMMENTED self.log([u"Range at row %d is %d %d", i, range_start, range_end])
            for j in range(range_start, range_end):
                tmp = mfcc1[:, i].transpose().dot(mfcc2[:, j])
                tmp /= norm2_1[i] * norm2_2[j]
                cost_matrix[i][j - range_start] = 1 - tmp
        return (cost_matrix, centers)

    def _compute_accumulated_cost_matrix(self, cost_matrix, centers):
        # create accumulated cost matrix
        #
        # a[i][j] = c[i][j] + min(c[i-1][j-1], c[i-1][j], c[i][j-1])
        #
        return self._compute_acm_in_place(cost_matrix, centers)

    def _compute_acm_in_place(self, cost_matrix, centers):
        n, delta = cost_matrix.shape
        current_row = numpy.copy(cost_matrix[0, :])
        # COMMENTED cost_matrix[0][0] = current_row[0]
        for j in range(1, delta):
            cost_matrix[0][j] = current_row[j] + cost_matrix[0][j - 1]
        # fill table
        for i in range(1, n):
            current_row = numpy.copy(cost_matrix[i, :])
            offset = centers[i] - centers[i - 1]
            for j in range(delta):
                cost0 = numpy.inf
                if (j + offset) < delta:
                    cost0 = cost_matrix[i - 1][j + offset]
                cost1 = numpy.inf
                if j > 0:
                    cost1 = cost_matrix[i][j - 1]
                cost2 = numpy.inf
                if ((j + offset - 1) < delta) and ((j + offset - 1) >= 0):
                    cost2 = cost_matrix[i - 1][j + offset - 1]
                cost_matrix[i][j] = current_row[j] + min(cost0, cost1, cost2)
        return cost_matrix

    def _compute_best_path(self, acc_matrix, centers):
        # get dimensions
        n, delta = acc_matrix.shape
        i = n - 1
        j = delta - 1 + centers[i]
        path = [(i, j)]
        # compute best (min cost) path
        while (i > 0) or (j > 0):
            if i == 0:
                path.append((0, j - 1))
                j -= 1
            elif j == 0:
                path.append((i - 1, 0))
                i -= 1
            else:
                offset = centers[i] - centers[i - 1]
                r_j = j - centers[i]
                cost0 = numpy.inf
                if (r_j + offset) < delta:
                    cost0 = acc_matrix[i - 1][r_j + offset]
                cost1 = numpy.inf
                if r_j > 0:
                    cost1 = acc_matrix[i][r_j - 1]
                cost2 = numpy.inf
                if (r_j > 0) and ((r_j + offset - 1) < delta) and ((r_j + offset - 1) >= 0):
                    cost2 = acc_matrix[i - 1][r_j + offset - 1]
                costs = [
                    cost0,
                    cost1,
                    cost2
                ]
                moves = [
                    (i - 1, j),
                    (i, j - 1),
                    (i - 1, j - 1)
                ]
                min_cost = numpy.argmin(costs)
                # COMMENTED self.log([u"Selected min cost move %d", min_cost])
                min_move = moves[min_cost]
                path.append(min_move)
                i, j = min_move
        return path

class DTWExact():

    def __init__(self, m1, m2):
        self.m1 = m1
        self.m2 = m2

    def compute_accumulated_cost_matrix(self):
        cost_matrix = self._compute_cost_matrix()
        accumulated_cost_matrix = self._compute_accumulated_cost_matrix(cost_matrix)
        return accumulated_cost_matrix

    def compute_path(self):
        accumulated_cost_matrix = self.compute_accumulated_cost_matrix()
        best_path = self._compute_best_path(accumulated_cost_matrix)
        return best_path

    def _compute_cost_matrix(self):
        # discard first MFCC component
        mfcc1 = self.m1[1:, :]
        mfcc2 = self.m2[1:, :]
        norm2_1 = numpy.sqrt(numpy.sum(mfcc1 ** 2, 0))
        norm2_2 = numpy.sqrt(numpy.sum(mfcc2 ** 2, 0))
        # compute dot product
        cost_matrix = mfcc1.transpose().dot(mfcc2)
        # normalize
        norm_matrix = numpy.outer(norm2_1, norm2_2)
        cost_matrix = 1 - (cost_matrix / norm_matrix)
        return cost_matrix

    def _compute_accumulated_cost_matrix(self, cost_matrix):
        # create accumulated cost matrix
        #
        # a[i][j] = c[i][j] + min(c[i-1][j-1], c[i-1][j], c[i][j-1])
        #
        return self._compute_acm_in_place(cost_matrix)

    def _compute_acm_in_place(self, cost_matrix):
        n, m = cost_matrix.shape
        current_row = numpy.copy(cost_matrix[0, :])
        # COMMENTED cost_matrix[0][0] = current_row[0]
        for j in range(1, m):
            cost_matrix[0][j] = current_row[j] + cost_matrix[0][j - 1]
        for i in range(1, n):
            current_row = numpy.copy(cost_matrix[i, :])
            cost_matrix[i][0] = cost_matrix[i - 1][0] + current_row[0]
            for j in range(1, m):
                cost_matrix[i][j] = current_row[j] + min(
                    cost_matrix[i - 1][j],
                    cost_matrix[i][j - 1],
                    cost_matrix[i - 1][j - 1]
                )
        return cost_matrix

    def _compute_best_path(self, acc_matrix):
        # get dimensions
        n, m = acc_matrix.shape
        i = n - 1
        j = m - 1
        path = [(i, j)]
        # compute best (min cost) path
        while (i > 0) or (j > 0):
            if i == 0:
                path.append((0, j - 1))
                j -= 1
            elif j == 0:
                path.append((i - 1, 0))
                i -= 1
            else:
                costs = [
                    acc_matrix[i - 1][j],
                    acc_matrix[i][j - 1],
                    acc_matrix[i - 1][j - 1]
                ]
                moves = [
                    (i - 1, j),
                    (i, j - 1),
                    (i - 1, j - 1)
                ]
                min_cost = numpy.argmin(costs)
                # COMMENTED self.log([u"Selected min cost move %d", min_cost])
                min_move = moves[min_cost]
                path.append(min_move)
                i, j = min_move
        return path

class aeneas_mfcc():
    """
    A class for computing Mel-frequency cepstral coefficients (MFCCs).
    """

    CUTOFF = 0.00001
    """ Cut-off threshold """

    MEL_10 = 2595.0
    """ Base Mel frequency """

    TAG = u"MFCC"

    def __init__(self, params):

        # store parameters in local attributes
        self.filter_bank_size = params.n_mel
        self.mfcc_size = params.n_mfcc
        self.fft_order = params.fft_order
        self.lower_frequency = params.low_freq
        self.upper_frequency = params.high_freq
        self.emphasis_factor = params.emphasis_factor
        self.window_length = params.win_length/params.sr
        self.window_shift = params.hop_length/params.sr

        # initialize DCT matrix
        self._create_dct_matrix()

        # initialized later by compute_from_data()
        self.data = None
        self.sample_rate = None
        self.filters = None
        self.hamming_window = None

    @classmethod
    def _hz2mel(cls, frequency):
        """
        Convert the given frequency in Hz to the Mel scale.

        :param float frequency: the Hz frequency to convert
        :rtype: float
        """
        return cls.MEL_10 * math.log10(1.0 + (frequency / 700.0))

    @classmethod
    def _mel2hz(cls, mel):
        """
        Convert the given Mel value to Hz frequency.

        :param float mel: the Mel value to convert
        :rtype: float
        """
        return 700.0 * (10 ** (mel / cls.MEL_10) - 1)

    def _create_dct_matrix(self):
        """
        Create the not-quite-DCT matrix as used by Sphinx,
        and store it in ```self.s2dct```.
        """
        self.s2dct = numpy.zeros((self.mfcc_size, self.filter_bank_size))
        for i in range(0, self.mfcc_size):
            freq = numpy.pi * float(i) / self.filter_bank_size
            self.s2dct[i] = numpy.cos(freq * numpy.arange(0.5, 0.5 + self.filter_bank_size, 1.0, 'float64'))
        self.s2dct[:, 0] *= 0.5
        self.s2dct = self.s2dct.transpose()

    def _create_mel_filter_bank(self):
        """
        Create the Mel filter bank,
        and store it in ``self.filters``.

        Note that it is a function of the audio sample rate,
        so it cannot be created in the class initializer,
        but only later in :func:`aeneas.mfcc.MFCC.compute_from_data`.
        """
        self.filters = numpy.zeros((1 + (self.fft_order // 2), self.filter_bank_size), 'd')
        dfreq = float(self.sample_rate) / self.fft_order
        nyquist_frequency = self.sample_rate / 2
        if self.upper_frequency > nyquist_frequency:
            self.log_exc(u"Upper frequency %f exceeds Nyquist frequency %f" % (self.upper_frequency, nyquist_frequency), None, True, ValueError)
        melmax = aeneas_mfcc._hz2mel(self.upper_frequency)
        melmin = aeneas_mfcc._hz2mel(self.lower_frequency)
        dmelbw = (melmax - melmin) / (self.filter_bank_size + 1)
        filt_edge = aeneas_mfcc._mel2hz(melmin + dmelbw * numpy.arange(self.filter_bank_size + 2, dtype='d'))

        # TODO can this code be written more numpy-style?
        #      (the performance loss is negligible, it is just ugly to see)
        for whichfilt in range(0, self.filter_bank_size):
            # int() casts to native int instead of working with numpy.float64
            leftfr = int(round(filt_edge[whichfilt] / dfreq))
            centerfr = int(round(filt_edge[whichfilt + 1] / dfreq))
            rightfr = int(round(filt_edge[whichfilt + 2] / dfreq))
            fwidth = (rightfr - leftfr) * dfreq
            height = 2.0 / fwidth
            if centerfr != leftfr:
                leftslope = height / (centerfr - leftfr)
            else:
                leftslope = 0
            freq = leftfr + 1
            while freq < centerfr:
                self.filters[freq, whichfilt] = (freq - leftfr) * leftslope
                freq = freq + 1
            # the next if should always be true!
            if freq == centerfr:
                self.filters[freq, whichfilt] = height
                freq = freq + 1
            if centerfr != rightfr:
                rightslope = height / (centerfr - rightfr)
            while freq < rightfr:
                self.filters[freq, whichfilt] = (freq - rightfr) * rightslope
                freq = freq + 1

    def _pre_emphasis(self):
        """
        Pre-emphasize the entire signal at once by self.emphasis_factor,
        overwriting ``self.data``.
        """
        self.data = numpy.append(self.data[0], self.data[1:] - self.emphasis_factor * self.data[:-1])

    def compute_from_data(self, data, sample_rate):
        """
        Compute MFCCs for the given audio data.

        The audio data must be a 1D :class:`numpy.ndarray`,
        that is, it must represent a monoaural (single channel)
        array of ``float64`` values in ``[-1.0, 1.0]``.

        :param data: the audio data
        :type  data: :class:`numpy.ndarray` (1D)
        :param int sample_rate: the sample rate of the audio data, in samples/s (Hz)
        :raises: ValueError: if the data is not a 1D :class:`numpy.ndarray` (i.e., not mono),
                             or if the data is empty
        :raises: ValueError: if the upper frequency defined in the ``rconf`` is
                             larger than the Nyquist frequenct (i.e., half of ``sample_rate``)
        """
        def _process_frame(self, frame):
            """
            Process each frame, returning the log(power()) of it.
            """
            # apply Hamming window
            frame *= self.hamming_window
            # compute RFFT
            fft = numpy.fft.rfft(frame, self.fft_order)
            # equivalent to power = fft.real * fft.real + fft.imag * fft.imag
            power = numpy.square(numpy.absolute(fft))
            #
            # return the log(power()) of the transformed vector
            # v1
            # COMMENTED logspec = numpy.log(numpy.dot(power, self.filters).clip(self.CUTOFF, numpy.inf))
            # COMMENTED return numpy.dot(logspec, self.s2dct) / self.filter_bank_size
            # v2
            return numpy.log(numpy.dot(power, self.filters).clip(self.CUTOFF, numpy.inf))

        if len(data.shape) != 1:
            self.log_exc(u"The audio data must be a 1D numpy array (mono).", None, True, ValueError)
        if len(data) < 1:
            self.log_exc(u"The audio data must not be empty.", None, True, ValueError)

        self.data = data
        self.sample_rate = sample_rate

        # number of samples in the audio
        data_length = len(self.data)

        # frame length in number of samples
        frame_length = int(self.window_length * self.sample_rate)

        # frame length must be at least equal to the FFT order
        frame_length_padded = max(frame_length, self.fft_order)

        # frame shift in number of samples
        frame_shift = int(self.window_shift * self.sample_rate)

        # number of MFCC vectors (one for each frame)
        # this number includes the last shift,
        # where the data will be padded with zeros
        # if the remaining samples are less than frame_length_padded
        number_of_frames = int((1.0 * data_length) / frame_shift)

        # create Hamming window
        self.hamming_window = numpy.hamming(frame_length_padded)

        # build Mel filter bank
        self._create_mel_filter_bank()

        # pre-emphasize the entire audio data
        self._pre_emphasis()

        # allocate the MFCCs matrix
        # v1
        # COMMENTED mfcc = numpy.zeros((number_of_frames, self.mfcc_size), 'float64')
        # v2
        mfcc = numpy.zeros((number_of_frames, self.filter_bank_size), 'float64')

        # compute MFCCs one frame at a time
        for frame_index in range(number_of_frames):
            # COMMENTED print("Computing frame %d / %d" % (frame_index, number_of_frames))

            # get the start and end indices for this frame,
            # do not overrun the data length
            frame_start = frame_index * frame_shift
            frame_end = min(frame_start + frame_length_padded, data_length)

            # frame is zero-padded if the remaining samples
            # are less than its length
            frame = numpy.zeros(frame_length_padded)
            frame[0:(frame_end - frame_start)] = self.data[frame_start:frame_end]

            # process the frame
            mfcc[frame_index] = _process_frame(self, frame)

        # v1
        # COMMENTED return mfcc
        # v2
        # return the dot product with the DCT matrix
        return numpy.dot(mfcc, self.s2dct) / self.filter_bank_size


def mfccs_to_secs(params, val):
    return val * params.hop_length / params.sr

def secs_to_mfccs(params, val):
    return int(val * params.sr / params.hop_length)
    
class AlignmentPair:

    def __init__(self, params, q_audio, r_audio):

        self.params = params
        self.query_audio = q_audio
        self.ref_audio = r_audio
        
        assert len(q_audio.labels) == len(r_audio.labels)
                    
    def _calc_alignment_deviation(self, query_labels, 
                                 aligned_indices, 
                                 aligned_indices_mfcc):
    
        total_st_diff, total_end_diff = 0,0
        total_st_diff_mfcc, total_end_diff_mfcc = 0,0
        params = self.params
            
        assert len(query_labels) == len(aligned_indices)
        
        for i in range(0, len(query_labels)):
            exp_st = query_labels[i]["start"]
            exp_end = query_labels[i]["end"]
            act_st = aligned_indices[i]["start"]
            act_end = aligned_indices[i]["end"]
            
            exp_st_mfcc = secs_to_mfccs(params, query_labels[i]["start"])
            exp_end_mfcc = secs_to_mfccs(params, query_labels[i]["end"])
            act_st_mfcc = aligned_indices_mfcc[i]["start"]
            act_end_mfcc = aligned_indices_mfcc[i]["end"]
            
            total_st_diff += abs(exp_st - act_st)
            total_end_diff += abs(exp_end - act_end)
    
            total_st_diff_mfcc += abs(exp_st_mfcc - act_st_mfcc)
            total_end_diff_mfcc += abs(exp_end_mfcc - act_end_mfcc)
    
        return total_st_diff, total_end_diff, total_st_diff_mfcc, total_end_diff_mfcc
    
    def _find_alignment_range(self, start, end, wp, search_start):

        alignment_st, alignment_end = -1, -1
        for i in range(search_start, len(wp)):

            if (wp[i][1] == start and alignment_st == -1):
                assert alignment_end == -1
                alignment_st = wp[i][0]
                
            if (wp[i][1] > end):
                if (end != wp[i-1][1]):
                    print("end %d i %d val %d curr %d" 
                          %(end, i, wp[i-1][1], wp[i][1]))
                    assert 0
                alignment_end = wp[i-1][0]
                break

        if (alignment_st == -1):
            print("ERROR : i %d start %d len %d" 
                  %(i, search_start, len(wp)))
        if (alignment_end == -1):
            alignment_end = wp[i][0]
            
        return alignment_st, alignment_end, i-1

    def _get_alignment_indices(self, wp, ref_labels):
        aligned_indices = []
        aligned_indices_mfcc = []
        search_start = 0
        for i in range(0, len(ref_labels)):
            ref_st = secs_to_mfccs(self.params, ref_labels[i]["start"])
            ref_end = secs_to_mfccs(self.params, ref_labels[i]["end"])
            alignment_st, alignment_end, search_start = \
            self._find_alignment_range(ref_st, ref_end, wp, search_start)
            aligned_indices_mfcc.append(
                                {"start": alignment_st, 
                               "end": alignment_end})
            aligned_indices.append(
                            {"start": mfccs_to_secs(self.params, alignment_st), 
                             "end": mfccs_to_secs(self.params, alignment_end)})
        return aligned_indices, aligned_indices_mfcc

    def get_alignment_deviation_normal(self):
        wp = self.wp[::-1]
        aligned_indices, aligned_indices_mfcc = \
            self._get_alignment_indices(wp, self.ref_audio.labels)
        return self._calc_alignment_deviation(self.query_audio.labels, 
                                     aligned_indices, 
                                     aligned_indices_mfcc)

    def get_alignment_deviation_swap(self):
        wp = [(point[1], point[0]) for point in self.wp]
        wp = wp[::-1]
        aligned_indices, aligned_indices_mfcc = \
            self._get_alignment_indices(wp, self.query_audio.labels)
        return self._calc_alignment_deviation(self.ref_audio.labels, 
                                     aligned_indices, 
                                     aligned_indices_mfcc)
    
    def dump_warping_path(self, f_name):
        
        with open(f_name, "wb") as f:
            f.write(struct.pack("<i", len(self.wp)))
            for x, y in self.wp:
              f.write(struct.pack("<ii", x, y))
              
    def set_warping_path(self, f_name):
        self.wp= []
        with open(f_name, "rb") as f:
            # Read the number of elements
            num_elements = struct.unpack("<i", f.read(4))[0]
            print(num_elements)
        
            for _ in range(num_elements):
              x, y = struct.unpack("<ii", f.read(8))
              self.wp.append((x, y))    

class AlignmentPairLibrosa(AlignmentPair):

    def get_warping_path(self):

        params = self.params
        q_audio = self.query_audio
        r_audio = self.ref_audio
        
        g_c_val = True
        if (params.dtw_algorithm == AlignmentAlgorithm.DTW_EXACT):
            g_c_val = False
        
        assert len(q_audio.labels) == len(r_audio.labels)
                
        #distance = cdist(q_audio.mfcc.T, r_audio.mfcc.T)
        #D, self.wp = librosa.sequence.dtw(C = distance, subseq=False)
        _, self.wp = librosa.sequence.dtw(q_audio.mfcc, 
                                          r_audio.mfcc,
                                          global_constraints=g_c_val,
                                          backtrack=True)

class AlignmentPairAeneas(AlignmentPair):

    def get_warping_path(self):

        params = self.params
        q_audio = self.query_audio
        r_audio = self.ref_audio
        
        assert len(q_audio.labels) == len(r_audio.labels)
                
        a_dtw = aeneas_dtw(params, q_audio, r_audio)
        self.wp = a_dtw.compute_path()

class AudioText:
    
    def __init__(self, params, filename, labelname):
        self.params = params
        self.filename = filename
        self.labelname = labelname
        self.labels  = self._construct_labels(labelname)
        self.audio, sample_rate = librosa.load(self.filename, sr=params.sr)

    def _construct_labels(self, labelname):
        
        begin, end = -1,-1
        labels = []
        word="NULL"
        with open(labelname, "r", encoding="utf-8") as fp:
            lines = fp.readlines()
            line_idx = 0
            for line in lines:
                word = 'begin'
                if line.find(word) != -1:
                    parts = line.split(": \"")
                    assert(end == -1)
                    assert(begin == -1)            
                    begin = float(parts[1][0:-3])
                word = 'end'
                if line.find(word) != -1:
                    parts = line.split(": \"")
                    assert(end == -1)
                    assert(begin != -1)            
                    end = float(parts[1][0:-3])
                word = 'lines'
                if line.find(word) != -1:
                    word = lines[line_idx + 1]
                    labels.append({"start": begin, 
                                   "end": end, "text": word[5:-2]})
                    begin, end = -1,-1
                line_idx += 1
        return labels    
    
class AudioTextLibrosa(AudioText):
    def calc_mfcc(self, params):
        self.mfcc = librosa.feature.mfcc(
            y=self.audio,
            sr=params.sr,
            n_mfcc=params.n_mfcc,
            win_length=params.win_length,
            hop_length=params.hop_length,
            n_mels = params.n_mel,
            n_fft = params.n_fft,
            fmin = params.low_freq,
            fmax = params.high_freq,
            center=False,
        )
        self.__middle_begin = 0
        self.__middle_end = self.mfcc.shape[1]

class AudioTextAeneas(AudioText):
    def calc_mfcc(self, params):
        self.mfcc = aeneas_mfcc(params
            ).compute_from_data(
            self.audio, params.sr
        ).transpose()
                
class Alignment:

    def __init__(self,
               sr=SAMPLE_RATE, 
               n_fft=NUM_FFT,
               hop_length=HOP_LENGTH,
               window_length=WINDOW_LENGTH, 
               n_mfcc=NUM_MFCC, 
               n_mel=NUM_MELS, 
               low_freq=LOWER_FREQ, 
               high_freq=HIGHEST_FREQ,
               algorithm=DTW_ALGORITHM
               ):
        self.sr = sr
        self.n_fft = n_fft
        self.win_length = window_length
        self.hop_length = hop_length
        self.n_mfcc = n_mfcc
        self.n_mel = n_mel
        self.low_freq = low_freq
        self.high_freq = high_freq
        self.dtw_algorithm = algorithm
        
class AlignmentLibrosa(Alignment):

    def audio_text(self, filename, labelname):
        return AudioTextLibrosa(self, filename, labelname)
    
    def alignment_pair(self, q_audio, r_audio):
        return AlignmentPairLibrosa(self, q_audio, r_audio)
    
class AlignmentAeneas(Alignment):
    def __init__(self, 
                sr=SAMPLE_RATE, 
                n_fft=NUM_FFT,
                hop_length=HOP_LENGTH,
                window_length=WINDOW_LENGTH, 
                n_mfcc=NUM_MFCC, 
                n_mel=NUM_MELS, 
                low_freq=LOWER_FREQ, 
                high_freq=HIGHEST_FREQ,
                algorithm=DTW_ALGORITHM,
                fft_order=FFT_ORDER,
                emphasis_factor = EMPHASIS_FACTOR,
                dtw_margin=DTW_MARGIN):
        super().__init__(sr=SAMPLE_RATE, 
                       n_fft=n_fft,
                       hop_length=hop_length,
                       window_length=window_length, 
                       n_mfcc=n_mfcc, 
                       n_mel=n_mel, 
                       low_freq=low_freq, 
                       high_freq=high_freq,
                       algorithm=algorithm
                       )
        self.fft_order = fft_order
        self.emphasis_factor = emphasis_factor
        self.dtw_margin = dtw_margin
        
    def audio_text(self, filename, labelname):
        return AudioTextAeneas(self, filename, labelname)
    
    def alignment_pair(self, q_audio, r_audio):
        return AlignmentPairAeneas(self, q_audio, r_audio)


1. Create an alignment object for librosa based alignment.
2. Parse the dataset directory and create an audio text object for each of the 14 datasets.
   Load the audio, calulate the mfcc's and read the json file prepared using finetune aeneas
   and get the start and end labels.

In [2]:
g_l = AlignmentLibrosa(algorithm=AlignmentAlgorithm.DTW_STRIPE)

In [41]:
import os
import glob
at_l_arr = []
dir = r"C:\Users\Lenovo\Desktop\sishya\audio alignment vs\audio_alignment - nama\dataset"
for sb in os.listdir(dir):
    f_a = r"%s\audio.mp3"%(os.path.join(dir, sb))       
    f_l = r"%s\label.json"%(os.path.join(dir, sb))

    at_l = g_l.audio_text(f_a, f_l)
    at_l.calc_mfcc(g_l)
    at_l_arr.append(at_l)


1. For the 14 datasets, do an alignment with self.
2. Get the warping path and dump it as a binary file.
3. From the warping path, calculate the deviation.
   That is - the start and end lables of the reference as well as the query are known.
   See where the start and end of the references map to in the query. These will be the aligned indices.
   The difference from the expected (obtained from finetuning query audio) is the alignment deviation.
4. Swap the query and reference. This is just a sanity check in this experiment. It's purpose will be clear
   in the next experiment.


In [56]:
for r_idx in range(0, len(at_l_arr)):
    print("Doing for r:", r_idx)
    ap_l = g_l.alignment_pair(at_l_arr[r_idx], at_l_arr[r_idx])
    ap_l.get_warping_path()
    ap_l.dump_warping_path(r"C:\Users\Lenovo\Desktop\sishya\prints\librosa_stripe\%d_%d.bin" %(r_idx, r_idx))
    print("normal deviation :")
    print(ap_l.get_alignment_deviation_normal())
    print("swap  deviation :")
    print(ap_l.get_alignment_deviation_swap())
    

Doing for r: 0
normal deviation :
(1.179999999999815, 1.1999999999997968, 0, 0)
swap  deviation :
(1.179999999999815, 1.1999999999997968, 0, 0)
Doing for r: 1
normal deviation :
(1.3800000000003765, 1.5000000000002673, 0, 3)
swap  deviation :
(1.3800000000003765, 1.5000000000002673, 0, 3)
Doing for r: 2
normal deviation :
(2.4599999999998694, 2.5799999999997603, 0, 3)
swap  deviation :
(2.4599999999998694, 2.5799999999997603, 0, 3)
Doing for r: 3
normal deviation :
(1.6599999999997337, 1.7799999999997382, 0, 3)
swap  deviation :
(1.6599999999997337, 1.7799999999997382, 0, 3)
Doing for r: 4
normal deviation :
(1.5199999999990403, 1.6399999999991586, 0, 3)
swap  deviation :
(1.5199999999990403, 1.6399999999991586, 0, 3)
Doing for r: 5
normal deviation :
(1.839999999999077, 1.959999999998968, 0, 3)
swap  deviation :
(1.839999999999077, 1.959999999998968, 0, 3)
Doing for r: 6
normal deviation :
(3.560000000000267, 3.680000000000158, 0, 3)
swap  deviation :
(3.560000000000267, 3.68000000000

Read from the binary file and ensure that the deviation values match.

In [57]:
for r_idx in range(0, len(at_l_arr)):
    print("Checking form bin file for r:", r_idx)
    ap_l_b = g_l.alignment_pair(at_l_arr[r_idx], at_l_arr[r_idx])
    ap_l_b.set_warping_path(r"C:\Users\Lenovo\Desktop\sishya\prints\librosa_stripe\%d_%d.bin" %(r_idx, r_idx))
    print("bin file normal deviation :")
    print(ap_l_b.get_alignment_deviation_normal())
    print("bin file swap  deviation :")
    print(ap_l_b.get_alignment_deviation_swap())
    

Checking form bin file for r: 0
30251
bin file normal deviation :
(1.179999999999815, 1.1999999999997968, 0, 0)
bin file swap  deviation :
(1.179999999999815, 1.1999999999997968, 0, 0)
Checking form bin file for r: 1
28773
bin file normal deviation :
(1.3800000000003765, 1.5000000000002673, 0, 3)
bin file swap  deviation :
(1.3800000000003765, 1.5000000000002673, 0, 3)
Checking form bin file for r: 2
31376
bin file normal deviation :
(2.4599999999998694, 2.5799999999997603, 0, 3)
bin file swap  deviation :
(2.4599999999998694, 2.5799999999997603, 0, 3)
Checking form bin file for r: 3
25193
bin file normal deviation :
(1.6599999999997337, 1.7799999999997382, 0, 3)
bin file swap  deviation :
(1.6599999999997337, 1.7799999999997382, 0, 3)
Checking form bin file for r: 4
38596
bin file normal deviation :
(1.5199999999990403, 1.6399999999991586, 0, 3)
bin file swap  deviation :
(1.5199999999990403, 1.6399999999991586, 0, 3)
Checking form bin file for r: 5
30313
bin file normal deviation :
(

RESULTS AND OBSERVATIONS:

1. The deviations were minimal.
2. In most cases the mfcc indices mathched correctly, but in some very few cases they did not.
   This is the limitation of the algorithm.
3. Even in cases where mfcc indices matched, the timestamps did not match.
   This is because of the error in conversion when timestamps get converted to mfcc indices and back to milliseconds.

Time taken for the experiment:
Each alignment finished in about a minute.


DOING THE SAME FOR AENEAS

Create an object for doing aeneas alignment. And do the alignment with self and 
dump the warp path and check the deviation values.

In [58]:
g_a = AlignmentAeneas(algorithm=AlignmentAlgorithm.DTW_STRIPE)

In [59]:
import os
import glob
at_a_arr = []
dir = r"C:\Users\Lenovo\Desktop\sishya\audio alignment vs\audio_alignment - nama\dataset"
for sb in os.listdir(dir):
    f_a = r"%s\audio.mp3"%(os.path.join(dir, sb))       
    f_l = r"%s\label.json"%(os.path.join(dir, sb))

    at_a = g_a.audio_text(f_a, f_l)
    at_a.calc_mfcc(g_a)
    at_a_arr.append(at_a)


In [60]:
for r_idx in range(0, len(at_a_arr)):
    print("Doing for r:", r_idx)
    ap_a = g_a.alignment_pair(at_a_arr[r_idx], at_a_arr[r_idx])
    ap_a.get_warping_path()
    ap_a.dump_warping_path(r"C:\Users\Lenovo\Desktop\sishya\prints\aeneas_stripe\%d_%d.bin" %(r_idx, r_idx))
    print("normal deviation :")
    print(ap_a.get_alignment_deviation_normal())
    print("swap  deviation :")
    print(ap_a.get_alignment_deviation_swap())
    

Doing for r: 0
normal deviation :
(1.179999999999815, 1.1999999999997968, 0, 0)
swap  deviation :
(1.179999999999815, 1.1999999999997968, 0, 0)
Doing for r: 1
normal deviation :
(2.6200000000001014, 2.660000000000065, 31, 32)
swap  deviation :
(31.820000000000885, 31.86000000000085, 761, 762)
Doing for r: 2
normal deviation :
(2.4599999999998694, 2.499999999999833, 0, 1)
swap  deviation :
(2.4599999999998694, 2.499999999999833, 0, 1)
Doing for r: 3
normal deviation :
(1.6599999999997337, 1.6999999999996973, 0, 1)
swap  deviation :
(1.6599999999997337, 1.6999999999996973, 0, 1)
Doing for r: 4
normal deviation :
(1.5199999999990403, 1.559999999999004, 0, 1)
swap  deviation :
(1.5199999999990403, 1.559999999999004, 0, 1)
Doing for r: 5
normal deviation :
(1.839999999999077, 1.8799999999990407, 0, 1)
swap  deviation :
(1.839999999999077, 1.8799999999990407, 0, 1)
Doing for r: 6
normal deviation :
(3.560000000000267, 3.6000000000002306, 0, 1)
swap  deviation :
(3.560000000000267, 3.60000000

In [61]:
for r_idx in range(0, len(at_a_arr)):
    print("Checking form bin file for r:", r_idx)
    ap_a_b = g_a.alignment_pair(at_a_arr[r_idx], at_a_arr[r_idx])
    ap_a_b.set_warping_path(r"C:\Users\Lenovo\Desktop\sishya\prints\aeneas_stripe\%d_%d.bin" %(r_idx, r_idx))
    print("bin file normal deviation :")
    print(ap_a_b.get_alignment_deviation_normal())
    print("bin file swap  deviation :")
    print(ap_a_b.get_alignment_deviation_swap())
    

Checking form bin file for r: 0
30267
bin file normal deviation :
(1.179999999999815, 1.1999999999997968, 0, 0)
bin file swap  deviation :
(1.179999999999815, 1.1999999999997968, 0, 0)
Checking form bin file for r: 1
31509
bin file normal deviation :
(2.6200000000001014, 2.660000000000065, 31, 32)
bin file swap  deviation :
(31.820000000000885, 31.86000000000085, 761, 762)
Checking form bin file for r: 2
31378
bin file normal deviation :
(2.4599999999998694, 2.499999999999833, 0, 1)
bin file swap  deviation :
(2.4599999999998694, 2.499999999999833, 0, 1)
Checking form bin file for r: 3
25195
bin file normal deviation :
(1.6599999999997337, 1.6999999999996973, 0, 1)
bin file swap  deviation :
(1.6599999999997337, 1.6999999999996973, 0, 1)
Checking form bin file for r: 4
38598
bin file normal deviation :
(1.5199999999990403, 1.559999999999004, 0, 1)
bin file swap  deviation :
(1.5199999999990403, 1.559999999999004, 0, 1)
Checking form bin file for r: 5
30315
bin file normal deviation :
(

RESULTS AND OBSERVATIONS:
1. The second dataset had a big deviation.
2. And the swapped warping path gave a mismatching deviation value.
THIS HAS TO BE LOOKED AT.

Time taken for the experiments:
Each alignment took about 5-6 mins, much more than that for librosa.


NEXT EXPERIMENT - Seeing which reference has the minimum deviation.

1. For each dataset, calculate the alignment against 13 other datasets.
2. This should be 14*13=182 alignment calculations.
3. But this can be halved, as the alignment ds0 vs ds1 is same as ds1 vs ds0, with the warping path points swapped.
4. So, calculate just 91 alignments and do a swap of the warping path to get the other alignment.
5. Then for each alignment calculate the deviations from the expected.
6. Print the total deviations for the 14 references.

FIRST LIBROSA

In [73]:
import numpy as np
dev_for_ref = np.zeros(len(at_l_arr), dtype=float)
cnt_for_ref = np.zeros(len(at_l_arr), dtype=int)

for r_idx in range(0, len(at_l_arr)):
    print("Doing for r:", r_idx)
    for q_idx in range(r_idx+1, len(at_l_arr)):
        print("Doing for q:", q_idx)
        ap_l = g_l.alignment_pair(at_l_arr[q_idx], at_l_arr[r_idx])
        ap_l.get_warping_path()
        ap_l.dump_warping_path(r"C:\Users\Lenovo\Desktop\sishya\prints\librosa_stripe\%d_%d.bin" %(q_idx, r_idx))
        
        st_diff, end_diff, st_diff_mfcc, end_diff_mfcc = ap_l.get_alignment_deviation_normal()
        st_diff = st_diff / len(ap_l.query_audio.labels)
        end_diff = end_diff / len(ap_l.query_audio.labels)
        
        print("normal deviation q %d ref %d : st %f end %f avg %f mfcc_st %d mfcc_end %d" 
            %(q_idx, r_idx, st_diff, end_diff, (st_diff+end_diff)/2, st_diff_mfcc, end_diff_mfcc))
        dev_for_ref[r_idx] += (st_diff+end_diff)/2
        cnt_for_ref[r_idx] += 1
        
        st_diff, end_diff, st_diff_mfcc, end_diff_mfcc = ap_l.get_alignment_deviation_swap()

        st_diff = st_diff / len(ap_l.ref_audio.labels)
        end_diff = end_diff / len(ap_l.ref_audio.labels)
        
        print("swap deviation q %d ref %d : st %f end %f avg %f mfcc_st %d mfcc_end %d" 
            %(r_idx, q_idx, st_diff, end_diff, (st_diff+end_diff)/2, st_diff_mfcc, end_diff_mfcc))
        dev_for_ref[q_idx] += (st_diff+end_diff)/2
        cnt_for_ref[q_idx] += 1
        

Doing for r: 0
Doing for q: 1
normal deviation q 1 ref 0 : st 0.188254 end 0.190159 avg 0.189206 mfcc_st 2072 mfcc_end 2093
swap deviation q 0 ref 1 : st 0.075057 end 0.082268 avg 0.078662 mfcc_st 832 mfcc_end 912
Doing for q: 2
normal deviation q 2 ref 0 : st 0.195782 end 0.219546 avg 0.207664 mfcc_st 2165 mfcc_end 2428
swap deviation q 0 ref 2 : st 0.152880 end 0.185034 avg 0.168957 mfcc_st 1677 mfcc_end 2032
Doing for q: 3
normal deviation q 3 ref 0 : st 0.081950 end 0.082222 avg 0.082086 mfcc_st 904 mfcc_end 907
swap deviation q 0 ref 3 : st 0.103537 end 0.110748 avg 0.107143 mfcc_st 1135 mfcc_end 1215
Doing for q: 4
normal deviation q 4 ref 0 : st 0.151927 end 0.152381 avg 0.152154 mfcc_st 1688 mfcc_end 1693
swap deviation q 0 ref 4 : st 0.116417 end 0.123628 avg 0.120023 mfcc_st 1273 mfcc_end 1353
Doing for q: 5
normal deviation q 5 ref 0 : st 0.217868 end 0.218503 avg 0.218186 mfcc_st 2416 mfcc_end 2423
swap deviation q 0 ref 5 : st 0.210023 end 0.217234 avg 0.213628 mfcc_st 230

In [76]:
dev_for_ref/len(at_l_arr)

array([0.13395044, 0.13147716, 0.16866861, 0.14410755, 5.4747943 ,
       0.21817298, 0.16976352, 0.1887496 , 0.14534176, 0.14917072,
       8.58350988, 0.14084548, 0.19133787, 0.14588273])

Checking if the warping path has been dumped correctly.
Reading from the bin file and seeing that the deviation values asre the same.

In [77]:
import numpy as np
dev_for_ref = np.zeros(len(at_l_arr), dtype=float)
cnt_for_ref = np.zeros(len(at_l_arr), dtype=int)

for r_idx in range(0, len(at_l_arr)):
    print("Checking form bin file for r:", r_idx)
    for q_idx in range(r_idx+1, len(at_l_arr)):
        print("Checking for q:", q_idx)
        ap_l_b = g_l.alignment_pair(at_l_arr[q_idx], at_l_arr[r_idx])
        ap_l_b.set_warping_path(r"C:\Users\Lenovo\Desktop\sishya\prints\librosa_stripe\%d_%d.bin" %(q_idx, r_idx))
        
        st_diff, end_diff, st_diff_mfcc, end_diff_mfcc = ap_l_b.get_alignment_deviation_normal()
        st_diff = st_diff / len(ap_l_b.query_audio.labels)
        end_diff = end_diff / len(ap_l_b.query_audio.labels)
        
        print("normal deviation q %d ref %d : st %f end %f avg %f mfcc_st %d mfcc_end %d" 
            %(q_idx, r_idx, st_diff, end_diff, (st_diff+end_diff)/2, st_diff_mfcc, end_diff_mfcc))
        dev_for_ref[r_idx] += (st_diff+end_diff)/2
        cnt_for_ref[r_idx] += 1
        
        st_diff, end_diff, st_diff_mfcc, end_diff_mfcc = ap_l_b.get_alignment_deviation_swap()

        st_diff = st_diff / len(ap_l_b.ref_audio.labels)
        end_diff = end_diff / len(ap_l_b.ref_audio.labels)
        
        print("swap deviation q %d ref %d : st %f end %f avg %f mfcc_st %d mfcc_end %d" 
            %(r_idx, q_idx, st_diff, end_diff, (st_diff+end_diff)/2, st_diff_mfcc, end_diff_mfcc))
        dev_for_ref[q_idx] += (st_diff+end_diff)/2
        cnt_for_ref[q_idx] += 1
        

Checking form bin file for r: 0
Checking for q: 1
33543
normal deviation q 1 ref 0 : st 0.188254 end 0.190159 avg 0.189206 mfcc_st 2072 mfcc_end 2093
swap deviation q 0 ref 1 : st 0.075057 end 0.082268 avg 0.078662 mfcc_st 832 mfcc_end 912
Checking for q: 2
33333
normal deviation q 2 ref 0 : st 0.195782 end 0.219546 avg 0.207664 mfcc_st 2165 mfcc_end 2428
swap deviation q 0 ref 2 : st 0.152880 end 0.185034 avg 0.168957 mfcc_st 1677 mfcc_end 2032
Checking for q: 3
30921
normal deviation q 3 ref 0 : st 0.081950 end 0.082222 avg 0.082086 mfcc_st 904 mfcc_end 907
swap deviation q 0 ref 3 : st 0.103537 end 0.110748 avg 0.107143 mfcc_st 1135 mfcc_end 1215
Checking for q: 4
39300
normal deviation q 4 ref 0 : st 0.151927 end 0.152381 avg 0.152154 mfcc_st 1688 mfcc_end 1693
swap deviation q 0 ref 4 : st 0.116417 end 0.123628 avg 0.120023 mfcc_st 1273 mfcc_end 1353
Checking for q: 5
32229
normal deviation q 5 ref 0 : st 0.217868 end 0.218503 avg 0.218186 mfcc_st 2416 mfcc_end 2423
swap deviation

In [78]:
dev_for_ref/len(at_l_arr)

array([0.13395044, 0.13147716, 0.16866861, 0.14410755, 5.4747943 ,
       0.21817298, 0.16976352, 0.1887496 , 0.14534176, 0.14917072,
       8.58350988, 0.14084548, 0.19133787, 0.14588273])

NEXT, DOING THE SAME FOR AENEAS. FIRST CALCULATING THE WARP PATHS (AND SWAPS) AND DEVIATIONS

In [79]:
dev_for_ref = np.zeros(len(at_a_arr), dtype=float)
cnt_for_ref = np.zeros(len(at_a_arr), dtype=int)

for r_idx in range(0, len(at_a_arr)):
    print("Doing for r:", r_idx)
    for q_idx in range(r_idx+1, len(at_a_arr)):
        print("Doing for q:", q_idx)
        ap_a = g_a.alignment_pair(at_a_arr[q_idx], at_a_arr[r_idx])
        ap_a.get_warping_path()
        ap_a.dump_warping_path(r"C:\Users\Lenovo\Desktop\sishya\prints\aeneas_stripe\%d_%d.bin" %(q_idx, r_idx))
        
        st_diff, end_diff, st_diff_mfcc, end_diff_mfcc = ap_a.get_alignment_deviation_normal()
        st_diff = st_diff / len(ap_a.query_audio.labels)
        end_diff = end_diff / len(ap_a.query_audio.labels)
        
        print("normal deviation q %d ref %d : st %f end %f avg %f mfcc_st %d mfcc_end %d" 
            %(q_idx, r_idx, st_diff, end_diff, (st_diff+end_diff)/2, st_diff_mfcc, end_diff_mfcc))
        dev_for_ref[r_idx] += (st_diff+end_diff)/2
        cnt_for_ref[r_idx] += 1
        
        st_diff, end_diff, st_diff_mfcc, end_diff_mfcc = ap_a.get_alignment_deviation_swap()

        st_diff = st_diff / len(ap_a.ref_audio.labels)
        end_diff = end_diff / len(ap_a.ref_audio.labels)
        
        print("swap deviation q %d ref %d : st %f end %f avg %f mfcc_st %d mfcc_end %d" 
            %(r_idx, q_idx, st_diff, end_diff, (st_diff+end_diff)/2, st_diff_mfcc, end_diff_mfcc))
        dev_for_ref[q_idx] += (st_diff+end_diff)/2
        cnt_for_ref[q_idx] += 1
        

Doing for r: 0
Doing for q: 1
normal deviation q 1 ref 0 : st 0.198685 end 0.200227 avg 0.199456 mfcc_st 2188 mfcc_end 2205
swap deviation q 0 ref 1 : st 0.111519 end 0.118912 avg 0.115215 mfcc_st 1233 mfcc_end 1315
Doing for q: 2
normal deviation q 2 ref 0 : st 0.204399 end 0.227891 avg 0.216145 mfcc_st 2261 mfcc_end 2521
swap deviation q 0 ref 2 : st 0.155601 end 0.187574 avg 0.171587 mfcc_st 1709 mfcc_end 2062
Doing for q: 3
normal deviation q 3 ref 0 : st 0.137007 end 0.137279 avg 0.137143 mfcc_st 1507 mfcc_end 1510
swap deviation q 0 ref 3 : st 0.184082 end 0.191474 avg 0.187778 mfcc_st 2025 mfcc_end 2107
Doing for q: 4
normal deviation q 4 ref 0 : st 0.208798 end 0.212698 avg 0.210748 mfcc_st 2303 mfcc_end 2346
swap deviation q 0 ref 4 : st 0.144717 end 0.152109 avg 0.148413 mfcc_st 1588 mfcc_end 1670
Doing for q: 5
normal deviation q 5 ref 0 : st 0.223220 end 0.228753 avg 0.225986 mfcc_st 2474 mfcc_end 2535
swap deviation q 0 ref 5 : st 0.179456 end 0.186848 avg 0.183152 mfcc_st

In [80]:
dev_for_ref/len(at_a_arr)

array([0.17207159, 0.26950761, 0.20834467, 2.5084872 , 1.77275024,
       0.22668934, 0.2721056 , 0.20295432, 0.21681568, 0.21564302,
       0.19016359, 0.16601879, 0.22535147, 0.16718173])

Checking if the warping path has been dumped correctly.
Reading from the bin file and seeing that the deviation values asre the same.

In [81]:
dev_for_ref = np.zeros(len(at_a_arr), dtype=float)
cnt_for_ref = np.zeros(len(at_a_arr), dtype=int)

for r_idx in range(0, len(at_a_arr)):
    print("Checking for r:", r_idx)
    for q_idx in range(r_idx+1, len(at_a_arr)):
        print("Checking for q:", q_idx)
        ap_a_b = g_a.alignment_pair(at_a_arr[q_idx], at_a_arr[r_idx])
        ap_a_b.set_warping_path(r"C:\Users\Lenovo\Desktop\sishya\prints\aeneas_stripe\%d_%d.bin" %(q_idx, r_idx))
        
        st_diff, end_diff, st_diff_mfcc, end_diff_mfcc = ap_a_b.get_alignment_deviation_normal()
        st_diff = st_diff / len(ap_a_b.query_audio.labels)
        end_diff = end_diff / len(ap_a_b.query_audio.labels)
        
        print("normal deviation q %d ref %d : st %f end %f avg %f mfcc_st %d mfcc_end %d" 
            %(q_idx, r_idx, st_diff, end_diff, (st_diff+end_diff)/2, st_diff_mfcc, end_diff_mfcc))
        dev_for_ref[r_idx] += (st_diff+end_diff)/2
        cnt_for_ref[r_idx] += 1
        
        st_diff, end_diff, st_diff_mfcc, end_diff_mfcc = ap_a_b.get_alignment_deviation_swap()

        st_diff = st_diff / len(ap_a.ref_audio.labels)
        end_diff = end_diff / len(ap_a.ref_audio.labels)
        
        print("swap deviation q %d ref %d : st %f end %f avg %f mfcc_st %d mfcc_end %d" 
            %(r_idx, q_idx, st_diff, end_diff, (st_diff+end_diff)/2, st_diff_mfcc, end_diff_mfcc))
        dev_for_ref[q_idx] += (st_diff+end_diff)/2
        cnt_for_ref[q_idx] += 1
        

Checking for r: 0
Checking for q: 1
34360
normal deviation q 1 ref 0 : st 0.198685 end 0.200227 avg 0.199456 mfcc_st 2188 mfcc_end 2205
swap deviation q 0 ref 1 : st 0.111519 end 0.118912 avg 0.115215 mfcc_st 1233 mfcc_end 1315
Checking for q: 2
35019
normal deviation q 2 ref 0 : st 0.204399 end 0.227891 avg 0.216145 mfcc_st 2261 mfcc_end 2521
swap deviation q 0 ref 2 : st 0.155601 end 0.187574 avg 0.171587 mfcc_st 1709 mfcc_end 2062
Checking for q: 3
33071
normal deviation q 3 ref 0 : st 0.137007 end 0.137279 avg 0.137143 mfcc_st 1507 mfcc_end 1510
swap deviation q 0 ref 3 : st 0.184082 end 0.191474 avg 0.187778 mfcc_st 2025 mfcc_end 2107
Checking for q: 4
40208
normal deviation q 4 ref 0 : st 0.208798 end 0.212698 avg 0.210748 mfcc_st 2303 mfcc_end 2346
swap deviation q 0 ref 4 : st 0.144717 end 0.152109 avg 0.148413 mfcc_st 1588 mfcc_end 1670
Checking for q: 5
34162
normal deviation q 5 ref 0 : st 0.223220 end 0.228753 avg 0.225986 mfcc_st 2474 mfcc_end 2535
swap deviation q 0 ref 5

In [83]:
dev_for_ref/len(at_a_arr)

array([0.17207159, 0.26950761, 0.20834467, 2.5084872 , 1.77275024,
       0.22668934, 0.2721056 , 0.20295432, 0.21681568, 0.21564302,
       0.19016359, 0.16601879, 0.22535147, 0.16718173])

Doing the experiments at segment level