This example is a demonstration of Crowdom's extended data labeling workflow for annotation tasks, using your pre-existing custom UI for projects. If you are not familiar with basic annotation task workflow – visit [basic audio transcript workflow example](../audio_transcript/audio_transcript.ipynb).

We will use an ASR model to produce first versions of transcript. Then these transcripts would be checked by Toloka workers, and in case the model made some mistakes, tasks would be reworked by Toloka workers also. 

For this example, we chose a more complex audio transcription task - we ask workers to both choose whether there is speech on the audio and write down the words they hear if there are any.

Additionally, this example illustrates how to use more precise calculation of task duration. 

# Setup environment

In [1]:
%pip install crowdom

In [2]:
%pip install -i https://pypi.yandex-team.ru/simple/ yandex-speechkit

In [None]:
from datetime import timedelta
import os
import json
import pandas as pd

from typing import Dict, Optional

import toloka.client as toloka

from crowdom import base, datasource, client, mapping, objects, pricing, params as labeling_params

## Logging customization

In [5]:
import yaml
import logging.config

In [6]:
with open('logging.yaml') as f:
    logging.config.dictConfig(yaml.full_load(f.read()))

## Crowdsourcing platfrom authorization

In [7]:
from IPython.display import clear_output, display

In [8]:
token = os.getenv('TOLOKA_TOKEN') or input('Enter your token: ')
toloka_client = client.create_toloka_client(token=token)
clear_output()

# Labeling task definition

We transcribe `Audio` into `AudioClass` and `Text`. Moreover, `Text` is not always required – there will be no text for audios with silence only.

In [10]:
class AudioClass(base.Class):
    SPEECH = 'sp'
    SILENCE = 'si'
    UNCLEAR = 'mis'

    @classmethod
    def labels(cls) -> Dict['AudioClass', Dict[str, str]]:
        return {
            cls.SPEECH: {'EN': 'Speech', 'RU': 'Речь'},
            cls.SILENCE: {'EN': 'Silence', 'RU': 'Я не слышу речи'},
            cls.UNCLEAR: {'EN': 'Hard to tell', 'RU': 'Затрудняюсь ответить'}}

In [11]:
annotation_function = base.AnnotationFunction(
    inputs=(base.ObjectMeta(type=objects.Audio, name='audio'),),
    outputs=(
        base.ObjectMeta(type=objects.Text, name='transcript', required=False),
        base.ClassMeta(type=AudioClass, name='class'))
)

## Workers instruction

In [12]:
lang = 'RU'

In [13]:
with open(f'instruction_{lang}.html') as f:
    instruction = {lang: f.read()}

## Labeling task specification

In [14]:
task_spec = client.TaskSpec(
    id='audio-transcription-ex',
    function=annotation_function,
    name={'EN': 'Audio transcription', 'RU': 'Расшифровка аудио'},
    description={'EN': 'Transcribe short audios', 'RU': 'Расшифровка коротких аудио'},
    instruction=instruction)

In [15]:
task_spec_ru = client.AnnotationTaskSpec(task_spec, lang)

## Worker interface customization

If you already have some custom UI developed for your tasks, you can use it with Crowdom processes. In this case, however, you __have to__ make sure that your UI is compatible with your task function and generated task mappings.

Also, you will need to localize custom UI on your own. We recommend moving all localized strings from UI into `vars` portion of TemplateBuilder code. There's one exception - regex template validation patterns cannot be set via `vars`, so they have to be swapped by text replacement.

In [16]:
def add_vars(config: dict, config_vars: dict):
    if 'vars' not in config:
        config['vars'] = {}

    config['vars'].update(config_vars)

In [17]:
def get_tb_from_dict(view: dict) -> toloka.project.view_spec.TemplateBuilderViewSpec:
    return toloka.project.view_spec.TemplateBuilderViewSpec(
        config=toloka.structure(view, toloka.project.view_spec.TemplateBuilder))

In [18]:
with open('transcript_view.json') as f, open(f'transcript_vars_{lang}.json') as var_f:
    transcript_dict = json.loads(f.read().replace('{{text_re}}', {
        'RU': '^[а-яё ?]*$',
        'KK': '^[а-яёәғқңөұүһі ?]*$',
    }[lang]))
    transcript_vars = json.load(var_f)['vars']
    add_vars(transcript_dict, transcript_vars)
    transcript_view = get_tb_from_dict(transcript_dict)

In [19]:
task_spec_ru.overload_view(transcript_view)

## Custom worker interface preview

