<a href="https://colab.research.google.com/github/tihunn/emotion_testing/blob/main/exp/dasha_dataprocessing_ipynb%22.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

datacls.py

In [1]:
from dataclasses import dataclass
from typing import Any

# -----------------------------------------------------------------------------
#                               AGGREGATION
# -----------------------------------------------------------------------------


@dataclass
class DawidSkeneEntryDataclass:
    task: str
    worker: str
    label: Any


@dataclass
class DawidSkeneResultDataclass:
    task: str
    pred: str


@dataclass
class MarkupDataclass:
    hash_id: str
    audio_path: str
    duration: str
    annotator_emo: str
    golden_emo: str
    speaker_text: str
    speaker_emo: str
    source_id: str
    audio_path: str
    annotator_emo: str
    annotator_id: str


@dataclass
class AggDataclass:
    hash_id: str
    audio_path: str
    duration: str
    emotion: str
    golden_emo: str
    speaker_text: str
    speaker_emo: str
    source_id: str


# -----------------------------------------------------------------------------
#                               FEATURES
# -----------------------------------------------------------------------------


@dataclass
class DataWithFeaturesEntryclass:
    wav_path: str
    wav_id: str


# -----------------------------------------------------------------------------
#                               EXP
# -----------------------------------------------------------------------------


@dataclass
class DataForExp:
    id: str
    tensor: str
    wav_length: str
    label: int
    emotion: str

dawidskene.py

In [2]:
!pip install crowd-kit

Collecting crowd-kit
  Downloading crowd_kit-1.4.1-py3-none-any.whl.metadata (10 kB)
