# Лабораторная работа 1. Подсчет ошибки распознавания

WER (Word Error Rate) является метрикой для оценки качества систем распознавания речи. Она показывает процент ошибочных слов в гипотезе распознавания по сравнению с эталонным текстом. WER учитывает три типа ошибок: вставки, удаления и замены. 
$$ WER = {I + D + S \over D + S + C} $$
где I, D, S - количество втавок, удалений и замен, соответственно. C - количество правильно распознанных слов

Лабораторная работа состоит из трех частей. Первая часть (функция подсчета WER) обязательная, остальные дополнительные. Всего за работу можно получить максимум 20 баллов. 4 за сдачу в срок и 16 за задания: 
* функция подсчета WER (тест 1.a, 1.b) - 8 баллов
* функция подсчета WER и ошибки пунктуации (тест 2.a) - 4 балла
* функция подсчета SA-WER (тест 3.а) - 4 балла

# 1. Word Error Rate (8 баллов)

## 1.a. подсчет WER 


Функция должна принимать две строки в качестве входных данных: эталонный текст и распознанный текст. Эталонный текст - это то, что произносится в аудиозаписи, а гипотеза распознавания - это текст, полученный от системы распознавания речи. Для корректного вычисления ошибки распознавания необходимо удалить все символы пунктуации и привести все слова к нижнему регистру.



In [4]:
import string

def calculate_wer(reference_text: str, recognized_text: str) -> float:
    # Приведение текста к нижнему регистру, удаление символов пунктуации
    reference_text = reference_text.lower().translate(str.maketrans('', '', string.punctuation))
    recognized_text = recognized_text.lower().translate(str.maketrans('', '', string.punctuation))
    
    # Разбивка на слова
    reference_words = reference_text.split()
    recognized_words = recognized_text.split()
    
    # Инициализация матрицы для вычисления расстояния Левенштейна между двумя списками слов
    distance_matrix = [[0] * (len(recognized_words) + 1) for _ in range(len(reference_words) + 1)]
    
    # Наполнение первой строки матрицы
    for i in range(len(reference_words) + 1):
        distance_matrix[i][0] = i

     # Наполнение первого столбца матрицы
    for j in range(len(recognized_words) + 1):
        distance_matrix[0][j] = j

    # Заполнение матрицы расстояний методом динамического программирования
    for i in range(1, len(reference_words) + 1):
        for j in range(1, len(recognized_words) + 1):
            insertion = distance_matrix[i][j - 1] + 1
            deletion = distance_matrix[i - 1][j] + 1
            substitution = distance_matrix[i - 1][j - 1] + (0 if reference_words[i - 1] == recognized_words[j - 1] else 1)
            
            distance_matrix[i][j] = min(insertion, deletion, substitution)

    # Расчет WER
    # Общее число слов в эталонном тексте
    N = len(reference_words)

    # Общее число вставок, удалений и замен
    I = distance_matrix[-1][-1]

    # WER в процентах
    wer = I / N * 100

    return wer

wer = calculate_wer('Привет привет', 'студент привет')
print(f"Word Error Rate: {wer:.2f}%")

Word Error Rate: 50.00%


In [5]:
def assert_wer(ref, hyp, ideal_wer):
    wer = calculate_wer(ref, hyp)
    assert round(wer, 2) == round(ideal_wer, 2), f"for '{hyp=}' and '{ref=}' {ideal_wer=}, calculate_wer {wer=}"
    
def test_wer():
    assert_wer('привет студент', 'привет студент', 0)
    assert_wer('привет! Студент.', 'Привет, студент?', 0)
    assert_wer('привет студент', 'студент', 50)
    assert_wer('привет студент', '', 100)
    assert_wer('привет студент', 'студент привет', 100)
    assert_wer('привет', 'привет студент', 100)
    assert_wer('привет студент привет как дела', 'студент привет', 60)
    assert_wer('привет студент привет как дела', 'привет как дела', 40)
    assert_wer('привет студент привет как дела ', 'привет студент дела ', 40)
    assert_wer('привет студент привет как дела '*100, 'привет студент дела '*100, 40)

    print(f"Test 1.a passed")
    
