# Install packages

In [None]:
!pip install nemo_toolkit[all]
!pip install pydub
!pip install wget
!pip install zipfile
!pip install transformers
!pip install torch
!pip install pympi-ling
!pip install librosa
!pip install soundfile

# Imports

In [None]:
from pydub import AudioSegment
from pydub.silence import split_on_silence, detect_nonsilent
import nemo.collections.asr as nemo_asr
import wget
import time
import datetime
import os
import shutil
import pympi
import zipfile
from transformers import AutoModelForTokenClassification, AutoTokenizer, Pipeline
from nltk import wordpunct_tokenize
from collections import Counter
import torch
import librosa
import soundfile as sf
import numpy as np
from google.colab import files

# Split sound to fragments

In [None]:
def get_intervals(path):
    print('Start record processing...')
    start = time.time()
    aud, sr = librosa.load(path, sr=16000)
    new_path = '/content/audio_16.wav'
    sf.write(new_path, aud, sr)
    sound = AudioSegment.from_wav(new_path)
    sound = sound.set_channels(1)
    result = detect_nonsilent(sound, min_silence_len=700, silence_thresh=sound.dBFS-14, seek_step=5)
    final = []
    next_was_added = False
    for i in range(len(result)):
        if next_was_added == False:
            if (result[i][1]/1000-result[i][0]/1000 > 1):
                final.append(result[i])
            else:
                if i != len(result):
                    last_verif = final[-1]
                    next_item = result[i+1]
                else:
                    last_verif = final[-1]
                    next_item = [-100, -100]
                if (result[i][0] - last_verif[1])/1000 < 1:
                    final[-1][1] = result[i][1]
                elif 0 < (next_item[0]-result[i][1])/1000 < 1:
                    final.append([result[i][0], next_item[1]])
                    next_was_added = True
                else:
                    if result[i][1]/1000-result[i][0]/1000 > 0.01:
                        final.append(result[i])
                    else:
                        pass
        else:
            next_was_added = False 
    end = time.time()
    total_time = end-start
    print('Done in ' + str(datetime.timedelta(seconds=total_time)))
    
    return sound, final

In [None]:
def split_sound(sound, chunks):
    print('Start record chuncking...')
    start = time.time()
    manifest_full = []
    current_directory = os.getcwd()
    final_directory = os.path.join(current_directory, r'audio_segments')
    if os.path.exists(final_directory) and os.path.isdir(final_directory):
        shutil.rmtree(final_directory)
    os.makedirs(final_directory)
    for i, chunk in enumerate(chunks):
        segment = sound[chunk[0]:chunk[1]]
        path_segment = os.path.join(final_directory, str(i) + '.wav')
        segment.export(path_segment, format='wav')
        manifest_full.append({'audio_filepath': path_segment, 
                              'duration': (chunk[1]-chunk[0])/1000, 
                              'xmin': chunk[0]/1000, 
                              'xmax': chunk[1]/1000})
    end = time.time()
    total_time = end-start
    print('Done in ' + str(datetime.timedelta(seconds=total_time)))
    print('Folder with audio segments: ' + final_directory)
    return manifest_full

# Transcribe

