# Clonage de Voix avec Bark et HuBERT
## Ce notebook présente une étude sur le clonage de voix en utilisant des modèles avancés comme Bark et HuBERT. Le clonage de voix permet de créer une réplique numérique d'une voix humaine en reproduisant fidèlement ses caractéristiques distinctives, comme le timbre, l'intonation, et le style de parole.

## Installation des Dépendances
### Nous commençons par installer les bibliothèques nécessaires pour le projet, y compris bark-voice-cloning-HuBERT-quantizer pour le modèle HuBERT et bark pour la synthèse vocale.

In [1]:
# Installation des bibliothèques nécessaires
!pip install git+https://github.com/gitmylo/bark-voice-cloning-HuBERT-quantizer
!pip install git+https://github.com/suno-ai/bark.git

Collecting git+https://github.com/gitmylo/bark-voice-cloning-HuBERT-quantizer
  Cloning https://github.com/gitmylo/bark-voice-cloning-HuBERT-quantizer to /tmp/pip-req-build-42adlb34
  Running command git clone --filter=blob:none --quiet https://github.com/gitmylo/bark-voice-cloning-HuBERT-quantizer /tmp/pip-req-build-42adlb34
  Resolved https://github.com/gitmylo/bark-voice-cloning-HuBERT-quantizer to commit 4f42e44480fb076a52ddeb1f5ec6132d3c1ad25a
  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting audiolm-pytorch==1.1.4 (from bark_hubert_quantizer==0.0.4)
  Downloading audiolm_pytorch-1.1.4-py3-none-any.whl.metadata (1.2 kB)
