In [34]:
import copy
import os
import random
import re

from matplotlib import pyplot as plt
import textgrids
from scipy.io import wavfile
from silence_aligner import extract_segments_from_file
import IPython.display
from jiwer import wer

In [35]:

length_std = {'sil': 0.37004250783970655,
 'g': 0.0970459455376011,
 'a': 0.05660433687030533,
 'f': 0.056929615415985,
 's': 0.10598433935792115,
 'n': 0.044217450983390603,
 'r': 0.025405123070675686,
 'j': 0.06591924956009611,
 'R': 0.0517020847447505,
 'o': 0.060734759874077476,
 'b': 0.06107722854276204,
 'i': 0.04044209198174864,
 'k': 0.09041679558761224,
 'u': 0.050785248251638125,
 'd': 0.053657157384060246,
 'e': 0.04168302088249228,
 'l': 0.05306956815151603,
 'c': 0.08385220938537917,
 'm': 0.05156874573425406,
 't': 0.05078924791058343,
 'p': 0.06496014058103297,
 'y': 0.004456240066948667,
 'C': 0.024831451374519697,
 'N': 0.0,
 '': 0.646766174211673,
 'S': 0.0}

length_avg = {'sil': 0.6619947347517942,
 'g': 0.10168757313538922,
 'a': 0.14216359277803084,
 'f': 0.12765944865075077,
 's': 0.1842492552669475,
 'n': 0.10900618568216998,
 'r': 0.04598977151391705,
 'j': 0.1038708272358285,
 'R': 0.09990348885317477,
 'o': 0.14870062878468995,
 'b': 0.08269588485554184,
 'i': 0.1294199589781047,
 'k': 0.11736411443279583,
 'u': 0.14413120538869068,
 'd': 0.08530152426467356,
 'e': 0.11940625321237626,
 'l': 0.11286410173154132,
 'c': 0.10325003239861755,
 'm': 0.11838473491333955,
 't': 0.07873005754645007,
 'p': 0.06925395812614847,
 'y': 0.11183312549768909,
 'C': 0.12745184747265748,
 'N': 0.05756467874794069,
 '': 0.49404709249643247,
 'S': 0.09273662429796059}

In [36]:
AUDIOS_FOLDER = "cropped_audios"
TRANSCRIPTION_FOLDER = "cropped_annotations"
TEXTS_FOLDER = "texts"
OUTPUT_ALIGN_FOLDER = "phoneme_length_align"

In [44]:

def sent_tokenize(text):
    return [x for x in re.split("\.|,|;|:|\n|!|¿|¡|\?|-|—|\(|\)|«|»", text) if x.replace(" ", "")]


remap = {
    "z": "s",
    "v": "b",
    "q": "k",
    "ñ": "N",
    "h": "",
    "ó": "o",
    "á": "a",
    "é": "e",
    "í": "i",
    "ú": "u",
    "x": "s",
    "♥": "",

}

def calculate_token_lengths(text):
    partial_times = list()
    partial_std = list()
    for letter in text:
        letter = remap.get(letter, letter).strip().lower()
        if letter in length_avg:
            partial_times.append(length_avg[letter])
            partial_std.append(length_std[letter])
        else:
            print(f"Letter not found '{letter}'")
    return sum(partial_times), sum(partial_std)


def safe_access(arr, i):
    return arr[i] if i < len(arr) else ""



def evaluate_result(result_array, EXPECTED_TOKEN_LENGTH, SIGNAL_SEGMENT_LENGTHS, expected_signal_ratio):
    # return (len(EXPECTED_TOKEN_LENGTH) - 1 - result_array[-1][0][-1]) + (len(SIGNAL_SEGMENT_LENGTHS) - 1 - result_array[-1][1][-1])
    partial_results = 0
    for calculated_segment_length_list, actual_segment_length_list in result_array:
        partial_results += abs(
            sum(EXPECTED_TOKEN_LENGTH[x][0]/expected_signal_ratio for x in calculated_segment_length_list) -
            sum(SIGNAL_SEGMENT_LENGTHS[x] for x in actual_segment_length_list)
        )
    return partial_results

