In [22]:
import re
from tqdm import tqdm
import argparse
from string import punctuation

import torch
import yaml
import numpy as np
from torch.utils.data import DataLoader
from g2p_en import G2p
from pypinyin import pinyin, Style

from utils.model import get_vocoder
from utils.tools import to_device, synth_samples
from dataset import TextDataset
from text import text_to_sequence

import os
from model import ScheduledOptim


In [23]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [24]:
def read_lexicon(lex_path):
    lexicon = {}
    with open(lex_path) as f:
        for line in f:
            temp = re.split(r"\s+", line.strip("\n"))
            word = temp[0]
            phones = temp[1:]
            if word.lower() not in lexicon:
                lexicon[word.lower()] = phones
    return lexicon


In [158]:
def preprocess_english(text, preprocess_config):
    text = text.rstrip(punctuation)
    lexicon = read_lexicon(preprocess_config["path"]["lexicon_path"])

    g2p = G2p()
    phones = []
    words = re.split(r"([,;.\-\?\!\s+])", text)
    idx = []
    for w in words:
        len_before = len(phones)
        if w.lower() in lexicon:
            phones += lexicon[w.lower()]
        else:
            phones += list(filter(lambda p: p != " ", g2p(w)))
        if w != " ":
            c_new_phones = len(phones) - len_before
            idx.append(c_new_phones)
            
    phones = "{" + "}{".join(phones) + "}"
    phones = re.sub(r"\{[^\w\s]?\}", "{sp}", phones)
    phones = phones.replace("}{", " ")
    words = [w for w in words if w != " "]
    print("Raw Text Sequence: {}".format(text))
    print("Phoneme Sequence: {}".format(phones))
    print("Words: {}".format(words))
    print("Idx: {}".format(idx))
    sequence = np.array(
        text_to_sequence(
            phones, preprocess_config["preprocessing"]["text"]["text_cleaners"]
        )
    )

    return np.array(sequence), words, idx


In [26]:
def synthesize(model, step, configs, vocoder, batchs, control_values):
    preprocess_config, model_config, train_config = configs
    pitch_control, energy_control, duration_control = control_values

    for batch in tqdm(batchs, total=len(batchs), desc="batches:> "):
        batch = to_device(batch, device)
        with torch.no_grad():
            # Forward
            output = model(
                *(batch[2:]),
                p_control=pitch_control,
                e_control=energy_control,
                d_control=duration_control
            )
            synth_samples(
                batch,
                output,
                vocoder,
                model_config,
                preprocess_config,
                train_config["path"]["result_path"],
            )


In [104]:
import os
import json
import copy
import math
from collections import OrderedDict

import torch
import torch.nn as nn
import numpy as np
import torch.nn.functional as F

from utils.tools import get_mask_from_lengths, pad



