### 사전 작업

In [4]:
import pickle
from dataclasses import dataclass
from typing import Literal
import json

# fields with None will be not used when tech is gt
@dataclass
class WaveItem:
    division1: Literal["m2m", "a2a"]
    division2: Literal["m2m", "f2f", "m2f", "f2m", "en_m2kr_f", "en_f2fr_m"]
    tech: Literal["gt", "o", "autovc", "vqmivc", "vqvc"]
    src_lang: Literal["en", "kr", "fr"]
    src_spk: str
    # src_wav: str | None
    # tgt_spk: str | None
    # tgt_wav: str | None
    my_wav: str
    id: str

with open("nat_items.pkl", "rb") as f:
    nat_items = pickle.load(f)

with open("sim_items.pkl", "rb") as f:
    sim_items = pickle.load(f)

with open("data.json", "r") as f:
    web_data = json.load(f)

In [7]:
import pandas as pd

df = pd.read_csv("https://docs.google.com/spreadsheets/d/1uvX5nI6kGfcy_pePNGrAqpsnjsPchN0gxeTCXAO7E7Q/edit#gid=785761853")



HTTPError: HTTP Error 401: Unauthorized

In [None]:
@dataclass
class NatWebData:
    sheet_col: str
    id: str
    item: WaveItem
    count: int
    scores: list[int]
    mean: float
    std: float
    dev_tag: str

@dataclass
class SimWebData:
    sheet_col: str
    id: str
    base_item: WaveItem
    item: WaveItem
    count: int
    scores: list[int]
    mean: float
    std: float
    dev_tag: str

nat_web_data: list[NatWebData] = []
sim_web_data: list[SimWebData] = []

wd_label: str
wd_data: list
for wd_label, wd_data in web_data.items():
    if wd_label.startswith("n"):
        file_path: str
        file_path, _, dev_tag = wd_data
        file_hex = file_path.replace("./data/", "").replace(".wav", "")
        nat_item = [item for item in nat_items if item.id == file_hex][0]
        nat_web_data.append(NatWebData(
            sheet_col=wd_label,
            id=file_hex,
            item=nat_item,
            count=df[wd_label].count(),
            scores=[int(i) for i in df[wd_label].tolist() if i == i],
            mean=df[wd_label].mean(),
            std=df[wd_label].std(),
            dev_tag=dev_tag,
        ))
    else:
        file_path: str
        comp_info: list[list[str]]
        sv: list[WaveItem]
        base_file_path, comp_info, _, _ = wd_data
        base_file_hex = base_file_path.replace("./data/", "").replace(".wav", "")
        base_item: WaveItem | None = None
        for sv in sim_items.values():
            for sv_item in sv:
                if sv_item.id == base_file_hex:
                    base_item = sv_item
                    break
            if base_item is not None:
                break
        assert base_item is not None
        for compi, comp in enumerate(comp_info):
            comp_file_path = comp[0]
            comp_file_hex = comp_file_path.replace("./data/", "").replace(".wav", "")
            comp_item: WaveItem | None = None
            for sv in sim_items.values():
                for sv_item in sv:
                    if sv_item.id == comp_file_hex:
                        comp_item = sv_item
                        break
                if comp_item is not None:
                    break
            assert comp_item is not None
            comp_col = df[f"{wd_label}_{compi}"]
            sim_web_data.append(SimWebData(
                sheet_col=f"{wd_label}_{compi}",
                id=comp_file_hex,
                base_item=base_item,
                item=comp_item,
                count=comp_col.count(),
                scores=[int(i) for i in comp_col.tolist() if i == i],
                mean=comp_col.mean(),
                std=comp_col.std(),
                dev_tag=comp[1],
            ))

In [None]:
from typing import Callable
import math
from dataclasses import dataclass

@dataclass
class MOSResult:
    count: int
    mean: float
    std: float

def get_nat_mos(filter: Callable[[WaveItem], bool]) -> MOSResult:
    filtered = [i for i in nat_web_data if filter(i.item)]
    scores = []
    for i in filtered:
        scores.extend(i.scores)
    if len(scores) == 0:
        print("필터링 결과 중 수집된 데이터가 없습니다.")
        return MOSResult(
            count=0,
            mean=0,
            std=0,
        )
    mean = sum(scores) / len(scores)
    std = math.sqrt(sum([(i - mean) ** 2 for i in scores]) / len(scores))
    return MOSResult(
        count=len(scores),
        mean=mean,
        std=std,
    )


def get_sim_mos(filter: Callable[[WaveItem], bool]) -> MOSResult:
    filtered = [i for i in sim_web_data if filter(i.item)]
    scores = []
    for i in filtered:
        scores.extend(i.scores)
    if len(scores) == 0:
        print("필터링 결과 중 수집된 데이터가 없습니다.")
        return MOSResult(
            count=0,
            mean=0,
            std=0,
        )
    mean = sum(scores) / len(scores)
    std = math.sqrt(sum([(i - mean) ** 2 for i in scores]) / len(scores))
    return MOSResult(
        count=len(scores),
        mean=mean,
        std=std,
    )

### 설문 결과 점검

- 빈 결과가 있는지 확인

In [None]:
for nwd in nat_web_data:
    if nwd.count == 0:
        print("""[NAT] 설문 결과가 없는 파일이 있습니다.
  - 설문 번호 : {nwd.sheet_col}
  - 파일명 : {nwd.id}.wav
  - 원본 파일명 : {nwd.item.my_wav}
  - 분류 : {nwd.item.division1} -> {nwd.item.division2}, {nwd.item.tech}
  - 원어 : {nwd.item.src_lang}
  - 원어 화자 : {nwd.item.src_spk}
  - dev 태그 : {nwd.dev_tag}""".format(nwd=nwd))

for swd in sim_web_data:
    if swd.count == 0:
        print("""[SIM] 설문 결과가 없는 파일이 있습니다.
  - 설문 번호 : {swd.sheet_col}
  - 파일명 : {swd.id}.wav
  - 원본 파일명 : {swd.item.my_wav}
  - 분류 : {swd.item.division1} -> {swd.item.division2}, {swd.item.tech}
  - 원어 : {swd.item.src_lang}
  - 원어 화자 : {swd.item.src_spk}
  - 목표 화자 : {swd.item.tgt_spk}
  - dev 태그 : {swd.dev_tag}""".format(swd=swd))

### MOS 데이터 뽑기

```python
@dataclass
class WaveItem:
    division1: Literal["m2m", "a2a"]
    division2: Literal["m2m", "f2f", "m2f", "f2m", "en_m2kr_f", "en_f2fr_m"]
    tech: Literal["gt", "acvc", "autovc", "againvc", "vqvc"]
    src_lang: Literal["en", "kr", "fr"]
    src_spk: str  # SPK_ID
    src_wav: str | None  # 파일 경로, GT인 경우 None임
    tgt_spk: str | None  # SPK_ID, GT인 경우 None임
    tgt_wav: str | None  # 파일 경로, GT인 경우 None임
    my_wav: str  # 파일 경로
```

In [None]:
def nat_filter(item: WaveItem) -> bool:
    # 필터링 조건을 여기에 추가합니다.
    # 예시: return item.tech == "gt"
    raise NotImplementedError

# Note: count는 필터링 된 결과의 총 수집된 점수의 개수임
get_nat_mos(nat_filter)

In [None]:
def sim_filter(item: WaveItem) -> bool:
    # 필터링 조건을 여기에 추가합니다.
    # 예시: return item.tech == "acvc"
    raise NotImplementedError

# Note: count는 필터링 된 결과의 총 수집된 점수의 개수임
get_sim_mos(sim_filter)