In [38]:
! pip install datasets==3.6.0



In [39]:
! pip install --no-deps git+https://github.com/salute-developers/GigaAM.git pywer

Collecting git+https://github.com/salute-developers/GigaAM.git
  Cloning https://github.com/salute-developers/GigaAM.git to /tmp/pip-req-build-pdt03mei
  Running command git clone --filter=blob:none --quiet https://github.com/salute-developers/GigaAM.git /tmp/pip-req-build-pdt03mei
  Resolved https://github.com/salute-developers/GigaAM.git to commit 6a8b511f753670ed38af6529bb89bbdc2191ba6a
  Preparing metadata (setup.py) ... [?25l[?25hdone


In [40]:
from datasets import load_dataset

In [41]:
fleurs = load_dataset("google/fleurs", "ru_ru")

In [42]:
from torch.utils.data import Dataset, DataLoader
import torch
from typing import Dict

In [43]:
class AudioDataset(Dataset):
    def __init__(self, dataset, dataset_part: str="train"):
       self.dataset = dataset

       if dataset_part == "train":
        self.dataset = self.dataset['train']
       elif dataset_part == "validation":
        self.dataset = self.dataset['validation']
       else:
          self.dataset = self.dataset['test']

    def __len__(self) -> int:
        return len(self.dataset)

    def __getitem__(self, idx: int) -> Dict:
        sample = self.dataset[idx]

        return {
            'audio': torch.tensor(sample['audio']['array'], dtype=torch.float32),
            'num_samples': torch.tensor(sample['num_samples'], dtype=torch.int32),
            'transcription': sample['transcription'],
        }

In [44]:
def collate_fn(batch):
    # Для полей с разной длиной (например, audio) нужно добавить паддинг
    audio = [item['audio'] for item in batch]
    audio_padded = torch.nn.utils.rnn.pad_sequence(audio, batch_first=True)

    return audio_padded, torch.stack([item['num_samples'] for item in batch]), [item['transcription'] for item in batch]

In [45]:
train_dataset = AudioDataset(fleurs, dataset_part='train')

In [46]:
train_loader = DataLoader(
            train_dataset,
            batch_size=4,
            shuffle=True,
            num_workers=1,
            collate_fn=collate_fn
        )

In [47]:
for batch in train_loader:
  audio, num_samples, texts = batch
  print(audio)
  print(num_samples)
  print(texts)
  break

tensor([[ 0.0000e+00,  0.0000e+00,  0.0000e+00,  ...,  0.0000e+00,
          0.0000e+00,  0.0000e+00],
        [ 0.0000e+00,  0.0000e+00,  0.0000e+00,  ...,  0.0000e+00,
          0.0000e+00,  0.0000e+00],
        [ 0.0000e+00,  0.0000e+00,  0.0000e+00,  ..., -2.2054e-06,
          4.4703e-06, -4.4703e-06],
        [ 0.0000e+00,  0.0000e+00,  0.0000e+00,  ...,  0.0000e+00,
          0.0000e+00,  0.0000e+00]])
tensor([163200, 142080, 224640, 122880], dtype=torch.int32)
['видимость также может быть ограничена вследствие снегопада метели конденсата или льда на стёклах транспортного средства', 'несмотря на эти обвинения ма легко победил выступая за более тесные связи с материковой частью китая', 'во время этих чудовищных штормов бушуют ветра со скоростью до 480 км/ч 133 м/с; 300 миль в час', 'впоследствии эдинбургский шерифский суд предъявил адекое обвинение в убийстве ее сына']


In [48]:
! pip install hydra-core



In [49]:
import os

In [50]:
from gigaam import GigaAMASR
import gigaam

CACHE_DIR = os.path.expanduser("~/.cache/gigaam")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model_name, model_path = gigaam._download_model('ctc', CACHE_DIR)

ckpt = torch.load(model_path, map_location='cpu', weights_only=False)

ckpt["cfg"].encoder.flash_attn = False
model = GigaAMASR(ckpt['cfg'])

model.load_state_dict(ckpt["state_dict"], strict=False)
model = model.eval()

if device.type != "cpu":
  model.encoder = model.encoder.half()

model = model.to(device)

In [51]:
from typing import List

In [52]:
import re
import string

def convert_arabic_number_to_words(number_str):
    """
    Преобразует арабское число в слова
    """
    try:
        num = int(number_str)
    except ValueError:
        return number_str

    # Базовые словари
    units = ['', 'один', 'два', 'три', 'четыре', 'пять', 'шесть', 'семь', 'восемь', 'девять']
    teens = ['десять', 'одиннадцать', 'двенадцать', 'тринадцать', 'четырнадцать',
             'пятнадцать', 'шестнадцать', 'семнадцать', 'восемнадцать', 'девятнадцать']
    tens = ['', '', 'двадцать', 'тридцать', 'сорок', 'пятьдесят',
            'шестьдесят', 'семьдесят', 'восемьдесят', 'девяносто']
    hundreds = ['', 'сто', 'двести', 'триста', 'четыреста', 'пятьсот',
                'шестьсот', 'семьсот', 'восемьсот', 'девятьсот']

    if num == 0:
        return 'ноль'

    words = []

    # Обрабатываем тысячи
    if num >= 1000:
        thousands = num // 1000
        if thousands == 1:
            words.append('тысяча')
        elif thousands in [2, 3, 4]:
            words.append(units[thousands] + ' тысячи')
        else:
            words.append(convert_arabic_number_to_words(str(thousands)) + ' тысяч')
        num %= 1000

    # Обрабатываем сотни
    if num >= 100:
        words.append(hundreds[num // 100])
        num %= 100

    # Обрабатываем десятки и единицы
    if num >= 20:
        words.append(tens[num // 10])
        if num % 10 > 0:
            words.append(units[num % 10])
    elif num >= 10:
        words.append(teens[num - 10])
    elif num > 0:
        words.append(units[num])

    return ' '.join(words)

def convert_roman_number_to_words(roman_str):
    """
    Преобразует римское число в слова
    """
    roman_numerals = {
        'I': 1, 'V': 5, 'X': 10, 'L': 50,
        'C': 100, 'D': 500, 'M': 1000
    }

    roman_str = roman_str.upper()
    total = 0
    prev_value = 0

    for char in reversed(roman_str):
        if char not in roman_numerals:
            return roman_str

        value = roman_numerals[char]
        if value < prev_value:
            total -= value
        else:
            total += value
        prev_value = value

    return convert_arabic_number_to_words(str(total))

def replace_latin_with_russian(text):
    """
    Заменяет латинские буквы на похожие русские
    """
    latin_to_russian = {
        'a': 'а', 'b': 'б', 'c': 'к', 'd': 'д', 'e': 'е',
        'f': 'ф', 'g': 'г', 'h': 'х', 'i': 'и', 'j': 'й',
        'k': 'к', 'l': 'л', 'm': 'м', 'n': 'н', 'o': 'о',
        'p': 'п', 'q': 'к', 'r': 'р', 's': 'с', 't': 'т',
        'u': 'у', 'v': 'в', 'w': 'в', 'x': 'кс', 'y': 'у', 'z': 'з'
    }

    for latin, russian in latin_to_russian.items():
        text = text.replace(latin, russian)
        text = text.replace(latin.upper(), russian.upper())

    return text

def preprocess_text(text):
    """
    Предобрабатывает текст по заданным правилам:
    1. Оставляет только русские буквы в нижнем регистре
    2. Заменяет знаки препинания на пробелы
    3. Заменяет ё на е
    4. Преобразует числа в слова
    5. Заменяет английские буквы на русские
    """

    # Шаг 1: Заменяем ё на е
    text = text.replace('ё', 'е').replace('Ё', 'е')

    # Шаг 2: Заменяем латинские буквы на русские
    text = replace_latin_with_russian(text)

    # Шаг 3: Преобразуем римские цифры
    # Ищем римские цифры (от I до MMMCMXCIX)
    roman_pattern = r'\b[IVXLCDM]+\b'
    text = re.sub(roman_pattern, lambda m: convert_roman_number_to_words(m.group()), text, flags=re.IGNORECASE)

    # Шаг 4: Преобразуем арабские цифры
    # Ищем целые числа
    arabic_pattern = r'\b\d+\b'
    text = re.sub(arabic_pattern, lambda m: convert_arabic_number_to_words(m.group()), text)

    # Шаг 5: Заменяем все знаки препинания на пробелы
    punctuation_chars = string.punctuation + '—–«»„“‚‘'
    for char in punctuation_chars:
        text = text.replace(char, ' ')

    # Шаг 6: Приводим к нижнему регистру
    text = text.lower()

    # Шаг 7: Удаляем все символы, кроме русских букв и пробелов
    text = re.sub(r'[^а-я\s]', '', text)

    # Шаг 8: Убираем лишние пробелы
    text = re.sub(r'\s+', ' ', text).strip()

    return text

In [61]:
def get_model_vocab(model):
    model_vocab = {sym: idx for idx, sym in enumerate(model.decoding.tokenizer.vocab)}
    return model_vocab

def pad_list(nested_list, padding_element=33):
    max_len = max(len(sublist) for sublist in nested_list)

    padded_list = [
        sublist + [padding_element] * (max_len - len(sublist))
        for sublist in nested_list
    ]

    return padded_list

def get_texts_idxs(texts: List[str], model_vocab: Dict[str, str], blank_token=33) -> torch.Tensor:
  texts_idxs = []

  for text in texts:
    text = preprocess_text(text)

    text_idxs = list([model_vocab[sym] for sym in text])
    texts_idxs.append(text_idxs)

  texts_idxs = pad_list(texts_idxs, padding_element=blank_token)
  return torch.tensor(texts_idxs, dtype=torch.int)

def get_gigaam_logprobs(model, wav_batch, wav_lengths, return_transcriptions=False):
    wav_batch = wav_batch.to(model._device)
    wav_lengths = wav_lengths.to(model._device)

    encoded, encoded_len = model.forward(wav_batch, wav_lengths)

    logprobs = model.head(encoded)

    if return_transcriptions:
        transcriptions = model.decoding.decode(model.head, encoded, encoded_len)
        return logprobs, encoded_len, transcriptions
    else:
        return logprobs, encoded_len

In [54]:
import torch.nn as nn

In [55]:
def _compute_ctc_loss(logprobs, encoded_len, transcripts, transcript_lengths):
  # Проверяем и выравниваем длины
  encoded_len = tuple(encoded_len.to('cpu').numpy())

  # Убеждаемся, что encoded_len не превышает длину logprobs по времени
  T = logprobs.size(1)  # временная размерность после transpose
  encoded_len = tuple(min(el, T) for el in encoded_len)

  # CTCLoss требует логиты в формате (T, N, C)
  logprobs = logprobs.transpose(0, 1)  # Теперь форма (T, N, C)

  BLANK_IDX = 33
  ctc_loss = nn.CTCLoss(blank=BLANK_IDX, reduction='mean', zero_infinity=True)

  # Вычисляем потерю
  loss = ctc_loss(
      logprobs,           # (T, N, C)
      transcripts,        # (N, S) -> целочисленные индексы
      encoded_len,        # (N,) -> длины выходных последовательностей
      transcript_lengths  # (N,) -> длины целевых последовательностей
  )

  return loss

In [62]:
for batch in train_loader:
   audios, audio_lengths, texts = batch

   model_vocab = get_model_vocab(model)

   #TODO: maybe move it to the dataloader?
   texts = get_texts_idxs(texts, model_vocab)

   transcript_lengths=(len(sample) for sample in texts)

   logprobs, encoded_len = get_gigaam_logprobs(model, audios, audio_lengths)

   loss =  _compute_ctc_loss(
                logprobs,
                encoded_len,
                texts,
                transcript_lengths=tuple(transcript_lengths)
            )

   print(loss)
   break

tensor(6.6494, grad_fn=<MeanBackward0>)