class VarianceAdaptor(nn.Module):
    """Variance Adaptor"""

    def __init__(self, preprocess_config, model_config):
        super(VarianceAdaptor, self).__init__()
        self.duration_predictor = VariancePredictor(model_config)
        self.length_regulator = LengthRegulator()
        self.pitch_predictor = VariancePredictor(model_config)
        self.energy_predictor = VariancePredictor(model_config)

        self.pitch_feature_level = preprocess_config["preprocessing"]["pitch"][
            "feature"
        ]
        self.energy_feature_level = preprocess_config["preprocessing"]["energy"][
            "feature"
        ]
        assert self.pitch_feature_level in ["phoneme_level", "frame_level"]
        assert self.energy_feature_level in ["phoneme_level", "frame_level"]

        pitch_quantization = model_config["variance_embedding"]["pitch_quantization"]
        energy_quantization = model_config["variance_embedding"]["energy_quantization"]
        n_bins = model_config["variance_embedding"]["n_bins"]
        assert pitch_quantization in ["linear", "log"]
        assert energy_quantization in ["linear", "log"]
        with open(
            os.path.join(preprocess_config["path"]["preprocessed_path"], "stats.json")
        ) as f:
            stats = json.load(f)
            pitch_min, pitch_max = stats["pitch"][:2]
            energy_min, energy_max = stats["energy"][:2]

        if pitch_quantization == "log":
            self.pitch_bins = nn.Parameter(
                torch.exp(
                    torch.linspace(np.log(pitch_min), np.log(pitch_max), n_bins - 1)
                ),
                requires_grad=False,
            )
        else:
            self.pitch_bins = nn.Parameter(
                torch.linspace(pitch_min, pitch_max, n_bins - 1),
                requires_grad=False,
            )
        if energy_quantization == "log":
            self.energy_bins = nn.Parameter(
                torch.exp(
                    torch.linspace(np.log(energy_min), np.log(energy_max), n_bins - 1)
                ),
                requires_grad=False,
            )
        else:
            self.energy_bins = nn.Parameter(
                torch.linspace(energy_min, energy_max, n_bins - 1),
                requires_grad=False,
            )

        self.pitch_embedding = nn.Embedding(
            n_bins, model_config["transformer"]["encoder_hidden"]
        )
        self.energy_embedding = nn.Embedding(
            n_bins, model_config["transformer"]["encoder_hidden"]
        )

    def get_pitch_embedding(self, x, target, mask, control):
        prediction = self.pitch_predictor(x, mask)
        
        if target is not None:
            embedding = self.pitch_embedding(torch.bucketize(target, self.pitch_bins))
        else:
            prediction = prediction * control
            embedding = self.pitch_embedding(
                torch.bucketize(prediction, self.pitch_bins)
            )
        print(f"Pitch: {prediction} \n{prediction.shape}")
        return prediction, embedding

    def get_energy_embedding(self, x, target, mask, control):
        prediction = self.energy_predictor(x, mask)
        
        if target is not None:
            embedding = self.energy_embedding(torch.bucketize(target, self.energy_bins))
        else:
            prediction = prediction * control
            embedding = self.energy_embedding(
                torch.bucketize(prediction, self.energy_bins)
            )
        print(f"Energy: {prediction} \n{prediction.shape}")
        return prediction, embedding

    def forward(
        self,
        x,
        src_mask,
        mel_mask=None,
        max_len=None,
        pitch_target=None,
        energy_target=None,
        duration_target=None,
        p_control=1.0,
        e_control=1.0,
        d_control=1.0,
    ):

        log_duration_prediction = self.duration_predictor(x, src_mask)
        if self.pitch_feature_level == "phoneme_level":
            pitch_prediction, pitch_embedding = self.get_pitch_embedding(
                x, pitch_target, src_mask, p_control
            )
            x = x + pitch_embedding
        if self.energy_feature_level == "phoneme_level":
            energy_prediction, energy_embedding = self.get_energy_embedding(
                x, energy_target, src_mask, e_control
            )
            x = x + energy_embedding

        if duration_target is not None:
            x, mel_len = self.length_regulator(x, duration_target, max_len)
            duration_rounded = duration_target
        else:
            duration_rounded = torch.clamp(
                (torch.round(torch.exp(log_duration_prediction) - 1) * d_control),
                min=0,
            )
            print(f"Duration: {duration_rounded}")
            x, mel_len = self.length_regulator(x, duration_rounded, max_len)
            mel_mask = get_mask_from_lengths(mel_len)

        if self.pitch_feature_level == "frame_level":
            pitch_prediction, pitch_embedding = self.get_pitch_embedding(
                x, pitch_target, mel_mask, p_control
            )
            x = x + pitch_embedding
        if self.energy_feature_level == "frame_level":
            energy_prediction, energy_embedding = self.get_energy_embedding(
                x, energy_target, mel_mask, e_control
            )
            x = x + energy_embedding

        return (
            x,
            pitch_prediction,
            energy_prediction,
            log_duration_prediction,
            duration_rounded,
            mel_len,
            mel_mask,
        )