In [None]:
def get_transcriptions(manifest, model_type):
    print('Start transcribing...')
    start = time.time()
    if model_type == 'zapdvin':
        url = 'https://storage.yandexcloud.net/dialect-speech-recognition-and-feature-detection/finetuned_conformer.nemo?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=YCAJEUEYBLmE9j6wqGUjKoE1r%2F20230529%2Fru-central1%2Fs3%2Faws4_request&X-Amz-Date=20230529T152617Z&X-Amz-Expires=1209600&X-Amz-Signature=C45CBC6562D8880D5EFFC05A811DC9BB211938AA3CA890EC8DC690952203128F&X-Amz-SignedHeaders=host'
        filename = wget.download(url, out='/content/conformer_zapdvin.nemo')
    if model_type == 'opochka':
        url = 'https://storage.yandexcloud.net/dialect-speech-recognition-and-feature-detection/finetuned_conformer_opochka.nemo?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=YCAJEUEYBLmE9j6wqGUjKoE1r%2F20230529%2Fru-central1%2Fs3%2Faws4_request&X-Amz-Date=20230529T153308Z&X-Amz-Expires=1209600&X-Amz-Signature=8F163739C6D5923A09B2A6888FD0023D0278AB7D2CF7D6590EDEBA01C7ADAF89&X-Amz-SignedHeaders=host'
        filename = wget.download(url, out='/content/conformer_zapdvin.nemo')
    asr_model = nemo_asr.models.EncDecRNNTBPEModel.restore_from(restore_path=filename)
    files = [file['audio_filepath'] for file in manifest]
    transcriptions = asr_model.transcribe(paths2audio_files=files)
    for i, item in enumerate(manifest):
        item['transcription'] = transcriptions[0][i]
    end = time.time()
    total_time = end-start
    print('Done in ' + str(datetime.timedelta(seconds=total_time)))
    return manifest

# Get TextGrid

In [None]:
def prepare_textgrid(manifest):
    print('Preparing TextGrid...')
    start = time.time()
    xmin = 0.0
    xmax = manifest[-1]['xmax']
    size = len(manifest)
    header = 'File type = "ooTextFile"\nObject class = "TextGrid"\nxmin = {xmin}\nxmax = {xmax}\ntiers? <exists>\nsize = 1\nitem []:\n'.format(xmin=xmin, xmax=xmax)
    item_1 ='    item[1]:\n        class = "IntervalTier"\n        name = "{name}"\n        xmin = {xmin}\n        xmax = {xmax}\n        intervals: size = {size}\n'.format(name='text', xmin=xmin, xmax=xmax, size=size)
    current_directory = os.getcwd()
    final_directory = os.path.join(current_directory, r'result.TextGrid')
    with open(final_directory, 'w', encoding='utf-8') as f:
        f.write(header)
        f.write(item_1)
        i = 1
        for item in manifest:
            f.write('        intervals [{num}]'.format(num=i))
            f.write('\n')
            f.write('            xmin = {start}'.format(start=item['xmin']))
            f.write('\n')
            f.write('            xmin = {end}'.format(end=item['xmax']))
            f.write('\n')
            f.write('            text = "{transcription}"'.format(transcription=item['transcription']))
            f.write('\n')
        i += 1
    end = time.time()
    total_time = end-start
    print('Done in ' + str(datetime.timedelta(seconds=total_time)))
    print('Location of TextGrid: ' + final_directory)
    

# Get dialect features

