In [None]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import sys
sys.path.append('/content/drive/MyDrive/TTS_2023_V3')


In [None]:
!pip install - r requirements.txt


In [None]:
import os
import random
import json
import tgt
import librosa
import numpy as np
import pyworld as pw
from scipy.interpolate import interp1d
from sklearn.preprocessing import StandardScaler
from tqdm import tqdm
import constants
import audio_processing as audio


In [None]:
class Preprocessor:
    def __init__(self, config):
        self.in_dir = constants.RAW_DATA_PATH
        self.out_dir = constants.DATA_PATH
        self.val_size = constants.VAL_SIZE
        self.sampling_rate = constants.SAMPLING_RATE
        self.hop_length = constants.STFT_HOP_LENGTH

        self.STFT = audio.stft.TacotronSTFT(
            constants.STFT_FILTER_LENGTH,
            constants.STFT_HOP_LENGTH,
            constants.STFT_WIN_LENGTH,
            constants.N_MEL_CHANNELS,
            constants.SAMPLING_RATE,
            constants.MEL_FMIN,
            constants.MEL_FMAX
        )

    def build_from_path(self):
        os.makedirs((os.path.join(self.out_dir, "mel")), exist_ok=True)
        os.makedirs((os.path.join(self.out_dir, "pitch")), exist_ok=True)
        os.makedirs((os.path.join(self.out_dir, "energy")), exist_ok=True)
        os.makedirs((os.path.join(self.out_dir, "duration")), exist_ok=True)

        print("Processing Data ...")
        out = []
        n_frames = 0
        pitch_scaler = StandardScaler()
        energy_scaler = StandardScaler()

        # Compute pitch, energy, duration, and mel-spectrogram
        speakers = {}
        for i, speaker in enumerate(os.listdir(self.in_dir)):
            speakers[speaker] = i
            for wav_name in tqdm(os.listdir(os.path.join(self.in_dir, speaker))):
                if ".wav" in wav_name:
                    basename = wav_name.split(".")[0]
                    tg_path = self.out_dir+"/TextGrid/" + \
                        speaker+"/{}.TextGrid".format(basename)
                    # print(tg_path )
                    if os.path.exists(tg_path):
                        ret = self.process_utterance(speaker, basename)
                        if ret is None:
                            continue
                        else:
                            info, pitch, energy, n = ret
                        out.append(info)

                    if len(pitch) > 0:
                        pitch_scaler.partial_fit(pitch.reshape((-1, 1)))
                    if len(energy) > 0:
                        energy_scaler.partial_fit(energy.reshape((-1, 1)))

                    n_frames += n

        print("Computing statistic quantities ...")
        # Perform normalization if necessary
        pitch_mean = pitch_scaler.mean_[0]
        pitch_std = pitch_scaler.scale_[0]
        pitch_min, pitch_max = self.normalize(os.path.join(
            self.out_dir, "pitch"), pitch_mean, pitch_std)

        energy_mean = energy_scaler.mean_[0]
        energy_std = energy_scaler.scale_[0]
        energy_min, energy_max = self.normalize(os.path.join(
            self.out_dir, "energy"), energy_mean, energy_std)

        # Save files
        with open(os.path.join(self.out_dir, "speakers.json"), "w") as f:
            f.write(json.dumps(speakers))

        with open(os.path.join(self.out_dir, "stats.json"), "w") as f:
            stats = {
                "pitch": [
                    float(pitch_min),
                    float(pitch_max),
                    float(pitch_mean),
                    float(pitch_std),
                ],
                "energy": [
                    float(energy_min),
                    float(energy_max),
                    float(energy_mean),
                    float(energy_std),
                ],
            }
            f.write(json.dumps(stats))
        with open(os.path.join(self.out_path, "stats.json"), "w") as f:
            f.write(json.dumps(stats))
        print("Total time: {} hours".format(
            n_frames * self.hop_length / self.sampling_rate / 3600))

        random.shuffle(out)
        out = [r for r in out if r is not None]

        # Write metadata
        with open(os.path.join(self.out_dir, "train.txt"), "w", encoding="utf-8") as f:
            for m in out[self.val_size:]:
                f.write(m + "\n")
        with open(os.path.join(self.out_dir, "val.txt"), "w", encoding="utf-8") as f:
            for m in out[: self.val_size]:
                f.write(m + "\n")

        return out

    def process_utterance(self, speaker, basename):
        main_path = self.in_dir+'/'
        wav_path = main_path+speaker+"/{}.wav".format(basename)
        text_path = main_path+speaker+"/{}.lab".format(basename)
        tg_path = self.out_dir + "/TextGrid/" + \
            speaker + "/{}.TextGrid".format(basename)

        # Get alignments
        textgrid = tgt.io.read_textgrid(tg_path, encoding="utf-8-sig")
        phone, duration, start, end = self.get_alignments(
            textgrid.get_tier_by_name("phones"))
        D = sum(duration)
        text = "{" + " ".join(phone) + "}"
        if start >= end:
            return None

        wav, _ = librosa.load(wav_path)
        wav = wav[int(self.sampling_rate * start)                  : int(self.sampling_rate * end)].astype(np.float32)

        # Read raw text
        with open(text_path, "r") as f:
            raw_text = f.readline().strip("\n")

        pitch, t = pw.dio(
            wav.astype(np.float64),
            self.sampling_rate,
            frame_period=self.hop_length / self.sampling_rate * 1000,
        )
        pitch = pw.stonemask(wav.astype(np.float64), pitch,
                             t, self.sampling_rate)[: D]

        if np.sum(pitch != 0) <= 1:
            return None

        # Compute mel-scale spectrogram and energy
        mel_spectrogram, energy = self.STFT.get_mel_from_wav(wav, self.STFT)
        mel_spectrogram = mel_spectrogram[:, : D]
        energy = energy[: D]

        # perform linear interpolation
        nonzero_ids = np.where(pitch != 0)[0]
        interp_fn = interp1d(
            nonzero_ids,
            pitch[nonzero_ids],
            fill_value=(pitch[nonzero_ids[0]], pitch[nonzero_ids[-1]]),
            bounds_error=False,
        )
        pitch = interp_fn(np.arange(0, len(pitch)))

        # Phoneme-level average
        p = 0
        for i, d in enumerate(duration):
            if d > 0:
                pitch[i] = np.mean(pitch[p: p + d])
            else:
                pitch[i] = 0
            p += d
        pitch = pitch[: len(duration)]

        # Phoneme-level average
        p = 0
        for i, d in enumerate(duration):
            if d > 0:
                energy[i] = np.mean(energy[p: p + d])
            else:
                energy[i] = 0
            p += d
        energy = energy[: len(duration)]

        # Savining files
        np.save("{}/duration/{}-duration-{}.npy".format(self.out_dir,
                speaker, basename), duration)

        np.save("{}/pitch/{}-pitch-{}.npy".format(self.out_dir,
                speaker, basename), pitch)

        np.save("{}/energy/{}-energy-{}.npy".format(self.out_dir,
                speaker, basename), energy)

        np.save("{}/mel/{}-mel-{}.npy".format(self.out_dir,
                speaker, basename), mel_spectrogram.T)

        return (
            "|".join([basename, speaker, text, raw_text]),
            self.remove_outlier(pitch),
            self.remove_outlier(energy),
            mel_spectrogram.shape[1],
        )

    def get_alignments(self, tier):
        silent_phones = ["sil", "sp", "spn"]
        phones = []
        durations = []
        start_time = 0
        end_time = 0
        end_index = 0
        for t in tier._objects:
            s, e, p = t.start_time, t.end_time, t.text
            # skipping leading silent phones
            if len(phones) == 0:
                if p in silent_phones:
                    continue
                else:
                    start_time = s
            if p not in silent_phones:
                phones.append(p)
                end_time = e
                end_index = len(phones)
            else:
                phones.append(p)
            durations.append(
                int((e - s) * self.sampling_rate / self.hop_length))
        phones = phones[:end_index]
        durations = durations[:end_index]
        return phones, durations, start_time, end_time

    # Remove values less than first quartile or greater than third quartile by 1.5 * interquartile range
    def remove_outlier(self, values):
        values = np.array(values)
        p25 = np.percentile(values, 25)
        p75 = np.percentile(values, 75)
        lower = 2.5 * p25 - 1.5 * p75
        upper = 2.5 * p75 - 1.5 * p25
        normal_indices = np.logical_and(values > lower, values < upper)
        return values[normal_indices]

    # Normalize values using mean and std (Standardization) and save the files
    def normalize_values(self, in_dir, mean, std):
        max_value = np.finfo(np.float64).min
        min_value = np.finfo(np.float64).max
        filenames = os.listdir(in_dir)
        for filename in filenames:
            filename = in_dir+'/'+filename
            values = np.load(filename)
            normalized_values = (values - mean) / std
            np.save(filename, normalized_values)
            max_value = max(max_value, max(normalized_values))
            min_value = min(min_value, min(normalized_values))
        return min_value, max_value


In [None]:
preprocessor = Preprocessor()
preprocessor.build_from_path()


Processing Data ...


100%|██████████| 3626/3626 [37:37<00:00,  1.61it/s]


Computing statistic quantities ...
Total time: 3.7055790375409425 hours
