# 概要

本notebookでは[JVS corpus](https://sites.google.com/site/shinnosuketakamichi/research-topics/jvs_corpus)のデータセットに対し、[pyannote](https://github.com/pyannote/pyannote-audio)を用いて話者情報のembeddingを取得し、類似した話者のデータセットを検索できるかを検証する。

JVS corpusの中には複数のデータセットがある。以下のデータセットそれぞれに対してembeddingを取得する。

- 話者間で共通する読み上げ音声100 発話を集めたparallel100
- 話者間で全く異なる読み上げ音声 30 発話を集めたnonpara30

話者ごとにembeddingの平均をとり、l2正規化したものを話者embeddingとする。

これに対し、[embedding projector](https://projector.tensorflow.org/)によるプロットと、[Word Tour](https://arxiv.org/abs/2205.01954)による一次元上での並び替えを行い、類似した音声を取得できるか確認する。

# 設定

## ライブラリのimport

In [1]:
import json
import os

import numpy as np
import pandas as pd

from ortools.constraint_solver import pywrapcp
from ortools.constraint_solver import routing_enums_pb2
from pyannote.audio import Model, Inference
from IPython.display import Audio
from tqdm.notebook import tqdm

## ディレクトリの配置

poc

 |--dataset/jvs/jvs_ver1

 |--working/VoiceVectorSearch_PoC_notebook.ipynb

 |--output

In [2]:
os.makedirs("../output/", exist_ok=True)

## HuggingFaceよりembeddingモデルのロード

In [3]:
token = "your-huggingface-token"
model = Model.from_pretrained("pyannote/embedding", 
                              use_auth_token=token)
inference = Inference(model, window="whole", device="cuda")

# parrallel100での検証

共通する読み上げ音声から得られたembeddingの比較により、類似する音声を取得できるかを調べる。

これによりRVC等のモデルに対し共通のコーパスで変換した音声を用いてモデルを検索できるか確かめる。

## embeddingの取得

In [4]:
# embedding の抽出
parallel_embs = []
for person in tqdm(range(1, 101)):
    emb = []
    dir_ = f"../dataset/jvs/jvs_ver1/jvs{person:03}/parallel100/wav24kHz16bit/"
    for i in range(1, 101):
        path = dir_ + f'VOICEACTRESS100_{i:03}.wav'
        if os.path.exists(path):
            emb.append(inference(path))
        else:
            print(path, "is not exist")
            emb.append(np.zeros(512))
    parallel_embs.append(np.stack(emb, axis=0))
parallel_embs = np.stack(parallel_embs, axis=0)

# 話者ごとにembeddingの和をとり、l2正規化を行う
parallel_embs_normed = parallel_embs.sum(axis=1)
parallel_embs_normed = parallel_embs_normed / np.linalg.norm(parallel_embs_normed, ord=2, axis=1, keepdims=True)

# データの書き込み
pd.read_csv("../dataset/jvs/jvs_ver1/gender_f0range.txt", delimiter=" ").to_csv("../output/jvs_speaker_meta.tsv", sep="\t", index=None, encoding="utf-8")
pd.DataFrame(parallel_embs_normed).astype(np.float16).to_csv("../output/jvs_parallel100_emb.tsv", sep="\t", index=None, header=None, encoding="utf-8")

  0%|          | 0/100 [00:00<?, ?it/s]

../dataset/jvs/jvs_ver1/jvs030/parallel100/wav24kHz16bit/VOICEACTRESS100_045.wav is not exist
../dataset/jvs/jvs_ver1/jvs074/parallel100/wav24kHz16bit/VOICEACTRESS100_094.wav is not exist
../dataset/jvs/jvs_ver1/jvs089/parallel100/wav24kHz16bit/VOICEACTRESS100_019.wav is not exist


## WordTour による一次元化

In [5]:
from ortools.constraint_solver import pywrapcp
from ortools.constraint_solver import routing_enums_pb2

# コサイン距離((1 - コサイン類似度)/2)を用いて巡回セールスマン問題を解き、一次元上にembeddingを並べる

data = {}
data["distance_matrix"] = ((1 - np.einsum("nd,md->nm", parallel_embs_normed, parallel_embs_normed)) / 2. * (1. - np.eye(100)) * 1e9).astype(np.int64).tolist()
data["num_vehicles"] = 1
data["depot"] = 0


def distance_callback(from_index, to_index):
    """Returns the distance between the two nodes."""
    # Convert from routing variable Index to distance matrix NodeIndex.
    from_node = manager.IndexToNode(from_index)
    to_node = manager.IndexToNode(to_index)
    return data['distance_matrix'][from_node][to_node]

def get_routes(solution, routing, manager):
    """Get vehicle routes from a solution and store them in an array."""
    # Get vehicle routes and store them in a two dimensional array whose
    # i,j entry is the jth location visited by vehicle i along its route.
    routes = []
    for route_nbr in range(routing.vehicles()):
        index = routing.Start(route_nbr)
        route = [manager.IndexToNode(index)]
        while not routing.IsEnd(index):
            index = solution.Value(routing.NextVar(index))
            route.append(manager.IndexToNode(index))
        routes.append(route)
    return routes[0]

manager = pywrapcp.RoutingIndexManager(len(data['distance_matrix']),
                                       data['num_vehicles'], data['depot'])
routing = pywrapcp.RoutingModel(manager)
transit_callback_index = routing.RegisterTransitCallback(distance_callback)
routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index)
search_parameters = pywrapcp.DefaultRoutingSearchParameters()
search_parameters.first_solution_strategy = (
    routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC)

solution = routing.SolveWithParameters(search_parameters)
routes = [f"jvs{x+1:03}" for x in get_routes(solution, routing, manager)[:-1]]
print(routes)

['jvs001', 'jvs070', 'jvs013', 'jvs005', 'jvs012', 'jvs022', 'jvs071', 'jvs028', 'jvs037', 'jvs086', 'jvs068', 'jvs078', 'jvs081', 'jvs074', 'jvs054', 'jvs098', 'jvs097', 'jvs023', 'jvs100', 'jvs020', 'jvs033', 'jvs079', 'jvs002', 'jvs091', 'jvs059', 'jvs056', 'jvs096', 'jvs072', 'jvs007', 'jvs092', 'jvs014', 'jvs017', 'jvs055', 'jvs067', 'jvs066', 'jvs082', 'jvs065', 'jvs084', 'jvs064', 'jvs095', 'jvs030', 'jvs090', 'jvs015', 'jvs010', 'jvs018', 'jvs058', 'jvs069', 'jvs019', 'jvs024', 'jvs025', 'jvs060', 'jvs094', 'jvs085', 'jvs004', 'jvs093', 'jvs026', 'jvs063', 'jvs062', 'jvs061', 'jvs029', 'jvs036', 'jvs083', 'jvs043', 'jvs038', 'jvs053', 'jvs039', 'jvs051', 'jvs057', 'jvs027', 'jvs016', 'jvs008', 'jvs040', 'jvs035', 'jvs032', 'jvs041', 'jvs089', 'jvs042', 'jvs076', 'jvs031', 'jvs050', 'jvs003', 'jvs073', 'jvs046', 'jvs047', 'jvs021', 'jvs009', 'jvs088', 'jvs034', 'jvs006', 'jvs048', 'jvs045', 'jvs080', 'jvs049', 'jvs044', 'jvs075', 'jvs011', 'jvs077', 'jvs087', 'jvs099', 'jvs052']

## 音声の聞き比べ

In [11]:
from IPython.display import Audio
# iで話者、xでスクリプトを選択する
i, x = 0, 1
# Audio("../dataset/jvs/jvs_ver1/{}/parallel100/wav24kHz16bit/VOICEACTRESS100_{:03}.wav".format(routes[i], x), autoplay=True)

# nonpara30での検証

話者間で異なる読み上げ音声から得られたembeddingの比較により、類似する音声を取得できるかを調べる。

これにより任意の音声から類似した音声のデータセットを検索できるか確かめる。

## embeddingの取得

In [7]:
# embedding の抽出
nonpara_embs = []
for person in tqdm(range(1, 101)):
    emb = []
    dir_ = f"../dataset/jvs/jvs_ver1/jvs{person:03}/nonpara30/wav24kHz16bit/"
    for file in os.listdir(dir_):
        if not file.endswith(".wav"):
            continue
        path = dir_ + file
        emb.append(inference(path))
    nonpara_embs.append(np.stack(emb, axis=0))
nonpara_embs = np.stack(nonpara_embs, axis=0)

# 話者ごとにembeddingの和をとり、l2正規化を行う
nonpara_embs_normed = nonpara_embs.sum(axis=1)
nonpara_embs_normed = nonpara_embs_normed / np.linalg.norm(nonpara_embs_normed, ord=2, axis=1, keepdims=True)

# データの書き込み
pd.DataFrame(nonpara_embs_normed).astype(np.float16).to_csv("../output/jvs_nonpara30_emb.tsv", sep="\t", index=None, header=None, encoding="utf-8")


  0%|          | 0/100 [00:00<?, ?it/s]

## Word Tourによる一次元化

In [8]:
from ortools.constraint_solver import pywrapcp
from ortools.constraint_solver import routing_enums_pb2

# コサイン距離((1 - コサイン類似度)/2)を用いて巡回セールスマン問題を解き、一次元上にembeddingを並べる

data = {}
data["distance_matrix"] = ((1 - np.einsum("nd,md->nm", nonpara_embs_normed, nonpara_embs_normed)) / 2. * (1. - np.eye(100)) * 1e9).astype(np.int64).tolist()
data["num_vehicles"] = 1
data["depot"] = 0


def distance_callback(from_index, to_index):
    """Returns the distance between the two nodes."""
    # Convert from routing variable Index to distance matrix NodeIndex.
    from_node = manager.IndexToNode(from_index)
    to_node = manager.IndexToNode(to_index)
    return data['distance_matrix'][from_node][to_node]

def get_routes(solution, routing, manager):
    """Get vehicle routes from a solution and store them in an array."""
    # Get vehicle routes and store them in a two dimensional array whose
    # i,j entry is the jth location visited by vehicle i along its route.
    routes = []
    for route_nbr in range(routing.vehicles()):
        index = routing.Start(route_nbr)
        route = [manager.IndexToNode(index)]
        while not routing.IsEnd(index):
            index = solution.Value(routing.NextVar(index))
            route.append(manager.IndexToNode(index))
        routes.append(route)
    return routes[0]

manager = pywrapcp.RoutingIndexManager(len(data['distance_matrix']),
                                       data['num_vehicles'], data['depot'])
routing = pywrapcp.RoutingModel(manager)
transit_callback_index = routing.RegisterTransitCallback(distance_callback)
routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index)
search_parameters = pywrapcp.DefaultRoutingSearchParameters()
search_parameters.first_solution_strategy = (
    routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC)

