In [None]:
from zipfile import ZipFile
with ZipFile("main_dataset_folder.zip", "r") as zip_ref:
  zip_ref.extractall("./")
print("Extraction of zip file complete")

In [None]:
from datasets import  load_dataset, Audio

dataset = load_dataset(
    "csv",
    data_files="/content/main_dataset_folder/main_dataset.csv",
    split = "train"
)

dataset

In [None]:
dataset = dataset.remove_columns(["Unnamed: 0", "id", "filename", "start", "end", "duration"])

In [None]:
dataset

In [None]:
import os

AUDIO_ROOT = "/main_dataset_folder"

def fix_path(example):
  example["file_path"] = os.path.join(AUDIO_ROOT, example["file_path"])
  return example

dataset = dataset.map(fix_path)

In [None]:
import numpy as np
import librosa
import soundfile as sf
import os

def load_audio_with_librosa(example):
    try:
        audio_array, sample_rate = librosa.load(
            example["file_path"],
            sr=16000,  # 16kHz resample
            mono=True,
            dtype=np.float32
        )

        audio_array = audio_array.astype(np.float32)

        example["audio"] = {
            "array": audio_array,
            "sampling_rate": 16000
        }

        return example
    except Exception as e:
        print(f"Error loading audio file {example['file_path']}: {e}")
    
        example["audio"] = {
            "array": np.zeros(16000, dtype=np.float32),
            "sampling_rate": 16000
        }
        return example

dataset = dataset.map(load_audio_with_librosa)


In [None]:
dataset["file_path"][0]

In [None]:
from transformers import SpeechT5Processor

checkpoint = "microsoft/speecht5_tts"
processor = SpeechT5Processor.from_pretrained(checkpoint)

In [None]:
tokenizer = processor.tokenizer

In [None]:
def extract_all_chars(batch):
  texts = [str(text) for text in batch["text"] if text is not None]
  all_text = " ".join(texts)
  vocab = list(set(all_text))
  return {"vocab": [vocab], "all text": [all_text]}

vocabs = dataset.map(
    extract_all_chars,
    batched = True,
    batch_size = -1,
    keep_in_memory = True,
    remove_columns = dataset.column_names,
)

dataset_vocab = set(vocabs["vocab"][0])
tokenizer_vocab = {k for k, _ in tokenizer.get_vocab().items()}

In [None]:
dataset_vocab- tokenizer_vocab

In [None]:
dataset["text"][5696]

In [None]:
import re

def normalize_text(text):

  if text is None or not str(text).strip():
    return ""

  text = str(text).lower()

  text = re.sub(r'[^\w\s\']', '', text)
  text = ' '.join(text.split())

  return text

def add_normalized_text(example):
  example["normalized_text"] = normalize_text(example["text"])

  return example

dataset = dataset.map(add_normalized_text)

In [None]:
def extract_all_chars(batch):
    all_text = " ".join(batch["normalized_text"])
    vocab = list(set(all_text))
    return {"vocab": [vocab], "all_text": [all_text]}


vocabs = dataset.map(
    extract_all_chars,
    batched=True,
    batch_size=-1,
    keep_in_memory=True,
    remove_columns=dataset.column_names,
)

dataset_vocab = set(vocabs["vocab"][0])
tokenizer_vocab = {k for k, _ in tokenizer.get_vocab().items()}

In [None]:
dataset_vocab - tokenizer_vocab

In [None]:
replacements = [
    ("â", "a"),
    ("ç", "ch"),
    ("ğ", "gh"),
    ("ı", "i"),
    ("î", "i"),
    ("ö", "oe"),
    ("ş", "sh"),
    ("ü", "ue"),
    ("û", "u"),
]

def cleanup_text(inputs):
  for src, dst in replacements:
    inputs["normalized_text"] = inputs["normalized_text"].replace(src, dst)
  return inputs

dataset = dataset.map(cleanup_text)

In [None]:
import os
import torch
from speechbrain.pretrained import EncoderClassifier

spk_model_name = "speechbrain/spkrec-xvect-voxceleb"

device = "cuda" if torch.cuda.is_available() else "cpu"
speaker_model = EncoderClassifier.from_hparams(
    source=spk_model_name,
    run_opts={"device": device},
    savedir=os.path.join("/tmp", spk_model_name),
)

def create_speaker_embedding(waveform):
    with torch.no_grad():
        speaker_embeddings = speaker_model.encode_batch(torch.tensor(waveform))
        speaker_embeddings = torch.nn.functional.normalize(speaker_embeddings, dim=2)
        speaker_embeddings = speaker_embeddings.squeeze().cpu().numpy()
    return speaker_embeddings

In [None]:
dataset

In [None]:
def prepare_dataset(example):
    audio_data = example["audio"]

    processed_example = processor(
        text=example["normalized_text"],
        audio_target=audio_data["array"],
        sampling_rate=audio_data["sampling_rate"],
        return_attention_mask=False,
    )

    # strip off the batch dimension
    processed_example["labels"] = processed_example["labels"][0]

    # use SpeechBrain to obtain x-vector
    processed_example["speaker_embeddings"] = create_speaker_embedding(audio_data["array"])

    return processed_example

In [None]:
dataset = dataset.map(prepare_dataset, remove_columns=dataset.column_names)

In [None]:
dataset

In [None]:
def is_not_too_long(input_ids):
    input_length = len(input_ids)
    return input_length < 200

dataset = dataset.filter(is_not_too_long, input_columns=["input_ids"])
len(dataset)

In [None]:
dataset = dataset.train_test_split(test_size=0.1)

In [None]:
from dataclasses import dataclass
from typing import Any, Dict, List, Union