class LengthRegulator(nn.Module):
    """Length Regulator"""

    def __init__(self):
        super(LengthRegulator, self).__init__()

    def LR(self, x, duration, max_len):
        output = list()
        mel_len = list()
        for batch, expand_target in zip(x, duration):
            expanded = self.expand(batch, expand_target)
            output.append(expanded)
            mel_len.append(expanded.shape[0])

        if max_len is not None:
            output = pad(output, max_len)
        else:
            output = pad(output)

        return output, torch.LongTensor(mel_len).to(device)

    def expand(self, batch, predicted):
        out = list()

        for i, vec in enumerate(batch):
            expand_size = predicted[i].item()
            out.append(vec.expand(max(int(expand_size), 0), -1))
        out = torch.cat(out, 0)

        return out

    def forward(self, x, duration, max_len):
        output, mel_len = self.LR(x, duration, max_len)
        return output, mel_len


class VariancePredictor(nn.Module):
    """Duration, Pitch and Energy Predictor"""

    def __init__(self, model_config):
        super(VariancePredictor, self).__init__()

        self.input_size = model_config["transformer"]["encoder_hidden"]
        self.filter_size = model_config["variance_predictor"]["filter_size"]
        self.kernel = model_config["variance_predictor"]["kernel_size"]
        self.conv_output_size = model_config["variance_predictor"]["filter_size"]
        self.dropout = model_config["variance_predictor"]["dropout"]

        self.conv_layer = nn.Sequential(
            OrderedDict(
                [
                    (
                        "conv1d_1",
                        Conv(
                            self.input_size,
                            self.filter_size,
                            kernel_size=self.kernel,
                            padding=(self.kernel - 1) // 2,
                        ),
                    ),
                    ("relu_1", nn.ReLU()),
                    ("layer_norm_1", nn.LayerNorm(self.filter_size)),
                    ("dropout_1", nn.Dropout(self.dropout)),
                    (
                        "conv1d_2",
                        Conv(
                            self.filter_size,
                            self.filter_size,
                            kernel_size=self.kernel,
                            padding=1,
                        ),
                    ),
                    ("relu_2", nn.ReLU()),
                    ("layer_norm_2", nn.LayerNorm(self.filter_size)),
                    ("dropout_2", nn.Dropout(self.dropout)),
                ]
            )
        )

        self.linear_layer = nn.Linear(self.conv_output_size, 1)

    def forward(self, encoder_output, mask):
        out = self.conv_layer(encoder_output)
        out = self.linear_layer(out)
        out = out.squeeze(-1)

        if mask is not None:
            out = out.masked_fill(mask, 0.0)

        return out


class Conv(nn.Module):
    """
    Convolution Module
    """

    def __init__(
        self,
        in_channels,
        out_channels,
        kernel_size=1,
        stride=1,
        padding=0,
        dilation=1,
        bias=True,
        w_init="linear",
    ):
        """
        :param in_channels: dimension of input
        :param out_channels: dimension of output
        :param kernel_size: size of kernel
        :param stride: size of stride
        :param padding: size of padding
        :param dilation: dilation rate
        :param bias: boolean. if True, bias is included.
        :param w_init: str. weight inits with xavier initialization.
        """
        super(Conv, self).__init__()

        self.conv = nn.Conv1d(
            in_channels,
            out_channels,
            kernel_size=kernel_size,
            stride=stride,
            padding=padding,
            dilation=dilation,
            bias=bias,
        )

    def forward(self, x):
        x = x.contiguous().transpose(1, 2)
        x = self.conv(x)
        x = x.contiguous().transpose(1, 2)

        return x


In [105]:
import os
import json

import torch
import torch.nn as nn
import torch.nn.functional as F

from transformer import Encoder, Decoder, PostNet
from utils.tools import get_mask_from_lengths