In [20]:
example_url = 'https://tlk.s3.yandex.net/ext_dataset/noisy_speech/noisy_tested_wav/p232_299.wav'
example_audio = (objects.Audio(url=example_url),)

example_check = (objects.Audio(url=example_url), objects.Text('текст для примера'), AudioClass.SPEECH)

client.TaskPreview(example_audio, task_spec=task_spec_ru).display_link()

In [21]:
client.TaskPreview(example_check, task_spec=task_spec_ru).display_link()

# Importing source data

In [22]:
input_objects = datasource.read_tasks('tasks.json', task_spec_ru.task_mapping)

In [23]:
control_objects = datasource.read_tasks(
    'control_tasks.json',
    task_spec_ru.task_mapping,
    has_solutions=True,
)

# Custom task duration calculation

In the simplest case, we use static `task_duration_hint` to estimate duration of each task. `task_duration_hint` influences both some time control measures and pricing of your task. 

However, your input objects may very in the amount of time they need to be processed by workers, - for example, you can have datasets with different average audio duration, and choosing a static `task_duration_hint` may be a problem. 

In this case, you can specify a custom task duration calculation function, that will be used for more accurate time control. This function should take a tuple of input objects and return `datetime.timedelta` duration estimation. We recommend fallback to some safe static task duration in case calculation fails. Also, it would be a great idea to cache your calculation function, if it is an IO-bound operation.

In [None]:
%pip install wave

In [24]:
import requests
import io
import wave
from functools import lru_cache
from multiprocessing.pool import ThreadPool

In [25]:
@lru_cache(maxsize=None)
def get_audio_duration(audio: objects.Audio) -> timedelta:
    url = audio.url
    response = requests.get(url)
    with wave.open(io.BytesIO(response.content), 'r') as file:
        frames = file.getnframes()
        rate = file.getframerate()
        duration = frames / float(rate)
    return timedelta(seconds=duration)

Input of this function would be a tuple of input objects, consistent with your task function. In this case, there would be a single `Audio` object in this tuple.

Calculation for annotation step:

In [26]:
fallback_task_duration_hint = timedelta(seconds=20)  # a safe estimate in case your calculation fails 

def calculate_annotation_task_duration(input_objects: mapping.Objects) -> timedelta:
    # here input objects are a length 1 tuple: (audio,)
    try:
        audio_duration = get_audio_duration(input_objects[0])
        return audio_duration * 2  # workers need to listen to audio a couple times and write a transcript, 
                                   # which may take more time depending on the worker's device
    except: 
        return fallback_task_duration_hint

In [27]:
calculate_annotation_task_duration(input_objects[0])

datetime.timedelta(seconds=12, microseconds=816000)

If markup and check steps can take singnificantly different time to complete, you can provide a different task calculation function for check step. In this case, input of the function is a tuple with objects corresponding to both inputs and outputs of your function. In this case, there would be a tuple with `Audio`, `Text` and `AudioClass` objects.

Calculation for check step:

In [28]:
def calculate_check_task_duration(input_objects: mapping.Objects) -> timedelta:
    # here input objects are a length 3 tuple: (audio, text, audio class)
    try:
        audio_duration = get_audio_duration(input_objects[0])
        return audio_duration * 3  # workers need to listen to audio more carefully, checking the transcript
    except: 
        return fallback_task_duration_hint

We need a static average `task_duration_hint` to determine pricing. If there are many input objects and your function is IO-bound, it is better to calculate it in parallel, using threads.

In [29]:
thread_pool = ThreadPool(min(40, len(input_objects)))
durations = thread_pool.map(calculate_annotation_task_duration, input_objects)

task_duration_hint = sum(durations, start=timedelta(seconds=0)) / len(durations)
task_duration_hint

datetime.timedelta(seconds=12, microseconds=686080)

In [30]:
check_durations = thread_pool.map(calculate_check_task_duration, [control_inputs for (control_inputs, _) in control_objects])
check_task_duration_hint = sum(check_durations, start=timedelta(seconds=0)) / len(check_durations)
check_task_duration_hint

datetime.timedelta(seconds=16, microseconds=532640)

# Model

In [31]:
import boto3
from botocore.config import Config

In [32]:
s3_client = boto3.session.Session().client(
    service_name='s3',
    endpoint_url='https://storage.yandexcloud.net',
    aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID') or input('Enter your AWS access key ID: '),
    aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY') or input('Enter your AWS secret access key: '),
    config=Config(retries=dict(max_attempts=30)),
)

In [None]:
from speechkit.common.utils import configure_credentials
from speechkit.common import Product
from speechkit import model_repository
from speechkit.stt import RecognitionConfig, AudioProcessingType