test_wer() 

Test 1.a passed


## 1.b. Построение выравнивания
Реализованная в части 1.a. функция выдает только суммарное значение ошибки распознавания, не давая понимания, в чем состоят основные проблемы распознавания. 

Значение WER получается из трех видов ошибок: 
* вставка (insertion)
* удаление (deletion)
* замена (substitution)

Каждый тип ошибок имеет свое значение и указывает на определенные недостатки системы. Например, большое количество вставок означает, что ASR слышит речь там где ее нет. А большое количество удалений показывает, что целевая речь пропускается и не транскрибируется. 

Кроме числовых значений каждой ошибки, для анализа результатов работы системы может пригодиться выравнивание эталонной текстовки и гипотезы распознавания относительно друг друга. 

пример выравнивания: 

```
>>> tabulate(ali)

Я сегодня  ***   учуcь  в  универе
Я    с    завтра учусь *** универе  
C    S      I      C    D    C    
```

Реализуйте функцию, которая кроме числового значения WER возвращает выравнивание, а также значения каждого типа ошибок распознавания (вставки, удаления, замены).

In [21]:
!pip3.10 install tabulate

Defaulting to user installation because normal site-packages is not writeable
Collecting tabulate
  Downloading tabulate-0.9.0-py3-none-any.whl (35 kB)
Installing collected packages: tabulate
Successfully installed tabulate-0.9.0


In [6]:
def calculate_wer_with_alignment(reference_text: str, recognized_text: str):
    
    # Перенесите сюда код из задания 1.a.
    #wer = 

    # используя distance_matrix восстановите путь (набор операций), который соответстует найденому WER 
    #TODO
    # ali[0]=  разбитый по словам референс. Втавки отабражаются в эталонном выравнивании с помощью "***"
    # ali[1] = разбитая по словам гипотеза.
    # ali[2] = аннотация 
    assert len(ali[0]) == len(ali[1]) == len(ali[2]), f"wrong ali {ali}"
    
    return {"wer" : wer,
            "cor": correct, 
            "del": deletion,
            "ins": insertion,
            "sub": substitution,
            "ali": ali}

In [7]:
from tabulate import tabulate
import string

def calculate_wer_with_alignment(reference_text: str, recognized_text: str):
    
    # Приведение текста к нижнему регистру, удаление символов пунктуации
    reference_text = reference_text.lower().translate(str.maketrans('', '', string.punctuation))
    recognized_text = recognized_text.lower().translate(str.maketrans('', '', string.punctuation))

    # Разбивка на слова
    reference_words = reference_text.split()
    recognized_words = recognized_text.split()
    
    # Инициализация матрицы для вычисления расстояния Левенштейна между двумя списками слов
    distance_matrix = [[0] * (len(recognized_words) + 1) for _ in range(len(reference_words) + 1)]
    
    # Наполнение первой строки матрицы
    for i in range(len(reference_words) + 1):
        distance_matrix[i][0] = i

     # Наполнение первого столбца матрицы
    for j in range(len(recognized_words) + 1):
        distance_matrix[0][j] = j

    # Заполнение матрицы расстояний методом динамического программирования
    for i in range(1, len(reference_words) + 1):
        for j in range(1, len(recognized_words) + 1):
            insertion = distance_matrix[i][j - 1] + 1
            deletion = distance_matrix[i - 1][j] + 1
            substitution = distance_matrix[i - 1][j - 1] + (0 if reference_words[i - 1] == recognized_words[j - 1] else 1)
            
            distance_matrix[i][j] = min(insertion, deletion, substitution)
    
    # Восстановление пути для выравнивания и типов ошибок
    # Инициализация пустых списков для хранения выровненных слов и типов операций
    ali = [[], [], []]
    
    # Инициализация счетчиков для различных типов ошибок
    insertion_count, deletion_count, substitution_count, correct_count = 0, 0, 0, 0

    # Инициализация индексов для итерации по словам
    i, j = len(reference_words), len(recognized_words)

    while i > 0 or j > 0:
        # Определение возможных шагов для текущей позиции в матрице Левенштейна
        diag_move = distance_matrix[i - 1][j - 1] if i > 0 and j > 0 else float('inf')
        vert_move = distance_matrix[i - 1][j] if i > 0 else float('inf')
        hori_move = distance_matrix[i][j - 1] if j > 0 else float('inf')

        # Нахождение минимального шага
        min_move = min(diag_move, vert_move, hori_move)

        # по диагонали (замена или совпадение)
        if min_move == diag_move:
            if reference_words[i - 1] == recognized_words[j - 1]:
                # Совпадение
                ali[2].append('C')
                correct_count += 1
            else:
                # Замена
                ali[2].append('S')
                substitution_count += 1
            ali[0].append(reference_words[i - 1])
            ali[1].append(recognized_words[j - 1])
            i -= 1
            j -= 1

        # по вертикали (удаление)
        elif min_move == vert_move:
            ali[0].append(reference_words[i - 1])
            ali[1].append("***")
            ali[2].append('D')
            deletion_count += 1
            i -= 1

        # по горизонтали (вставка)
        else:
            ali[0].append("***")
            ali[1].append(recognized_words[j - 1])
            ali[2].append('I')
            insertion_count += 1
            j -= 1

    ali[0] = ali[0][::-1]
    ali[1] = ali[1][::-1]
    ali[2] = ali[2][::-1]

    
    N = len(reference_words)
    wer = (insertion_count + deletion_count + substitution_count) / N * 100

    return {
        "wer": wer,
        "cor": correct_count,
        "del": deletion_count,
        "ins": insertion_count,
        "sub": substitution_count,
        "ali": ali
    }