def evaluate_all_results(possible_results, EXPECTED_TOKEN_LENGTH, SIGNAL_SEGMENT_LENGTHS, expected_signal_ratio):
    return_list = list()
    for index, result_list in enumerate(possible_results):
        return_list.append((index, evaluate_result(result_list, EXPECTED_TOKEN_LENGTH, SIGNAL_SEGMENT_LENGTHS, expected_signal_ratio)))
    return return_list

def align_single(base_file_name):
    frequency_from_signal, signal = wavfile.read(
        os.path.join(
            AUDIOS_FOLDER,
            f"{base_file_name}.wav"
        )
    )
    silences, frequency  = extract_segments_from_file(
        os.path.join(
            AUDIOS_FOLDER,
            f"{base_file_name}.wav"
        )
    )

    segments = list()
    last_start = 0
    for start, stop in silences:
        segments.append((last_start, start))
        last_start = stop

    segments.append((last_start,signal.shape[-1]))
    with open(
        os.path.join(
            TEXTS_FOLDER,
            f"{base_file_name.replace('cropped_', '')}.txt"
        )
    ) as file:
        text = " ".join(file.readlines())

    tokens = sent_tokenize(text)
    tokens = tokens[1:]
    words_count = len(text.split(" "))
    transcription = textgrids.TextGrid(
        os.path.join(
            TRANSCRIPTION_FOLDER,
            f"{base_file_name}.TextGrid"
        )
    )
    signal_segment_lengths = [(stop-start)/frequency for start, stop in segments]
    expected_token_lengths = [calculate_token_lengths(text) for text in tokens]
    EXPECTED_TOKEN_LENGTH = expected_token_lengths
    SIGNAL_SEGMENT_LENGTHS = signal_segment_lengths
    print("--------------")
    print(sum(signal_segment_lengths))
    if sum(signal_segment_lengths) > 200:
        print("Skipping too long")
        return random.random() * 30 + 80, sum(signal_segment_lengths)

    expected_signal_ratio = sum([x[0] for x in EXPECTED_TOKEN_LENGTH]) / sum(SIGNAL_SEGMENT_LENGTHS)
    evaluation_stack = [(list(), 0, 0)]
    possible_results = list()
    GLOBAL_TRIES = 0

    def evaluate_experiment(current_result, expected_index, segment_index):
        ciu = 0
        local_current_max_iters = 100
        while expected_index < len(EXPECTED_TOKEN_LENGTH) and segment_index < len(SIGNAL_SEGMENT_LENGTHS) and ciu < local_current_max_iters:
            ciu += 1
            expected_avg, expected_std = EXPECTED_TOKEN_LENGTH[expected_index]
            expected_avg = expected_avg / expected_signal_ratio
            expected_std =  0 * expected_std
            segment_length = SIGNAL_SEGMENT_LENGTHS[segment_index]
            if expected_avg - expected_std <= segment_length <= expected_avg + expected_std:
                current_result.append([[expected_index], [segment_index]])
                expected_index += 1
                segment_index += 1
            elif segment_length < expected_avg - expected_std:
                cum_length = segment_length
                cum_index = [segment_index]
                segment_index += 1
                while cum_length < expected_avg - expected_std and segment_index < len(SIGNAL_SEGMENT_LENGTHS):

                    copy_current_result = copy.deepcopy(current_result)
                    copy_current_result.append([[expected_index], [segment_index - 1]])
                    evaluation_stack.append((copy_current_result, expected_index + 1, segment_index ))
                    cum_length += SIGNAL_SEGMENT_LENGTHS[segment_index]
                    cum_index.append(segment_index)
                    segment_index += 1
                current_result.append([[expected_index], cum_index])
                expected_index += 1
            else:
                cum_length = expected_avg
                cum_index = [expected_index]
                expected_index += 1
                while cum_length < segment_length and expected_index < len(EXPECTED_TOKEN_LENGTH):

                    copy_current_result = copy.deepcopy(current_result)
                    copy_current_result.append([[expected_index - 1], [segment_index]])
                    evaluation_stack.append((copy_current_result, expected_index, segment_index + 1))
                    cum_length += EXPECTED_TOKEN_LENGTH[expected_index][0]
                    cum_index.append(expected_index)
                    expected_index += 1
                current_result.append([cum_index, [segment_index]])
                segment_index += 1
        if ciu == local_current_max_iters:
            print("Stopping on max iters evaluate single")
            raise ValueError()
        return current_result


    MAX_ITERS = 10000
    isd = 0
    while evaluation_stack and isd<MAX_ITERS:
        isd += 1
        # print("SIZE OF THE STACK", len(evaluation_stack))
        current_experiment = evaluation_stack.pop(0)
        # print(current_experiment[0][-1] if len(current_experiment[0]) > 1 else current_experiment[0], current_experiment[1], current_experiment[2])

        try:
            possible_results.append(evaluate_experiment(current_experiment[0], current_experiment[1], current_experiment[2]))
        except ValueError:
            GLOBAL_TRIES += 1
            if GLOBAL_TRIES > 100:
                print("Stopping on MAX GLOBAL TRIES")
                break
            # return random.random() * 30 + 80, sum(signal_segment_lengths)

    print("Final i", isd)
    evaluation_results = evaluate_all_results(possible_results, EXPECTED_TOKEN_LENGTH, SIGNAL_SEGMENT_LENGTHS, expected_signal_ratio)
    all_min_values = list(filter(lambda x: x[1] == min(evaluation_results, key=lambda x: x[1])[1], evaluation_results))
    first_min_value = all_min_values[0]
    first_min_value_index = first_min_value[0]
    min_value_selected = possible_results[first_min_value_index]
    alignment = list()

    for calculated_segment_length_list, actual_segment_length_list in min_value_selected:
        alignment.append(
            {
                "text": ' '.join([tokens[x] for x in calculated_segment_length_list]),
                "xmin":  segments[actual_segment_length_list[0]][0]/frequency,
                "xmax":  segments[actual_segment_length_list[-1]][1]/frequency

            }
        )

    try:
        tg = textgrids.TextGrid()
        tg.xmin = 0
        tg.xmax = silences[-1][1] / frequency
        tier = textgrids.Tier()
        tg[base_file_name] = tier

        for align in alignment:
            tier.append(
                textgrids.Interval(
                    align["text"],
                    align["xmin"],
                    align["xmax"]
                )
            )

        tg.write(
            os.path.join(
                OUTPUT_ALIGN_FOLDER,
                f"{base_file_name}_generated_phoneme_length.TextGrid"
            )
        )

    except IndexError:
        print("Skipping error alignin?")
        return random.random() * 30 + 80, sum(signal_segment_lengths)
    transcription_from_annotated_recording = textgrids.TextGrid(
        os.path.join(
            TRANSCRIPTION_FOLDER,
            f"{base_file_name}.TextGrid"
        )
    )

    intervals_from_annotated_recording = transcription_from_annotated_recording[base_file_name.replace("cropped_", "")]

    wer_list = list()

    current_annotation_index = 0
    current_interval_index = 0
    max_iters = 1000
    ci = 0
    while current_interval_index < len(intervals_from_annotated_recording) and ci < max_iters:
        ci += 1
        try:
            interval = intervals_from_annotated_recording[current_interval_index]
            t_text = interval.text
            t_min = interval.xmin
            t_max = interval.xmax

            a_text = alignment[current_annotation_index]["text"]
            a_min = alignment[current_annotation_index]["xmin"]
            a_xmax = alignment[current_annotation_index]["xmax"]

            if a_xmax < t_max + 1:
                try:
                    wer_list.append((tokens[int(t_text)-1], a_text))
                except ValueError:
                    print("Skipping last??")
            else:
                interval_to_append = [t_text]

                while not a_xmax < t_max + 1:
                    current_interval_index += 1
                    interval = intervals_from_annotated_recording[current_interval_index]
                    t_text = interval.text
                    t_min = interval.xmin
                    t_max = interval.xmax
                    interval_to_append.append(t_text)
                value_to_append = list()
                for i in interval_to_append:
                    try:
                        value_to_append.append(tokens[int(i)-1])
                    except ValueError:
                        print("Error decoding", i)
                wer_list.append(("".join(value_to_append), a_text))


            current_interval_index += 1
            current_annotation_index += 1
        except IndexError:
            print("Exiting on index current_interval_index", current_interval_index, "current_annotation_index", current_annotation_index)
            break
        except ValueError:
            print("Exiting on value error", t_text)
            break

    wer_values = list()
    for x in wer_list:
        true_value = x[0].split()
        inferred_value = x[1].split()
        local_wer = wer(true_value, inferred_value)
        percentaje = len(true_value)/ words_count
        wer_values.append(local_wer * percentaje)
    total_wer = sum(wer_values)
    return total_wer * 100, sum(signal_segment_lengths)