solution = routing.SolveWithParameters(search_parameters)
routes = [f"jvs{x+1:03}" for x in get_routes(solution, routing, manager)[:-1]]
print(routes)

['jvs001', 'jvs052', 'jvs099', 'jvs089', 'jvs041', 'jvs054', 'jvs074', 'jvs081', 'jvs076', 'jvs097', 'jvs023', 'jvs031', 'jvs028', 'jvs071', 'jvs022', 'jvs042', 'jvs027', 'jvs057', 'jvs051', 'jvs094', 'jvs004', 'jvs085', 'jvs093', 'jvs082', 'jvs039', 'jvs019', 'jvs024', 'jvs025', 'jvs072', 'jvs092', 'jvs007', 'jvs017', 'jvs055', 'jvs066', 'jvs014', 'jvs067', 'jvs053', 'jvs065', 'jvs038', 'jvs060', 'jvs061', 'jvs029', 'jvs096', 'jvs056', 'jvs059', 'jvs091', 'jvs002', 'jvs043', 'jvs083', 'jvs036', 'jvs084', 'jvs026', 'jvs069', 'jvs062', 'jvs063', 'jvs064', 'jvs095', 'jvs030', 'jvs090', 'jvs015', 'jvs010', 'jvs058', 'jvs018', 'jvs016', 'jvs008', 'jvs040', 'jvs035', 'jvs087', 'jvs077', 'jvs011', 'jvs032', 'jvs005', 'jvs013', 'jvs070', 'jvs009', 'jvs088', 'jvs073', 'jvs078', 'jvs046', 'jvs047', 'jvs003', 'jvs050', 'jvs048', 'jvs034', 'jvs006', 'jvs021', 'jvs098', 'jvs068', 'jvs075', 'jvs044', 'jvs049', 'jvs079', 'jvs086', 'jvs037', 'jvs045', 'jvs080', 'jvs033', 'jvs020', 'jvs100', 'jvs012']