In [None]:
class MyPipeline(Pipeline):
    def _sanitize_parameters(self, **kwargs):
        preprocess_kwargs = {}
        if "tokenizer" in kwargs:
            preprocess_kwargs["tokenizer"] = kwargs["tokenizer"]
        return preprocess_kwargs, {}, {}

    def preprocess(self, text):
        self.text_splt = wordpunct_tokenize(text.lower())
        self.tokenized = self.tokenizer(self.text_splt, is_split_into_words=True, return_tensors='pt')
        return self.tokenized

    def _forward(self, model_inputs):
        model_inputs['input_ids'] = model_inputs['input_ids'].to('cuda')
        model_inputs['attention_mask'] = model_inputs['attention_mask'].to('cuda')
        return self.model(**model_inputs)

    def postprocess(self, model_outputs):
        tokens = self.tokenizer.convert_ids_to_tokens(list(self.tokenized["input_ids"][0]))
        predicted_label_id = torch.argmax(model_outputs.logits, axis=-1).numpy()
        id2label = {0: "O", 1: "B-PHON", 2: "B-MORPH", 3: "I-MORPH", 
                    4: "B-LEX", 5: "I-LEX", 6: "B-SYNT", 7: "I-SYNT"}
        labels = [id2label[i] for i in predicted_label_id[0]]
        res = {'tokens': tokens, 'labels': labels}
        result_text = ''
        result_labels = ''
        for i in range(len(res['tokens'])):
            if res['tokens'][i] != '<s>' and res['tokens'][i] != '</s>':
                if res['tokens'][i].startswith('▁'):
                    res['labels'][i] = '▁' + res['labels'][i]
                if i > 1:
                    x = res['tokens'][i].replace('▁', ' ')
                    y = res['labels'][i].replace('▁', ' ')
                else:
                    x = res['tokens'][i].replace('▁', '')
                    y = res['labels'][i].replace('▁', '')
                result_text += x
                result_labels = result_labels + '|' + y
        result_labels_splt = result_labels.split(' ')
        final_labels = []
        if result_labels_splt == ['']:
            return {0: {0: None}}
        else:
          for l in result_labels_splt:
              cnt = Counter()
              if l[0] == '|' and l[-1] == '|':
                  l = l[1:-1]
              elif l[0] != '|' and l[-1] == '|':
                  l = l[0:-1]
              elif l[0] == '|' and l[-1] != '|':
                  l = l[1:]
              l_splt = l.split('|')
              if len(l_splt) == 1:
                  res_lab = l_splt[0]
                  final_labels.append(res_lab)
              if len(l_splt) > 1:
                  cnt = Counter(l_splt)
                  if len(dict(cnt)) == 1:
                      res_lab = l_splt[0]
                      final_labels.append(res_lab)
                  else:
                      c = dict(cnt)
                      c.pop('O', None)
                      res_lab = max(c, key=c.get)
                      final_labels.append(res_lab)

        txt_splt = result_text.split(' ')
        dict_with_labels = {}
        for i in range(len(txt_splt)):
            dict_with_labels[i] = {txt_splt[i]: final_labels[i]}

        return dict_with_labels

In [None]:
def get_features(manifest, model_type):
    print('Finding features...')
    start = time.time()
    if model_type == 'zapdvin':
        url = 'https://storage.yandexcloud.net/dialect-speech-recognition-and-feature-detection/xlm_roberta_base_dial.zip?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=YCAJEUEYBLmE9j6wqGUjKoE1r%2F20230529%2Fru-central1%2Fs3%2Faws4_request&X-Amz-Date=20230529T154001Z&X-Amz-Expires=1209600&X-Amz-Signature=5AEB7322B6A853EFB3873968A52FFF7F842D35B63466AFFB5812FC5738B80812&X-Amz-SignedHeaders=host'
        filename = wget.download(url, out='/content/xlm_roberta.zip')
        with zipfile.ZipFile(filename, 'r') as zip_ref:
            zip_ref.extractall('/content/xlm_roberta')
    if model_type == 'opochka':
        url = 'https://storage.yandexcloud.net/dialect-speech-recognition-and-feature-detection/xlm_roberta_base_dial_V1_opochka.zip?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=YCAJEUEYBLmE9j6wqGUjKoE1r%2F20230529%2Fru-central1%2Fs3%2Faws4_request&X-Amz-Date=20230529T154207Z&X-Amz-Expires=1209600&X-Amz-Signature=764989EFA172BC6F103823BCAB4EC58F135C12B417EDA7357BF46B00F0D34911&X-Amz-SignedHeaders=host'
        filename = wget.download(url, out='/content/xlm_roberta.zip')
        with zipfile.ZipFile(filename, 'r') as zip_ref:
            zip_ref.extractall('/content/xlm_roberta')
    tokenizer = AutoTokenizer.from_pretrained('/content/xlm_roberta/xlm_roberta_base_dial/checkpoint-3000', local_files_only=True)
    model = AutoModelForTokenClassification.from_pretrained('/content/xlm_roberta/xlm_roberta_base_dial/checkpoint-3000', local_files_only=True)
    pipeline = MyPipeline(model=model.to('cuda'), tokenizer=tokenizer)
    labels = []
    for sentence in list(manifest):
        res = pipeline(sentence['transcription'])
        labels_sent = []
        for key, value in res.items():
            for key2, value2 in value.items():
                labels_sent.append(value2)
        labels.append(labels_sent)
    for i, item in enumerate(manifest):
        tokens = wordpunct_tokenize(item['transcription'].lower())
        item['tokens'] = tokens
        item['features'] = labels[i]
    end = time.time()
    total_time = end-start
    print('Done in ' + str(datetime.timedelta(seconds=total_time)))
    return manifest