class FastSpeech2(nn.Module):
    """ FastSpeech2 """

    def __init__(self, preprocess_config, model_config):
        super(FastSpeech2, self).__init__()
        self.model_config = model_config

        self.encoder = Encoder(model_config)
        self.variance_adaptor = VarianceAdaptor(preprocess_config, model_config)
        self.decoder = Decoder(model_config)
        self.mel_linear = nn.Linear(
            model_config["transformer"]["decoder_hidden"],
            preprocess_config["preprocessing"]["mel"]["n_mel_channels"],
        )
        self.postnet = PostNet()

        self.speaker_emb = None
        if model_config["multi_speaker"]:
            with open(
                os.path.join(
                    preprocess_config["path"]["preprocessed_path"], "speakers.json"
                ),
                "r",
            ) as f:
                n_speaker = len(json.load(f))
            self.speaker_emb = nn.Embedding(
                n_speaker,
                model_config["transformer"]["encoder_hidden"],
            )

    def forward(
        self,
        speakers,
        texts,
        src_lens,
        max_src_len,
        mels=None,
        mel_lens=None,
        max_mel_len=None,
        p_targets=None,
        e_targets=None,
        d_targets=None,
        p_control=1.0,
        e_control=1.0,
        d_control=1.0,
    ):
        src_masks = get_mask_from_lengths(src_lens, max_src_len)
        mel_masks = (
            get_mask_from_lengths(mel_lens, max_mel_len)
            if mel_lens is not None
            else None
        )

        output = self.encoder(texts, src_masks)

        if self.speaker_emb is not None:
            output = output + self.speaker_emb(speakers).unsqueeze(1).expand(
                -1, max_src_len, -1
            )

        (
            output,
            p_predictions,
            e_predictions,
            log_d_predictions,
            d_rounded,
            mel_lens,
            mel_masks,
        ) = self.variance_adaptor(
            output,
            src_masks,
            mel_masks,
            max_mel_len,
            p_targets,
            e_targets,
            d_targets,
            p_control,
            e_control,
            d_control,
        )

        output, mel_masks = self.decoder(output, mel_masks)
        output = self.mel_linear(output)

        postnet_output = self.postnet(output) + output

        return (
            output,
            postnet_output,
            p_predictions,
            e_predictions,
            log_d_predictions,
            d_rounded,
            src_masks,
            mel_masks,
            src_lens,
            mel_lens,
        )

In [106]:
# Load configs

PREPROCESS_CONFIG_PATH = "/work/tc046/tc046/lordzuko/work/SpeakingStyle/config/BC2013/preprocess.yaml"
MODEL_CONFIG_PATH = "/work/tc046/tc046/lordzuko/work/SpeakingStyle/config/BC2013/model.yaml"
TRAIN_CONFIG_PATH = "/work/tc046/tc046/lordzuko/work/SpeakingStyle/config/BC2013/train.yaml"

In [107]:
preprocess_config = yaml.load(
    open(PREPROCESS_CONFIG_PATH, "r"), Loader=yaml.FullLoader
)
model_config = yaml.load(open(MODEL_CONFIG_PATH, "r"), Loader=yaml.FullLoader)
train_config = yaml.load(open(TRAIN_CONFIG_PATH, "r"), Loader=yaml.FullLoader)
configs = (preprocess_config, model_config, train_config)

In [108]:
def get_model(args, configs, device, train=False):
    (preprocess_config, model_config, train_config) = configs

    model = FastSpeech2(preprocess_config, model_config).to(device)
    if args["restore_step"]:
        ckpt_path = os.path.join(
            train_config["path"]["ckpt_path"],
            "{}.pth.tar".format(args["restore_step"]),
        )
        if torch.cuda.is_available():
            ckpt = torch.load(ckpt_path)
        else:
            ckpt = torch.load(ckpt_path, map_location=torch.device('cpu'))
        model.load_state_dict(ckpt["model"])

    if train:
        scheduled_optim = ScheduledOptim(
            model, train_config, model_config, args["restore_step"]
        )
        if args["restore_step"]:
            scheduled_optim.load_state_dict(ckpt["optimizer"])
        model.train()
        return model, scheduled_optim

    model.eval()
    model.requires_grad_ = False
    return model


