# ContentVec を transformers ベースで使えるようにする

Lyodos 著

Version 1.0.0 (2024-07-14)

> このノートブックにある手順は、 https://huggingface.co/lengyue233/content-vec-best/blob/main/convert.py による。

ContentVec の重みは、公式で配布されているもの（ContentVec_legacy 500 と書いてあるリンク）を落とす。

https://github.com/auspicious3000/contentvec

https://ibm.ent.box.com/s/z1wgl1stco8ffooyatzdwsqn2psd9lrr

ただし公式のネットワーク定義だと ONNX 化した際に不具合が出るので、transformers パッケージに準拠して HuBERT を作り、
そちらに重みを移植する。



> なお、ここからの作業中に以下のエラーが出る場合は protobuf のバージョンが tensorflow に対して新しすぎるので、TF 自体を上げるか、protobuf をダウングレードするか、あるいは ['PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION'] = 'python' を設定する。

```
TypeError: Descriptors cannot be created directly.
If this call came from a _pb2.py file, your generated code is out of date and must be regenerated with protoc >= 3.19.0.
If you cannot immediately regenerate your protos, some other possible workarounds are:
 1. Downgrade the protobuf package to 3.20.x or lower.
 2. Set PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python (but this will use pure-Python parsing and will be much slower).

More information: https://developers.google.com/protocol-buffers/docs/news/2022-05-06#python-updates
```

In [None]:
# ただしこの環境変数を入れると重くなる
import os

os.environ['PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION'] = 'python'


----

### まず ContentVec のモデルの重みをロードしてみる

落としたファイルを "checkpoint_best_legacy_500.pt" として以下のフォルダに配置したものとする。
ここから、まず `fairseq` の方式でモデルをタスクとしてロードする。

> 蛇足：最初は混乱するのだが、`ContentVec` という名前のゼロから開発された新規構造のモデルがあるわけではない。
PyTorch ベースで系列モデルを扱うための `fairseq` というツールキットがあり、`ContentVec` のなかでも "legacy" と書いてある重みについては、
このツールキットに従う形でモデルを初期化して利用できる。
> また `ContentVec` のモデル構造としては、`fairseq` を通じて提供されている様々なモデル構造の中でも `HuBERT` を採用している。
> ただし `fairseq` で提供される `HuBERT` は、ネットワークの定義方法の都合で ONNX に書き出すと不具合が生じる。
> なので、同様に `HuBERT` 構造を提供しているツールキットであり、かつ ONNX に書き出せる形で定義コードが書かれている `transformers` を使う。
>  `fairseq` 用に作られた重み（チェックポイント）の辞書をそのまま `transformers` で読むことは困難なので、辞書の項目名を書き換えてやるわけだ。



In [None]:
from pathlib import Path
from fairseq import checkpoint_utils # なければ https://pypi.org/project/fairseq/


DATASET_ROOT_PATH = Path("/home/lyodos/study/dataset") # このフォルダ名はユーザーの実情に合わせて書き変えること

best_legacy_500_path = DATASET_ROOT_PATH / "checkpoints" / "classic-vc" / "checkpoint_best_legacy_500.pt"

models, _, _ = checkpoint_utils.load_model_ensemble_and_task(
    [str(best_legacy_500_path)], 
    suffix = "",
)

model = models[0]
model.eval()


----

### 辞書の移植用に一時的に使用する HubertModelWithFinalProj モデルクラスの定義

次に、transformers パッケージが提供する HubertModel に基づいて、新規のモデルクラス HubertModelWithFinalProj を定義する。

HubertModel の元定義は以下にある。
https://github.com/huggingface/transformers/blob/main/src/transformers/models/hubert/modeling_hubert.py#L1330

HubertModelWithFinalProj は、HubertModel に self.final_proj という層を足したものである。
ContentVec 公式で配布されているモデルは、この final_proj という線形層を持つが、`transformers` パッケージで定義されている構造には存在しない。

* ちなみに `__init__` で追加しているだけであって `forward` を書き換えていないので、いわゆる orphan であり推論結果には影響しない。

* 重みを移植するとき層の構造が違うぞと叱られないためだけに、つじつま合わせに付けている。

* 最終的には ClassicVC ではこのネットワーク定義は使用しない。
`final_proj` は不要なので、そのまま `transformers` の `HubertModel` を使うだけ。



In [None]:
import torch

from transformers import HubertConfig, HubertModel
from torch import nn

class HubertModelWithFinalProj(HubertModel):
    def __init__(self, config):
        super().__init__(config)
        self.final_proj = nn.Linear(config.hidden_size, config.classifier_proj_size)

# デフォルトの config でインスタンス化
hubert = HubertModelWithFinalProj(HubertConfig())



すでに作ってある model から、新しく作った hubert に重みを移植する。
まず mapping として、層名を読み替える辞書（新：旧）を作る。