@dataclass
class TTSDataCollatorWithPadding:
    processor: Any

    def __call__(
        self, features: List[Dict[str, Union[List[int], torch.Tensor]]]
    ) -> Dict[str, torch.Tensor]:
        input_ids = [{"input_ids": feature["input_ids"]} for feature in features]
        label_features = [{"input_values": feature["labels"]} for feature in features]
        speaker_features = [feature["speaker_embeddings"] for feature in features]

        # collate the inputs and targets into a batch
        batch = processor.pad(
            input_ids=input_ids, labels=label_features, return_tensors="pt"
        )

        batch["labels"] = batch["labels"].masked_fill(
            batch.decoder_attention_mask.unsqueeze(-1).ne(1), -100
        )

        del batch["decoder_attention_mask"]

        # round down target lengths to multiple of reduction factor
        if model.config.reduction_factor > 1:
            target_lengths = torch.tensor(
                [len(feature["input_values"]) for feature in label_features]
            )
            target_lengths = target_lengths.new(
                [
                    length - length % model.config.reduction_factor
                    for length in target_lengths
                ]
            )
            max_length = max(target_lengths)
            batch["labels"] = batch["labels"][:, :max_length]

        # add in the speaker embeddings
        batch["speaker_embeddings"] = torch.tensor(speaker_features)

        return batch

In [None]:
data_collator = TTSDataCollatorWithPadding(processor=processor)

In [None]:
from transformers import SpeechT5ForTextToSpeech

model = SpeechT5ForTextToSpeech.from_pretrained(checkpoint)

In [None]:
from functools import partial

# disable cache during training since it's incompatible with gradient checkpointing
model.config.use_cache = False

In [None]:
from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir="speecht5_finetuned_tts_tr",
    per_device_train_batch_size=4,
    gradient_accumulation_steps=8,
    learning_rate=5e-4,
    warmup_steps=150,
    max_steps=800,
    gradient_checkpointing=False,
    fp16=True,
    eval_strategy="steps",
    per_device_eval_batch_size=1,
    save_steps=200,
    eval_steps=200,
    logging_steps=25,
    report_to=["tensorboard"],
    load_best_model_at_end=True,
    greater_is_better=False,
    label_names=["labels"],
    push_to_hub=True,
    dataloader_pin_memory=False,
    optim="adamw_torch",
    weight_decay=0.01,
    lr_scheduler_type="cosine",
)

In [None]:
from transformers import Seq2SeqTrainer

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=dataset["train"],
    eval_dataset=dataset["test"],
    data_collator=data_collator,
    tokenizer=processor,
)

In [None]:
trainer.train()

In [None]:
trainer.push_to_hub()

In [None]:
model = SpeechT5ForTextToSpeech.from_pretrained(
    "{your name here}/{your model name here}"
)

In [None]:
example = dataset["test"][304]
speaker_embeddings = torch.tensor(example["speaker_embeddings"]).unsqueeze(0)

In [None]:
text = "Merhaba, ben Mehmet. 21 yaşındayım. Umarım bu ses kaydı anlaşılırdır ve bu proje de bitmiştir"

In [None]:
number_words = {
    0: "sıfır", 1: "bir", 2: "iki", 3: "üç", 4: "dört", 5: "beş", 6: "altı", 7: "yedi", 8: "sekiz", 9: "dokuz",
    10: "on", 11: "on bir", 12: "on iki", 13: "on üç", 14: "on dört", 15: "on beş", 16: "on altı", 17: "on yedi",
    18: "on sekiz", 19: "on dokuz", 20: "yirmi", 30: "otuz", 40: "kırk", 50: "elli", 60: "altmış", 70: "yetmiş",
    80: "seksen", 90: "doksan", 100: "yüz", 1000: "bin"
}

def number_to_words(number):
    if number < 20:
        return number_words[number]
    elif number < 100:
        tens, unit = divmod(number, 10)
        return number_words[tens * 10] + (" " + number_words[unit] if unit else "")
    elif number < 1000:
        hundreds, remainder = divmod(number, 100)
        return (number_words[hundreds] + " yüz" if hundreds > 1 else "yüz") + (" " + number_to_words(remainder) if remainder else "")
    elif number < 1000000:
        thousands, remainder = divmod(number, 1000)
        return (number_to_words(thousands) + " bin" if thousands > 1 else "bin") + (" " + number_to_words(remainder) if remainder else "")
    elif number < 1000000000:
        millions, remainder = divmod(number, 1000000)
        return number_to_words(millions) + " milyon" + (" " + number_to_words(remainder) if remainder else "")
    elif number < 1000000000000:
        billions, remainder = divmod(number, 1000000000)
        return number_to_words(billions) + " milyar" + (" " + number_to_words(remainder) if remainder else "")
    else:
        return str(number)

def replace_numbers_with_words(text):

    def replace(match):
        number = int(match.group())
        return number_to_words(number)

    # Find the numbers and change with words.
    result = re.sub(r'\b\d+\b', replace, text)

    return result

In [None]:
# Function to clean up text using the replacement pairs
def cleanup_text(text):
    for src, dst in replacements:
        text = text.replace(src, dst)
    return text

In [None]:
import re
converted_text = replace_numbers_with_words(text)
cleaned_text = cleanup_text(converted_text)
final_text = normalize_text(cleaned_text)
final_text

In [None]:
inputs = processor(text=final_text, return_tensors="pt")

In [None]:
from transformers import SpeechT5HifiGan

vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan")
speech = model.generate_speech(inputs["input_ids"], speaker_embeddings, vocoder=vocoder)

In [None]:
from IPython.display import Audio
import soundfile as sf

Audio(speech.numpy(), rate=16000)
# Save the audio to a file (e.g., 'output.wav')
sf.write('output.wav', speech.numpy(), 16000)