In [1]:
import os

os.environ['CUDA_VISIBLE_DEVICES'] = '2'

In [2]:
from dynamicbatch_ttspipeline.f5_tts.load import (
    load_f5_tts,
    load_vocoder,
    target_sample_rate,
    hop_length,
    nfe_step,
    cfg_strength,
    sway_sampling_coef,
)
from dynamicbatch_ttspipeline.f5_tts.utils import (
    chunk_text,
    convert_char_to_pinyin,
)
from pydub import AudioSegment, silence
import torchaudio
import torch
import torch.nn.functional as F
from transformers import pipeline
import numpy as np
import librosa

In [3]:
torch_dtype = torch.bfloat16
model_name = 'mesolitica/Malaysian-F5-TTS'

In [5]:
device = 'cuda'
model = load_f5_tts(model_name = model_name, device = device, dtype = torch.float16)
model

Fetching 4 files:   0%|          | 0/4 [00:00<?, ?it/s]

CFM(
  (mel_spec): MelSpec()
  (transformer): DiT(
    (time_embed): TimestepEmbedding(
      (time_embed): SinusPositionEmbedding()
      (time_mlp): Sequential(
        (0): Linear(in_features=256, out_features=1024, bias=True)
        (1): SiLU()
        (2): Linear(in_features=1024, out_features=1024, bias=True)
      )
    )
    (text_embed): TextEmbedding(
      (text_embed): Embedding(2546, 512)
      (text_blocks): Sequential(
        (0): ConvNeXtV2Block(
          (dwconv): Conv1d(512, 512, kernel_size=(7,), stride=(1,), padding=(3,), groups=512)
          (norm): LayerNorm((512,), eps=1e-06, elementwise_affine=True)
          (pwconv1): Linear(in_features=512, out_features=1024, bias=True)
          (act): GELU(approximate='none')
          (grn): GRN()
          (pwconv2): Linear(in_features=1024, out_features=512, bias=True)
        )
        (1): ConvNeXtV2Block(
          (dwconv): Conv1d(512, 512, kernel_size=(7,), stride=(1,), padding=(3,), groups=512)
          (norm)

In [6]:
vocoder = load_vocoder(device = device)

In [7]:
asr_pipe = pipeline(
    "automatic-speech-recognition",
    model="openai/whisper-large-v3-turbo",
    torch_dtype=torch_dtype,
    device=device,
)

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [226]:
audio_input = 'stress-test/anwar-ibrahim.mp3'
dwav, sr_ = torchaudio.load(audio_input)
dwav = dwav.mean(dim=0).numpy()

In [9]:
r_asr = asr_pipe(
    [dwav, dwav],
    chunk_length_s=30,
    batch_size=8,
    generate_kwargs={"task": "transcribe"},
    return_timestamps=False,
)

You have passed task=transcribe, but also have set `forced_decoder_ids` to [[1, None], [2, 50360]] which creates a conflict. `forced_decoder_ids` will be ignored in favor of task=transcribe.
Passing a tuple of `past_key_values` is deprecated and will be removed in Transformers v4.43.0. You should pass an instance of `EncoderDecoderCache` instead, e.g. `past_key_values=EncoderDecoderCache.from_legacy_cache(past_key_values)`.
The attention mask is not set and cannot be inferred from input because pad token is same as eos token.As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.


In [227]:
transcription_input = r_asr[0]['text'].strip()

In [228]:
target_rms = 0.1
audio = dwav
ref_text = transcription_input
rms = np.sqrt(np.mean(np.square(audio)))
if rms < target_rms:
    audio = audio * target_rms / rms
    
if sr_ != target_sample_rate:
    audio = librosa.resample(audio, orig_sr = sr_, target_sr = target_sample_rate)

In [229]:
if not ref_text.endswith(". ") and not ref_text.endswith("。"):
    if ref_text.endswith("."):
        ref_text += " "
    else:
        ref_text += ". "
    
ref_text

'Saya ingin nyatakan bahawa cara menangani masalah ekonomi dalam krisis tidak sama dalam keadaan biasa. Sebab itu soal defisit, soal hutang dan soal paling termasuk moratorium. '

In [230]:
max_chars = int(len(ref_text.encode("utf-8")) / (audio.shape[-1] / sr_) * (25 - audio.shape[-1] / sr_))

In [231]:
gen_text_batches = chunk_text(text, max_chars=max_chars)
gen_text_batches

['ketiak saya masham sangat tau tak tetapi lazat.']

In [232]:
ref_audio_len = audio.shape[-1] // hop_length
speed = 1

In [233]:
texts = [
    'helo nama saya husein bin zolkepli, ',
    'ketiak saya masham sangat tau tak tetapi lazat.',
]
final_text_lists, durations, after_durations = [], [], []
for text in texts:
    gen_text_batches = chunk_text(text, max_chars=max_chars)
    for gen_text in gen_text_batches:
        text_list = [ref_text + gen_text]
        final_text_list = convert_char_to_pinyin(text_list)
        ref_text_len = len(ref_text.encode("utf-8"))
        gen_text_len = len(gen_text.encode("utf-8"))
        after_duration = int(ref_audio_len / ref_text_len * gen_text_len / speed)
        final_text_lists.append(final_text_list[0])
        durations.append(ref_audio_len + after_duration)
        after_durations.append(after_duration)

In [234]:
lengths = [len(l) for l in final_text_lists]
maxlen = max(lengths)
batch_final_text_lists = []
for t in final_text_lists:
    batch_final_text_lists.append(t + ['.'] * (maxlen - len(t)))

In [235]:
# def remove_silence_edges(audio, silence_threshold=-42):
#     # Remove silence from the start
#     non_silent_start_idx = silence.detect_leading_silence(audio, silence_threshold=silence_threshold)
#     audio = audio[non_silent_start_idx:]

#     # Remove silence from the end
#     non_silent_end_duration = audio.duration_seconds
#     for ms in reversed(audio):
#         if ms.dBFS > silence_threshold:
#             break
#         non_silent_end_duration -= 0.001
#     trimmed_audio = audio[: int(non_silent_end_duration * 1000)]

#     return trimmed_audio

In [236]:
# import matplotlib.pyplot as plt

# plt.plot(audio)

In [237]:
# audio_segment = AudioSegment.from_file('stress-test/anwar-ibrahim.mp3')
# remove_silent = remove_silence_edges(audio_segment)
# remove_silent.export('test.wav', format="wav")
# new_y, _ = torchaudio.load('test.wav')
# new_y = new_y.mean(dim=0).numpy()

In [238]:
audio = torch.Tensor(audio[None,:])
audio = audio.to(device)
audio.shape

torch.Size([1, 351144])

In [239]:
durations

[1643, 1737]

In [240]:
# import numpy as np

# [0.0000, 0.0323, 0.0645, 0.0968, 0.1290, 0.1613, 0.1936, 0.2258, 0.2581,
#         0.2903, 0.3225, 0.3547, 0.3872, 0.4194, 0.4517, 0.4839, 0.5161, 0.5483,
#         0.5806, 0.6128, 0.6455, 0.6777, 0.7100, 0.7422, 0.7744, 0.8066, 0.8389,
#         0.8711, 0.9033, 0.9355, 0.9678, 1.0000]
# timelike = np.array([0.0000, 0.0015, 0.0054, 0.0117, 0.0205, 0.0317, 0.0459, 0.0625, 0.0811,
#         0.1021, 0.1255, 0.1514, 0.1792, 0.2090, 0.2412, 0.2754, 0.3110, 0.3486,
#         0.3877, 0.4282, 0.4712, 0.5151, 0.5601, 0.6064, 0.6533, 0.7002, 0.7490,
#         0.7988, 0.8486, 0.8989, 0.9497, 0.9995])
# timelike[1:] > timelike[:-1]
# diff = timelike[1:] > timelike[:-1]
# assert diff.all() or (~diff).all()

In [241]:
# [0.0000, 0.0322, 0.0645, 0.0967, 0.1289, 0.1611, 0.1934, 0.2256, 0.2578,
#         0.2891, 0.3223, 0.3555, 0.3867, 0.4180, 0.4512, 0.4844, 0.5156, 0.5469,
#         0.5820, 0.6133, 0.6445, 0.6797, 0.7109, 0.7422, 0.7734, 0.8047, 0.8398,
#         0.8711, 0.9023, 0.9375, 0.9688, 1.0000]
# timelike = np.array([0.0000, 0.0000, 0.0039, 0.0117, 0.0195, 0.0312, 0.0469, 0.0625, 0.0820,
#         0.1016, 0.1250, 0.1523, 0.1797, 0.2070, 0.2383, 0.2773, 0.3086, 0.3477,
#         0.3906, 0.4297, 0.4727, 0.5195, 0.5625, 0.6016, 0.6562, 0.6992, 0.7500,
#         0.7969, 0.8438, 0.8984, 0.9531, 1.0000])
# diff = timelike[1:] > timelike[:-1]
# assert diff.all() or (~diff).all()

In [242]:
# import torch

# t = torch.Tensor([0.0000, 0.0322, 0.0645, 0.0967, 0.1289, 0.1611, 0.1934, 0.2256, 0.2578,
#         0.2891, 0.3223, 0.3555, 0.3867, 0.4180, 0.4512, 0.4844, 0.5156, 0.5469,
#         0.5820, 0.6133, 0.6445, 0.6797, 0.7109, 0.7422, 0.7734, 0.8047, 0.8398,
#         0.8711, 0.9023, 0.9375, 0.9688, 1.0000]).type(torch.bfloat16)
# t

In [243]:
durations

[1643, 1737]

In [244]:
with torch.no_grad():
    generated, _ = model.sample(
        cond=audio.repeat(2, 1),
        text=batch_final_text_lists,
        duration=torch.Tensor(durations).to(device).type(torch.long),
        steps=nfe_step,
        cfg_strength=cfg_strength,
        sway_sampling_coef=-1.0,
    )

In [247]:
generated.shape

torch.Size([2, 1737, 100])

In [248]:
rms, target_rms

(0.14197704, 0.1)

In [249]:
generated.shape

torch.Size([2, 1737, 100])

In [250]:
after_durations

[272, 366]

In [251]:
with torch.no_grad():
    generated = generated.to(torch.float32)
    generated = generated[:, ref_audio_len:, :]
    generated_mel_spec = generated.permute(0, 2, 1)
    generated_wave = vocoder.decode(generated_mel_spec)
    if rms < target_rms:
        generated_wave = generated_wave * rms / target_rms

    # wav -> numpy
    generated_wave = generated_wave.squeeze().cpu().numpy()

In [252]:
generated_wave.shape

(2, 93440)

In [253]:
actual_after_durations = [d * hop_length for d in after_durations]
actual_after_durations

[69632, 93696]

In [254]:
import IPython.display as ipd
ipd.Audio(generated_wave[0, :actual_after_durations[0]], rate = sr_)

In [256]:
ipd.Audio(generated_wave[1, :actual_after_durations[1]], rate = sr_)