In [109]:
args = {}
args["restore_step"] = 61000

In [110]:
# Get model
print("Loading Model...")
model = get_model(args, configs, device, train=False)
print("Model Loaded")
# Load vocoder
print("Loading Vocoder...")
vocoder = get_vocoder(model_config, device)
print("Vocoder Loaded")


Loading Model...
Model Loaded
Loading Vocoder...
Removing weight norm...
Vocoder Loaded


In [112]:
def synth_speech(text, speaker_id=0, pitch_control=1.0, energy_control=1.0, duration_control=1.0, fine_control={}):
    ids = raw_texts = [text[:100]]
    speakers = np.array([speaker_id])
    if preprocess_config["preprocessing"]["text"]["language"] == "en":
        texts = np.array([preprocess_english(text, preprocess_config)])
    elif preprocess_config["preprocessing"]["text"]["language"] == "zh":
        texts = np.array([preprocess_mandarin(text, preprocess_config)])
    text_lens = np.array([len(texts[0])])
    batchs = [(ids, raw_texts, speakers, texts, text_lens, max(text_lens))]
            
    control_values = pitch_control, energy_control, duration_control        
    print("Synthesizing ...")
    synthesize(model, args["restore_step"], configs, vocoder, batchs, control_values)

In [113]:
synth_speech(text="synthesis")

Raw Text Sequence: synthesis
Phoneme Sequence: {S IH1 N TH AH0 S AH0 S}
Words: ['synthesis']
Idx: [8]
Synthesizing ...


batches:> :   0%|          | 0/1 [00:00<?, ?it/s]

Pitch: tensor([[ 0.1739,  0.8873,  0.6487,  0.2073,  0.1662, -0.3408, -0.4615, -0.7759]],
       device='cuda:0') 
torch.Size([1, 8])
Energy: tensor([[-0.1701,  3.5771,  3.3476,  0.1656,  0.6157, -0.1228,  0.2820, -0.4996]],
       device='cuda:0') 
torch.Size([1, 8])
Duration: tensor([[14.,  6.,  6.,  5.,  4.,  8.,  6., 11.]], device='cuda:0')


batches:> : 100%|██████████| 1/1 [00:08<00:00,  8.54s/it]


In [114]:
synth_speech(text="synthesis is cool", pitch_control=1.0, energy_control=1.0)

Raw Text Sequence: synthesis is cool
Phoneme Sequence: {S IH1 N TH AH0 S AH0 S IH0 Z K UW1 L}
Words: ['synthesis', 'is', 'cool']
Idx: [8, 2, 3]
Synthesizing ...


batches:> : 100%|██████████| 1/1 [00:01<00:00,  1.74s/it]

Pitch: tensor([[ 0.1186,  1.1969,  1.3294,  1.1833,  0.5511, -0.2413, -0.5019, -0.7311,
         -0.2597, -0.6401, -0.3367,  0.4571, -0.7197]], device='cuda:0') 
torch.Size([1, 13])
Energy: tensor([[-0.4386,  2.7543,  3.3090, -0.2172,  0.5745, -0.5562, -0.0577, -0.5846,
         -0.1813, -0.4881, -1.1717, -0.0766, -0.1817]], device='cuda:0') 
torch.Size([1, 13])
Duration: tensor([[17.,  7.,  7.,  5.,  6.,  9.,  5.,  8.,  4.,  8., 11., 18.,  8.]],
       device='cuda:0')





In [82]:
synth_speech(text="synthesis is cool", pitch_control=1.0, energy_control=0.5)

Raw Text Sequence: synthesis is cool
Phoneme Sequence: {S IH1 N TH AH0 S AH0 S IH0 Z K UW1 L}
Words: ['synthesis', 'is', 'cool']
Idx: [8, 2, 3]
Synthesizing ...