# Write transcriptions, tokens and features to eaf file

In [None]:
def prepare_elan(manifest, path):
    print('Preparing EAF...')
    start = time.time()
    elan = pympi.Elan.Eaf(author='user')
    elan.add_linked_file(path)
    elan.remove_tiers(['default'])
    elan.add_tier('annotation', ling='default-lt', parent=None)
    elan.add_tier('tokens', ling='default-lt', parent='annotation')
    elan.add_tier('features', ling='default-lt', parent='tokens')

    for item in manifest:
        if len(item['transcription']) > 0:
            elan.add_annotation(id_tier='annotation', 
                                start=int(item['xmin'] * 1000), 
                                end=int(item['xmax'] * 1000), 
                                value=item['transcription'])
            word_time = item['duration'] * 1000 / len(item['tokens'])
            start_time_word_tier = item['xmin'] * 1000
            for token in item['tokens']:
                end_time_word_tier = start_time_word_tier + word_time
                elan.add_annotation(id_tier='tokens', 
                                start=int(start_time_word_tier), 
                                end=int(end_time_word_tier), 
                                value=token)
                start_time_word_tier = end_time_word_tier
            start_time_word_tier = item['xmin'] * 1000
            for token in item['features']:
                end_time_word_tier = start_time_word_tier + word_time
                elan.add_annotation(id_tier='features', 
                                start=int(start_time_word_tier), 
                                end=int(end_time_word_tier), 
                                value=token)
                start_time_word_tier = end_time_word_tier

    current_directory = os.getcwd()
    final_directory = os.path.join(current_directory, r'result.eaf')
    elan.to_file(file_path=final_directory)
    end = time.time()
    total_time = end-start
    print('Done in ' + str(datetime.timedelta(seconds=total_time)))
    print('Location of TextGrid: ' + final_directory)

# Main function

To test on a new audio file, you need to upload it and replace the link in the url variable. As an example, a record of 2022 from an expedition to the Zapadnodvinsky district of the Tver region is presented.

To change the model type, you need to change the model_type variables in the cell below. 

'zapdvin' - for data of the Zapadnodvinsky district, Tver region

'opochka' - for data of the Opochetsky district, Pskov region

In [None]:
def main():
    url = 'https://storage.yandexcloud.net/dialect-speech-recognition-and-feature-detection/20220710mga1932.wav?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=YCAJEUEYBLmE9j6wqGUjKoE1r%2F20230529%2Fru-central1%2Fs3%2Faws4_request&X-Amz-Date=20230529T162054Z&X-Amz-Expires=1209600&X-Amz-Signature=84C359415EBD33413690751188EDE437DFC5624FBEEB0F0A86B55A921F5BFF42&X-Amz-SignedHeaders=host'
    path = wget.download(url, out='/content/test.wav')
    sound, chunks = get_intervals(path)
    manifest = split_sound(sound, chunks)
    manifest = get_transcriptions(manifest, model_type='zapdvin')
    torch.cuda.empty_cache()
    manifest = get_features(manifest, model_type='zapdvin')
    prepare_textgrid(manifest)
    prepare_elan(manifest, path)

    # code below only for Google Chrome, for downloading in other browsers, 
    # you must select the "folder" icon in the left panel and download the file

    files.download('/content/result.eaf') 
    files.download('/content/result.TextGrid') 

In [None]:
if __name__ == '__main__':
    main()