Downloading crowd_kit-1.4.1-py3-none-any.whl (89 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.2/89.2 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: crowd-kit
Successfully installed crowd-kit-1.4.1


In [3]:
from pathlib import Path
from typing import List

import pandas as pd
from crowdkit.aggregation import DawidSkene as CrowdKitDawidSkene


def get_dawidskene_pred(
    data: List[DawidSkeneEntryDataclass],
    threshold: float,
    meta_path: Path,
    n_iter: int = 100,
) -> List[DawidSkeneResultDataclass]:
    labels = {row.label for row in data}
    assert "task" not in labels, 'Labels cant contains the name "task"!'
    aggregated_labels = CrowdKitDawidSkene(n_iter=n_iter).fit_predict_proba(
        pd.DataFrame(data)
    )
    aggregated_labels.to_csv(meta_path, sep="\t")

    aggregated_labels_list = aggregated_labels.reset_index().to_dict("records")
    aggregated_data = []
    for row in aggregated_labels_list:
        tmp_dict = {val: key for key, val in row.items() if key in labels}
        max_item_proba = max(tmp_dict)
        if max_item_proba >= threshold:
            key_with_max_value = tmp_dict[max_item_proba]
            aggregated_row = DawidSkeneResultDataclass(
                task=row["task"],
                pred=key_with_max_value,
            )
            aggregated_data.append(aggregated_row)
    return aggregated_data

aggregation.py

In [4]:
import json
import os
from enum import Enum
from pathlib import Path
from typing import Dict, List


HEADER = "\t".join(
    [
        "hash_id",
        "wav_path",
        "duration",
        "emotion",
        "golden_emo",
        "speaker_text",
        "speaker_emo",
        "source_id",
    ]
)

HEADER_EXP = "\t".join(["id", "tensor", "wav_lengh", "label"])


class Emotion(Enum):
    ANGRY = 0
    SAD = 1
    NEUTRAL = 2
    POSITIVE = 3


def read_data_markup(dataset_path: Path, use_tsv: bool) -> List[MarkupDataclass]:
    markup_data = []
    if use_tsv:
        with open(
            dataset_path.parent / (dataset_path.stem + ".tsv"), "r", encoding="utf-8"
        ) as file:
            headers = file.readline().rstrip("\r\n").split("\t")
            for line in file:
                line_data = line.strip("\r\n").split("\t")
                string = dict(zip(headers, line_data))
                row = MarkupDataclass(**string)
                markup_data.append(row)
    else:
        with open(
            dataset_path.parent / (dataset_path.stem + ".jsonl"), "r", encoding="utf-8"
        ) as file:
            for line in file:
                row = MarkupDataclass(**json.loads(line))
                markup_data.append(row)
    return markup_data


def agg_data_to_file(
    file_path: Path, agg_data: List[AggDataclass], use_tsv: bool
) -> None:
    if use_tsv:
        with open(
            file_path.parent / (file_path.stem + ".tsv"), "w", encoding="utf-8"
        ) as file:
            print(HEADER, file=file, end=os.linesep)
            for row in agg_data:
                print("\t".join(row.__dict__.values()), file=file, end=os.linesep)
    else:
        with open(
            file_path.parent / (file_path.stem + ".jsonl"), "w", encoding="utf-8"
        ) as file:
            for row in agg_data:
                line = json.dumps(row.__dict__, ensure_ascii=False)
                print(line, file=file, end=os.linesep)


def exp_data_to_file(
    file_path: Path, exp_data: List[DataForExp], use_tsv: bool
) -> None:
    if use_tsv:
        with open(
            file_path.parent / (file_path.stem + ".tsv"), "w", encoding="utf-8"
        ) as file:
            print(HEADER_EXP, file=file, end=os.linesep)
            for row in exp_data:
                line = "\t".join(list(map(str, row.__dict__.values())))
                print(line, file=file, end=os.linesep)
    else:
        with open(
            file_path.parent / (file_path.stem + ".jsonl"), "w", encoding="utf-8"
        ) as file:
            for row in exp_data:
                line = json.dumps(row.__dict__, ensure_ascii=False)
                print(line, file=file, end=os.linesep)


def filter_data(
    markup_data: List[MarkupDataclass],
    aggregated_data_dict: Dict[str, str],
    dataset: str,
) -> List[AggDataclass]:
    agg_data = []
    used_wavs = set()
    for row in markup_data:
        if row.hash_id in used_wavs:
            continue
        if row.hash_id in aggregated_data_dict:
            good_agg_row = AggDataclass(
                hash_id=row.hash_id,
                audio_path=str(Path("..", "..", dataset, row.audio_path)),
                duration=row.duration,
                emotion=aggregated_data_dict[row.hash_id],
                golden_emo=row.golden_emo,
                speaker_text=row.speaker_text,
                speaker_emo=row.speaker_emo,
                source_id=row.source_id,
            )
            agg_data.append(good_agg_row)
        used_wavs.add(row.hash_id)
    return agg_data


def make_exp_data(agg_data: List[AggDataclass]) -> List[DataForExp]:
    exp_data = []
    for row in agg_data:
        if (
            not isinstance(row.golden_emo, str) or row.golden_emo == ""
        ) and row.emotion != "other":
            exp_row = DataForExp(
                id=row.hash_id,
                tensor=str(Path("..", "..", "features", row.hash_id + ".npy")),
                wav_length=row.duration,
                label=Emotion[row.emotion.upper()].value,
                emotion=row.emotion,
            )
            exp_data.append(exp_row)
    return exp_data


def aggregate_data(
    data_path: Path, out_path: Path, use_tsv: bool, dawidskene_threshold: float
) -> None:

    markup_data = [ "crowd_train", "crowd_test"]
    data = {}
    all_data = []
    for dataset in markup_data:
        data[dataset] = read_data_markup(
            dataset_path=Path(data_path, dataset, "raw_" + dataset),
            use_tsv=use_tsv,
        )
        all_data += data[dataset]

    data_for_agg = []
    for row in all_data:
        row_for_agg = DawidSkeneEntryDataclass(
            task=row.hash_id,
            worker=row.annotator_id,
            label=row.annotator_emo,
        )
        data_for_agg.append(row_for_agg)

    aggregated_data = get_dawidskene_pred(
        data=data_for_agg,
        threshold=dawidskene_threshold,
        meta_path=data_path / "meta.tsv",
    )

    aggregated_data_dict = {row.task: row.pred for row in aggregated_data}

    exp_data = {}
    for dataset in markup_data:
        agg_data = filter_data(
            markup_data=data[dataset],
            aggregated_data_dict=aggregated_data_dict,
            dataset=dataset,
        )
        exp_data[dataset] = make_exp_data(agg_data=agg_data)
        exp_data_to_file(
            file_path=out_path / dataset.rsplit("_", maxsplit=1)[-1] / dataset,
            exp_data=exp_data[dataset],
            use_tsv=use_tsv,
        )
        agg_data_to_file(
            file_path=out_path / "aggregated_dataset" / dataset,
            agg_data=agg_data,
            use_tsv=use_tsv,
        )
    # exp_data_to_file(
    #     file_path=out_path / "train" / "train",
    #     exp_data=exp_data["podcast_train"] + exp_data["crowd_train"],
    #     use_tsv=use_tsv,
    # )
    # exp_data_to_file(
    #     file_path=Path(out_path / "test" / "test"),
    #     exp_data=exp_data["podcast_test"] + exp_data["crowd_test"],
    #     use_tsv=use_tsv,
    # )

calc_features.py

In [5]:
from pathlib import Path
from typing import List, Set

import librosa
import numpy as np
from tqdm import tqdm


def create_features(
    data: List[DataWithFeaturesEntryclass],
    wavs_names: Set[str],
    features_dump_path: Path,
    dataset_name: str,
    recalculate_feature: bool,
    hop_length_coef: float = 0.01,
    win_length_coef: float = 0.02,
    sample_rate: int = 16000,
    n_mels: int = 64,
) -> None:
    """
    As an input all models use standard speech features:
    64 Mel-filterbank calculated from 20ms windows with a 10ms overlap.
    """
    if recalculate_feature:
        if len(data) != len(wavs_names):
            print(
                f"{len(wavs_names) - len(data)} wav files are missing for {dataset_name}"
            )
        hop_length = int(sample_rate * hop_length_coef)
        win_length = int(sample_rate * win_length_coef)
        for row in tqdm(data):
            data, rate = librosa.load(row.wav_path, sr=sample_rate)
            if len(data) != 0:
                spec = librosa.feature.melspectrogram(
                    y=data,
                    sr=rate,
                    hop_length=hop_length,
                    n_fft=win_length,
                    n_mels=n_mels,
                )
            else:
                raise AttributeError
            mel_spec = librosa.power_to_db(spec, ref=np.max)
            np.save(features_dump_path / f"{row.wav_id}.npy", mel_spec[None])
        print(
            f"({len(data)}/{len(wavs_names)}) features have been calculated for {dataset_name}"
        )
    else:
        ready_features = {elm.stem for elm in features_dump_path.glob("*.npy")}
        wav_to_features = {wav for wav in wavs_names if not wav in ready_features}
        data_to_culc = [wav for wav in data if wav.wav_id in wav_to_features]

        if len(data_to_culc) != len(wav_to_features):
            print(
                f"{len(wav_to_features) - len(data_to_culc)} wav files are missing for {dataset_name}"
            )

        if not data_to_culc:
            print(
                f"All({len({wav for wav in wavs_names if wav in ready_features})}/{len(wavs_names)}) features have been calculated for {dataset_name}"
            )
            return

        hop_length = int(sample_rate * hop_length_coef)
        win_length = int(sample_rate * win_length_coef)
        for row in tqdm(data_to_culc):
            data, rate = librosa.load(row.wav_path, sr=sample_rate)
            if len(data) != 0:
                spec = librosa.feature.melspectrogram(
                    y=data,
                    sr=rate,
                    hop_length=hop_length,
                    n_fft=win_length,
                    n_mels=n_mels,
                )
            else:
                raise AttributeError
            mel_spec = librosa.power_to_db(spec, ref=np.max)
            np.save(features_dump_path / f"{row.wav_id}.npy", mel_spec[None])


def load_features(
    wavs_path: Path,
    wavs_names: Set[str],
    result_dir: Path,
    dataset_name: str,
    recalculate_feature: bool,
) -> None:
    wavs = []
    for elm in wavs_path.glob("*.wav"):
        wavs.append(DataWithFeaturesEntryclass(wav_path=str(elm), wav_id=elm.stem))
    create_features(
        data=wavs,
        wavs_names=wavs_names,
        features_dump_path=result_dir / "features",
        dataset_name=dataset_name,
        recalculate_feature=recalculate_feature,
    )

processing.py by gpt

In [6]:
from pathlib import Path
import numpy as np

def processing(
    dataset_path: str,
    use_tsv: bool = False,
    recalculate_features: bool = False,
    threshold: float = 0.9,
) -> None:
    """
    Processing raw data for training
    """
    if threshold > 1 or threshold < 0:
        raise AttributeError("Threshold must be between 0 and 1")

    np.seterr(divide="ignore")

    public_data = Path(dataset_path)
    result_dir = public_data / f"processed_dataset_0{int(threshold*100)}"

    # создаём подпапки
    path_names = ["train", "aggregated_dataset", "test"]
    for path_name in path_names:
        (result_dir / path_name).mkdir(parents=True, exist_ok=True)

    (public_data / "features").mkdir(parents=True, exist_ok=True)

    # обрабатываем все наборы
    data_types = ["crowd_train", "crowd_test"]
    for data_type in data_types:
        wavs_path = public_data / data_type / "wavs"
        data = read_data_markup(
            dataset_path=public_data / data_type / ("raw_" + data_type),
            use_tsv=use_tsv,
        )
        wavs_names = {Path(row.audio_path).stem for row in data}
        load_features(
            wavs_path=wavs_path,
            wavs_names=wavs_names,
            result_dir=public_data,
            dataset_name=data_type,
            recalculate_feature=recalculate_features,
        )

    # агрегируем
    aggregate_data(public_data, result_dir, use_tsv, threshold)


create a small datasets

In [7]:
# !rm -r /content/drive/MyDrive/crowd_small #

In [8]:
import os
import json
import shutil
import random

def split_crowd_dataset(n_test: int):
    base_dir = "/content/drive/MyDrive/crowd/crowd_test"
    small_dir = "/content/drive/MyDrive/crowd_small"

    # создаём папки
    os.makedirs(os.path.join(small_dir, "crowd_test"), exist_ok=True)
    os.makedirs(os.path.join(small_dir, "crowd_train"), exist_ok=True)
    os.makedirs(os.path.join(small_dir, "crowd_test", 'wavs'), exist_ok=True)
    os.makedirs(os.path.join(small_dir, "crowd_train", 'wavs'), exist_ok=True)

    # пути к новым jsonl
    test_jsonl = os.path.join(small_dir, "crowd_test", "raw_crowd_test.jsonl")
    train_jsonl = os.path.join(small_dir, "crowd_train", "raw_crowd_train.jsonl")

    # создаём/очищаем jsonl, чтобы файлы появились сразу
    open(test_jsonl, "w", encoding="utf-8").close()
    open(train_jsonl, "w", encoding="utf-8").close()

    # читаем исходный jsonl
    src_file = os.path.join(base_dir, "raw_crowd_test.jsonl")
    with open(src_file, "r", encoding="utf-8") as f:
        lines = [json.loads(line) for line in f]

    # перемешаем для случайного выбора
    random.shuffle(lines)

    # делим на test и train
    test_lines = lines[0:20]
    train_lines = lines[100:200]

    # копируем аудио для теста
    for item in test_lines:
        audio_rel = item["audio_path"]
        src_audio = os.path.join(base_dir, audio_rel)
        dst_audio = os.path.join(small_dir, "crowd_test", audio_rel)
        shutil.copy(src_audio, dst_audio)

    # копируем аудио для train
    for item in train_lines:
        audio_rel = item["audio_path"]
        src_audio = os.path.join(base_dir, audio_rel)
        dst_audio = os.path.join(small_dir, "crowd_train", audio_rel)
        shutil.copy(src_audio, dst_audio)

    # сохраняем jsonl
    with open(test_jsonl, "w", encoding="utf-8") as f:
        for item in test_lines:
            f.write(json.dumps(item, ensure_ascii=False) + "\n")

    with open(train_jsonl, "w", encoding="utf-8") as f:
        for item in train_lines:
            f.write(json.dumps(item, ensure_ascii=False) + "\n")

    print(f"✅ Готово! {len(test_lines)} файлов в test и {len(train_lines)} в train.")

# пример вызова:
split_crowd_dataset(1)


✅ Готово! 20 файлов в test и 100 в train.


На функцию выше благодаря gpt я потратил 6 часов времени, без него было бы быстрее... но я добился своего за пол часа работы над кусоком его первого гавна на основе моего почти нормального промпт. Вот это почти всё изкаверкало...

run process

In [10]:
processing(
    dataset_path="/content/drive/MyDrive/crowd_small",  # путь к твоему датасету
    use_tsv=False,
    recalculate_features=False,
    threshold=0.9,
)


100%|██████████| 100/100 [00:22<00:00,  4.47it/s]
100%|██████████| 20/20 [00:01<00:00, 19.80it/s]


ну а сейчас что так долго, во, бля ошибка, заебало. ну у меня нету подкаста и тут и там... сука кажется часть кода вообще потеряна. надо проверить просто.

итак наконец эта функция завершилась без ошибок и я вижу другие проблемы я сделал 120 файлов вижу только 2.

эта проблема возникает из-за агрегации то есть несколько анататоров размечает один файл, а я вырвал 120 файлов и по стречке к ним, а не все упоминания файла.