batches:> :   0%|          | 0/1 [00:00<?, ?it/s]

Pitch: tensor([[ 0.0593,  0.5985,  0.6647,  0.5917,  0.2756, -0.1206, -0.2509, -0.3655,
         -0.1298, -0.3200, -0.1683,  0.2285, -0.3598]], device='cuda:0') 
torch.Size([1, 13])
Energy: tensor([[-2.5605e-01,  1.2461e+00,  1.3168e+00, -1.6128e-01,  2.3102e-01,
         -2.8790e-01, -9.2617e-04, -2.5511e-01, -8.9651e-02, -2.1404e-01,
         -5.9024e-01, -7.2405e-02, -2.3174e-02]], device='cuda:0') 
torch.Size([1, 13])
Duration: tensor([[17.,  7.,  7.,  5.,  6.,  9.,  5.,  8.,  4.,  8., 11., 18.,  8.]],
       device='cuda:0')


batches:> : 100%|██████████| 1/1 [00:03<00:00,  3.19s/it]


In [183]:
class ControlledVarianceAdapter(VarianceAdaptor):
    
    def __init__(self, preprocess_config, model_config):
        super(ControlledVarianceAdapter, self).__init__(preprocess_config, model_config)
        
    def get_pitch_embedding(self, x, target, mask, control):
        prediction = self.pitch_predictor(x, mask)
        
        if target is not None:
            embedding = self.pitch_embedding(torch.bucketize(target, self.pitch_bins))
        else:
            if isinstance(control, float):
                prediction = prediction * control
            elif isinstance(control, list):
                prediction = prediction * torch.from_numpy(np.array(control)).to(device)
            embedding = self.pitch_embedding(
                torch.bucketize(prediction, self.pitch_bins)
            )
        print(f"Pitch: {prediction} \n{prediction.shape}")
        return prediction, embedding

    def get_energy_embedding(self, x, target, mask, control):
        prediction = self.energy_predictor(x, mask)
        
        if target is not None:
            embedding = self.energy_embedding(torch.bucketize(target, self.energy_bins))
        else:
            if isinstance(control, float):
                prediction = prediction * control
            elif isinstance(control, list):
                prediction = prediction * torch.from_numpy(np.array(control)).to(device)
            embedding = self.energy_embedding(
                torch.bucketize(prediction, self.energy_bins)
            )
        print(f"Energy: {prediction} \n{prediction.shape}")
        return prediction, embedding
    
    def forward(
        self,
        x,
        src_mask,
        mel_mask=None,
        max_len=None,
        pitch_target=None,
        energy_target=None,
        duration_target=None,
        p_control=1.0,
        e_control=1.0,
        d_control=1.0,
    ):

        log_duration_prediction = self.duration_predictor(x, src_mask)
        if self.pitch_feature_level == "phoneme_level":
            pitch_prediction, pitch_embedding = self.get_pitch_embedding(
                x, pitch_target, src_mask, p_control
            )
            x = x + pitch_embedding
        if self.energy_feature_level == "phoneme_level":
            energy_prediction, energy_embedding = self.get_energy_embedding(
                x, energy_target, src_mask, e_control
            )
            x = x + energy_embedding

        if duration_target is not None:
            x, mel_len = self.length_regulator(x, duration_target, max_len)
            duration_rounded = duration_target
        else:
            if isinstance(d_control, float):
                duration_rounded = torch.clamp(
                    (torch.round(torch.exp(log_duration_prediction) - 1) * d_control),
                    min=0,
                )
            elif isinstance(d_control, list):
                duration_rounded = torch.clamp(
                    (torch.round(torch.exp(log_duration_prediction) - 1) * torch.from_numpy(np.array(d_control)).to(device)),
                    min=0,
                )
            print(f"Duration: {duration_rounded}")
            x, mel_len = self.length_regulator(x, duration_rounded, max_len)
            mel_mask = get_mask_from_lengths(mel_len)

        if self.pitch_feature_level == "frame_level":
            pitch_prediction, pitch_embedding = self.get_pitch_embedding(
                x, pitch_target, mel_mask, p_control
            )
            x = x + pitch_embedding
        if self.energy_feature_level == "frame_level":
            energy_prediction, energy_embedding = self.get_energy_embedding(
                x, energy_target, mel_mask, e_control
            )
            x = x + energy_embedding

        return (
            x,
            pitch_prediction,
            energy_prediction,
            log_duration_prediction,
            duration_rounded,
            mel_len,
            mel_mask,
        )
    

