In [None]:
# !pip install crowd-kit==1.0.0

In [None]:
import hashlib
from typing import Iterable, Tuple, List, Dict, Set
from functools import lru_cache
from collections import Counter, defaultdict

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
plt.style.use('ggplot')
import seaborn as sns
import IPython.display as ipd
import nltk
from tqdm.auto import tqdm
from crowdkit.aggregation import ROVER

In [None]:
# !mkdir ./data
# !wget https://raw.githubusercontent.com/vadim0912/MLIntro2022_Spring/main/lecture08/data/noisy_text_aggregation_* -P data/
# !wget -r https://raw.githubusercontent.com/vadim0912/MLIntro2022_Spring/main/lecture08/data/audio -P data/audio

# Problem Statement

Дано:
* три модели распознавания речи:
    * `qnet`: [QuartzNet](https://arxiv.org/abs/1910.10261)
    * `w2v`: [wav2vec XLS-R](https://arxiv.org/pdf/2111.09296.pdf)
    * `w2v-tts`: тот же [wav2vec XLS-R](https://arxiv.org/pdf/2111.09296.pdf), но в дообучении использовались синтезированные (Text-To-Speech) данные
   
   
 * модели имеют разную структуру (QuartzNet — сверточная, wav2vec — Трансформер) и обучались на разных данных => из их предсказаний можно построить композицию, которая сильнее любого кандидата в отдельности
 * В тренировочном наборе данных ~ 60 тысяч примеров с референсной транскрипцией `text` (ground truth) и гипотезой каждой из моделей
 * В тестовом наборе данных ~ 20 тысяч примеров с гипотезами от каждой из моделей, но без референсной транскрипции (ее нужно предсказать)
 * Также доступен миллион референсных фраз из того же домена (запросы к ассистентам), но без предсказаний моделей

Задача: улучшить распознавание речи с помощью:
 * агрегации транскрипций
 * выбора лучшей транскрипции
 * исправления ошибок в транскрипциях

In [None]:
train_df = pd.read_json("data/noisy_text_aggregation_train.jsonl", lines=True)

train_df.head()

In [None]:
MODEL_LIST = ["qnet", "w2v", "w2v_tts"]

In [None]:
for i, row in train_df.head(5).iterrows():
    
    labels = [
        row[model] == row["text"] for model in MODEL_LIST
    ]
        
    row_df = (
        train_df
        .drop({"task", "text"}, axis=1)
        .iloc[[i]]
        .style.set_properties(
            **{'background-color': '#aaffaa'},
            subset=[model for model, label in zip(MODEL_LIST, labels) if label]
        )
        .set_properties(
            **{'background-color': '#ffaaaa'}, 
            subset=[model for model, label in zip(MODEL_LIST, labels) if not label]
        )
        .set_properties(width="150px")
    )
    ipd.display(row_df)
    ipd.display(ipd.Audio(f"data/audio/{row['task']}.wav"))

In [None]:
test_df = pd.read_json("data/noisy_text_aggregation_test.jsonl", lines=True)

test_df.head()

In [None]:
text_data = pd.read_csv("data/noisy_text_aggregation_text_only.csv", header=None)

text_data.head(10)

In [None]:
def hash_reminder(str_, base: int=10) -> int:
    return int(hashlib.md5(str_.encode()).hexdigest(), 16) % base

train_mask = train_df['task'].apply(lambda x: hash_reminder(x, 10) <= 7)

val_df = train_df[~train_mask]
train_df = train_df[train_mask]

# Metrics

$$
\mathrm{L}(a, b) = 
\begin{cases}
    |a|,& \text{if } |b| = 0, ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ \text{# second sequence is empty} \\
    |b|,& \text{if } |a| = 0, ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ \text{# first sequence is empty} \\
    \mathrm{L}(\mathrm{tail}(a), \mathrm{tail}(b)),& \text{if } \mathrm{head}(a) = \mathrm{head}(b), ~ ~ \text{# first elements of two sequencies are equal} \\
    1 + min 
    \begin{cases} 
        \mathrm{L}(\mathrm{tail}(a), b), ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ \text{# deletion from first sequence} \\ 
        \mathrm{L}(a, \mathrm{tail}(b)), ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ \text{# insertion into first sequence} \\ 
        \mathrm{L}(\mathrm{tail}(a), \mathrm{tail}(b)); ~ ~ ~ ~ \text{# substitution}
    \end{cases} & \text{, otherwise.}
\end{cases}
$$

In [None]:
def edit_distance(ref: Iterable, hyp: Iterable) -> int:
    """
    dummy levenshtein implementation O(3^n)
    """
    if not ref:
        return len(hyp)
    if not hyp:
        return len(ref)
    return min(
        edit_distance(ref[1:], hyp[1:]) + (ref[0] != hyp[0]), # Correct / Insertion
        edit_distance(ref, hyp[1:]) + 1, # Deletion
        edit_distance(ref[1:], hyp) + 1 # Substitution
    )


def edit_distance(ref: Iterable, hyp: Iterable, plot: bool=False) -> int:
    """
    dynamic programming levenshtein implementation O(n^2)
    """
    
    dist = np.zeros((len(hyp) + 1, len(ref) + 1), dtype=np.int32)
    
    dist[:, 0] = np.arange(len(hyp) + 1)
    dist[0, :] = np.arange(len(ref) + 1)

    for i, r in enumerate(hyp, start=1):
        for j, h in enumerate(ref, start=1):
            dist[i, j] = min(
                dist[i - 1, j - 1] + (r != h),
                dist[i, j - 1] + 1,
                dist[i - 1, j] + 1
            )
    if plot:
        sns.heatmap(
            pd.DataFrame(
                dist,
                index=[' '] + list(hyp),
                columns=[' '] + list(ref)
            ),
            annot=True,
            cmap='coolwarm_r',
            linewidth=2
        )
        plt.tick_params(
            axis='both', which='major', labelsize=14, left=False, labelbottom=False, 
            bottom=False, top=False, labeltop=True
        )
        plt.yticks(rotation=0)
            
    return dist[-1, -1]

In [None]:
edit_distance('мама мыла раму', 'мама раму', plot=True)

* Подходит ли само по себе расстояние Левенштейна в качестве метрики? Почему?

In [None]:
def error_rate(refs: Iterable[Iterable], hyps: Iterable[Iterable]) -> float:
    """
    ignoring hypotheses with empty references
    """
    
    wrong_words, all_words = 0, 0
    
    for ref, hyp in tqdm(zip(refs, hyps), total=len(refs)):
        if len(ref) > 0:
            wrong_words += edit_distance(ref, hyp)
            all_words += len(ref)
        else:
            continue
    return wrong_words / all_words


def wer(refs: Iterable[str], hyps: Iterable[str]) -> float:
    """
    Word Error Rate
    """
    return error_rate(
        [ref.split() for ref in refs],
        [hyp.split() for hyp in hyps]
    )


def cer(refs: Iterable[str], hyps: Iterable[str]) -> float:
    """
    Character Error Rate
    """
    return error_rate(refs, hyps)

* Может ли Error Rate быть > 1 ?
* Что дольше считать WER или CER ?

In [None]:
method2wer = {model: wer(val_df[model], val_df['text']) for model in MODEL_LIST}

# Alignment

In [None]:
def align(ref: Iterable, hyp: Iterable) -> List[Tuple[str, str, str]]:
    
    dist = np.zeros((len(hyp) + 1, len(ref) + 1), dtype=np.int32)
    
    dist[:, 0] = np.arange(len(hyp) + 1)
    dist[0, :] = np.arange(len(ref) + 1)
    
    cache = [[None] * (len(ref) + 1) for _ in range(len(hyp) + 1)]
    
    for i, h in enumerate(hyp, start=1):
        cache[i][0] = ('I', '%', h)
    
    for i, r in enumerate(ref, start=1):
        cache[0][i] = ('D', r, '#')
        
    for i, h in enumerate(hyp, start=1):
        for j, r in enumerate(ref, start=1):
        
            cases = []
            
            if r == h:
                cases.append((
                    dist[i - 1, j - 1],
                    ('C', r, h)
                ))
            else:
                cases.append((
                    dist[i - 1, j - 1] + 1,
                    ('S', r, h)
                ))
            cases.append((
                dist[i, j - 1] + 1,
                ('D', r, '#')
            ))
            cases.append((
                dist[i - 1, j] + 1,
                ('I', '%', h)
            ))
            
            dist[i, j], cache[i][j] = min(cases, key=lambda x: x[0])
            
    alignment = []
    i, j = len(hyp), len(ref)
    
    while i != 0 or j != 0:
        action, r, h = cache[i][j]
        alignment.append((action, r, h))
        if action in {'C', 'S'}:
            i -= 1
            j -= 1
        elif action == 'I':
            i -= 1
        else:
            j -= 1

    return alignment[::-1]

In [None]:
align('машинное обучение', 'мышиное облучение')

In [None]:
align('мама мыла раму с мылом'.split(), 'мама мыла с млом'.split())

# Aggregation

In [None]:
from crowdkit.aggregation import ROVER

**R**ecognizer **O**utput **V**oting **E**rror **R**eduction

https://ieeexplore.ieee.org/document/659110

https://arxiv.org/pdf/2107.01091.pdf

In [None]:
def get_rover_df(df: pd.DataFrame, model_cols: List[str], tmp_col: str="__tmp") -> pd.DataFrame:

    rover_df = df.copy()
    
    if "text" in rover_df.columns:
        rover_df.drop("text", axis=1, inplace=True)
    
    rover_df[tmp_col] = rover_df.apply(lambda row: [(model, row[model]) for model in model_cols], axis=1)
    
    rover_df = rover_df.drop(model_cols, axis=1).explode(tmp_col)

    return pd.DataFrame({
        "task": rover_df["task"],
        "performer": rover_df[tmp_col].apply(lambda x: x[0]),
        "text": rover_df[tmp_col].apply(lambda x: x[1])
    })

In [None]:
val_rover_df = get_rover_df(val_df, model_cols=MODEL_LIST)

In [None]:
rover_result = (
    ROVER(
        tokenizer=lambda x: list(x),
        detokenizer=lambda s: "".join(s),
        silent=False
    )
    .fit_predict(val_rover_df)
)

In [None]:
rover_result = pd.merge(
    val_df,
    rover_result.reset_index(),
    on='task'
)

In [None]:
method2wer['ROVER'] = wer(rover_result['agg_text'], rover_result['text'])

In [None]:
method2wer

* Как можно улучшить ROVER ?

# Error Correction

https://norvig.com/spell-correct.html

In [None]:
def one_edit_words(word: str) -> Set[str]:
    """
    return list of candidates with one correction
    """
    letters = 'абвгдежзийклмнопрстуфхцчшщъыьэюя'
    splits = [(word[:i], word[i:]) for i in range(len(word) + 1)]
    deletions = [left + right[1:] for left, right in splits if right]
    substitutions = [left + c + right[1:] for left, right in splits if right for c in letters]
    insertions = [left + c + right for left, right in splits for c in letters]
    return set(deletions + substitutions + insertions)


word_counts = Counter([word for utterance in train_df['text'].str.split() for word in utterance])


@lru_cache(maxsize=None)
def correct_word(word: str) -> str:
    if word in word_counts:
        return word
    
    candidates = one_edit_words(word)
    
    candidates = sorted([
            (word, word_counts[word])
            for word in candidates if word_counts[word] > 0
        ],
        key=lambda x: -x[1]
    )
    
    if candidates:
        return max(candidates, key=lambda x: x[1])[0]
    return word

In [None]:
w2v_corrected = val_df['w2v'].apply(
    lambda x: " ".join([correct_word(w) for w in x.split()])
)

method2wer['w2v_corrected'] = wer(val_df['text'], w2v_corrected)

In [None]:
method2wer

# Rescoring

In [None]:
class LaplaceLanguageModel:
    
    def __init__(
            self, 
            tokenized_texts: Iterable[Iterable[str]], 
            n: int, 
            delta: float = 0.0, 
            BOS: str='<BOS>',
            EOS: str='<EOS>'
        ):
        self.n = n
        self.BOS = BOS
        self.EOS = EOS
        ngram_counts: Dict[Tuple[str, ...], Dict[str, int]] = self.build_ngram_counts(
            tokenized_texts, n, BOS, EOS
        )
        
        self.vocab = {
            token for distribution in ngram_counts.values() for token in distribution
        }
        
        self.probs = defaultdict(Counter)

        for prefix, distribution in ngram_counts.items():
            norm: float = sum(distribution.values()) + delta * len(self.vocab)
            self.probs[prefix] = {
                token: (count + delta) / norm for token, count in distribution.items()
            }
            
    @staticmethod
    def build_ngram_counts(
        tokenized_texts: Iterable[Iterable[str]], 
        n: int,
        BOS: str,
        EOS: str
    ) -> Dict[Tuple[str, ...], Dict[str, int]]:
        
        counts = defaultdict(Counter)

        for text in tokenized_texts:

            ngrams = nltk.ngrams(
                text, n=n, pad_left=True, pad_right=True, left_pad_symbol=BOS, right_pad_symbol=EOS
            )

            for ngram in ngrams:
                prev, token = ngram[:-1], ngram[-1]
                counts[prev][token] += 1

        return counts
    
    
    def __get_observed_token_distribution(self, prefix: List[str]) -> Dict[str, float]:
        prefix = prefix[max(0, len(prefix) - self.n + 1):]
        prefix = [self.BOS] * (self.n - 1 - len(prefix)) + prefix
        return self.probs[tuple(prefix)]
    
    
    def get_token_distribution(self, prefix: List[str]) -> Dict[str, float]:
        
        distribution: Dict[str, float] = self.__get_observed_token_distribution(prefix)
        
        missing_prob_total: float = 1.0 - sum(distribution.values())
        
        missing_prob = missing_prob_total / max(1, len(self.vocab) - len(distribution))
        
        return {token: distribution.get(token, missing_prob) for token in self.vocab}
    
    
    def get_next_token_prob(self, prefix: List[str], next_token: str):
        
        distribution: Dict[str, float] = self.__get_observed_token_distribution(prefix)
        
        if next_token in distribution:
            return distribution[next_token]
        
        else:
            missing_prob_total = 1.0 - sum(distribution.values())
            return max(0, missing_prob_total) / max(1, len(self.vocab) - len(distribution))
    
    
    def score_sequence(self, tokens: List[str], min_logprob: float = np.log(10 ** -50.)) -> float:
        prefix = [self.BOS] * (self.n - 1)
        padded_tokens = tokens + [self.EOS]
        logprobs_sum = 0.0
        for token in padded_tokens:
            logprob = np.log(self.get_next_token_prob(prefix, token))
            prefix = prefix[1:] + [token]
            logprobs_sum += max(logprob, min_logprob)
        return logprobs_sum / len(tokens) if tokens else 0.0

In [None]:
lm = LaplaceLanguageModel(
    n=2,
    tokenized_texts=text_data[0],
    delta=1e-5
)

In [None]:
for text in ('мама мыла раму', 'мамо мыла раму', 'машинное обучение', 'маинное обучение'):
    score = lm.score_sequence(list(text))
    print(f"{text}\t\t{score:.2f}")

In [None]:
max_likelihood_utterances = val_df.apply(
    lambda row: row[
        np.array([
            lm.score_sequence(tokens=list(row[model])) for model in MODEL_LIST
        ]).argmax()
    ], 
    axis=1
)

In [None]:
method2wer['dummy_rescoring'] = wer(val_df['text'], max_likelihood_utterances)

In [None]:
method2wer

# Oracle WER

если представить, что мы идеально выбираем лучшую из трех гипотез (Оракул), каким будет Word Error Rate?\
таким образом оценим нижнюю границу Rescoring-системы

In [None]:
def get_best_transcription(ref: Iterable[str], hyps: Iterable[Iterable[str]]):
    return hyps[
        np.array([
            edit_distance(ref, hyp) for hyp in hyps
        ]).argmin()
    ]

In [None]:
oracle_hyp = val_df.apply(
    lambda row: " ".join(
        get_best_transcription(
            ref=row['text'].split(),
            hyps=[row[model].split() for model in MODEL_LIST]
        )
    ),
    axis=1
)

In [None]:
method2wer['oracle_wer'] = wer(val_df['text'], oracle_hyp)

In [None]:
method2wer

In [None]:
{'qnet': 0.7652783922138658,
 'w2v': 0.5601030720835285,
 'w2v_tts': 0.620017745017745,
 'ROVER': 0.5634646316005613,
 'w2v_corrected': 0.5412983965584669,
 'dummy_rescoring': 0.6509581540868205,
 'oracle_wer': 0.49483770043019165}

# Prediction

In [None]:
test_rover_df = get_rover_df(test_df, model_cols=MODEL_LIST)

In [None]:
test_result = (
    ROVER(
        tokenizer=lambda x: list(x),
        detokenizer=lambda s: "".join(s),
        silent=False
    )
    .fit_predict(test_rover_df)
    .reset_index()
    .rename({'agg_text': 'prediction'}, axis=1)
)

In [None]:
username = 

test_result.to_json(
    f"noisy_text_aggregation_test_prediction_{username}.jsonl",
    lines=True, orient="records"
)

# TODO

* провести эксперименты с разными подходами
* аккуратно валидироваться и тестироваться
* сформировать файл с предсказаниями
* <font color='red'>в переменную `username` указать фамилию <font> 
* прикрепить на портале jupyter-notebook / .py-file / colab-link и файл с предсказаниями