In [34]:
configure_credentials(yc_ai_token=f'Api-Key {os.getenv("ASR_API_KEY")}')

In [36]:
model = model_repository.recognition_model(product=Product.Yandex)

In [37]:
config = RecognitionConfig(mode=AudioProcessingType.Full, language='ru-RU')

In [38]:
from multiprocessing.pool import ThreadPool
from pydub import AudioSegment
import io

In [39]:
def recognize_record(s3_url) -> str:
    bucket, key = s3_url[len('https://storage.yandexcloud.net/'):].split('/', 1)
    audio_bytes = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read()
    audio = AudioSegment.from_wav(io.BytesIO(audio_bytes))
    result = model.transcribe(audio, config)
    return ' '.join(chunk.raw_text for chunk in result)

In [40]:
def recognize_voice_recordings(tasks: list[tuple[objects.Audio]]) -> list[tuple[objects.Text, AudioClass]]:
    if not tasks:
        return []
    pool = ThreadPool(processes=min(len(tasks), 40))
    recognized_texts = pool.map(recognize_record, [audio.url for (audio,) in tasks])
    return [(objects.Text(text), AudioClass.SPEECH if text else AudioClass.SILENCE) for text in recognized_texts]

In [41]:
from crowdom import worker

In [42]:
model_worker = worker.Model(name='asr:general', func=recognize_voice_recordings)

In [43]:
recognize_voice_recordings([
    (
        objects.Audio(url='https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/data/9cb0c9bc-b3c5-4984-b799-724f100250d1.wav'),
    )
])

[(Text(text='наша задача теперь добиться отношения к воде и биоразнообразию как к стратегическим ресурсам'), <AudioClass.SPEECH: 'sp'>)]

# Labeling efficiency optimization

Specify your custom task duration estimation functions:

In [None]:
params_form = labeling_params.get_annotation_interface(
    task_spec=task_spec_ru,
    check_task_duration_hint=check_task_duration_hint ,
    annotation_task_duration_hint=task_duration_hint,
    toloka_client=toloka_client,
)

In [68]:
check_params, annotation_params = params_form.get_params()

Specify your model:

In [69]:
annotation_params.model = model_worker

And your custom task duration functions:

In [70]:
check_params.task_duration_function = calculate_check_task_duration
annotation_params.task_duration_function = calculate_annotation_task_duration

# Labeling of your data

In [None]:
client.define_task(task_spec_ru, toloka_client)

In [76]:
assert control_objects, 'No control objects supplied'
assert isinstance(control_objects[0], tuple)

try:
    task_spec_ru.check.task_mapping.validate_objects(control_objects[0][0])
except:
    control_objects = [(task + solution, (base.BinaryEvaluation(ok=True),)) for (task, solution) in control_objects]

In [None]:
raw_results, worker_weights = client.launch_annotation(
    task_spec_ru,
    annotation_params,
    check_params,
    input_objects,
    control_objects,
    toloka_client,
)

In [78]:
results = client.AnnotationResults(input_objects, raw_results, task_spec_ru, worker_weights)

## Results

In [79]:
with pd.option_context("max_colwidth", 100):
    display(results.predict())

Unnamed: 0,audio,transcript,class
0,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/9cb0c9bc-b3c...,наша задача теперь добиться отношения к воде и биоразнообразию как к стратегическим ресурсам,sp
1,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/d7f0b9dc-b4d...,вьетнам привержен широкому и эффективному международному сотрудничеству в целях решения проблемы...,sp
2,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/f361e245-cc6...,бразилия готова взять на себя обязанность постоянного члена совета,sp
4,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/03f38960-da2...,когда мы вернемся домой нам придется столкнуться с множеством конкурирующих интересов,sp
6,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/94c1e9c8-3ac...,я также призываю организацию объединенных наций полностью поддержать африку в этом отношении,sp
7,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/fc4f3e2c-053...,николай всеволодович вовсе впрочем не улыбался а напротив слушал нахмуренные несколько нетерпеливо,sp
8,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/4773f0ac-82c...,мы считаем что это глобальный вопрос который требует эффективного решения,sp
9,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/db49db03-7f5...,в этом ключе предпринимались многочисленные инициативы направленные на достижение прогресса в об...,sp
10,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/9bf4db95-ab8...,мы заслушали больше выступлений государственных членов чем в предыдущие сессии,sp
12,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/0f2bb114-2a9...,наша позиция остается неизменной,sp


In [80]:
with pd.option_context("max_colwidth", 100):
    display(results.predict_proba())