In [8]:
test_result = calculate_wer_with_alignment("Сегодня я изучаю Python", "Завтра я начну изучать Python")

print("WER: " + str(test_result['wer']) + "\n" +
      "Correct: " + str(test_result['cor']) + "\n" +
      "Deletions: " + str(test_result['del']) + "\n" +
      "Insertions: " + str(test_result['ins']) + "\n" +
      "Substitutions: " + str(test_result['sub']) + "\n" +
      "Alignment:\n" +
      tabulate(test_result['ali']))

WER: 75.0
Correct: 2
Deletions: 0
Insertions: 1
Substitutions: 2
Alignment:
-------  -  -----  -------  ------
сегодня  я  ***    изучаю   python
завтра   я  начну  изучать  python
S        C  I      S        C
-------  -  -----  -------  ------


In [9]:
def assert_wer_with_alignment(ref, hyp, ideal_report):
    report = calculate_wer_with_alignment(ref, hyp)
    for k, v in ideal_report.items():
        if isinstance(v, float):
            assert round(v, 2) == round(report[k], 2), f"for '{hyp=}' and '{ref=}' {ideal_report=}, calculate_wer {report=}"
        else:
            assert v == report[k], f"for '{hyp=}' and '{ref=}' {ideal_report=}, calculate_wer {report=}"

    
def test_wer_with_alignment():
    assert_wer_with_alignment('привет студент', 'привет студент',  {
            "wer" : 0,
            "cor": 2, 
            "del": 0,
            "ins": 0,
            "sub": 0,
            "ali": [["привет", "студент"],["привет", "студент"],['C', 'C']]})
    assert_wer_with_alignment('привет студент', 'студент', {
            "wer" : 50,
            "cor": 1, 
            "del": 1,
            "ins": 0,
            "sub": 0,
            "ali": [["привет", "студент"],["***", "студент"],['D', 'C']]})
    assert_wer_with_alignment('привет', 'привет студент', {
            "wer" : 100,
            "cor": 1, 
            "del": 0,
            "ins": 1,
            "sub": 0,
            "ali": [["привет", "***"],["привет", "студент"],['C', 'I']]})
    assert_wer_with_alignment('привет студент', 'пока студент',  {
            "wer" : 50,
            "cor": 1, 
            "del": 0,
            "ins": 0,
            "sub": 1,
            "ali": [["привет", "студент"],["пока", "студент"],['S', 'C']]})

    print(f"Test 1.b passed")
    
test_wer_with_alignment() 

Test 1.b passed