In [None]:
# huggingface <- fairseq
mapping = {
    "masked_spec_embed": "mask_emb",
    "encoder.layer_norm.bias": "encoder.layer_norm.bias",
    "encoder.layer_norm.weight": "encoder.layer_norm.weight",
    "encoder.pos_conv_embed.conv.bias": "encoder.pos_conv.0.bias",
    "encoder.pos_conv_embed.conv.weight_g": "encoder.pos_conv.0.weight_g",
    "encoder.pos_conv_embed.conv.weight_v": "encoder.pos_conv.0.weight_v",
    "feature_projection.layer_norm.bias": "layer_norm.bias",
    "feature_projection.layer_norm.weight": "layer_norm.weight",
    "feature_projection.projection.bias": "post_extract_proj.bias",
    "feature_projection.projection.weight": "post_extract_proj.weight",
    "final_proj.bias": "final_proj.bias",
    "final_proj.weight": "final_proj.weight",
}

# Convert encoder
for layer in range(12):
    for j in ["q", "k", "v"]:
        mapping[
            f"encoder.layers.{layer}.attention.{j}_proj.weight"
        ] = f"encoder.layers.{layer}.self_attn.{j}_proj.weight"
        mapping[
            f"encoder.layers.{layer}.attention.{j}_proj.bias"
        ] = f"encoder.layers.{layer}.self_attn.{j}_proj.bias"

    mapping[
        f"encoder.layers.{layer}.final_layer_norm.bias"
    ] = f"encoder.layers.{layer}.final_layer_norm.bias"
    mapping[
        f"encoder.layers.{layer}.final_layer_norm.weight"
    ] = f"encoder.layers.{layer}.final_layer_norm.weight"

    mapping[
        f"encoder.layers.{layer}.layer_norm.bias"
    ] = f"encoder.layers.{layer}.self_attn_layer_norm.bias"
    mapping[
        f"encoder.layers.{layer}.layer_norm.weight"
    ] = f"encoder.layers.{layer}.self_attn_layer_norm.weight"

    mapping[
        f"encoder.layers.{layer}.attention.out_proj.bias"
    ] = f"encoder.layers.{layer}.self_attn.out_proj.bias"
    mapping[
        f"encoder.layers.{layer}.attention.out_proj.weight"
    ] = f"encoder.layers.{layer}.self_attn.out_proj.weight"

    mapping[
        f"encoder.layers.{layer}.feed_forward.intermediate_dense.bias"
    ] = f"encoder.layers.{layer}.fc1.bias"
    mapping[
        f"encoder.layers.{layer}.feed_forward.intermediate_dense.weight"
    ] = f"encoder.layers.{layer}.fc1.weight"

    mapping[
        f"encoder.layers.{layer}.feed_forward.output_dense.bias"
    ] = f"encoder.layers.{layer}.fc2.bias"
    mapping[
        f"encoder.layers.{layer}.feed_forward.output_dense.weight"
    ] = f"encoder.layers.{layer}.fc2.weight"

# Convert Conv Layers
for layer in range(7):
    mapping[
        f"feature_extractor.conv_layers.{layer}.conv.weight"
    ] = f"feature_extractor.conv_layers.{layer}.0.weight"

    if layer != 0:
        continue

    mapping[
        f"feature_extractor.conv_layers.{layer}.layer_norm.weight"
    ] = f"feature_extractor.conv_layers.{layer}.2.weight"
    mapping[
        f"feature_extractor.conv_layers.{layer}.layer_norm.bias"
    ] = f"feature_extractor.conv_layers.{layer}.2.bias"

hf_keys = set(hubert.state_dict().keys())
fair_keys = set(model.state_dict().keys())

hf_keys -= set(mapping.keys())
fair_keys -= set(mapping.values())



上で作った対照表に基づき、HubertModelWithFinalProj インスタンスに重みを移植していく。


In [None]:
# try loading the weights
new_state_dict = {}

for k, v in mapping.items():
    new_state_dict[k] = model.state_dict()[v]

x = hubert.load_state_dict(new_state_dict, strict = False)
print(x)
hubert.eval()



この方法で作ったモデルが、通常通り順伝播できて、しかも ONNX 化の可能な ContentVec モデルになる。
ただし最終隠れ層の状態を取り出して VC に使おうとすると、
単なる順伝播の返り値でなく `["last_hidden_state"]` という辞書にアクセスする必要がある。



In [None]:
import torchaudio
import torchinfo

device = "cuda:0"

# VCTK コーパスからのボイスサンプルを読み込んでみる
with torch.no_grad():
    waveform, orig_sr = torchaudio.load('../wavs/p225_003.wav') # まずオリジナル周波数でロード
    wav16 = torchaudio.transforms.Resample(orig_freq = orig_sr, new_freq = 16000)(waveform).to(device)
    %time content = hubert.to(device)(wav16, output_hidden_states = True)