In [184]:
import os
import json

import torch
import torch.nn as nn
import torch.nn.functional as F

from transformer import Encoder, Decoder, PostNet
from utils.tools import get_mask_from_lengths


class FastSpeech2(nn.Module):
    """ FastSpeech2 """

    def __init__(self, preprocess_config, model_config):
        super(FastSpeech2, self).__init__()
        self.model_config = model_config

        self.encoder = Encoder(model_config)
        self.variance_adaptor = ControlledVarianceAdapter(preprocess_config, model_config)
        self.decoder = Decoder(model_config)
        self.mel_linear = nn.Linear(
            model_config["transformer"]["decoder_hidden"],
            preprocess_config["preprocessing"]["mel"]["n_mel_channels"],
        )
        self.postnet = PostNet()

        self.speaker_emb = None
        if model_config["multi_speaker"]:
            with open(
                os.path.join(
                    preprocess_config["path"]["preprocessed_path"], "speakers.json"
                ),
                "r",
            ) as f:
                n_speaker = len(json.load(f))
            self.speaker_emb = nn.Embedding(
                n_speaker,
                model_config["transformer"]["encoder_hidden"],
            )

    def forward(
        self,
        speakers,
        texts,
        src_lens,
        max_src_len,
        mels=None,
        mel_lens=None,
        max_mel_len=None,
        p_targets=None,
        e_targets=None,
        d_targets=None,
        p_control=1.0,
        e_control=1.0,
        d_control=1.0,
    ):
        src_masks = get_mask_from_lengths(src_lens, max_src_len)
        mel_masks = (
            get_mask_from_lengths(mel_lens, max_mel_len)
            if mel_lens is not None
            else None
        )

        output = self.encoder(texts, src_masks)

        if self.speaker_emb is not None:
            output = output + self.speaker_emb(speakers).unsqueeze(1).expand(
                -1, max_src_len, -1
            )

        (
            output,
            p_predictions,
            e_predictions,
            log_d_predictions,
            d_rounded,
            mel_lens,
            mel_masks,
        ) = self.variance_adaptor(
            output,
            src_masks,
            mel_masks,
            max_mel_len,
            p_targets,
            e_targets,
            d_targets,
            p_control,
            e_control,
            d_control,
        )

        output, mel_masks = self.decoder(output, mel_masks)
        output = self.mel_linear(output)

        postnet_output = self.postnet(output) + output

        return (
            output,
            postnet_output,
            p_predictions,
            e_predictions,
            log_d_predictions,
            d_rounded,
            src_masks,
            mel_masks,
            src_lens,
            mel_lens,
        )

In [185]:
def get_model2(args, configs, device, train=False):
    (preprocess_config, model_config, train_config) = configs

    model = FastSpeech2(preprocess_config, model_config).to(device)
    if args["restore_step"]:
        ckpt_path = os.path.join(
            train_config["path"]["ckpt_path"],
            "{}.pth.tar".format(args["restore_step"]),
        )
        if torch.cuda.is_available():
            ckpt = torch.load(ckpt_path)
        else:
            ckpt = torch.load(ckpt_path, map_location=torch.device('cpu'))
        model.load_state_dict(ckpt["model"])

    if train:
        scheduled_optim = ScheduledOptim(
            model, train_config, model_config, args["restore_step"]
        )
        if args["restore_step"]:
            scheduled_optim.load_state_dict(ckpt["optimizer"])
        model.train()
        return model, scheduled_optim

    model.eval()
    model.requires_grad_ = False
    return model