## 音声の聞き比べ

In [12]:
from IPython.display import Audio
# iで話者、xでスクリプトを選択する
i, x = 0, 1
# Audio("../dataset/jvs/jvs_ver1/{}/parallel100/wav24kHz16bit/VOICEACTRESS100_{:03}.wav".format(routes[i], x), autoplay=True)

# embedding projectorへの反映

In [10]:
config_json = {"embeddings": [{"tensorName": "Parallel100",
                               "tensorShape": [100, 512],
                               "tensorPath": "https://raw.githubusercontent.com/nadare881/voice-changer-vector-search/main/poc/output/jvs_parallel100_emb.tsv",
                               "metadataPath": "https://raw.githubusercontent.com/nadare881/voice-changer-vector-search/main/poc/output/jvs_speaker_meta.tsv"},
                              {"tensorName": "Nonpara30",
                               "tensorShape": [100, 512],
                               "tensorPath": "https://raw.githubusercontent.com/nadare881/voice-changer-vector-search/main/poc/output/jvs_nonpara30_emb.tsv",
                               "metadataPath": "https://raw.githubusercontent.com/nadare881/voice-changer-vector-search/main/poc/output/jvs_speaker_meta.tsv"}]}
with open("../output/embeddding_projector_config.json", "w", encoding="utf-8") as f:
    json.dump(config_json, f, indent=2, ensure_ascii=False)

結果は[こちら(embedding projector)](https://projector.tensorflow.org/?config=https://raw.githubusercontent.com/nadare881/voice-changer-vector-search/main/poc/output/embeddding_projector_config.json)