# 2. WER с пунктуацией (4 балла)
Попробуйте модифицировать WER таким образом, чтобы получившаяся метрика учитавала ошибки расстановки знаков препинания. 

Для этого надо ввести ограничение в алгоритм подсчета distance_matrix таким образом, чтобы запретить делать замену знака препинания на слово и наоборот.

Пример выравнивания 
```
Я сегодня  .   ***   ***  А ты  
Я    с    *** завтра  ?   А ты  
C    S    D_p   I    I_p  C  C    
```
Здесь суффикс _p в аннотации к ошибкам означает **ошибки пунктуации**


Задание: 
Напишите функцию, которая кроме стандартного WER считает дополнительно PunctuaionErrorRate (PER) по формуле

$$ PER = {I_p + D_p + S_p \over D_p + S_p + C_p} $$


In [10]:
from tabulate import tabulate

def tokenize_text_and_punctuation(src: str, original_case: bool = False) -> list[str]:
    # Определяем список знаков препинания
    punctuations = ".,!?;"
    result = []
    word = ""
    for char in src:
        if char in punctuations:
            if word.strip():
                result.append(word.strip())
            word = ""
            result.append(char)
        elif char.isspace():
            if word.strip():
                result.append(word.strip())
            word = ""
        else:
            word += char
    if word.strip():
        result.append(word.strip())
    
    if not original_case:
        result = [w.lower() for w in result]
    
    return result


def is_word(s: str) -> bool:
    return s not in ".,!?;"

def same(str1, str2):
    return not (is_word(str1) ^ is_word(str2))

def build_punctuation_sensitive_distance_matrix(reference_text: str, recognized_text: str) -> list[list[int]]:
    
    reference_words = tokenize_text_and_punctuation(reference_text)
    recognized_words = tokenize_text_and_punctuation(recognized_text)

    distance_matrix = [[0] * (len(recognized_words) + 1) for _ in range(len(reference_words) + 1)]
    
    for i in range(len(reference_words) + 1):
        distance_matrix[i][0] = i
    for j in range(len(recognized_words) + 1):
        distance_matrix[0][j] = j
    
    for i in range(1, len(reference_words) + 1):
        for j in range(1, len(recognized_words) + 1):
            operations = [
                distance_matrix[i][j-1] + 1,
                distance_matrix[i-1][j] + 1,
            ]
            if same(reference_words[i-1], recognized_words[j-1]):
                operations.append(distance_matrix[i-1][j-1] + (0 if reference_words[i-1] == recognized_words[j-1] else 1))
            distance_matrix[i][j] = min(operations)
    
    return distance_matrix

def calculate_wer_per(reference_text: str, recognized_text: str):
    reference_words = tokenize_text_and_punctuation(reference_text, original_case=True)
    recognized_words = tokenize_text_and_punctuation(recognized_text, original_case=True)
    distance_matrix = build_punctuation_sensitive_distance_matrix(reference_text, recognized_text)
    
    i, j = len(reference_words), len(recognized_words)
    ali = [[], [], []]

    # Расстояние Левенштейна для слов
    levenstein_distance = 0
    # Расстояние Левенштейна для знаков препинания
    levenstein_distance_per = 0

    # Поиск оптимального пути
    while i > 0 or j > 0:
        current_value = distance_matrix[i][j]
        try_insertion = distance_matrix[i][j-1]
        try_deletion = distance_matrix[i-1][j]
        try_substitution = distance_matrix[i-1][j-1]
        min_value = min([try_insertion, try_deletion, try_substitution])

        # Если минимальное значение соответствует замене или совпадению
        if try_substitution == min_value and abs(current_value - min_value) <= 1:
            i -= 1
            j -= 1
            ali[0].append(reference_words[i])
            ali[1].append(recognized_words[j])

            # Проверяем является ли это знаком препинания или словом
            if not is_word(reference_words[i]):
                ali[2].append("C_p" if current_value - min_value == 0 else "S_p")
                if current_value - min_value == 1:
                    levenstein_distance_per += 1
            else:
                ali[2].append("C" if current_value - min_value == 0 else "S")
                if current_value - min_value == 1:
                    levenstein_distance += 1

        # Если минимальное значение соответствует удалению
        elif try_deletion == min_value and abs(current_value - min_value) <= 1:
            i -= 1
            ali[0].append(reference_words[i])
            ali[1].append("***")
            if not is_word(reference_words[i]):
                ali[2].append("D_p")
                levenstein_distance_per += 1
            else:
                ali[2].append("D")
                levenstein_distance += 1

         # Если минимальное значение соответствует вставке
        else:
            j -= 1
            ali[0].append("***")
            ali[1].append(recognized_words[j])
            if not is_word(recognized_words[j]):
                ali[2].append("I_p")
                levenstein_distance_per += 1
            else:
                ali[2].append("I")
                levenstein_distance += 1

    ali[0] = ali[0][::-1]
    ali[1] = ali[1][::-1]
    ali[2] = ali[2][::-1]

    # Вычисление WER и PER
    wer = levenstein_distance / len([x for x in reference_words if is_word(x)]) * 100
    per = levenstein_distance_per / len([x for x in reference_words if not is_word(x)]) * 100 if len([x for x in reference_words if not is_word(x)]) != 0 else 0
    
    return {"wer": wer, "per": per, "ali": ali}