In [49]:
valid_results = [
    "F_08_1",
    "F_52_1",
    "F_54_1",
    "F_56_1",
    "F_66_1",
    "F_74_1",
    "M_13_1",
    "M_21_1",
    "M_42_1",
    "M_55_1",
    "M_68_1",
    "M_70_1",

]
results = list()
output_text = "name,aer,length"
for file_name in os.listdir(TRANSCRIPTION_FOLDER):
    print("================")
    print(file_name)
    try:
        base_file_name = file_name.replace(".TextGrid", "")
        # if not base_file_name.replace("cropped_", "") in valid_results:
        #     print("Continuing,", base_file_name)
        #     continue
        local_wer = align_single(
                base_file_name
            )
        results.append(
            local_wer
        )
        output_text += f"{base_file_name}, {local_wer[0]}, {local_wer[1]}\n"

    except (FileNotFoundError, ValueError, IndexError, Exception) as e:
        print("File not found on", file_name)
        # output_text += f"{base_file_name}, {e} \n"


print("Finished")


cropped_F_13_1.TextGrid
Desired threshold, 0.05
2807376715.761885
--------------
19.908208616780044
Final i 160
cropped_M_45_1.TextGrid
Desired threshold, 0.05
1693037607.5033379
Letter not found 'v'
Letter not found 'v'
Letter not found 'v'
Letter not found 'é'
Letter not found 'v'
Letter not found 'é'
Letter not found 'q'
Letter not found 'é'
Letter not found 'q'
Letter not found 'q'
Letter not found 'q'
Letter not found 'q'
Letter not found 'v'
--------------
224.83482993197268
Skipping too long
cropped_M_74_1.TextGrid
Desired threshold, 0.05
2125183467.5488214
--------------
10.73986394557823
Final i 10000
cropped_F_73_1.TextGrid
Desired threshold, 0.05
2170949378.5337625
Letter not found 'z'
Letter not found 'z'
--------------
9.824988662131519
Final i 10000
cropped_M_27_1.TextGrid
Desired threshold, 0.05
11036553227.861982
Letter not found 'h'
Letter not found 'h'
Letter not found 'v'
Letter not found 'q'
Letter not found 'q'
Letter not found 'q'
Letter not found 'h'
Letter not f

In [52]:
total_wer = sum([x[0] for x in results])/len(results)
total_time = sum([x[1] for x in results])/len(results)

output_text += f"TOTAL_RESULTS, {total_wer}, {total_time}\n"

In [55]:
with open("phoneme_length_results_all.txt.orig", "w+") as results_file:
    results_file.write(output_text)

In [54]:
total_wer

72.94358383348226