# 演習：声質変換

## 環境構築

In [1]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [2]:
cd /content/drive/'My Drive'/vc_exercise2019

[Errno 2] No such file or directory: '/content/drive/My Drive/vc_exercise2019'
/content


In [3]:
!pip3 install pyworld
!pip3 install pysptk
!pip3 install dtw

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyworld
  Downloading pyworld-0.3.2.tar.gz (214 kB)
[K     |████████████████████████████████| 214 kB 8.2 MB/s 
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
    Preparing wheel metadata ... [?25l[?25hdone
Building wheels for collected packages: pyworld
  Building wheel for pyworld (PEP 517) ... [?25l[?25hdone
  Created wheel for pyworld: filename=pyworld-0.3.2-cp37-cp37m-linux_x86_64.whl size=611567 sha256=5e8ec2b9cd44e71245bccdc2c976c1074000ba9b8d29c88845561269eb97570c
  Stored in directory: /root/.cache/pip/wheels/5e/4c/9c/79f7408701787a80a2cbab87d7257fd0081fe601251abaa42b
Successfully built pyworld
Installing collected packages: pyworld
Successfully installed pyworld-0.3.2
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pysptk
  Downloadi

## 特徴量の分析

In [4]:
import os
import sys
import glob

from scipy.io import wavfile # for wavfile I/O
import pyworld as pw
import numpy as np
import pysptk as sptk

In [5]:
spklist = ["SF", "TF"]  # speaker list [source female speaker, target female speaker]
featlist = ["mgc","f0","ap"]

In [6]:
# Making directories for speech features
for s in spklist:
    for f in featlist:
        if not os.path.exists("data/{}/{}".format(s,f)):
            os.mkdir("data/{}/{}".format(s,f))

FileNotFoundError: ignored

In [None]:
for s in spklist:
    wavlist = os.listdir("data/{}/wav".format(s))
    for wf in wavlist:
        # WORLD analysis for each file
        print("spekaer: {} file: {}".format(s,wf))
        fs, data = wavfile.read("data/{}/wav/{}".format(s,wf))
        data = data.astype(np.float)

        f0, t = pw.harvest(data, fs)
        sp = pw.cheaptrick(data, f0, t, fs)
        ap = pw.d4c(data, f0, t, fs)

        alpha = 0.42
        dim = 24
        mgc = sptk.sp2mc(sp, dim, alpha)

        bn, _ = os.path.splitext(wf)

        with open("data/{}/mgc/{}.mgc".format(s,bn),"wb") as f:
            mgc.tofile(f)
        with open("data/{}/f0/{}.f0".format(s,bn),"wb") as f:
            f0.tofile(f)
        with open("data/{}/ap/{}.ap".format(s,bn),"wb") as f:
            ap.tofile(f)

spekaer: SF file: atr503_a11.wav
spekaer: SF file: atr503_a07.wav
spekaer: SF file: atr503_a38.wav
spekaer: SF file: atr503_a06.wav
spekaer: SF file: atr503_a10.wav
spekaer: SF file: atr503_a39.wav
spekaer: SF file: atr503_a05.wav
spekaer: SF file: atr503_a12.wav
spekaer: SF file: atr503_a04.wav
spekaer: SF file: atr503_a13.wav
spekaer: SF file: atr503_a03.wav
spekaer: SF file: atr503_a17.wav
spekaer: SF file: atr503_a16.wav
spekaer: SF file: atr503_a47.wav
spekaer: SF file: atr503_a14.wav
spekaer: SF file: atr503_a29.wav
spekaer: SF file: atr503_a49.wav
spekaer: SF file: atr503_a15.wav
spekaer: SF file: atr503_a02.wav
spekaer: SF file: atr503_a28.wav
spekaer: SF file: atr503_a48.wav
spekaer: SF file: atr503_a01.wav
spekaer: SF file: atr503_a46.wav
spekaer: SF file: atr503_a44.wav
spekaer: SF file: atr503_a50.wav
spekaer: SF file: atr503_a25.wav
spekaer: SF file: atr503_a45.wav
spekaer: SF file: atr503_a18.wav
spekaer: SF file: atr503_a42.wav
spekaer: SF file: atr503_a41.wav
spekaer: S

## フレーム毎時間アラインメント

In [None]:
import os
import sys
import array

from dtw import dtw
import numpy as np
import pysptk as sptk

In [None]:
srcspk = "SF"
tgtspk = "TF"

mgclist = os.listdir("data/{}/mgc".format(srcspk))

if not os.path.isdir("data/{}/data".format(srcspk)):
    os.mkdir("data/{}/data".format(srcspk))
if not os.path.isdir("data/{}/data".format(tgtspk)):
    os.mkdir("data/{}/data".format(tgtspk))

In [None]:
def distfunc(x,y):
    # Euclid distance except first dim
    return np.linalg.norm(x[1:]-y[1:])

In [None]:
dim = 25 # mgc dim + 1
for mf in mgclist:
    print(mf)
    bn, _ = os.path.splitext(mf)
    srcfile = "data/{}/mgc/{}".format(srcspk,mf)
    tgtfile = "data/{}/mgc/{}".format(tgtspk,mf)

    with open(srcfile,"rb") as f:
        x = np.fromfile(f, dtype="<f8", sep="")
        x = x.reshape(len(x)//dim,dim)
    with open(tgtfile,"rb") as f:
        y = np.fromfile(f, dtype="<f8", sep="")
        y = y.reshape(len(y)//dim,dim)
    print("framelen: (x,y) = {} {}".format(len(x),len(y)))
    _,_,_, twf = dtw(x,y,distfunc)
    srcout = "data/{}/data/{}.dat".format(srcspk,bn)
    tgtout = "data/{}/data/{}.dat".format(tgtspk,bn)

    with open(srcout,"wb") as f:
        x[twf[0]].tofile(f)
    with open(tgtout,"wb") as f:
        y[twf[1]].tofile(f)

## 音声変換モデルの学習

In [None]:
# Listing training/evaluation data
!mkdir -p conf
!ls data/SF/data/ | head -45 | sed -e 's/\.dat//' > conf/train.list
!ls data/SF/data/ | tail -5 | sed -e 's/\.dat//' > conf/eval.list

In [None]:
import numpy as np
import torch
from torch import nn, optim
from torch.nn import functional as F
import os
import sys
import time

In [None]:
def get_dataset(dim=25):
    x = []
    y = []
    datalist = []
    with open("conf/train.list","r") as f:
        for line in f:
            line = line.rstrip()
            datalist.append(line)

    for d in datalist:
        print(d)
        with open("data/SF/data/{}.dat".format(d),"rb") as f:
            dat = np.fromfile(f,dtype="<f8",sep="")
            x.append(dat.reshape(len(dat)//dim,dim))
        with open("data/TF/data/{}.dat".format(d),"rb") as f:
            dat = np.fromfile(f,dtype="<f8",sep="")
            y.append(dat.reshape(len(dat)//dim,dim))
    return x,y

In [None]:
class VCDNN(nn.Module):
        def __init__(self, dim=25, n_units=256):
            super(VCDNN, self).__init__()
            self.fc = nn.ModuleList([
                           nn.Linear(dim, n_units),
                           nn.Linear(n_units, n_units),
                           nn.Linear(n_units, dim)
            ])
            
        def forward(self, x):
            h1 = F.relu(self.fc[0](x))
            h2 = F.relu(self.fc[1](h1))
            h3 = self.fc[2](h2)
            return h3
        
        def get_predata(self, x):
            _x = torch.from_numpy(x.astype(np.float32))
            return self.forward(_x).detach().numpy()

In [None]:
x_train, y_train = get_dataset()
# parameters for training
n_epoch = 50
dim = 25
n_units = 128
N = len(x_train)

model = VCDNN(dim,n_units)
model.double()
optimizer = optim.Adam(model.parameters())

loss_fn = nn.MSELoss()

# loop
model.train()

losses = []
sum_loss = 0

for epoch in range(1, n_epoch + 1):
    sum_loss = 0

    for i in range(0, N):
        x_batch =torch.from_numpy(x_train[i])
        y_batch = torch.from_numpy(y_train[i])
        
        optimizer.zero_grad()
        
        predict_y_batch = model(x_batch)
        loss = loss_fn(predict_y_batch, y_batch)
        loss.backward()
        optimizer.step()
        sum_loss += loss.item()
        
        average_loss = sum_loss / N
        losses.append(average_loss)

        print("epoch: {}/{}  loss: {}".format(epoch, n_epoch, average_loss))

if not os.path.isdir("model"):
    os.mkdir("model")
torch.save(model.state_dict(), "model/vcmodel.model")


In [None]:
!ls ./model/

vcmodel.model


## 学習したモデルによる音声の変換

In [None]:
import numpy as np
import pysptk as sptk
import pyworld as pw
from scipy.io import wavfile
import os
import sys
import time

In [None]:
dim = 25
n_units = 128

model = VCDNN(dim,n_units)
_ = model.load_state_dict(torch.load("model/vcmodel.model"))

In [None]:
# test data
x = []
datalist = []
with open("conf/eval.list","r") as f:
    for line in f:
        line = line.rstrip()
        datalist.append(line)

for d in datalist:
    with open("data/SF/mgc/{}.mgc".format(d),"rb") as f:
        dat = np.fromfile(f,dtype="<f8",sep="")
        x.append(dat.reshape(len(dat)//dim,dim))

if not os.path.isdir("result"):
    os.mkdir("result")
if not os.path.isdir("result/wav"):
    os.mkdir("result/wav")

fs = 16000
fftlen = 512
alpha = 0.42
for i in range(0,len(datalist)):
    outfile = "result/wav/{}.wav".format(datalist[i])
    with open("data/SF/f0/{}.f0".format(datalist[i]),"rb") as f:
        f0 = np.fromfile(f, dtype="<f8", sep="")
    with open("data/SF/ap/{}.ap".format(datalist[i]),"rb") as f:
        ap = np.fromfile(f, dtype="<f8", sep="")
        ap = ap.reshape(len(ap)//(fftlen+1),fftlen+1)
    y = model.get_predata(x[i])
    y = y.astype(np.float64)
    sp = sptk.mc2sp(y, alpha, fftlen*2)
    owav = pw.synthesize(f0, sp, ap, fs)
    owav = np.clip(owav, -32768, 32767)
    wavfile.write(outfile, fs, owav.astype(np.int16))

In [None]:
!ls result/wav

atr503_a46.wav	atr503_a47.wav	atr503_a48.wav	atr503_a49.wav	atr503_a50.wav