In [11]:
test_result = calculate_wer_per("Сегодня, я изучаю Python.", "Завтра, я начну изучать Python!")

print("WER: " + str(test_result['wer']) + "\n" +
      "PER: " + str(test_result['per']) + "\n" +
      "Alignment:\n" +
      tabulate(test_result['ali']))

WER: 75.0
PER: 50.0
Alignment:
-------  ---  -  -----  -------  ------  ---
Сегодня  ,    я  ***    изучаю   Python  .
Завтра   ,    я  начну  изучать  Python  !
S        C_p  C  I      S        C       S_p
-------  ---  -  -----  -------  ------  ---


In [12]:
def assert_wer_per(ref, hyp, ideal_report):
    report = calculate_wer_per(ref, hyp)
    for k, v in ideal_report.items():
        if isinstance(v, float):
            assert round(v, 2) == round(report[k], 2), f"for '{hyp=}' and '{ref=}' {ideal_report=}, calculate_wer {report=}"
        else:
            assert v == report[k], f"for '{hyp=}' and '{ref=}' {ideal_report=}, calculate_wer {report=}"

    
def test_wer_per():
    assert_wer_per('привет студент.', 'привет студент',  {
            "wer" : 0,
            "per": 100})
    assert_wer_per('привет студент.', 'студент.', {
            "wer" : 50,
            "per": 0,})
    assert_wer_per('привет студент.', 'привет. студент',  {
            "wer" : 0,
            "per": 200})
    assert_wer_per('привет студент.', '.студент?', {
            "wer" : 50,
            "per": 200, })

    print(f"Test 2 passed")
    
test_wer_per() 

Test 2 passed


# 3. Speaker-attributed Word Error Rate (4 балла)

В задаче распознавания диалоговых данных, когда говорят два диктора, важно не только распознать правильно каждое слово, но и отнести его к правильному диктору. Чаще всего такой результат получается с помощью комбинации двух независимых систем: распознавания речи и диаризация. 

При подсчете ошибки распознавания диалоговых систем в формулу WER добавляется еще один тип ошибки - S_I (speaker incorrect).

$$ SA{\text -}WER = \min{I + D + S + S_I \over D + S + C + S_I} $$

Кроме подсчета самой ошибки, sa-wer решает еще одну задачку - поиск маппинга из эталонных названий спикеров (например, имен) в предсказанные (чаще всего Idшники). Это необходимо, тк система диаризации не знает, какие названия у спикеров в эталоне. При подсчете SA-WER проверяются все возможные мапинги спикеров и выбирается тот, который соответствует минимальному значению ошибки. 