Unnamed: 0,audio,transcript,class,confidence
0,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/9cb0c9bc-b3c...,наша задача теперь добиться отношения к воде и биоразнообразию как к стратегическим ресурсам,sp,1.0
1,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/d7f0b9dc-b4d...,вьетнам привержен широкому и эффективному международному сотрудничеству в целях решения проблемы...,sp,1.0
2,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/f361e245-cc6...,бразилия готова взять на себя обязанность постоянного члена совета,sp,1.0
3,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/f361e245-cc6...,бразилия готова взять на себя обязанности постоянного члена совета,sp,3.333333e-11
4,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/03f38960-da2...,когда мы вернемся домой нам придется столкнуться с множеством конкурирующих интересов,sp,1.0
5,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/03f38960-da2...,когда мы вернемся домой нам придется столкнуться с множеством конкурирующих интересных,sp,3.333333e-11
6,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/94c1e9c8-3ac...,я также призываю организацию объединенных наций полностью поддержать африку в этом отношении,sp,1.0
7,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/fc4f3e2c-053...,николай всеволодович вовсе впрочем не улыбался а напротив слушал нахмуренные несколько нетерпеливо,sp,1.0
8,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/4773f0ac-82c...,мы считаем что это глобальный вопрос который требует эффективного решения,sp,1.0
9,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/db49db03-7f5...,в этом ключе предпринимались многочисленные инициативы направленные на достижение прогресса в об...,sp,1.0


In [81]:
with pd.option_context('max_colwidth', 150), pd.option_context('display.max_rows', 100):
    display(results.worker_labels())

Unnamed: 0,audio,transcript,class,annotator,annotation_overlap,confidence,evaluation_overlap,eval,evaluator
0,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/9cb0c9bc-b3c5-4984-b799-724f100250d1.wav,наша задача теперь добиться отношения к воде и биоразнообразию как к стратегическим ресурсам,sp,asr:general,1,1.0,1,True,6d85abd870df2592ef79175f99b5b93c
1,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/d7f0b9dc-b4d6-4412-b9c5-6e6cdcdaf947.wav,вьетнам привержен широкому и эффективному международному сотрудничеству в целях решения проблемы неинфекционных заболеваний,sp,asr:general,1,1.0,1,True,6d85abd870df2592ef79175f99b5b93c
2,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/f361e245-cc6e-42f2-9eba-153224a4d221.wav,бразилия готова взять на себя обязанность постоянного члена совета,sp,6d85abd870df2592ef79175f99b5b93c,2,1.0,1,True,6d85abd870df2592ef79175f99b5b93c
3,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/f361e245-cc6e-42f2-9eba-153224a4d221.wav,бразилия готова взять на себя обязанности постоянного члена совета,sp,asr:general,2,3.333333e-11,1,False,6d85abd870df2592ef79175f99b5b93c
4,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/03f38960-da28-4f06-96f7-aef7e29811a7.wav,когда мы вернемся домой нам придется столкнуться с множеством конкурирующих интересов,sp,6d85abd870df2592ef79175f99b5b93c,2,1.0,1,True,6d85abd870df2592ef79175f99b5b93c
5,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/03f38960-da28-4f06-96f7-aef7e29811a7.wav,когда мы вернемся домой нам придется столкнуться с множеством конкурирующих интересных,sp,asr:general,2,3.333333e-11,1,False,6d85abd870df2592ef79175f99b5b93c
6,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/94c1e9c8-3ac2-44d9-bb27-1a8f9cd5f5a3.wav,я также призываю организацию объединенных наций полностью поддержать африку в этом отношении,sp,asr:general,1,1.0,1,True,6d85abd870df2592ef79175f99b5b93c
7,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/fc4f3e2c-0534-4708-95a7-09ca5c43c8e6.wav,николай всеволодович вовсе впрочем не улыбался а напротив слушал нахмуренные несколько нетерпеливо,sp,asr:general,1,1.0,1,True,6d85abd870df2592ef79175f99b5b93c
8,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/4773f0ac-82c4-40c8-904a-1f0df81fa0e2.wav,мы считаем что это глобальный вопрос который требует эффективного решения,sp,asr:general,1,1.0,1,True,6d85abd870df2592ef79175f99b5b93c
9,https://storage.yandexcloud.net/crowdom-public/examples/audio_transcript_ex/db49db03-7f5e-48b6-bb15-b0d2f5a64a73.wav,в этом ключе предпринимались многочисленные инициативы направленные на достижение прогресса в области ядерного разоружения,sp,asr:general,1,1.0,1,True,6d85abd870df2592ef79175f99b5b93c