In [190]:
def synth_speech2(text, speaker_id=0, pitch_control=1.0, energy_control=1.0, duration_control=1.0, fine_control={}):
    ids = raw_texts = [text[:100]]
    speakers = np.array([speaker_id])
    if preprocess_config["preprocessing"]["text"]["language"] == "en":
        out = preprocess_english(text, preprocess_config)
        texts, words, idxs = np.array([out[0]]), out[1], out[2]
    elif preprocess_config["preprocessing"]["text"]["language"] == "zh":
        texts = np.array([preprocess_mandarin(text, preprocess_config)])
    text_lens = np.array([len(texts[0])])
    batchs = [(ids, raw_texts, speakers, texts, text_lens, max(text_lens))]
    print(batchs)
    print(words)
    print(idxs)
    if fine_control:
        energy_control = []
        duration_control = []
        pitch_control = []
        for i, x in enumerate(idxs):
            for _ in range(x):
                energy_control.append(fine_control["energy"][0][i])
                pitch_control.append(fine_control["pitch"][0][i])
                duration_control.append(fine_control["duration"][0][i])
        print(energy_control)
        print(pitch_control)
        print(duration_control)
        control_values = pitch_control, energy_control, duration_control
    else: 
        control_values = pitch_control, energy_control, duration_control        
    print("Synthesizing ...")
    synthesize(model2, args["restore_step"], configs, vocoder, batchs, control_values)

In [187]:
# Get model
print("Loading Model...")
model2 = get_model2(args, configs, device, train=False)
print("Model Loaded")

Loading Model...
Model Loaded


In [192]:
fine_control = {
    "pitch": [[0.7, 1, 1.3]],
    "energy": [[1.1, 1, 0.8]],
    "duration": [[0.7, 1, 1.5]]
}

In [193]:
synth_speech2(text="synthesis is cool", fine_control=fine_control)

Raw Text Sequence: synthesis is cool
Phoneme Sequence: {S IH1 N TH AH0 S AH0 S IH0 Z K UW1 L}
Words: ['synthesis', 'is', 'cool']
Idx: [8, 2, 3]
[(['synthesis is cool'], ['synthesis is cool'], array([0]), array([[131, 109, 119, 134,  73, 131,  73, 131, 108, 146, 116, 141, 117]]), array([13]), 13)]
['synthesis', 'is', 'cool']
[8, 2, 3]
[1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1, 1, 0.8, 0.8, 0.8]
[0.7, 0.7, 0.7, 0.7, 0.7, 0.7, 0.7, 0.7, 1, 1, 1.3, 1.3, 1.3]
[0.7, 0.7, 0.7, 0.7, 0.7, 0.7, 0.7, 0.7, 1, 1, 1.5, 1.5, 1.5]
Synthesizing ...


batches:> :   0%|          | 0/1 [00:00<?, ?it/s]

Pitch: tensor([[ 0.0830,  0.8379,  0.9305,  0.8283,  0.3858, -0.1689, -0.3513, -0.5117,
         -0.2597, -0.6401, -0.4376,  0.5942, -0.9356]], device='cuda:0',
       dtype=torch.float64) 
torch.Size([1, 13])
Energy: tensor([[-0.5355,  2.8602,  3.1640, -0.4525,  0.5363, -0.6024, -0.0158, -0.5996,
         -0.1858, -0.4825, -0.9323, -0.0455, -0.1929]], device='cuda:0',
       dtype=torch.float64) 
torch.Size([1, 13])
Duration: tensor([[11.9000,  4.9000,  4.9000,  3.5000,  4.2000,  6.3000,  3.5000,  5.6000,
          4.0000,  8.0000, 16.5000, 27.0000, 12.0000]], device='cuda:0',
       dtype=torch.float64)


batches:> : 100%|██████████| 1/1 [00:07<00:00,  7.78s/it]