In [13]:
def calculate_sawer(reference_text, reference_speakers, recognized_text, recognized_speakers):
    # В отличие от прошлых функций на вход sawer подаются уже разбитые на слова произнесения
    # Кроме списка слов, дополнительно передается список меток спикеров
    assert isinstance(reference_text, list)
    assert isinstance(recognized_text, list)
    assert len(reference_text) == len(reference_speakers)
    assert len(recognized_text) == len(recognized_speakers)
    
    # TODO  посчитайте sawer с учетом мапинга спикеров
    # для этого посчитайте значение ошибки для каждого варианта мапинга меток дикторов 
    # и выберете тот, который соответствует минимальному SA-WER
    sawer=0
    ali=[]
    return {"sawer" : sawer, 
            "ali": ali}


In [40]:
from itertools import permutations, product

def calculate_sawer(reference_text, reference_speakers, recognized_text, recognized_speakers):
    assert isinstance(reference_text, list)
    assert isinstance(recognized_text, list)
    assert len(reference_text) == len(reference_speakers)
    assert len(recognized_text) == len(recognized_speakers)

    # Получаем уникальных дикторов из эталона и распознанного текста
    unique_ref_speakers = list(set(reference_speakers))
    unique_rec_speakers = list(set(recognized_speakers))

    # Генерируем все комбинации маппинга дикторов
    all_mappings = product(unique_rec_speakers, repeat=len(unique_ref_speakers))

    best_sawer = float("inf")
    best_mapping = {}

    for mapping in all_mappings:
        speaker_mapping = {unique_ref_speakers[i]: mapping[i] for i in range(len(unique_ref_speakers))}

        # Вставка, удаление, замена, правильный, неправильный диктор
        I, D, S, C, SI = 0, 0, 0, 0, 0 
        
        i, j = 0, 0

        while i < len(reference_text) and j < len(recognized_text):
            ref_word = reference_text[i]
            ref_spk = reference_speakers[i]
            rec_word = recognized_text[j]
            rec_spk = recognized_speakers[j]

            if ref_word == rec_word:
                if speaker_mapping.get(ref_spk) == rec_spk:
                    C += 1
                else:
                    SI += 1
                i += 1
                j += 1
            else:
                if ref_word != rec_word:
                    S += 1
                i += 1
                j += 1

        while i < len(reference_text):
            D += 1
            i += 1

        while j < len(recognized_text):
            I += 1
            j += 1

        # Расчет SA-WER
        total = D + S + C + SI
        sawer = (I + D + S + SI) / total * 100 if total != 0 else 0

        if sawer < best_sawer:
            best_sawer = sawer
            best_mapping = speaker_mapping

    for spk in unique_rec_speakers:
        if spk not in best_mapping.values():
            best_mapping[None] = spk

    return {"sawer": best_sawer, "ali": best_mapping}

In [41]:
sawer = calculate_sawer(['привет', 'с'], ['A', 'B'], ['привет', 'студент'], [1, 2])
print(sawer)

{'sawer': 50.0, 'ali': {'A': 1, 'B': 1, None: 2}}


In [42]:
def assert_sawer(reference_text, reference_speakers, recognized_text, recognized_speakers, ideal_report):
    report = calculate_sawer(reference_text, reference_speakers, recognized_text, recognized_speakers)
    for k, v in ideal_report.items():
        assert v == report[k]

    
def test_sawer():
    assert_sawer(['привет', 'студент'], ['A', 'B'], ['привет', 'студент'], [1, 2],  {
            "sawer" : 0})
    assert_sawer(['привет', 'студент'], ['A', 'A'], ['привет', 'студент'], [1, 2],  {
            "sawer" : 50})
    assert_sawer(['привет', 'студент'], ['A', 'A'], ['привет', 'студент'], [0, 0],  {
            "sawer" : 0})
    assert_sawer(['привет', 'с'], ['A', 'B'], ['привет', 'студент'], [1, 2],  {
            "sawer" : 50})
    assert_sawer(['привет', 'с'], ['A', 'B'], ['привет'], [1],  {
            "sawer" : 50})
    assert_sawer(['привет'], ['A'], ['привет', 'студент'], [1, 0],  {
            "sawer" : 100})
    assert_sawer(['привет'], ['A'], ['привет', 'студент'], [0, 0],  {
            "sawer" : 100})

    print(f"Test 3 passed")
    
test_sawer()

Test 3 passed