print(type(content))
print(len(content))
print(content.keys())
print(content["last_hidden_state"].shape)

torchinfo.summary(model = hubert, input_size = wav16.shape, depth = 4, 
                  col_names=["input_size", "output_size", "num_params"], device = "cpu")



In [None]:
# 変換されたモデルと元のモデルの出力の比較

with torch.no_grad():
    new_input = torch.randn(1, 16384)

    # こちらは hubert の順伝播。返り値のうち、隠れ層の 9 番目が必要。
    result1 = hubert(new_input, output_hidden_states = True)["hidden_states"][9]
    # 隠れ層の 9 番目で
    result1 = hubert.final_proj(result1)

    # こちらが古いモデル。
    result2 = model.extract_features(
        **{
            "source": new_input,
            "padding_mask": torch.zeros(1, 16384, dtype=torch.bool),
            # "features_only": True,
            "output_layer": 9,
        }
    )[0]
    result2 = model.final_proj(result2)

    assert torch.allclose(result1, result2, atol=1e-3)

print("Sanity check passed")



---

### 重みの保存




では、この定義部分だけをどうやって取り出すか。

実は Hubert クラス（HubertModelWithFinalProj ではなく）をそのままインスタンス化するだけでいい。

ここに上の hubert インスタンスの state dict をロードして、それからローカルに保存する。


In [None]:
from pathlib import Path
from transformers import HubertConfig, HubertModel

CE = HubertModel(HubertConfig())

CE_dict = hubert.cpu().state_dict() # 重みだけ取り出す
CE.load_state_dict(CE_dict, strict = False) # 新しいモデルインスタンスにロード
CE.eval()



テストしてみる。


In [None]:
with torch.no_grad():
    waveform, orig_sr = torchaudio.load('../wavs/p225_003.wav') # まずオリジナル周波数でロード
    wav16 = torchaudio.transforms.Resample(orig_freq = orig_sr, new_freq = 16000)(waveform).to(device)
    %time content = CE.to(device)(wav16, output_hidden_states = True)

print(type(content))
print(len(content))
print(content.keys())
print(content["last_hidden_state"].shape)

torchinfo.summary(model = CE, input_size = wav16.shape, depth = 4, 
                  col_names=["input_size", "output_size", "num_params"], device = "cpu")



重みをファイルに保存する。
訓練時はこの PyTorch 用の重みをロードして使う。


In [None]:

save_path = DATASET_ROOT_PATH / "checkpoints" / "classic-vc" / "contentvec_500_hubert.pth"

torch.save(
    CE.cpu().state_dict(),
    save_path,
)



----

### ONNX 化

この CE は通常の方法で ONNX に export できる。なお `dynamic_axes` で可変長時系列に対応させる必要がある。



In [None]:

tensor_x = torch.rand((1, 16000*4), dtype = torch.float32) * 0.2

onnx_path = DATASET_ROOT_PATH / "checkpoints" / "classic-vc" / "hubert500.onnx"

# ONNX形式にエクスポート
torch.onnx.export(
    CE, 
    tensor_x, 
    onnx_path, 
    opset_version = 17,
    input_names=['input'],
    output_names=['last_hidden_state'],
    dynamic_axes = {
        'input': {0: 'batch', 1: 'samples'},
        'last_hidden_state': {0: 'batch', 1: 'frames'}
    }
)



In [None]:
import librosa

audio_file = '../wavs/p225_003.wav'
y, sr = librosa.load(audio_file, sr = None, mono = False)
y16 = librosa.resample(y, orig_sr = sr, target_sr = 16000)

if y.ndim == 1:  # モノラル
    y16 = y16.reshape(1, -1)
elif y.ndim == 2:  # ステレオ
    y16 = y16.T

print(y16.shape)



In [None]:
import onnxruntime

sess = onnxruntime.InferenceSession(
    onnx_path, 
    providers  =[
        'CUDAExecutionProvider', 
        'CPUExecutionProvider',
    ],
)

for i in sess.get_inputs():
    print("Input name: '{}', type: {}, shape: {}".format(i.name, i.shape, i.type))

for o in sess.get_outputs():
    print("Output name: '{}', type: {}, shape: {}".format(o.name, o.shape, o.type))

%time content_onnx = sess.run(['last_hidden_state'], {"input": y16})[0]

print(y16.shape, content_onnx.shape)

import matplotlib.pyplot as plt

plt.imshow(content_torch.squeeze(), vmin = -1, vmax = 1)
plt.tight_layout()
plt.show()

plt.imshow(content_onnx.squeeze(), vmin = -1, vmax = 1)
plt.tight_layout()
plt.show()