Collecting fairseq (from bark_hubert_quantizer==0.0.4)
  Downloading fairseq-0.12.2.tar.gz (9.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.6/9.6 MB[0m [31m29.2 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?2

## Chargement et Préparation des Modèles
### Nous chargeons les modèles Bark et HuBERT pour générer et manipuler l'audio.

In [2]:
from bark import SAMPLE_RATE, generate_audio, preload_models
from IPython.display import Audio

# Chargement des modèles Bark
preload_models()

text_2.pt:   0%|          | 0.00/5.35G [00:00<?, ?B/s]

  checkpoint = torch.load(ckpt_path, map_location=device)


tokenizer_config.json:   0%|          | 0.00/49.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/996k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.96M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/625 [00:00<?, ?B/s]



coarse_2.pt:   0%|          | 0.00/3.93G [00:00<?, ?B/s]

fine_2.pt:   0%|          | 0.00/3.74G [00:00<?, ?B/s]

  WeightNorm.apply(module, name, dim)
Downloading: "https://dl.fbaipublicfiles.com/encodec/v0/encodec_24khz-d7cc33bc.th" to /root/.cache/torch/hub/checkpoints/encodec_24khz-d7cc33bc.th
100%|██████████| 88.9M/88.9M [00:00<00:00, 133MB/s] 


## Génération Audio à Partir de Texte
### Nous générons un exemple audio en utilisant Bark à partir d'un texte simple.

In [4]:
# Génération d'audio à partir d'un texte
text_prompt = """
    Hello, my name is Serpy. And, uh — and I like pizza. [laughs] 
    But I also have other interests such as playing tic tac toe.
"""
audio_array = generate_audio(text_prompt)

# Lecture de l'audio généré
Audio(audio_array, rate=SAMPLE_RATE)

  with InferenceContext(), torch.inference_mode(), torch.no_grad(), autocast():
100%|██████████| 620/620 [00:08<00:00, 71.37it/s]
100%|██████████| 32/32 [00:32<00:00,  1.01s/it]


## Clonage de Voix : Préparation de l'Audio et des Modèles
### Ensuite, nous préparons l'audio à cloner et configurons les modèles HuBERT et CustomTokenizer.

In [5]:
from bark.generation import load_codec_model
from encodec.utils import convert_audio
import torchaudio
import torch

device = 'cuda' if torch.cuda.is_available() else 'cpu'

# Chargement du modèle Bark
model = load_codec_model(use_gpu=True if device == 'cuda' else False)

# Chargement de l'audio
audio_filepath = '/kaggle/input/audiosamy/telecharger.wav'
wav, sr = torchaudio.load(audio_filepath)
wav = convert_audio(wav, sr, model.sample_rate, model.channels).to(device)

## Génération de Tokens Sémantiques avec HuBERT
### Nous utilisons le modèle HuBERT pour extraire les tokens sémantiques de l'audio.

In [10]:
"""
Custom tokenizer model.
Author: https://www.github.com/gitmylo/
License: MIT
"""

import json
import os.path
from zipfile import ZipFile

import numpy
import torch
from torch import nn, optim
from torch.serialization import MAP_LOCATION


class CustomTokenizer(nn.Module):
    def __init__(self, hidden_size=1024, input_size=768, output_size=10000, version=0):
        super(CustomTokenizer, self).__init__()
        next_size = input_size
        if version == 0:
            self.lstm = nn.LSTM(input_size, hidden_size, 2, batch_first=True)
            next_size = hidden_size
        if version == 1:
            self.lstm = nn.LSTM(input_size, hidden_size, 2, batch_first=True)
            self.intermediate = nn.Linear(hidden_size, 4096)
            next_size = 4096

        self.fc = nn.Linear(next_size, output_size)
        self.softmax = nn.LogSoftmax(dim=1)
        self.optimizer: optim.Optimizer = None
        self.lossfunc = nn.CrossEntropyLoss()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.version = version

    def forward(self, x):
        x, _ = self.lstm(x)
        if self.version == 1:
            x = self.intermediate(x)
        x = self.fc(x)
        x = self.softmax(x)
        return x

    @torch.no_grad()
    def get_token(self, x):
        """
        Used to get the token for the first
        :param x: An array with shape (N, input_size) where N is a whole number greater or equal to 1, and input_size is the input size used when creating the model.
        :return: An array with shape (N,) where N is the same as N from the input. Every number in the array is a whole number in range 0...output_size - 1 where output_size is the output size used when creating the model.
        """
        return torch.argmax(self(x), dim=1)

    def prepare_training(self):
        self.optimizer = optim.Adam(self.parameters(), 0.001)

    def train_step(self, x_train, y_train, log_loss=False):
        # y_train = y_train[:-1]
        # y_train = y_train[1:]

        optimizer = self.optimizer
        lossfunc = self.lossfunc
        # Zero the gradients
        self.zero_grad()

        # Forward pass
        y_pred = self(x_train)

        y_train_len = len(y_train)
        y_pred_len = y_pred.shape[0]

        if y_train_len > y_pred_len:
            diff = y_train_len - y_pred_len
            y_train = y_train[diff:]
        elif y_train_len < y_pred_len:
            diff = y_pred_len - y_train_len
            y_pred = y_pred[:-diff, :]

        y_train_hot = torch.zeros(len(y_train), self.output_size)
        y_train_hot[range(len(y_train)), y_train] = 1
        y_train_hot = y_train_hot.to('cuda')

        # Calculate the loss
        loss = lossfunc(y_pred, y_train_hot)

        # Print loss
        if log_loss:
            print('Loss', loss.item())

        # Backward pass
        loss.backward()

        # Update the weights
        optimizer.step()

    def save(self, path):
        info_path = '.'.join(os.path.basename(path).split('.')[:-1]) + '/.info'
        torch.save(self.state_dict(), path)
        data_from_model = Data(self.input_size, self.hidden_size, self.output_size, self.version)
        with ZipFile(path, 'a') as model_zip:
            model_zip.writestr(info_path, data_from_model.save())
            model_zip.close()

    @staticmethod
    def load_from_checkpoint(path, map_location: MAP_LOCATION = None):
        old = True
        with ZipFile(path) as model_zip:
            filesMatch = [file for file in model_zip.namelist() if file.endswith('/.info')]
            file = filesMatch[0] if filesMatch else None
            if file:
                old = False
                data_from_model = Data.load(model_zip.read(file).decode('utf-8'))
            model_zip.close()
        if old:
            model = CustomTokenizer()
        else:
            model = CustomTokenizer(data_from_model.hidden_size, data_from_model.input_size, data_from_model.output_size, data_from_model.version)
        model.load_state_dict(torch.load(path))
        if map_location:
            model = model.to(map_location)
        return model



class Data:
    input_size: int
    hidden_size: int
    output_size: int
    version: int

    def __init__(self, input_size=768, hidden_size=1024, output_size=10000, version=0):
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.version = version

    @staticmethod
    def load(string):
        data = json.loads(string)
        return Data(data['input_size'], data['hidden_size'], data['output_size'], data['version'])

    def save(self):
        data = {
            'input_size': self.input_size,
            'hidden_size': self.hidden_size,
            'output_size': self.output_size,
            'version': self.version,
        }
        return json.dumps(data)


def auto_train(data_path, save_path='model.pth', load_model: str | None = None, save_epochs=1):
    data_x, data_y = [], []

    if load_model and os.path.isfile(load_model):
        print('Loading model from', load_model)
        model_training = CustomTokenizer.load_from_checkpoint(load_model, 'cuda')
    else:
        print('Creating new model.')
        model_training = CustomTokenizer(version=1).to('cuda')  # Settings for the model to run without lstm
    save_path = os.path.join(data_path, save_path)
    base_save_path = '.'.join(save_path.split('.')[:-1])

    sem_string = '_semantic.npy'
    feat_string = '_semantic_features.npy'

    ready = os.path.join(data_path, 'ready')
    for input_file in os.listdir(ready):
        full_path = os.path.join(ready, input_file)
        if input_file.endswith(sem_string):
            data_y.append(numpy.load(full_path))
        elif input_file.endswith(feat_string):
            data_x.append(numpy.load(full_path))
    model_training.prepare_training()

    epoch = 1

    while 1:
        for i in range(save_epochs):
            j = 0
            for x, y in zip(data_x, data_y):
                model_training.train_step(torch.tensor(x).to('cuda'), torch.tensor(y).to('cuda'), j % 50 == 0)  # Print loss every 50 steps
                j += 1
        save_p = save_path
        save_p_2 = f'{base_save_path}_epoch_{epoch}.pth'
        model_training.save(save_p)
        model_training.save(save_p_2)
        print(f'Epoch {epoch} completed')
        epoch += 1

In [11]:
# From https://github.com/gitmylo/bark-voice-cloning-HuBERT-quantizer

import os.path
import shutil
import urllib.request

import huggingface_hub


class HuBERTManager:
    @staticmethod
    def make_sure_hubert_installed(download_url: str = 'https://dl.fbaipublicfiles.com/hubert/hubert_base_ls960.pt', file_name: str = 'hubert.pt'):
        install_dir = os.path.join('/kaggle/working/', 'models', 'hubert')
        if not os.path.isdir(install_dir):
            os.makedirs(install_dir, exist_ok=True)
        install_file = os.path.join(install_dir, file_name)
        if not os.path.isfile(install_file):
            print('Downloading HuBERT base model')
            urllib.request.urlretrieve(download_url, install_file)
            print('Downloaded HuBERT')
        return install_file


    @staticmethod
    def make_sure_tokenizer_installed(model: str = 'quantifier_hubert_base_ls960_14.pth', repo: str = 'GitMylo/bark-voice-cloning', local_file: str = 'tokenizer.pth'):
        install_dir = os.path.join('/kaggle/working/', 'models', 'hubert')
        if not os.path.isdir(install_dir):
            os.makedirs(install_dir, exist_ok=True)
        install_file = os.path.join(install_dir, local_file)
        if not os.path.isfile(install_file):
            print('Downloading HuBERT custom tokenizer')
            huggingface_hub.hf_hub_download(repo, model, local_dir=install_dir, local_dir_use_symlinks=False)
            shutil.move(os.path.join(install_dir, model), install_file)
            print('Downloaded tokenizer')
        return install_file

In [12]:
"""
Modified HuBERT model without kmeans.
Original author: https://github.com/lucidrains/
Modified by: https://www.github.com/gitmylo/
License: MIT
"""

# Modified code from https://github.com/lucidrains/audiolm-pytorch/blob/main/audiolm_pytorch/hubert_kmeans.py

from pathlib import Path

import torch
from torch import nn
from einops import pack, unpack

import fairseq

from torchaudio.functional import resample

from audiolm_pytorch.utils import curtail_to_multiple

import logging
logging.root.setLevel(logging.ERROR)


def exists(val):
    return val is not None


def default(val, d):
    return val if exists(val) else d


class CustomHubert(nn.Module):
    """
    checkpoint and kmeans can be downloaded at https://github.com/facebookresearch/fairseq/tree/main/examples/hubert
    or you can train your own
    """

    def __init__(
        self,
        checkpoint_path,
        target_sample_hz=16000,
        seq_len_multiple_of=None,
        output_layer=9,
        device=None
    ):
        super().__init__()
        self.target_sample_hz = target_sample_hz
        self.seq_len_multiple_of = seq_len_multiple_of
        self.output_layer = output_layer

        if device is not None:
            self.to(device)

        model_path = Path(checkpoint_path)

        assert model_path.exists(), f'path {checkpoint_path} does not exist'

        checkpoint = torch.load(checkpoint_path)
        load_model_input = {checkpoint_path: checkpoint}
        model, *_ = fairseq.checkpoint_utils.load_model_ensemble_and_task(load_model_input)

        if device is not None:
            model[0].to(device)

        self.model = model[0]
        self.model.eval()

    @property
    def groups(self):
        return 1

    @torch.no_grad()
    def forward(
        self,
        wav_input,
        flatten=True,
        input_sample_hz=None
    ):
        device = wav_input.device

        if exists(input_sample_hz):
            wav_input = resample(wav_input, input_sample_hz, self.target_sample_hz)

        if exists(self.seq_len_multiple_of):
            wav_input = curtail_to_multiple(wav_input, self.seq_len_multiple_of)

        embed = self.model(
            wav_input,
            features_only=True,
            mask=False,  # thanks to @maitycyrus for noticing that mask is defaulted to True in the fairseq code
            output_layer=self.output_layer
        )

        embed, packed_shape = pack([embed['x']], '* d')

        # codebook_indices = self.kmeans.predict(embed.cpu().detach().numpy())

        codebook_indices = torch.from_numpy(embed.cpu().detach().numpy()).to(device)  # .long()

        if flatten:
            return codebook_indices

        codebook_indices, = unpack(codebook_indices, packed_shape, '*')
        return codebook_indices

In [15]:
hubert_manager = HuBERTManager()
hubert_manager.make_sure_hubert_installed()
hubert_manager.make_sure_tokenizer_installed()

Downloading HuBERT base model
Downloaded HuBERT
Downloading HuBERT custom tokenizer


For more details, check out https://huggingface.co/docs/huggingface_hub/main/en/guides/download#download-files-to-local-folder.


quantifier_hubert_base_ls960_14.pth:   0%|          | 0.00/104M [00:00<?, ?B/s]

Downloaded tokenizer


'/kaggle/working/models/hubert/tokenizer.pth'

In [16]:

# Initialisation et téléchargement des modèles HuBERT
hubert_manager = HuBERTManager()
hubert_model = CustomHubert(checkpoint_path='/kaggle/working/models/hubert/hubert.pt').to(device)


  checkpoint = torch.load(checkpoint_path)
  state = torch.load(f, map_location=torch.device("cpu"))
  WeightNorm.apply(module, name, dim)


## Extraction des Codes et Tokens
### Les codes et tokens sont extraits et préparés pour être utilisés dans la génération d'audio cloné.

In [17]:
# Extraction des vecteurs sémantiques
semantic_vectors = hubert_model.forward(wav, input_sample_hz=model.sample_rate)

# Génération des tokens sémantiques
tokenizer = CustomTokenizer.load_from_checkpoint('/kaggle/working/models/hubert/tokenizer.pth').to(device)
semantic_tokens = tokenizer.get_token(semantic_vectors)

# Extraction des codes discrets d'EnCodec
with torch.no_grad():
    encoded_frames = model.encode(wav.unsqueeze(0))
codes = torch.cat([encoded[0] for encoded in encoded_frames], dim=-1).squeeze().cpu()


  model.load_state_dict(torch.load(path))


## Sauvegarde des Prompts
### Nous sauvegardons les prompts générés pour les utiliser comme historique dans la génération audio.

In [20]:
import numpy as np

# Ensure tensors are moved to the CPU and converted to NumPy arrays
fine_prompt = codes.cpu().numpy()
coarse_prompt = codes[:2, :].cpu().numpy()
semantic_prompt = semantic_tokens.cpu().numpy()

# Save the prompts
voice_name = 'output'
output_path = '/kaggle/working/' + voice_name + '.npz'
np.savez(output_path, fine_prompt=fine_prompt, coarse_prompt=coarse_prompt, semantic_prompt=semantic_prompt)


## Génération d'Audio avec la Voix Clonée
### Enfin, nous utilisons les prompts sauvegardés pour générer un nouvel audio avec la voix clonée.

In [21]:
from bark.api import generate_audio
from bark.generation import preload_models

# Préchargement des modèles Bark
preload_models(text_use_gpu=True, coarse_use_gpu=True, fine_use_gpu=True, codec_use_gpu=True)

# Génération de l'audio avec la voix clonée
text_prompt = "Bonjour, ceci est un test de voix clonée!"
audio_array = generate_audio(text_prompt, history_prompt=output_path, text_temp=0.7, waveform_temp=0.7)

# Sauvegarde de l'audio généré
from scipy.io.wavfile import write as write_wav
output_audio_path = '/kaggle/working/cloned_audio.wav'
write_wav(output_audio_path, SAMPLE_RATE, audio_array)

  with InferenceContext(), torch.inference_mode(), torch.no_grad(), autocast():
100%|██████████| 312/312 [00:04<00:00, 71.48it/s]
100%|██████████| 16/16 [00:17<00:00,  1.11s/it]


In [23]:
Audio(audio_array,rate=SAMPLE_RATE)

### Dans l'audio donné en parametre il y avait une musique de fond et maintenant dans la voix clonné il y a encore la musique de fond

### Nouvelle voix

In [None]:
# Chargement de l'audio
audio_filepath = '/kaggle/input/audiosamy/telecharger.wav'
wav, sr = torchaudio.load(audio_filepath)
wav = convert_audio(wav, sr, model.sample_rate, model.channels).to(device)